mirror of https://github.com/dswd/vpncloud.git
Compiles but does not work
This commit is contained in:
parent
caedec6fae
commit
dd168139f0
|
@ -1,7 +1,7 @@
|
||||||
use super::{core::test_speed, rotate::RotationState};
|
use super::{core::test_speed, rotate::RotationState};
|
||||||
pub use super::{
|
pub use super::{
|
||||||
core::{CryptoCore, EXTRA_LEN, TAG_LEN},
|
core::{CryptoCore, EXTRA_LEN, TAG_LEN},
|
||||||
init::{is_init_message, INIT_MESSAGE_FIRST_BYTE, InitState, InitResult}
|
init::{is_init_message, InitResult, InitState, INIT_MESSAGE_FIRST_BYTE}
|
||||||
};
|
};
|
||||||
use crate::{
|
use crate::{
|
||||||
error::Error,
|
error::Error,
|
||||||
|
@ -182,6 +182,14 @@ impl Crypto {
|
||||||
Ok(keypair)
|
Ok(keypair)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn public_key_from_private_key(privkey: &str) -> Result<String, Error> {
|
||||||
|
let privkey = from_base62(privkey).map_err(|_| Error::InvalidConfig("Failed to parse private key"))?;
|
||||||
|
let keypair = Ed25519KeyPair::from_seed_unchecked(&privkey)
|
||||||
|
.map_err(|_| Error::InvalidConfig("Key rejected by crypto library"))?;
|
||||||
|
let pubkey = to_base62(keypair.public_key().as_ref());
|
||||||
|
Ok(pubkey)
|
||||||
|
}
|
||||||
|
|
||||||
fn parse_public_key(pubkey: &str) -> Result<Ed25519PublicKey, Error> {
|
fn parse_public_key(pubkey: &str) -> Result<Ed25519PublicKey, Error> {
|
||||||
let pubkey = from_base62(pubkey).map_err(|_| Error::InvalidConfig("Failed to parse public key"))?;
|
let pubkey = from_base62(pubkey).map_err(|_| Error::InvalidConfig("Failed to parse public key"))?;
|
||||||
if pubkey.len() != ED25519_PUBLIC_KEY_LEN {
|
if pubkey.len() != ED25519_PUBLIC_KEY_LEN {
|
||||||
|
@ -295,7 +303,7 @@ impl PeerCrypto {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn every_second(&mut self, out: &mut MsgBuffer) -> MessageResult {
|
pub fn every_second(&mut self, out: &mut MsgBuffer) {
|
||||||
out.clear();
|
out.clear();
|
||||||
if let PeerCrypto::Encrypted { core, rotation, rotate_counter, algorithm, .. } = self {
|
if let PeerCrypto::Encrypted { core, rotation, rotate_counter, algorithm, .. } = self {
|
||||||
core.every_second();
|
core.every_second();
|
||||||
|
@ -309,11 +317,9 @@ impl PeerCrypto {
|
||||||
if !out.is_empty() {
|
if !out.is_empty() {
|
||||||
out.prepend_byte(MESSAGE_TYPE_ROTATION);
|
out.prepend_byte(MESSAGE_TYPE_ROTATION);
|
||||||
self.encrypt_message(out);
|
self.encrypt_message(out);
|
||||||
return MessageResult::Reply
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
MessageResult::None
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -357,9 +363,9 @@ mod tests {
|
||||||
assert_eq!(res, InitResult::Success { peer_payload: vec![], is_initiator: true });
|
assert_eq!(res, InitResult::Success { peer_payload: vec![], is_initiator: true });
|
||||||
assert!(msg.is_empty());
|
assert!(msg.is_empty());
|
||||||
|
|
||||||
let node1 = node1.finish(&mut msg);
|
let mut node1 = node1.finish(&mut msg);
|
||||||
assert!(msg.is_empty());
|
assert!(msg.is_empty());
|
||||||
let node2 = node2.finish(&mut msg);
|
let mut node2 = node2.finish(&mut msg);
|
||||||
assert!(msg.is_empty());
|
assert!(msg.is_empty());
|
||||||
|
|
||||||
debug!("Node1 <- Node2");
|
debug!("Node1 <- Node2");
|
||||||
|
@ -377,21 +383,15 @@ mod tests {
|
||||||
let res = node2.handle_message(&mut buffer).unwrap();
|
let res = node2.handle_message(&mut buffer).unwrap();
|
||||||
assert_eq!(res, MessageResult::Message(1));
|
assert_eq!(res, MessageResult::Message(1));
|
||||||
|
|
||||||
match node1.every_second(&mut msg) {
|
node1.every_second(&mut msg);
|
||||||
MessageResult::None => (),
|
if !msg.is_empty() {
|
||||||
MessageResult::Reply => {
|
let res = node2.handle_message(&mut msg).unwrap();
|
||||||
let res = node2.handle_message(&mut msg).unwrap();
|
assert_eq!(res, MessageResult::None);
|
||||||
assert_eq!(res, MessageResult::None);
|
|
||||||
}
|
|
||||||
other => assert_eq!(other, MessageResult::None)
|
|
||||||
}
|
}
|
||||||
match node2.every_second(&mut msg) {
|
node2.every_second(&mut msg);
|
||||||
MessageResult::None => (),
|
if !msg.is_empty() {
|
||||||
MessageResult::Reply => {
|
let res = node1.handle_message(&mut msg).unwrap();
|
||||||
let res = node1.handle_message(&mut msg).unwrap();
|
assert_eq!(res, MessageResult::None);
|
||||||
assert_eq!(res, MessageResult::None);
|
|
||||||
}
|
|
||||||
other => assert_eq!(other, MessageResult::None)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -258,7 +258,7 @@ pub fn create_dummy_pair(algo: &'static aead::Algorithm) -> (CryptoCore, CryptoC
|
||||||
pub fn test_speed(algo: &'static aead::Algorithm, max_time: &Duration) -> f64 {
|
pub fn test_speed(algo: &'static aead::Algorithm, max_time: &Duration) -> f64 {
|
||||||
let mut buffer = MsgBuffer::new(EXTRA_LEN);
|
let mut buffer = MsgBuffer::new(EXTRA_LEN);
|
||||||
buffer.set_length(1000);
|
buffer.set_length(1000);
|
||||||
let (mut sender, mut receiver) = create_dummy_pair(algo);
|
let (sender, receiver) = create_dummy_pair(algo);
|
||||||
let mut iterations = 0;
|
let mut iterations = 0;
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
while (Instant::now() - start).as_nanos() < max_time.as_nanos() {
|
while (Instant::now() - start).as_nanos() < max_time.as_nanos() {
|
||||||
|
@ -290,7 +290,7 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn test_encrypt_decrypt(algo: &'static aead::Algorithm) {
|
fn test_encrypt_decrypt(algo: &'static aead::Algorithm) {
|
||||||
let (mut sender, mut receiver) = create_dummy_pair(algo);
|
let (sender, receiver) = create_dummy_pair(algo);
|
||||||
let plain = random_data(1000);
|
let plain = random_data(1000);
|
||||||
let mut buffer = MsgBuffer::new(EXTRA_LEN);
|
let mut buffer = MsgBuffer::new(EXTRA_LEN);
|
||||||
buffer.clone_from(&plain);
|
buffer.clone_from(&plain);
|
||||||
|
@ -318,7 +318,7 @@ mod tests {
|
||||||
|
|
||||||
|
|
||||||
fn test_tampering(algo: &'static aead::Algorithm) {
|
fn test_tampering(algo: &'static aead::Algorithm) {
|
||||||
let (mut sender, mut receiver) = create_dummy_pair(algo);
|
let (sender, receiver) = create_dummy_pair(algo);
|
||||||
let plain = random_data(1000);
|
let plain = random_data(1000);
|
||||||
let mut buffer = MsgBuffer::new(EXTRA_LEN);
|
let mut buffer = MsgBuffer::new(EXTRA_LEN);
|
||||||
buffer.clone_from(&plain);
|
buffer.clone_from(&plain);
|
||||||
|
@ -358,7 +358,7 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn test_nonce_pinning(algo: &'static aead::Algorithm) {
|
fn test_nonce_pinning(algo: &'static aead::Algorithm) {
|
||||||
let (mut sender, mut receiver) = create_dummy_pair(algo);
|
let (sender, receiver) = create_dummy_pair(algo);
|
||||||
let plain = random_data(1000);
|
let plain = random_data(1000);
|
||||||
let mut buffer = MsgBuffer::new(EXTRA_LEN);
|
let mut buffer = MsgBuffer::new(EXTRA_LEN);
|
||||||
buffer.clone_from(&plain);
|
buffer.clone_from(&plain);
|
||||||
|
@ -399,7 +399,7 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn test_key_rotation(algo: &'static aead::Algorithm) {
|
fn test_key_rotation(algo: &'static aead::Algorithm) {
|
||||||
let (mut sender, mut receiver) = create_dummy_pair(algo);
|
let (sender, receiver) = create_dummy_pair(algo);
|
||||||
let plain = random_data(1000);
|
let plain = random_data(1000);
|
||||||
let mut buffer = MsgBuffer::new(EXTRA_LEN);
|
let mut buffer = MsgBuffer::new(EXTRA_LEN);
|
||||||
buffer.clone_from(&plain);
|
buffer.clone_from(&plain);
|
||||||
|
|
|
@ -51,6 +51,8 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
|
||||||
fn broadcast_msg(&mut self, type_: u8, msg: &mut MsgBuffer) -> Result<(), Error> {
|
fn broadcast_msg(&mut self, type_: u8, msg: &mut MsgBuffer) -> Result<(), Error> {
|
||||||
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, msg.len(), self.peer_crypto.count());
|
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, msg.len(), self.peer_crypto.count());
|
||||||
let mut msg_data = MsgBuffer::new(100);
|
let mut msg_data = MsgBuffer::new(100);
|
||||||
|
let traffic = &mut self.traffic;
|
||||||
|
let socket = &mut self.socket;
|
||||||
self.peer_crypto.for_each(|addr, crypto| {
|
self.peer_crypto.for_each(|addr, crypto| {
|
||||||
msg_data.set_start(msg.get_start());
|
msg_data.set_start(msg.get_start());
|
||||||
msg_data.set_length(msg.len());
|
msg_data.set_length(msg.len());
|
||||||
|
@ -59,8 +61,8 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
|
||||||
if let Some(crypto) = crypto {
|
if let Some(crypto) = crypto {
|
||||||
crypto.encrypt(&mut msg_data);
|
crypto.encrypt(&mut msg_data);
|
||||||
}
|
}
|
||||||
self.traffic.count_out_traffic(addr, msg_data.len());
|
traffic.count_out_traffic(addr, msg_data.len());
|
||||||
match self.socket.send(msg_data.message(), addr) {
|
match socket.send(msg_data.message(), addr) {
|
||||||
Ok(written) if written == msg_data.len() => Ok(()),
|
Ok(written) if written == msg_data.len() => Ok(()),
|
||||||
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
||||||
Err(e) => Err(Error::SocketIo("IOError when sending", e))
|
Err(e) => Err(Error::SocketIo("IOError when sending", e))
|
||||||
|
|
|
@ -167,7 +167,6 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
|
||||||
_dummy_p: PhantomData,
|
_dummy_p: PhantomData,
|
||||||
_dummy_ts: PhantomData
|
_dummy_ts: PhantomData
|
||||||
};
|
};
|
||||||
res.initialize();
|
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -268,14 +267,6 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Connects to a node given by its address
|
|
||||||
///
|
|
||||||
/// This method connects to node by sending a `Message::Init` to it. If `addr` is a name that
|
|
||||||
/// resolves to multiple addresses, one message is sent to each of them.
|
|
||||||
/// If the node is already a connected peer or the address is blacklisted, no message is sent.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// This method returns `Error::NameError` if the address is a name that fails to resolve.
|
|
||||||
pub fn connect<Addr: ToSocketAddrs + fmt::Debug + Clone>(&mut self, addr: Addr) -> Result<(), Error> {
|
pub fn connect<Addr: ToSocketAddrs + fmt::Debug + Clone>(&mut self, addr: Addr) -> Result<(), Error> {
|
||||||
let addrs = resolve(&addr)?.into_iter().map(mapped_addr).collect::<SmallVec<[SocketAddr; 3]>>();
|
let addrs = resolve(&addr)?.into_iter().map(mapped_addr).collect::<SmallVec<[SocketAddr; 3]>>();
|
||||||
for addr in &addrs {
|
for addr in &addrs {
|
||||||
|
@ -293,627 +284,9 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
|
||||||
true
|
true
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
// Send a message to each resolved address
|
unimplemented!()
|
||||||
for a in addrs {
|
|
||||||
// Ignore error this time
|
|
||||||
self.connect_sock(a).ok();
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_node_info(&self) -> NodeInfo {
|
|
||||||
let mut peers = smallvec![];
|
|
||||||
for peer in self.peers.values() {
|
|
||||||
peers.push(PeerInfo { node_id: Some(peer.node_id), addrs: peer.addrs.clone() })
|
|
||||||
}
|
|
||||||
if peers.len() > 20 {
|
|
||||||
let mut rng = rand::thread_rng();
|
|
||||||
peers.partial_shuffle(&mut rng, 20);
|
|
||||||
peers.truncate(20);
|
|
||||||
}
|
|
||||||
NodeInfo {
|
|
||||||
node_id: self.node_id,
|
|
||||||
peers,
|
|
||||||
claims: self.claims.clone(),
|
|
||||||
peer_timeout: Some(self.peer_timeout_publish),
|
|
||||||
addrs: self.own_addresses.clone()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn connect_sock(&mut self, addr: SocketAddr) -> Result<(), Error> {
|
|
||||||
let addr = mapped_addr(addr);
|
|
||||||
if self.peers.contains_key(&addr)
|
|
||||||
|| self.own_addresses.contains(&addr)
|
|
||||||
|| self.pending_inits.contains_key(&addr)
|
|
||||||
{
|
|
||||||
return Ok(())
|
|
||||||
}
|
|
||||||
debug!("Connecting to {:?}", addr);
|
|
||||||
let payload = self.create_node_info();
|
|
||||||
let mut peer_crypto = self.crypto.peer_instance(payload);
|
|
||||||
let mut msg = MsgBuffer::new(SPACE_BEFORE);
|
|
||||||
peer_crypto.send_ping(&mut msg);
|
|
||||||
self.pending_inits.insert(addr, peer_crypto);
|
|
||||||
self.send_to(addr, &mut msg)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn crypto_housekeep(&mut self) -> Result<(), Error> {
|
|
||||||
let mut msg = MsgBuffer::new(SPACE_BEFORE);
|
|
||||||
let mut del: SmallVec<[SocketAddr; 4]> = smallvec![];
|
|
||||||
for addr in self.pending_inits.keys().copied().collect::<SmallVec<[SocketAddr; 4]>>() {
|
|
||||||
msg.clear();
|
|
||||||
match self.pending_inits.get_mut(&addr).unwrap().every_second(&mut msg) {
|
|
||||||
Err(_) => del.push(addr),
|
|
||||||
Ok(MessageResult::None) => (),
|
|
||||||
Ok(MessageResult::Reply) => self.send_to(addr, &mut msg)?,
|
|
||||||
Ok(_) => unreachable!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for addr in self.peers.keys().copied().collect::<SmallVec<[SocketAddr; 16]>>() {
|
|
||||||
msg.clear();
|
|
||||||
match self.peers.get_mut(&addr).unwrap().crypto.every_second(&mut msg) {
|
|
||||||
Err(_) => del.push(addr),
|
|
||||||
Ok(MessageResult::None) => (),
|
|
||||||
Ok(MessageResult::Reply) => self.send_to(addr, &mut msg)?,
|
|
||||||
Ok(_) => unreachable!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for addr in del {
|
|
||||||
self.pending_inits.remove(&addr);
|
|
||||||
if self.peers.remove(&addr).is_some() {
|
|
||||||
self.connect_sock(addr)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn reconnect_to_peers(&mut self) -> Result<(), Error> {
|
|
||||||
let now = TS::now();
|
|
||||||
// Connect to those reconnect_peers that are due
|
|
||||||
for entry in self.reconnect_peers.clone() {
|
|
||||||
if entry.next > now {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
self.connect(&entry.resolved as &[SocketAddr])?;
|
|
||||||
}
|
|
||||||
for entry in &mut self.reconnect_peers {
|
|
||||||
// Schedule for next second if node is connected
|
|
||||||
for addr in &entry.resolved {
|
|
||||||
if self.peers.contains_key(&addr) {
|
|
||||||
entry.tries = 0;
|
|
||||||
entry.timeout = 1;
|
|
||||||
entry.next = now + 1;
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Resolve entries anew
|
|
||||||
if let Some((ref address, ref mut next_resolve)) = entry.address {
|
|
||||||
if *next_resolve <= now {
|
|
||||||
match resolve(address as &str) {
|
|
||||||
Ok(addrs) => entry.resolved = addrs,
|
|
||||||
Err(_) => {
|
|
||||||
match resolve(&format!("{}:{}", address, DEFAULT_PORT)) {
|
|
||||||
Ok(addrs) => entry.resolved = addrs,
|
|
||||||
Err(err) => warn!("Failed to resolve {}: {}", address, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*next_resolve = now + RESOLVE_INTERVAL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Ignore if next attempt is already in the future
|
|
||||||
if entry.next > now {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Exponential back-off: every 10 tries, the interval doubles
|
|
||||||
entry.tries += 1;
|
|
||||||
if entry.tries > 10 {
|
|
||||||
entry.tries = 0;
|
|
||||||
entry.timeout *= 2;
|
|
||||||
}
|
|
||||||
// Maximum interval is one hour
|
|
||||||
if entry.timeout > MAX_RECONNECT_INTERVAL {
|
|
||||||
entry.timeout = MAX_RECONNECT_INTERVAL;
|
|
||||||
}
|
|
||||||
// Schedule next connection attempt
|
|
||||||
entry.next = now + Time::from(entry.timeout);
|
|
||||||
}
|
|
||||||
self.reconnect_peers.retain(|e| e.final_timeout.unwrap_or(now) >= now);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn housekeep(&mut self) -> Result<(), Error> {
|
|
||||||
let now = TS::now();
|
|
||||||
let mut buffer = MsgBuffer::new(SPACE_BEFORE);
|
|
||||||
let mut del: SmallVec<[SocketAddr; 3]> = SmallVec::new();
|
|
||||||
for (&addr, ref data) in &self.peers {
|
|
||||||
if data.timeout < now {
|
|
||||||
del.push(addr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for addr in del {
|
|
||||||
info!("Forgot peer {} due to timeout", addr_nice(addr));
|
|
||||||
self.peers.remove(&addr);
|
|
||||||
self.table.remove_claims(addr);
|
|
||||||
self.connect_sock(addr)?; // Try to reconnect
|
|
||||||
}
|
|
||||||
self.table.housekeep();
|
|
||||||
self.crypto_housekeep()?;
|
|
||||||
// Periodically extend the port-forwarding
|
|
||||||
if let Some(ref mut pfw) = self.port_forwarding {
|
|
||||||
pfw.check_extend();
|
|
||||||
}
|
|
||||||
let now = TS::now();
|
|
||||||
// Periodically reset own peers
|
|
||||||
if self.next_own_address_reset <= now {
|
|
||||||
self.reset_own_addresses().map_err(|err| Error::SocketIo("Failed to get own addresses", err))?;
|
|
||||||
self.next_own_address_reset = now + OWN_ADDRESS_RESET_INTERVAL;
|
|
||||||
}
|
|
||||||
// Periodically send peer list to peers
|
|
||||||
if self.next_peers <= now {
|
|
||||||
debug!("Send peer list to all peers");
|
|
||||||
let info = self.create_node_info();
|
|
||||||
info.encode(&mut buffer);
|
|
||||||
self.broadcast_msg(MESSAGE_TYPE_NODE_INFO, &mut buffer)?;
|
|
||||||
// Reschedule for next update
|
|
||||||
let min_peer_timeout = self.peers.iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT);
|
|
||||||
let interval = min(self.update_freq as u16, max(min_peer_timeout / 2 - 60, 1));
|
|
||||||
self.next_peers = now + Time::from(interval);
|
|
||||||
}
|
|
||||||
self.reconnect_to_peers()?;
|
|
||||||
if self.next_stats_out < now {
|
|
||||||
// Write out the statistics
|
|
||||||
self.write_out_stats().map_err(|err| Error::FileIo("Failed to write stats file", err))?;
|
|
||||||
self.send_stats_to_statsd()?;
|
|
||||||
self.next_stats_out = now + STATS_INTERVAL;
|
|
||||||
self.traffic.period(Some(5));
|
|
||||||
}
|
|
||||||
if let Some(peers) = self.beacon_serializer.get_cmd_results() {
|
|
||||||
debug!("Loaded beacon with peers: {:?}", peers);
|
|
||||||
for peer in peers {
|
|
||||||
self.connect_sock(peer)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if self.next_beacon < now {
|
|
||||||
self.store_beacon()?;
|
|
||||||
self.load_beacon()?;
|
|
||||||
self.next_beacon = now + Time::from(self.config.beacon_interval);
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Stores the beacon
|
|
||||||
fn store_beacon(&mut self) -> Result<(), Error> {
|
|
||||||
if let Some(ref path) = self.config.beacon_store {
|
|
||||||
let peers: SmallVec<[SocketAddr; 3]> =
|
|
||||||
self.own_addresses.choose_multiple(&mut thread_rng(), 3).cloned().collect();
|
|
||||||
if let Some(path) = path.strip_prefix('|') {
|
|
||||||
self.beacon_serializer
|
|
||||||
.write_to_cmd(&peers, path)
|
|
||||||
.map_err(|e| Error::BeaconIo("Failed to call beacon command", e))?;
|
|
||||||
} else {
|
|
||||||
self.beacon_serializer
|
|
||||||
.write_to_file(&peers, &path)
|
|
||||||
.map_err(|e| Error::BeaconIo("Failed to write beacon to file", e))?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Loads the beacon
|
|
||||||
fn load_beacon(&mut self) -> Result<(), Error> {
|
|
||||||
let peers;
|
|
||||||
if let Some(ref path) = self.config.beacon_load {
|
|
||||||
if let Some(path) = path.strip_prefix('|') {
|
|
||||||
self.beacon_serializer
|
|
||||||
.read_from_cmd(path, Some(50))
|
|
||||||
.map_err(|e| Error::BeaconIo("Failed to call beacon command", e))?;
|
|
||||||
return Ok(())
|
|
||||||
} else {
|
|
||||||
peers = self
|
|
||||||
.beacon_serializer
|
|
||||||
.read_from_file(&path, Some(50))
|
|
||||||
.map_err(|e| Error::BeaconIo("Failed to read beacon from file", e))?;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return Ok(())
|
|
||||||
}
|
|
||||||
debug!("Loaded beacon with peers: {:?}", peers);
|
|
||||||
for peer in peers {
|
|
||||||
self.connect_sock(peer)?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Writes out the statistics to a file
|
|
||||||
fn write_out_stats(&mut self) -> Result<(), io::Error> {
|
|
||||||
if let Some(ref mut f) = self.stats_file {
|
|
||||||
debug!("Writing out stats");
|
|
||||||
f.seek(SeekFrom::Start(0))?;
|
|
||||||
f.set_len(0)?;
|
|
||||||
writeln!(f, "peers:")?;
|
|
||||||
let now = TS::now();
|
|
||||||
for (addr, data) in &self.peers {
|
|
||||||
writeln!(
|
|
||||||
f,
|
|
||||||
" - \"{}\": {{ ttl_secs: {}, crypto: {} }}",
|
|
||||||
addr_nice(*addr),
|
|
||||||
data.timeout - now,
|
|
||||||
data.crypto.algorithm_name()
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
writeln!(f)?;
|
|
||||||
self.table.write_out(f)?;
|
|
||||||
writeln!(f)?;
|
|
||||||
self.traffic.write_out(f)?;
|
|
||||||
writeln!(f)?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sends the statistics to a statsd endpoint
|
|
||||||
fn send_stats_to_statsd(&mut self) -> Result<(), Error> {
|
|
||||||
if let Some(ref endpoint) = self.statsd_server {
|
|
||||||
let peer_traffic = self.traffic.total_peer_traffic();
|
|
||||||
let payload_traffic = self.traffic.total_payload_traffic();
|
|
||||||
let dropped = &self.traffic.dropped;
|
|
||||||
let prefix = self.config.statsd_prefix.as_ref().map(|s| s as &str).unwrap_or("vpncloud");
|
|
||||||
let msg = StatsdMsg::new()
|
|
||||||
.with_ns(prefix, |msg| {
|
|
||||||
msg.add("peer_count", self.peers.len(), "g");
|
|
||||||
msg.add("table_cache_entries", self.table.cache_len(), "g");
|
|
||||||
msg.add("table_claims", self.table.claim_len(), "g");
|
|
||||||
msg.with_ns("traffic", |msg| {
|
|
||||||
msg.with_ns("protocol", |msg| {
|
|
||||||
msg.with_ns("inbound", |msg| {
|
|
||||||
msg.add("bytes", peer_traffic.in_bytes, "c");
|
|
||||||
msg.add("packets", peer_traffic.in_packets, "c");
|
|
||||||
});
|
|
||||||
msg.with_ns("outbound", |msg| {
|
|
||||||
msg.add("bytes", peer_traffic.out_bytes, "c");
|
|
||||||
msg.add("packets", peer_traffic.out_packets, "c");
|
|
||||||
});
|
|
||||||
});
|
|
||||||
msg.with_ns("payload", |msg| {
|
|
||||||
msg.with_ns("inbound", |msg| {
|
|
||||||
msg.add("bytes", payload_traffic.in_bytes, "c");
|
|
||||||
msg.add("packets", payload_traffic.in_packets, "c");
|
|
||||||
});
|
|
||||||
msg.with_ns("outbound", |msg| {
|
|
||||||
msg.add("bytes", payload_traffic.out_bytes, "c");
|
|
||||||
msg.add("packets", payload_traffic.out_packets, "c");
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
msg.with_ns("invalid_protocol_traffic", |msg| {
|
|
||||||
msg.add("bytes", dropped.in_bytes, "c");
|
|
||||||
msg.add("packets", dropped.in_packets, "c");
|
|
||||||
});
|
|
||||||
msg.with_ns("dropped_payload", |msg| {
|
|
||||||
msg.add("bytes", dropped.out_bytes, "c");
|
|
||||||
msg.add("packets", dropped.out_packets, "c");
|
|
||||||
});
|
|
||||||
})
|
|
||||||
.build();
|
|
||||||
let msg_data = msg.as_bytes();
|
|
||||||
let addrs = resolve(endpoint)?;
|
|
||||||
if let Some(addr) = addrs.first() {
|
|
||||||
match self.socket.send(msg_data, *addr) {
|
|
||||||
Ok(written) if written == msg_data.len() => Ok(()),
|
|
||||||
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
|
||||||
Err(e) => Err(Error::SocketIo("IOError when sending", e))
|
|
||||||
}?
|
|
||||||
} else {
|
|
||||||
error!("Failed to resolve statsd server {}", endpoint);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn handle_interface_data(&mut self, data: &mut MsgBuffer) -> Result<(), Error> {
|
|
||||||
// HOT PATH
|
|
||||||
let (src, dst) = P::parse(data.message())?;
|
|
||||||
debug!("Read data from interface: src: {}, dst: {}, {} bytes", src, dst, data.len());
|
|
||||||
self.traffic.count_out_payload(dst, src, data.len());
|
|
||||||
match self.table.lookup(dst) {
|
|
||||||
Some(addr) => {
|
|
||||||
// HOT PATH
|
|
||||||
// Peer found for destination
|
|
||||||
debug!("Found destination for {} => {}", dst, addr);
|
|
||||||
self.send_msg(addr, MESSAGE_TYPE_DATA, data)?;
|
|
||||||
if !self.peers.contains_key(&addr) {
|
|
||||||
// COLD PATH
|
|
||||||
// If the peer is not actually connected, remove the entry in the table and try
|
|
||||||
// to reconnect.
|
|
||||||
warn!("Destination for {} not found in peers: {}", dst, addr_nice(addr));
|
|
||||||
self.table.remove_claims(addr);
|
|
||||||
self.connect_sock(addr)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
// COLD PATH
|
|
||||||
if self.broadcast {
|
|
||||||
debug!("No destination for {} found, broadcasting", dst);
|
|
||||||
self.broadcast_msg(MESSAGE_TYPE_DATA, data)?;
|
|
||||||
} else {
|
|
||||||
debug!("No destination for {} found, dropping", dst);
|
|
||||||
self.traffic.count_dropped_payload(data.len());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn add_new_peer(&mut self, addr: SocketAddr, info: NodeInfo) -> Result<(), Error> {
|
|
||||||
info!("Added peer {}", addr_nice(addr));
|
|
||||||
self.config.call_hook(
|
|
||||||
"peer_connected",
|
|
||||||
vec![
|
|
||||||
("PEER", format!("{:?}", addr_nice(addr))),
|
|
||||||
("IFNAME", self.device.ifname().to_owned()),
|
|
||||||
("CLAIMS", info.claims.iter().map(|r| format!("{:?}", r)).collect::<Vec<String>>().join(" ")),
|
|
||||||
("NODE_ID", bytes_to_hex(&info.node_id)),
|
|
||||||
],
|
|
||||||
true
|
|
||||||
);
|
|
||||||
if let Some(init) = self.pending_inits.remove(&addr) {
|
|
||||||
self.peers.insert(addr, PeerData {
|
|
||||||
addrs: info.addrs.clone(),
|
|
||||||
crypto: init,
|
|
||||||
node_id: info.node_id,
|
|
||||||
peer_timeout: info.peer_timeout.unwrap_or(DEFAULT_PEER_TIMEOUT),
|
|
||||||
last_seen: TS::now(),
|
|
||||||
timeout: TS::now() + self.config.peer_timeout as Time
|
|
||||||
});
|
|
||||||
self.update_peer_info(addr, Some(info))?;
|
|
||||||
} else {
|
|
||||||
error!("No init for new peer {}", addr_nice(addr));
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn remove_peer(&mut self, addr: SocketAddr) {
|
|
||||||
if let Some(peer) = self.peers.remove(&addr) {
|
|
||||||
info!("Closing connection to {}", addr_nice(addr));
|
|
||||||
self.table.remove_claims(addr);
|
|
||||||
self.config.call_hook(
|
|
||||||
"peer_disconnected",
|
|
||||||
vec![
|
|
||||||
("PEER", format!("{:?}", addr)),
|
|
||||||
("IFNAME", self.device.ifname().to_owned()),
|
|
||||||
("NODE_ID", bytes_to_hex(&peer.node_id)),
|
|
||||||
],
|
|
||||||
true
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn connect_to_peers(&mut self, peers: &[PeerInfo]) -> Result<(), Error> {
|
|
||||||
'outer: for peer in peers {
|
|
||||||
for addr in &peer.addrs {
|
|
||||||
if self.peers.contains_key(addr) {
|
|
||||||
continue 'outer
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if let Some(node_id) = peer.node_id {
|
|
||||||
if self.node_id == node_id {
|
|
||||||
continue 'outer
|
|
||||||
}
|
|
||||||
for p in self.peers.values() {
|
|
||||||
if p.node_id == node_id {
|
|
||||||
continue 'outer
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
self.connect(&peer.addrs as &[SocketAddr])?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn update_peer_info(&mut self, addr: SocketAddr, info: Option<NodeInfo>) -> Result<(), Error> {
|
|
||||||
if let Some(peer) = self.peers.get_mut(&addr) {
|
|
||||||
peer.last_seen = TS::now();
|
|
||||||
peer.timeout = TS::now() + self.config.peer_timeout as Time
|
|
||||||
} else {
|
|
||||||
error!("Received peer update from non peer {}", addr_nice(addr));
|
|
||||||
return Ok(())
|
|
||||||
}
|
|
||||||
if let Some(info) = info {
|
|
||||||
debug!("Adding claims of peer {}: {:?}", addr_nice(addr), info.claims);
|
|
||||||
self.table.set_claims(addr, info.claims);
|
|
||||||
debug!("Received {} peers from {}: {:?}", info.peers.len(), addr_nice(addr), info.peers);
|
|
||||||
self.connect_to_peers(&info.peers)?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_payload_from(&mut self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
|
|
||||||
// HOT PATH
|
|
||||||
let (src, dst) = P::parse(data.message())?;
|
|
||||||
let len = data.len();
|
|
||||||
debug!("Writing data to device: {} bytes", len);
|
|
||||||
self.traffic.count_in_payload(src, dst, len);
|
|
||||||
if let Err(e) = self.device.write(data) {
|
|
||||||
error!("Failed to send via device: {}", e);
|
|
||||||
return Err(e)
|
|
||||||
}
|
|
||||||
if self.learning {
|
|
||||||
// Learn single address
|
|
||||||
self.table.cache(src, peer);
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_message(
|
|
||||||
&mut self, src: SocketAddr, msg_result: MessageResult<NodeInfo>, data: &mut MsgBuffer
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
// HOT PATH
|
|
||||||
match msg_result {
|
|
||||||
MessageResult::Message(type_) => {
|
|
||||||
// HOT PATH
|
|
||||||
match type_ {
|
|
||||||
MESSAGE_TYPE_DATA => {
|
|
||||||
// HOT PATH
|
|
||||||
self.handle_payload_from(src, data)?
|
|
||||||
}
|
|
||||||
MESSAGE_TYPE_NODE_INFO => {
|
|
||||||
// COLD PATH
|
|
||||||
let info = match NodeInfo::decode(Cursor::new(data.message())) {
|
|
||||||
Ok(val) => val,
|
|
||||||
Err(err) => {
|
|
||||||
self.traffic.count_invalid_protocol(data.len());
|
|
||||||
return Err(err)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
self.update_peer_info(src, Some(info))?
|
|
||||||
}
|
|
||||||
MESSAGE_TYPE_KEEPALIVE => {
|
|
||||||
// COLD PATH
|
|
||||||
self.update_peer_info(src, None)?
|
|
||||||
}
|
|
||||||
MESSAGE_TYPE_CLOSE => {
|
|
||||||
// COLD PATH
|
|
||||||
self.remove_peer(src)
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
// COLD PATH
|
|
||||||
self.traffic.count_invalid_protocol(data.len());
|
|
||||||
return Err(Error::Message("Unknown message type"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
MessageResult::Initialized(info) => {
|
|
||||||
// COLD PATH
|
|
||||||
self.add_new_peer(src, info)?
|
|
||||||
}
|
|
||||||
MessageResult::InitializedWithReply(info) => {
|
|
||||||
// COLD PATH
|
|
||||||
self.add_new_peer(src, info)?;
|
|
||||||
self.send_to(src, data)?
|
|
||||||
}
|
|
||||||
MessageResult::Reply => {
|
|
||||||
// COLD PATH
|
|
||||||
self.send_to(src, data)?
|
|
||||||
}
|
|
||||||
MessageResult::None => {
|
|
||||||
// COLD PATH
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn handle_net_message(&mut self, src: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
|
|
||||||
// HOT PATH
|
|
||||||
let src = mapped_addr(src);
|
|
||||||
debug!("Received {} bytes from {}", data.len(), src);
|
|
||||||
let msg_result = if let Some(init) = self.pending_inits.get_mut(&src) {
|
|
||||||
// COLD PATH
|
|
||||||
init.handle_message(data)
|
|
||||||
} else if is_init_message(data.message()) {
|
|
||||||
// COLD PATH
|
|
||||||
let mut result = None;
|
|
||||||
if let Some(peer) = self.peers.get_mut(&src) {
|
|
||||||
if peer.crypto.has_init() {
|
|
||||||
result = Some(peer.crypto.handle_message(data))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if let Some(result) = result {
|
|
||||||
result
|
|
||||||
} else {
|
|
||||||
let mut init = self.crypto.peer_instance(self.create_node_info());
|
|
||||||
let msg_result = init.handle_message(data);
|
|
||||||
match msg_result {
|
|
||||||
Ok(res) => {
|
|
||||||
self.config.call_hook(
|
|
||||||
"peer_connecting",
|
|
||||||
vec![
|
|
||||||
("PEER", format!("{:?}", addr_nice(src))),
|
|
||||||
("IFNAME", self.device.ifname().to_owned()),
|
|
||||||
],
|
|
||||||
true
|
|
||||||
);
|
|
||||||
self.pending_inits.insert(src, init);
|
|
||||||
Ok(res)
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
self.traffic.count_invalid_protocol(data.len());
|
|
||||||
return Err(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if let Some(peer) = self.peers.get_mut(&src) {
|
|
||||||
// HOT PATH
|
|
||||||
peer.crypto.handle_message(data)
|
|
||||||
} else {
|
|
||||||
// COLD PATH
|
|
||||||
info!("Ignoring non-init message from unknown peer {}", addr_nice(src));
|
|
||||||
self.traffic.count_invalid_protocol(data.len());
|
|
||||||
return Ok(())
|
|
||||||
};
|
|
||||||
// HOT PATH
|
|
||||||
match msg_result {
|
|
||||||
Ok(val) => {
|
|
||||||
// HOT PATH
|
|
||||||
self.handle_message(src, val, data)
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
// COLD PATH
|
|
||||||
self.traffic.count_invalid_protocol(data.len());
|
|
||||||
Err(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn initialize(&mut self) {
|
|
||||||
if let Err(err) = self.reset_own_addresses() {
|
|
||||||
error!("Failed to obtain local addresses: {}", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_socket_event(&mut self, buffer: &mut MsgBuffer) {
|
|
||||||
// HOT PATH
|
|
||||||
let src = try_fail!(self.socket.receive(buffer), "Failed to read from network socket: {}");
|
|
||||||
self.traffic.count_in_traffic(src, buffer.len());
|
|
||||||
match self.handle_net_message(src, buffer) {
|
|
||||||
Err(e @ Error::CryptoInitFatal(_)) => {
|
|
||||||
// COLD PATH
|
|
||||||
debug!("Fatal crypto init error from {}: {}", src, e);
|
|
||||||
info!("Closing pending connection to {} due to error in crypto init", addr_nice(src));
|
|
||||||
self.pending_inits.remove(&src);
|
|
||||||
self.config.call_hook(
|
|
||||||
"peer_disconnected",
|
|
||||||
vec![("PEER", format!("{:?}", addr_nice(src))), ("IFNAME", self.device.ifname().to_owned())],
|
|
||||||
true
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(e @ Error::CryptoInit(_)) => {
|
|
||||||
// COLD PATH
|
|
||||||
debug!("Recoverable init error from {}: {}", src, e);
|
|
||||||
info!("Ignoring invalid init message from peer {}", addr_nice(src));
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
// COLD PATH
|
|
||||||
error!("{}", e);
|
|
||||||
}
|
|
||||||
Ok(_) => {} // HOT PATH
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_device_event(&mut self, buffer: &mut MsgBuffer) {
|
|
||||||
// HOT PATH
|
|
||||||
try_fail!(self.device.read(buffer), "Failed to read from device: {}");
|
|
||||||
if let Err(e) = self.handle_interface_data(buffer) {
|
|
||||||
error!("{}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The main method of the node
|
|
||||||
///
|
|
||||||
/// This method will use epoll to wait in the sockets and the device at the same time.
|
|
||||||
/// It will read from the sockets, decode and decrypt the message and then call the
|
|
||||||
/// `handle_net_message` method. It will also read from the device and call
|
|
||||||
/// `handle_interface_data` for each packet read.
|
|
||||||
/// Also, this method will call `housekeep` every second.
|
|
||||||
pub fn run(&mut self) {
|
pub fn run(&mut self) {
|
||||||
let ctrlc = CtrlC::new();
|
let ctrlc = CtrlC::new();
|
||||||
let waiter = try_fail!(WaitImpl::new(self.socket.as_raw_fd(), self.device.as_raw_fd(), 1000), "Failed to setup poll: {}");
|
let waiter = try_fail!(WaitImpl::new(self.socket.as_raw_fd(), self.device.as_raw_fd(), 1000), "Failed to setup poll: {}");
|
||||||
|
@ -932,8 +305,8 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
|
||||||
poll_error = true;
|
poll_error = true;
|
||||||
}
|
}
|
||||||
WaitResult::Timeout => {}
|
WaitResult::Timeout => {}
|
||||||
WaitResult::Socket => self.handle_socket_event(&mut buffer),
|
WaitResult::Socket => unimplemented!(),
|
||||||
WaitResult::Device => self.handle_device_event(&mut buffer)
|
WaitResult::Device => unimplemented!()
|
||||||
}
|
}
|
||||||
if self.next_housekeep < TS::now() {
|
if self.next_housekeep < TS::now() {
|
||||||
// COLD PATH
|
// COLD PATH
|
||||||
|
@ -941,9 +314,6 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
|
||||||
if ctrlc.was_pressed() {
|
if ctrlc.was_pressed() {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if let Err(e) = self.housekeep() {
|
|
||||||
error!("{}", e)
|
|
||||||
}
|
|
||||||
self.next_housekeep = TS::now() + 1
|
self.next_housekeep = TS::now() + 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -980,16 +350,16 @@ impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
|
||||||
|
|
||||||
pub fn trigger_socket_event(&mut self) {
|
pub fn trigger_socket_event(&mut self) {
|
||||||
let mut buffer = MsgBuffer::new(SPACE_BEFORE);
|
let mut buffer = MsgBuffer::new(SPACE_BEFORE);
|
||||||
self.handle_socket_event(&mut buffer);
|
unimplemented!()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn trigger_device_event(&mut self) {
|
pub fn trigger_device_event(&mut self) {
|
||||||
let mut buffer = MsgBuffer::new(SPACE_BEFORE);
|
let mut buffer = MsgBuffer::new(SPACE_BEFORE);
|
||||||
self.handle_device_event(&mut buffer);
|
unimplemented!()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn trigger_housekeep(&mut self) {
|
pub fn trigger_housekeep(&mut self) {
|
||||||
assert!(self.housekeep().is_ok())
|
unimplemented!()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_connected(&self, addr: &SocketAddr) -> bool {
|
pub fn is_connected(&self, addr: &SocketAddr) -> bool {
|
||||||
|
|
|
@ -1,15 +1,20 @@
|
||||||
use crate::error::Error;
|
|
||||||
use crate::{
|
use crate::{
|
||||||
crypto::CryptoCore,
|
crypto::CryptoCore,
|
||||||
engine::{Hash, PeerData, TimeSource},
|
engine::{Hash, PeerData, TimeSource},
|
||||||
|
error::Error,
|
||||||
messages::NodeInfo,
|
messages::NodeInfo,
|
||||||
table::ClaimTable,
|
table::ClaimTable,
|
||||||
traffic::TrafficStats,
|
traffic::{TrafficStats, TrafficEntry},
|
||||||
types::{Address, NodeId, RangeList},
|
types::{Address, NodeId, RangeList},
|
||||||
util::MsgBuffer
|
util::MsgBuffer
|
||||||
};
|
};
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
io::{self, Write},
|
||||||
|
net::SocketAddr,
|
||||||
|
sync::Arc
|
||||||
|
};
|
||||||
|
|
||||||
pub struct SharedPeerCrypto {
|
pub struct SharedPeerCrypto {
|
||||||
peers: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>
|
peers: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>
|
||||||
|
@ -26,10 +31,12 @@ impl SharedPeerCrypto {
|
||||||
None => Err(Error::InvalidCryptoState("No crypto found for peer")),
|
None => Err(Error::InvalidCryptoState("No crypto found for peer")),
|
||||||
Some(None) => Ok(()),
|
Some(None) => Ok(()),
|
||||||
Some(Some(crypto)) => Ok(crypto.encrypt(data))
|
Some(Some(crypto)) => Ok(crypto.encrypt(data))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn for_each(&mut self, mut callback: impl FnMut(SocketAddr, Option<Arc<CryptoCore>>) -> Result<(), Error>) -> Result<(), Error> {
|
pub fn for_each(
|
||||||
|
&mut self, mut callback: impl FnMut(SocketAddr, Option<Arc<CryptoCore>>) -> Result<(), Error>
|
||||||
|
) -> Result<(), Error> {
|
||||||
let mut peers = self.peers.lock();
|
let mut peers = self.peers.lock();
|
||||||
for (k, v) in peers.iter_mut() {
|
for (k, v) in peers.iter_mut() {
|
||||||
callback(*k, v.clone())?
|
callback(*k, v.clone())?
|
||||||
|
@ -75,6 +82,26 @@ impl SharedTraffic {
|
||||||
pub fn count_invalid_protocol(&self, bytes: usize) {
|
pub fn count_invalid_protocol(&self, bytes: usize) {
|
||||||
self.traffic.lock().count_invalid_protocol(bytes);
|
self.traffic.lock().count_invalid_protocol(bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn period(&mut self, cleanup_idle: Option<usize>) {
|
||||||
|
self.traffic.lock().period(cleanup_idle)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write_out<W: Write>(&self, out: &mut W) -> Result<(), io::Error> {
|
||||||
|
self.traffic.lock().write_out(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn total_peer_traffic(&self) -> TrafficEntry {
|
||||||
|
self.traffic.lock().total_peer_traffic()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn total_payload_traffic(&self) -> TrafficEntry {
|
||||||
|
self.traffic.lock().total_payload_traffic()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn dropped(&self) -> TrafficEntry {
|
||||||
|
self.traffic.lock().dropped.clone()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -87,19 +114,35 @@ impl<TS: TimeSource> SharedTable<TS> {
|
||||||
// TODO sync if needed
|
// TODO sync if needed
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn lookup(&self, addr: Address) -> Option<SocketAddr> {
|
pub fn lookup(&mut self, addr: Address) -> Option<SocketAddr> {
|
||||||
self.table.lock().lookup(addr)
|
self.table.lock().lookup(addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_claims(&self, peer: SocketAddr, claims: RangeList) {
|
pub fn set_claims(&mut self, peer: SocketAddr, claims: RangeList) {
|
||||||
self.table.lock().set_claims(peer, claims)
|
self.table.lock().set_claims(peer, claims)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn remove_claims(&self, peer: SocketAddr) {
|
pub fn remove_claims(&mut self, peer: SocketAddr) {
|
||||||
self.table.lock().remove_claims(peer)
|
self.table.lock().remove_claims(peer)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn cache(&self, addr: Address, peer: SocketAddr) {
|
pub fn cache(&mut self, addr: Address, peer: SocketAddr) {
|
||||||
self.table.lock().cache(addr, peer)
|
self.table.lock().cache(addr, peer)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn housekeep(&mut self) {
|
||||||
|
self.table.lock().housekeep()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write_out<W: Write>(&self, out: &mut W) -> Result<(), io::Error> {
|
||||||
|
self.table.lock().write_out(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn cache_len(&self) -> usize {
|
||||||
|
self.table.lock().cache_len()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn claim_len(&self) -> usize {
|
||||||
|
self.table.lock().claim_len()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,26 +4,48 @@ use super::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
config::DEFAULT_PEER_TIMEOUT,
|
beacon::BeaconSerializer,
|
||||||
crypto::{is_init_message, MessageResult, PeerCrypto, InitState, InitResult},
|
config::{DEFAULT_PEER_TIMEOUT, DEFAULT_PORT},
|
||||||
|
crypto::{is_init_message, InitResult, InitState, MessageResult},
|
||||||
engine::{addr_nice, resolve, Hash, PeerData},
|
engine::{addr_nice, resolve, Hash, PeerData},
|
||||||
error::Error,
|
error::Error,
|
||||||
messages::{AddrList, NodeInfo, PeerInfo},
|
messages::{AddrList, NodeInfo, PeerInfo, MESSAGE_TYPE_NODE_INFO},
|
||||||
net::{mapped_addr, Socket},
|
net::{mapped_addr, Socket},
|
||||||
|
port_forwarding::PortForwarding,
|
||||||
types::{NodeId, RangeList},
|
types::{NodeId, RangeList},
|
||||||
util::{MsgBuffer, Time, TimeSource},
|
util::{MsgBuffer, StatsdMsg, Time, TimeSource},
|
||||||
Config, Crypto, Device, Protocol
|
Config, Crypto, Device, Protocol
|
||||||
};
|
};
|
||||||
use rand::{seq::SliceRandom};
|
use rand::{random, seq::SliceRandom, thread_rng};
|
||||||
use smallvec::{smallvec, SmallVec};
|
use smallvec::{smallvec, SmallVec};
|
||||||
use std::{
|
use std::{
|
||||||
|
cmp::{max, min},
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
fmt,
|
fmt,
|
||||||
io::Cursor,
|
fs::File,
|
||||||
|
io,
|
||||||
|
io::{Write, Cursor, Seek, SeekFrom},
|
||||||
marker::PhantomData,
|
marker::PhantomData,
|
||||||
net::{SocketAddr, ToSocketAddrs},
|
net::{SocketAddr, ToSocketAddrs}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
const MAX_RECONNECT_INTERVAL: u16 = 3600;
|
||||||
|
const RESOLVE_INTERVAL: Time = 300;
|
||||||
|
const OWN_ADDRESS_RESET_INTERVAL: Time = 300;
|
||||||
|
pub const STATS_INTERVAL: Time = 60;
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct ReconnectEntry {
|
||||||
|
address: Option<(String, Time)>,
|
||||||
|
resolved: AddrList,
|
||||||
|
tries: u16,
|
||||||
|
timeout: u16,
|
||||||
|
next: Time,
|
||||||
|
final_timeout: Option<Time>
|
||||||
|
}
|
||||||
|
|
||||||
pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
|
pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
|
||||||
// Read-only fields
|
// Read-only fields
|
||||||
node_id: NodeId,
|
node_id: NodeId,
|
||||||
|
@ -31,6 +53,7 @@ pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
|
||||||
config: Config,
|
config: Config,
|
||||||
peer_timeout_publish: u16,
|
peer_timeout_publish: u16,
|
||||||
learning: bool,
|
learning: bool,
|
||||||
|
update_freq: u16,
|
||||||
_dummy_ts: PhantomData<TS>,
|
_dummy_ts: PhantomData<TS>,
|
||||||
_dummy_p: PhantomData<P>,
|
_dummy_p: PhantomData<P>,
|
||||||
// Socket-only fields
|
// Socket-only fields
|
||||||
|
@ -38,12 +61,23 @@ pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
|
||||||
device: D,
|
device: D,
|
||||||
next_housekeep: Time,
|
next_housekeep: Time,
|
||||||
own_addresses: AddrList,
|
own_addresses: AddrList,
|
||||||
|
next_own_address_reset: Time,
|
||||||
pending_inits: HashMap<SocketAddr, InitState<NodeInfo>, Hash>,
|
pending_inits: HashMap<SocketAddr, InitState<NodeInfo>, Hash>,
|
||||||
crypto: Crypto,
|
crypto: Crypto,
|
||||||
peers: HashMap<SocketAddr, PeerData, Hash>,
|
peers: HashMap<SocketAddr, PeerData, Hash>,
|
||||||
|
next_peers: Time,
|
||||||
|
next_stats_out: Time,
|
||||||
|
next_beacon: Time,
|
||||||
|
beacon_serializer: BeaconSerializer<TS>,
|
||||||
|
stats_file: Option<File>,
|
||||||
|
statsd_server: Option<String>,
|
||||||
|
reconnect_peers: SmallVec<[ReconnectEntry; 3]>,
|
||||||
// Shared fields
|
// Shared fields
|
||||||
|
peer_crypto: SharedPeerCrypto,
|
||||||
traffic: SharedTraffic,
|
traffic: SharedTraffic,
|
||||||
table: SharedTable<TS>
|
table: SharedTable<TS>,
|
||||||
|
// Should not be here
|
||||||
|
port_forwarding: Option<PortForwarding> // TODO: 3rd thread
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS> {
|
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS> {
|
||||||
|
@ -58,6 +92,26 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn broadcast_msg(&mut self, type_: u8, msg: &mut MsgBuffer) -> Result<(), Error> {
|
||||||
|
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, msg.len(), self.peers.len());
|
||||||
|
let mut msg_data = MsgBuffer::new(100);
|
||||||
|
for (addr, peer) in &mut self.peers {
|
||||||
|
msg_data.set_start(msg.get_start());
|
||||||
|
msg_data.set_length(msg.len());
|
||||||
|
msg_data.message_mut().clone_from_slice(msg.message());
|
||||||
|
msg_data.prepend_byte(type_);
|
||||||
|
peer.crypto.encrypt_message(&mut msg_data);
|
||||||
|
self.traffic.count_out_traffic(*addr, msg_data.len());
|
||||||
|
match self.socket.send(msg_data.message(), *addr) {
|
||||||
|
Ok(written) if written == msg_data.len() => Ok(()),
|
||||||
|
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
||||||
|
Err(e) => Err(Error::SocketIo("IOError when sending", e))
|
||||||
|
}?
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn connect_sock(&mut self, addr: SocketAddr) -> Result<(), Error> {
|
fn connect_sock(&mut self, addr: SocketAddr) -> Result<(), Error> {
|
||||||
let addr = mapped_addr(addr);
|
let addr = mapped_addr(addr);
|
||||||
if self.peers.contains_key(&addr)
|
if self.peers.contains_key(&addr)
|
||||||
|
@ -133,7 +187,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
info!("Added peer {}", addr_nice(addr));
|
info!("Added peer {}", addr_nice(addr));
|
||||||
if let Some(init) = self.pending_inits.remove(&addr) {
|
if let Some(init) = self.pending_inits.remove(&addr) {
|
||||||
msg.clear();
|
msg.clear();
|
||||||
let crypto = init.finish(&mut msg);
|
let crypto = init.finish(msg);
|
||||||
self.peers.insert(addr, PeerData {
|
self.peers.insert(addr, PeerData {
|
||||||
addrs: info.addrs.clone(),
|
addrs: info.addrs.clone(),
|
||||||
crypto,
|
crypto,
|
||||||
|
@ -231,12 +285,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
fn handle_message(&mut self, src: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
|
fn handle_message(&mut self, src: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
|
||||||
let src = mapped_addr(src);
|
let src = mapped_addr(src);
|
||||||
debug!("Received {} bytes from {}", data.len(), src);
|
debug!("Received {} bytes from {}", data.len(), src);
|
||||||
if let Some(result) = self.peers.get_mut(&src).map(|peer| {
|
if let Some(result) = self.peers.get_mut(&src).map(|peer| peer.crypto.handle_message(data)) {
|
||||||
peer.crypto.handle_message(data)
|
|
||||||
}) {
|
|
||||||
return self.process_message(src, result?, data)
|
return self.process_message(src, result?, data)
|
||||||
}
|
}
|
||||||
let is_init = is_init_message(data.message());
|
let is_init = is_init_message(data.message());
|
||||||
if let Some(result) = self.pending_inits.get_mut(&src).map(|init| {
|
if let Some(result) = self.pending_inits.get_mut(&src).map(|init| {
|
||||||
if is_init {
|
if is_init {
|
||||||
init.handle_init(data)
|
init.handle_init(data)
|
||||||
|
@ -251,10 +303,8 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
if !data.is_empty() {
|
if !data.is_empty() {
|
||||||
self.send_to(src, data)?
|
self.send_to(src, data)?
|
||||||
}
|
}
|
||||||
},
|
|
||||||
InitResult::Success { peer_payload, is_initiator } => {
|
|
||||||
self.add_new_peer(src, peer_payload, data)?
|
|
||||||
}
|
}
|
||||||
|
InitResult::Success { peer_payload, .. } => self.add_new_peer(src, peer_payload, data)?
|
||||||
}
|
}
|
||||||
return Ok(())
|
return Ok(())
|
||||||
}
|
}
|
||||||
|
@ -266,7 +316,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
let mut init = self.crypto.peer_instance(self.create_node_info());
|
let mut init = self.crypto.peer_instance(self.create_node_info());
|
||||||
let msg_result = init.handle_init(data);
|
let msg_result = init.handle_init(data);
|
||||||
match msg_result {
|
match msg_result {
|
||||||
Ok(res) => {
|
Ok(_) => {
|
||||||
self.pending_inits.insert(src, init);
|
self.pending_inits.insert(src, init);
|
||||||
self.send_to(src, data)
|
self.send_to(src, data)
|
||||||
}
|
}
|
||||||
|
@ -278,24 +328,290 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
}
|
}
|
||||||
|
|
||||||
fn housekeep(&mut self) -> Result<(), Error> {
|
fn housekeep(&mut self) -> Result<(), Error> {
|
||||||
// self.shared.sync();
|
let now = TS::now();
|
||||||
// * = can be in different thread, ** only with caching/sync
|
let mut buffer = MsgBuffer::new(SPACE_BEFORE);
|
||||||
//TODO: peers: timeout **
|
let mut del: SmallVec<[SocketAddr; 3]> = SmallVec::new();
|
||||||
//TODO: table: timeout **
|
for (&addr, ref data) in &self.peers {
|
||||||
//TODO: rotate crypto keys
|
if data.timeout < now {
|
||||||
//TODO: time out pending inits
|
del.push(addr);
|
||||||
//TODO: extend port forwarding *
|
}
|
||||||
//TODO: reset own address **
|
}
|
||||||
//TODO: send peer lists **
|
for addr in del {
|
||||||
//TODO: reconnect to peers **
|
info!("Forgot peer {} due to timeout", addr_nice(addr));
|
||||||
//TODO: write to statsd **
|
self.peers.remove(&addr);
|
||||||
//TODO: write to stats file **
|
self.table.remove_claims(addr);
|
||||||
//TODO: read beacon **
|
self.connect_sock(addr)?; // Try to reconnect
|
||||||
//TODO: write beacon **
|
}
|
||||||
// TODO: sync
|
self.table.housekeep();
|
||||||
|
self.crypto_housekeep()?;
|
||||||
|
// Periodically extend the port-forwarding
|
||||||
|
if let Some(ref mut pfw) = self.port_forwarding {
|
||||||
|
pfw.check_extend();
|
||||||
|
}
|
||||||
|
let now = TS::now();
|
||||||
|
// Periodically reset own peers
|
||||||
|
if self.next_own_address_reset <= now {
|
||||||
|
self.reset_own_addresses().map_err(|err| Error::SocketIo("Failed to get own addresses", err))?;
|
||||||
|
self.next_own_address_reset = now + OWN_ADDRESS_RESET_INTERVAL;
|
||||||
|
}
|
||||||
|
// Periodically send peer list to peers
|
||||||
|
if self.next_peers <= now {
|
||||||
|
debug!("Send peer list to all peers");
|
||||||
|
let info = self.create_node_info();
|
||||||
|
info.encode(&mut buffer);
|
||||||
|
self.broadcast_msg(MESSAGE_TYPE_NODE_INFO, &mut buffer)?;
|
||||||
|
// Reschedule for next update
|
||||||
|
let min_peer_timeout = self.peers.iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT);
|
||||||
|
let interval = min(self.update_freq as u16, max(min_peer_timeout / 2 - 60, 1));
|
||||||
|
self.next_peers = now + Time::from(interval);
|
||||||
|
}
|
||||||
|
self.reconnect_to_peers()?;
|
||||||
|
if self.next_stats_out < now {
|
||||||
|
// Write out the statistics
|
||||||
|
self.write_out_stats().map_err(|err| Error::FileIo("Failed to write stats file", err))?;
|
||||||
|
self.send_stats_to_statsd()?;
|
||||||
|
self.next_stats_out = now + STATS_INTERVAL;
|
||||||
|
self.traffic.period(Some(5));
|
||||||
|
}
|
||||||
|
if let Some(peers) = self.beacon_serializer.get_cmd_results() {
|
||||||
|
debug!("Loaded beacon with peers: {:?}", peers);
|
||||||
|
for peer in peers {
|
||||||
|
self.connect_sock(peer)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if self.next_beacon < now {
|
||||||
|
self.store_beacon()?;
|
||||||
|
self.load_beacon()?;
|
||||||
|
self.next_beacon = now + Time::from(self.config.beacon_interval);
|
||||||
|
}
|
||||||
|
// TODO: sync peer_crypto
|
||||||
|
self.table.sync();
|
||||||
|
self.traffic.sync();
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn crypto_housekeep(&mut self) -> Result<(), Error> {
|
||||||
|
let mut msg = MsgBuffer::new(SPACE_BEFORE);
|
||||||
|
let mut del: SmallVec<[SocketAddr; 4]> = smallvec![];
|
||||||
|
for addr in self.pending_inits.keys().copied().collect::<SmallVec<[SocketAddr; 4]>>() {
|
||||||
|
msg.clear();
|
||||||
|
if self.pending_inits.get_mut(&addr).unwrap().every_second(&mut msg).is_err() {
|
||||||
|
del.push(addr)
|
||||||
|
} else if !msg.is_empty() {
|
||||||
|
self.send_to(addr, &mut msg)?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for addr in self.peers.keys().copied().collect::<SmallVec<[SocketAddr; 16]>>() {
|
||||||
|
msg.clear();
|
||||||
|
self.peers.get_mut(&addr).unwrap().crypto.every_second(&mut msg);
|
||||||
|
if !msg.is_empty() {
|
||||||
|
self.send_to(addr, &mut msg)?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for addr in del {
|
||||||
|
self.pending_inits.remove(&addr);
|
||||||
|
if self.peers.remove(&addr).is_some() {
|
||||||
|
self.connect_sock(addr)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reset_own_addresses(&mut self) -> io::Result<()> {
|
||||||
|
self.own_addresses.clear();
|
||||||
|
self.own_addresses.push(self.socket.address().map(mapped_addr)?);
|
||||||
|
if let Some(ref pfw) = self.port_forwarding {
|
||||||
|
self.own_addresses.push(pfw.get_internal_ip().into());
|
||||||
|
self.own_addresses.push(pfw.get_external_ip().into());
|
||||||
|
}
|
||||||
|
debug!("Own addresses: {:?}", self.own_addresses);
|
||||||
|
// TODO: detect address changes and call event
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stores the beacon
|
||||||
|
fn store_beacon(&mut self) -> Result<(), Error> {
|
||||||
|
if let Some(ref path) = self.config.beacon_store {
|
||||||
|
let peers: SmallVec<[SocketAddr; 3]> =
|
||||||
|
self.own_addresses.choose_multiple(&mut thread_rng(), 3).cloned().collect();
|
||||||
|
if let Some(path) = path.strip_prefix('|') {
|
||||||
|
self.beacon_serializer
|
||||||
|
.write_to_cmd(&peers, path)
|
||||||
|
.map_err(|e| Error::BeaconIo("Failed to call beacon command", e))?;
|
||||||
|
} else {
|
||||||
|
self.beacon_serializer
|
||||||
|
.write_to_file(&peers, &path)
|
||||||
|
.map_err(|e| Error::BeaconIo("Failed to write beacon to file", e))?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Loads the beacon
|
||||||
|
fn load_beacon(&mut self) -> Result<(), Error> {
|
||||||
|
let peers;
|
||||||
|
if let Some(ref path) = self.config.beacon_load {
|
||||||
|
if let Some(path) = path.strip_prefix('|') {
|
||||||
|
self.beacon_serializer
|
||||||
|
.read_from_cmd(path, Some(50))
|
||||||
|
.map_err(|e| Error::BeaconIo("Failed to call beacon command", e))?;
|
||||||
|
return Ok(())
|
||||||
|
} else {
|
||||||
|
peers = self
|
||||||
|
.beacon_serializer
|
||||||
|
.read_from_file(&path, Some(50))
|
||||||
|
.map_err(|e| Error::BeaconIo("Failed to read beacon from file", e))?;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Ok(())
|
||||||
|
}
|
||||||
|
debug!("Loaded beacon with peers: {:?}", peers);
|
||||||
|
for peer in peers {
|
||||||
|
self.connect_sock(peer)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Writes out the statistics to a file
|
||||||
|
fn write_out_stats(&mut self) -> Result<(), io::Error> {
|
||||||
|
if let Some(ref mut f) = self.stats_file {
|
||||||
|
debug!("Writing out stats");
|
||||||
|
f.seek(SeekFrom::Start(0))?;
|
||||||
|
f.set_len(0)?;
|
||||||
|
writeln!(f, "peers:")?;
|
||||||
|
let now = TS::now();
|
||||||
|
for (addr, data) in &self.peers {
|
||||||
|
writeln!(
|
||||||
|
f,
|
||||||
|
" - \"{}\": {{ ttl_secs: {}, crypto: {} }}",
|
||||||
|
addr_nice(*addr),
|
||||||
|
data.timeout - now,
|
||||||
|
data.crypto.algorithm_name()
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
writeln!(f)?;
|
||||||
|
self.table.write_out(f)?;
|
||||||
|
writeln!(f)?;
|
||||||
|
self.traffic.write_out(f)?;
|
||||||
|
writeln!(f)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sends the statistics to a statsd endpoint
|
||||||
|
fn send_stats_to_statsd(&mut self) -> Result<(), Error> {
|
||||||
|
if let Some(ref endpoint) = self.statsd_server {
|
||||||
|
let peer_traffic = self.traffic.total_peer_traffic();
|
||||||
|
let payload_traffic = self.traffic.total_payload_traffic();
|
||||||
|
let dropped = &self.traffic.dropped();
|
||||||
|
let prefix = self.config.statsd_prefix.as_ref().map(|s| s as &str).unwrap_or("vpncloud");
|
||||||
|
let msg = StatsdMsg::new()
|
||||||
|
.with_ns(prefix, |msg| {
|
||||||
|
msg.add("peer_count", self.peers.len(), "g");
|
||||||
|
msg.add("table_cache_entries", self.table.cache_len(), "g");
|
||||||
|
msg.add("table_claims", self.table.claim_len(), "g");
|
||||||
|
msg.with_ns("traffic", |msg| {
|
||||||
|
msg.with_ns("protocol", |msg| {
|
||||||
|
msg.with_ns("inbound", |msg| {
|
||||||
|
msg.add("bytes", peer_traffic.in_bytes, "c");
|
||||||
|
msg.add("packets", peer_traffic.in_packets, "c");
|
||||||
|
});
|
||||||
|
msg.with_ns("outbound", |msg| {
|
||||||
|
msg.add("bytes", peer_traffic.out_bytes, "c");
|
||||||
|
msg.add("packets", peer_traffic.out_packets, "c");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
msg.with_ns("payload", |msg| {
|
||||||
|
msg.with_ns("inbound", |msg| {
|
||||||
|
msg.add("bytes", payload_traffic.in_bytes, "c");
|
||||||
|
msg.add("packets", payload_traffic.in_packets, "c");
|
||||||
|
});
|
||||||
|
msg.with_ns("outbound", |msg| {
|
||||||
|
msg.add("bytes", payload_traffic.out_bytes, "c");
|
||||||
|
msg.add("packets", payload_traffic.out_packets, "c");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
msg.with_ns("invalid_protocol_traffic", |msg| {
|
||||||
|
msg.add("bytes", dropped.in_bytes, "c");
|
||||||
|
msg.add("packets", dropped.in_packets, "c");
|
||||||
|
});
|
||||||
|
msg.with_ns("dropped_payload", |msg| {
|
||||||
|
msg.add("bytes", dropped.out_bytes, "c");
|
||||||
|
msg.add("packets", dropped.out_packets, "c");
|
||||||
|
});
|
||||||
|
})
|
||||||
|
.build();
|
||||||
|
let msg_data = msg.as_bytes();
|
||||||
|
let addrs = resolve(endpoint)?;
|
||||||
|
if let Some(addr) = addrs.first() {
|
||||||
|
match self.socket.send(msg_data, *addr) {
|
||||||
|
Ok(written) if written == msg_data.len() => Ok(()),
|
||||||
|
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
||||||
|
Err(e) => Err(Error::SocketIo("IOError when sending", e))
|
||||||
|
}?
|
||||||
|
} else {
|
||||||
|
error!("Failed to resolve statsd server {}", endpoint);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reconnect_to_peers(&mut self) -> Result<(), Error> {
|
||||||
|
let now = TS::now();
|
||||||
|
// Connect to those reconnect_peers that are due
|
||||||
|
for entry in self.reconnect_peers.clone() {
|
||||||
|
if entry.next > now {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
self.connect(&entry.resolved as &[SocketAddr])?;
|
||||||
|
}
|
||||||
|
for entry in &mut self.reconnect_peers {
|
||||||
|
// Schedule for next second if node is connected
|
||||||
|
for addr in &entry.resolved {
|
||||||
|
if self.peers.contains_key(&addr) {
|
||||||
|
entry.tries = 0;
|
||||||
|
entry.timeout = 1;
|
||||||
|
entry.next = now + 1;
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Resolve entries anew
|
||||||
|
if let Some((ref address, ref mut next_resolve)) = entry.address {
|
||||||
|
if *next_resolve <= now {
|
||||||
|
match resolve(address as &str) {
|
||||||
|
Ok(addrs) => entry.resolved = addrs,
|
||||||
|
Err(_) => {
|
||||||
|
match resolve(&format!("{}:{}", address, DEFAULT_PORT)) {
|
||||||
|
Ok(addrs) => entry.resolved = addrs,
|
||||||
|
Err(err) => warn!("Failed to resolve {}: {}", address, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*next_resolve = now + RESOLVE_INTERVAL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Ignore if next attempt is already in the future
|
||||||
|
if entry.next > now {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Exponential back-off: every 10 tries, the interval doubles
|
||||||
|
entry.tries += 1;
|
||||||
|
if entry.tries > 10 {
|
||||||
|
entry.tries = 0;
|
||||||
|
entry.timeout *= 2;
|
||||||
|
}
|
||||||
|
// Maximum interval is one hour
|
||||||
|
if entry.timeout > MAX_RECONNECT_INTERVAL {
|
||||||
|
entry.timeout = MAX_RECONNECT_INTERVAL;
|
||||||
|
}
|
||||||
|
// Schedule next connection attempt
|
||||||
|
entry.next = now + Time::from(entry.timeout);
|
||||||
|
}
|
||||||
|
self.reconnect_peers.retain(|e| e.final_timeout.unwrap_or(now) >= now);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn run(mut self) {
|
pub fn run(mut self) {
|
||||||
let mut buffer = MsgBuffer::new(SPACE_BEFORE);
|
let mut buffer = MsgBuffer::new(SPACE_BEFORE);
|
||||||
loop {
|
loop {
|
||||||
|
@ -316,7 +632,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
Ok(_) => {}
|
Ok(_) => {}
|
||||||
}
|
}
|
||||||
let now = TS::now();
|
let now = TS::now();
|
||||||
if self.next_housekeep < TS::now() {
|
if self.next_housekeep < now {
|
||||||
if let Err(e) = self.housekeep() {
|
if let Err(e) = self.housekeep() {
|
||||||
error!("{}", e)
|
error!("{}", e)
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@ use super::{
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default, Clone)]
|
||||||
pub struct TrafficEntry {
|
pub struct TrafficEntry {
|
||||||
pub out_bytes_total: u64,
|
pub out_bytes_total: u64,
|
||||||
pub out_packets_total: usize,
|
pub out_packets_total: usize,
|
||||||
|
|
Loading…
Reference in New Issue