From dd168139f02a6ca71ff90575ae932c96e4efb8f8 Mon Sep 17 00:00:00 2001 From: Dennis Schwerdel Date: Mon, 15 Feb 2021 13:46:55 +0100 Subject: [PATCH] Compiles but does not work --- src/crypto/common.rs | 40 +-- src/crypto/core.rs | 10 +- src/engine/device_thread.rs | 6 +- src/engine/mod.rs | 642 +----------------------------------- src/engine/shared.rs | 61 +++- src/engine/socket_thread.rs | 382 +++++++++++++++++++-- src/traffic.rs | 2 +- 7 files changed, 437 insertions(+), 706 deletions(-) diff --git a/src/crypto/common.rs b/src/crypto/common.rs index c948ca2..e9e50cf 100644 --- a/src/crypto/common.rs +++ b/src/crypto/common.rs @@ -1,7 +1,7 @@ use super::{core::test_speed, rotate::RotationState}; pub use super::{ core::{CryptoCore, EXTRA_LEN, TAG_LEN}, - init::{is_init_message, INIT_MESSAGE_FIRST_BYTE, InitState, InitResult} + init::{is_init_message, InitResult, InitState, INIT_MESSAGE_FIRST_BYTE} }; use crate::{ error::Error, @@ -182,6 +182,14 @@ impl Crypto { Ok(keypair) } + pub fn public_key_from_private_key(privkey: &str) -> Result { + let privkey = from_base62(privkey).map_err(|_| Error::InvalidConfig("Failed to parse private key"))?; + let keypair = Ed25519KeyPair::from_seed_unchecked(&privkey) + .map_err(|_| Error::InvalidConfig("Key rejected by crypto library"))?; + let pubkey = to_base62(keypair.public_key().as_ref()); + Ok(pubkey) + } + fn parse_public_key(pubkey: &str) -> Result { let pubkey = from_base62(pubkey).map_err(|_| Error::InvalidConfig("Failed to parse public key"))?; if pubkey.len() != ED25519_PUBLIC_KEY_LEN { @@ -295,7 +303,7 @@ impl PeerCrypto { } } - pub fn every_second(&mut self, out: &mut MsgBuffer) -> MessageResult { + pub fn every_second(&mut self, out: &mut MsgBuffer) { out.clear(); if let PeerCrypto::Encrypted { core, rotation, rotate_counter, algorithm, .. } = self { core.every_second(); @@ -309,11 +317,9 @@ impl PeerCrypto { if !out.is_empty() { out.prepend_byte(MESSAGE_TYPE_ROTATION); self.encrypt_message(out); - return MessageResult::Reply } } } - MessageResult::None } } @@ -357,9 +363,9 @@ mod tests { assert_eq!(res, InitResult::Success { peer_payload: vec![], is_initiator: true }); assert!(msg.is_empty()); - let node1 = node1.finish(&mut msg); + let mut node1 = node1.finish(&mut msg); assert!(msg.is_empty()); - let node2 = node2.finish(&mut msg); + let mut node2 = node2.finish(&mut msg); assert!(msg.is_empty()); debug!("Node1 <- Node2"); @@ -377,21 +383,15 @@ mod tests { let res = node2.handle_message(&mut buffer).unwrap(); assert_eq!(res, MessageResult::Message(1)); - match node1.every_second(&mut msg) { - MessageResult::None => (), - MessageResult::Reply => { - let res = node2.handle_message(&mut msg).unwrap(); - assert_eq!(res, MessageResult::None); - } - other => assert_eq!(other, MessageResult::None) + node1.every_second(&mut msg); + if !msg.is_empty() { + let res = node2.handle_message(&mut msg).unwrap(); + assert_eq!(res, MessageResult::None); } - match node2.every_second(&mut msg) { - MessageResult::None => (), - MessageResult::Reply => { - let res = node1.handle_message(&mut msg).unwrap(); - assert_eq!(res, MessageResult::None); - } - other => assert_eq!(other, MessageResult::None) + node2.every_second(&mut msg); + if !msg.is_empty() { + let res = node1.handle_message(&mut msg).unwrap(); + assert_eq!(res, MessageResult::None); } } } diff --git a/src/crypto/core.rs b/src/crypto/core.rs index 1999256..1ffe826 100644 --- a/src/crypto/core.rs +++ b/src/crypto/core.rs @@ -258,7 +258,7 @@ pub fn create_dummy_pair(algo: &'static aead::Algorithm) -> (CryptoCore, CryptoC pub fn test_speed(algo: &'static aead::Algorithm, max_time: &Duration) -> f64 { let mut buffer = MsgBuffer::new(EXTRA_LEN); buffer.set_length(1000); - let (mut sender, mut receiver) = create_dummy_pair(algo); + let (sender, receiver) = create_dummy_pair(algo); let mut iterations = 0; let start = Instant::now(); while (Instant::now() - start).as_nanos() < max_time.as_nanos() { @@ -290,7 +290,7 @@ mod tests { } fn test_encrypt_decrypt(algo: &'static aead::Algorithm) { - let (mut sender, mut receiver) = create_dummy_pair(algo); + let (sender, receiver) = create_dummy_pair(algo); let plain = random_data(1000); let mut buffer = MsgBuffer::new(EXTRA_LEN); buffer.clone_from(&plain); @@ -318,7 +318,7 @@ mod tests { fn test_tampering(algo: &'static aead::Algorithm) { - let (mut sender, mut receiver) = create_dummy_pair(algo); + let (sender, receiver) = create_dummy_pair(algo); let plain = random_data(1000); let mut buffer = MsgBuffer::new(EXTRA_LEN); buffer.clone_from(&plain); @@ -358,7 +358,7 @@ mod tests { } fn test_nonce_pinning(algo: &'static aead::Algorithm) { - let (mut sender, mut receiver) = create_dummy_pair(algo); + let (sender, receiver) = create_dummy_pair(algo); let plain = random_data(1000); let mut buffer = MsgBuffer::new(EXTRA_LEN); buffer.clone_from(&plain); @@ -399,7 +399,7 @@ mod tests { } fn test_key_rotation(algo: &'static aead::Algorithm) { - let (mut sender, mut receiver) = create_dummy_pair(algo); + let (sender, receiver) = create_dummy_pair(algo); let plain = random_data(1000); let mut buffer = MsgBuffer::new(EXTRA_LEN); buffer.clone_from(&plain); diff --git a/src/engine/device_thread.rs b/src/engine/device_thread.rs index 07e4ced..ffb05db 100644 --- a/src/engine/device_thread.rs +++ b/src/engine/device_thread.rs @@ -51,6 +51,8 @@ impl DeviceThread Result<(), Error> { debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, msg.len(), self.peer_crypto.count()); let mut msg_data = MsgBuffer::new(100); + let traffic = &mut self.traffic; + let socket = &mut self.socket; self.peer_crypto.for_each(|addr, crypto| { msg_data.set_start(msg.get_start()); msg_data.set_length(msg.len()); @@ -59,8 +61,8 @@ impl DeviceThread Ok(()), Ok(_) => Err(Error::Socket("Sent out truncated packet")), Err(e) => Err(Error::SocketIo("IOError when sending", e)) diff --git a/src/engine/mod.rs b/src/engine/mod.rs index ae36975..c77821a 100644 --- a/src/engine/mod.rs +++ b/src/engine/mod.rs @@ -167,7 +167,6 @@ impl GenericCloud GenericCloud(&mut self, addr: Addr) -> Result<(), Error> { let addrs = resolve(&addr)?.into_iter().map(mapped_addr).collect::>(); for addr in &addrs { @@ -293,627 +284,9 @@ impl GenericCloud NodeInfo { - let mut peers = smallvec![]; - for peer in self.peers.values() { - peers.push(PeerInfo { node_id: Some(peer.node_id), addrs: peer.addrs.clone() }) - } - if peers.len() > 20 { - let mut rng = rand::thread_rng(); - peers.partial_shuffle(&mut rng, 20); - peers.truncate(20); - } - NodeInfo { - node_id: self.node_id, - peers, - claims: self.claims.clone(), - peer_timeout: Some(self.peer_timeout_publish), - addrs: self.own_addresses.clone() - } - } - - fn connect_sock(&mut self, addr: SocketAddr) -> Result<(), Error> { - let addr = mapped_addr(addr); - if self.peers.contains_key(&addr) - || self.own_addresses.contains(&addr) - || self.pending_inits.contains_key(&addr) - { - return Ok(()) - } - debug!("Connecting to {:?}", addr); - let payload = self.create_node_info(); - let mut peer_crypto = self.crypto.peer_instance(payload); - let mut msg = MsgBuffer::new(SPACE_BEFORE); - peer_crypto.send_ping(&mut msg); - self.pending_inits.insert(addr, peer_crypto); - self.send_to(addr, &mut msg) - } - - fn crypto_housekeep(&mut self) -> Result<(), Error> { - let mut msg = MsgBuffer::new(SPACE_BEFORE); - let mut del: SmallVec<[SocketAddr; 4]> = smallvec![]; - for addr in self.pending_inits.keys().copied().collect::>() { - msg.clear(); - match self.pending_inits.get_mut(&addr).unwrap().every_second(&mut msg) { - Err(_) => del.push(addr), - Ok(MessageResult::None) => (), - Ok(MessageResult::Reply) => self.send_to(addr, &mut msg)?, - Ok(_) => unreachable!() - } - } - for addr in self.peers.keys().copied().collect::>() { - msg.clear(); - match self.peers.get_mut(&addr).unwrap().crypto.every_second(&mut msg) { - Err(_) => del.push(addr), - Ok(MessageResult::None) => (), - Ok(MessageResult::Reply) => self.send_to(addr, &mut msg)?, - Ok(_) => unreachable!() - } - } - for addr in del { - self.pending_inits.remove(&addr); - if self.peers.remove(&addr).is_some() { - self.connect_sock(addr)?; - } - } - Ok(()) - } - - fn reconnect_to_peers(&mut self) -> Result<(), Error> { - let now = TS::now(); - // Connect to those reconnect_peers that are due - for entry in self.reconnect_peers.clone() { - if entry.next > now { - continue - } - self.connect(&entry.resolved as &[SocketAddr])?; - } - for entry in &mut self.reconnect_peers { - // Schedule for next second if node is connected - for addr in &entry.resolved { - if self.peers.contains_key(&addr) { - entry.tries = 0; - entry.timeout = 1; - entry.next = now + 1; - continue - } - } - // Resolve entries anew - if let Some((ref address, ref mut next_resolve)) = entry.address { - if *next_resolve <= now { - match resolve(address as &str) { - Ok(addrs) => entry.resolved = addrs, - Err(_) => { - match resolve(&format!("{}:{}", address, DEFAULT_PORT)) { - Ok(addrs) => entry.resolved = addrs, - Err(err) => warn!("Failed to resolve {}: {}", address, err) - } - } - } - *next_resolve = now + RESOLVE_INTERVAL; - } - } - // Ignore if next attempt is already in the future - if entry.next > now { - continue - } - // Exponential back-off: every 10 tries, the interval doubles - entry.tries += 1; - if entry.tries > 10 { - entry.tries = 0; - entry.timeout *= 2; - } - // Maximum interval is one hour - if entry.timeout > MAX_RECONNECT_INTERVAL { - entry.timeout = MAX_RECONNECT_INTERVAL; - } - // Schedule next connection attempt - entry.next = now + Time::from(entry.timeout); - } - self.reconnect_peers.retain(|e| e.final_timeout.unwrap_or(now) >= now); - Ok(()) - } - - fn housekeep(&mut self) -> Result<(), Error> { - let now = TS::now(); - let mut buffer = MsgBuffer::new(SPACE_BEFORE); - let mut del: SmallVec<[SocketAddr; 3]> = SmallVec::new(); - for (&addr, ref data) in &self.peers { - if data.timeout < now { - del.push(addr); - } - } - for addr in del { - info!("Forgot peer {} due to timeout", addr_nice(addr)); - self.peers.remove(&addr); - self.table.remove_claims(addr); - self.connect_sock(addr)?; // Try to reconnect - } - self.table.housekeep(); - self.crypto_housekeep()?; - // Periodically extend the port-forwarding - if let Some(ref mut pfw) = self.port_forwarding { - pfw.check_extend(); - } - let now = TS::now(); - // Periodically reset own peers - if self.next_own_address_reset <= now { - self.reset_own_addresses().map_err(|err| Error::SocketIo("Failed to get own addresses", err))?; - self.next_own_address_reset = now + OWN_ADDRESS_RESET_INTERVAL; - } - // Periodically send peer list to peers - if self.next_peers <= now { - debug!("Send peer list to all peers"); - let info = self.create_node_info(); - info.encode(&mut buffer); - self.broadcast_msg(MESSAGE_TYPE_NODE_INFO, &mut buffer)?; - // Reschedule for next update - let min_peer_timeout = self.peers.iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT); - let interval = min(self.update_freq as u16, max(min_peer_timeout / 2 - 60, 1)); - self.next_peers = now + Time::from(interval); - } - self.reconnect_to_peers()?; - if self.next_stats_out < now { - // Write out the statistics - self.write_out_stats().map_err(|err| Error::FileIo("Failed to write stats file", err))?; - self.send_stats_to_statsd()?; - self.next_stats_out = now + STATS_INTERVAL; - self.traffic.period(Some(5)); - } - if let Some(peers) = self.beacon_serializer.get_cmd_results() { - debug!("Loaded beacon with peers: {:?}", peers); - for peer in peers { - self.connect_sock(peer)?; - } - } - if self.next_beacon < now { - self.store_beacon()?; - self.load_beacon()?; - self.next_beacon = now + Time::from(self.config.beacon_interval); - } - Ok(()) - } - - /// Stores the beacon - fn store_beacon(&mut self) -> Result<(), Error> { - if let Some(ref path) = self.config.beacon_store { - let peers: SmallVec<[SocketAddr; 3]> = - self.own_addresses.choose_multiple(&mut thread_rng(), 3).cloned().collect(); - if let Some(path) = path.strip_prefix('|') { - self.beacon_serializer - .write_to_cmd(&peers, path) - .map_err(|e| Error::BeaconIo("Failed to call beacon command", e))?; - } else { - self.beacon_serializer - .write_to_file(&peers, &path) - .map_err(|e| Error::BeaconIo("Failed to write beacon to file", e))?; - } - } - Ok(()) - } - - /// Loads the beacon - fn load_beacon(&mut self) -> Result<(), Error> { - let peers; - if let Some(ref path) = self.config.beacon_load { - if let Some(path) = path.strip_prefix('|') { - self.beacon_serializer - .read_from_cmd(path, Some(50)) - .map_err(|e| Error::BeaconIo("Failed to call beacon command", e))?; - return Ok(()) - } else { - peers = self - .beacon_serializer - .read_from_file(&path, Some(50)) - .map_err(|e| Error::BeaconIo("Failed to read beacon from file", e))?; - } - } else { - return Ok(()) - } - debug!("Loaded beacon with peers: {:?}", peers); - for peer in peers { - self.connect_sock(peer)?; - } - Ok(()) - } - - /// Writes out the statistics to a file - fn write_out_stats(&mut self) -> Result<(), io::Error> { - if let Some(ref mut f) = self.stats_file { - debug!("Writing out stats"); - f.seek(SeekFrom::Start(0))?; - f.set_len(0)?; - writeln!(f, "peers:")?; - let now = TS::now(); - for (addr, data) in &self.peers { - writeln!( - f, - " - \"{}\": {{ ttl_secs: {}, crypto: {} }}", - addr_nice(*addr), - data.timeout - now, - data.crypto.algorithm_name() - )?; - } - writeln!(f)?; - self.table.write_out(f)?; - writeln!(f)?; - self.traffic.write_out(f)?; - writeln!(f)?; - } - Ok(()) - } - - /// Sends the statistics to a statsd endpoint - fn send_stats_to_statsd(&mut self) -> Result<(), Error> { - if let Some(ref endpoint) = self.statsd_server { - let peer_traffic = self.traffic.total_peer_traffic(); - let payload_traffic = self.traffic.total_payload_traffic(); - let dropped = &self.traffic.dropped; - let prefix = self.config.statsd_prefix.as_ref().map(|s| s as &str).unwrap_or("vpncloud"); - let msg = StatsdMsg::new() - .with_ns(prefix, |msg| { - msg.add("peer_count", self.peers.len(), "g"); - msg.add("table_cache_entries", self.table.cache_len(), "g"); - msg.add("table_claims", self.table.claim_len(), "g"); - msg.with_ns("traffic", |msg| { - msg.with_ns("protocol", |msg| { - msg.with_ns("inbound", |msg| { - msg.add("bytes", peer_traffic.in_bytes, "c"); - msg.add("packets", peer_traffic.in_packets, "c"); - }); - msg.with_ns("outbound", |msg| { - msg.add("bytes", peer_traffic.out_bytes, "c"); - msg.add("packets", peer_traffic.out_packets, "c"); - }); - }); - msg.with_ns("payload", |msg| { - msg.with_ns("inbound", |msg| { - msg.add("bytes", payload_traffic.in_bytes, "c"); - msg.add("packets", payload_traffic.in_packets, "c"); - }); - msg.with_ns("outbound", |msg| { - msg.add("bytes", payload_traffic.out_bytes, "c"); - msg.add("packets", payload_traffic.out_packets, "c"); - }); - }); - }); - msg.with_ns("invalid_protocol_traffic", |msg| { - msg.add("bytes", dropped.in_bytes, "c"); - msg.add("packets", dropped.in_packets, "c"); - }); - msg.with_ns("dropped_payload", |msg| { - msg.add("bytes", dropped.out_bytes, "c"); - msg.add("packets", dropped.out_packets, "c"); - }); - }) - .build(); - let msg_data = msg.as_bytes(); - let addrs = resolve(endpoint)?; - if let Some(addr) = addrs.first() { - match self.socket.send(msg_data, *addr) { - Ok(written) if written == msg_data.len() => Ok(()), - Ok(_) => Err(Error::Socket("Sent out truncated packet")), - Err(e) => Err(Error::SocketIo("IOError when sending", e)) - }? - } else { - error!("Failed to resolve statsd server {}", endpoint); - } - } - Ok(()) - } - - pub fn handle_interface_data(&mut self, data: &mut MsgBuffer) -> Result<(), Error> { - // HOT PATH - let (src, dst) = P::parse(data.message())?; - debug!("Read data from interface: src: {}, dst: {}, {} bytes", src, dst, data.len()); - self.traffic.count_out_payload(dst, src, data.len()); - match self.table.lookup(dst) { - Some(addr) => { - // HOT PATH - // Peer found for destination - debug!("Found destination for {} => {}", dst, addr); - self.send_msg(addr, MESSAGE_TYPE_DATA, data)?; - if !self.peers.contains_key(&addr) { - // COLD PATH - // If the peer is not actually connected, remove the entry in the table and try - // to reconnect. - warn!("Destination for {} not found in peers: {}", dst, addr_nice(addr)); - self.table.remove_claims(addr); - self.connect_sock(addr)?; - } - } - None => { - // COLD PATH - if self.broadcast { - debug!("No destination for {} found, broadcasting", dst); - self.broadcast_msg(MESSAGE_TYPE_DATA, data)?; - } else { - debug!("No destination for {} found, dropping", dst); - self.traffic.count_dropped_payload(data.len()); - } - } - } - Ok(()) - } - - fn add_new_peer(&mut self, addr: SocketAddr, info: NodeInfo) -> Result<(), Error> { - info!("Added peer {}", addr_nice(addr)); - self.config.call_hook( - "peer_connected", - vec![ - ("PEER", format!("{:?}", addr_nice(addr))), - ("IFNAME", self.device.ifname().to_owned()), - ("CLAIMS", info.claims.iter().map(|r| format!("{:?}", r)).collect::>().join(" ")), - ("NODE_ID", bytes_to_hex(&info.node_id)), - ], - true - ); - if let Some(init) = self.pending_inits.remove(&addr) { - self.peers.insert(addr, PeerData { - addrs: info.addrs.clone(), - crypto: init, - node_id: info.node_id, - peer_timeout: info.peer_timeout.unwrap_or(DEFAULT_PEER_TIMEOUT), - last_seen: TS::now(), - timeout: TS::now() + self.config.peer_timeout as Time - }); - self.update_peer_info(addr, Some(info))?; - } else { - error!("No init for new peer {}", addr_nice(addr)); - } - Ok(()) - } - - fn remove_peer(&mut self, addr: SocketAddr) { - if let Some(peer) = self.peers.remove(&addr) { - info!("Closing connection to {}", addr_nice(addr)); - self.table.remove_claims(addr); - self.config.call_hook( - "peer_disconnected", - vec![ - ("PEER", format!("{:?}", addr)), - ("IFNAME", self.device.ifname().to_owned()), - ("NODE_ID", bytes_to_hex(&peer.node_id)), - ], - true - ); - } - } - - fn connect_to_peers(&mut self, peers: &[PeerInfo]) -> Result<(), Error> { - 'outer: for peer in peers { - for addr in &peer.addrs { - if self.peers.contains_key(addr) { - continue 'outer - } - } - if let Some(node_id) = peer.node_id { - if self.node_id == node_id { - continue 'outer - } - for p in self.peers.values() { - if p.node_id == node_id { - continue 'outer - } - } - } - self.connect(&peer.addrs as &[SocketAddr])?; - } - Ok(()) - } - - fn update_peer_info(&mut self, addr: SocketAddr, info: Option) -> Result<(), Error> { - if let Some(peer) = self.peers.get_mut(&addr) { - peer.last_seen = TS::now(); - peer.timeout = TS::now() + self.config.peer_timeout as Time - } else { - error!("Received peer update from non peer {}", addr_nice(addr)); - return Ok(()) - } - if let Some(info) = info { - debug!("Adding claims of peer {}: {:?}", addr_nice(addr), info.claims); - self.table.set_claims(addr, info.claims); - debug!("Received {} peers from {}: {:?}", info.peers.len(), addr_nice(addr), info.peers); - self.connect_to_peers(&info.peers)?; - } - Ok(()) - } - - fn handle_payload_from(&mut self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> { - // HOT PATH - let (src, dst) = P::parse(data.message())?; - let len = data.len(); - debug!("Writing data to device: {} bytes", len); - self.traffic.count_in_payload(src, dst, len); - if let Err(e) = self.device.write(data) { - error!("Failed to send via device: {}", e); - return Err(e) - } - if self.learning { - // Learn single address - self.table.cache(src, peer); - } - Ok(()) - } - - fn handle_message( - &mut self, src: SocketAddr, msg_result: MessageResult, data: &mut MsgBuffer - ) -> Result<(), Error> { - // HOT PATH - match msg_result { - MessageResult::Message(type_) => { - // HOT PATH - match type_ { - MESSAGE_TYPE_DATA => { - // HOT PATH - self.handle_payload_from(src, data)? - } - MESSAGE_TYPE_NODE_INFO => { - // COLD PATH - let info = match NodeInfo::decode(Cursor::new(data.message())) { - Ok(val) => val, - Err(err) => { - self.traffic.count_invalid_protocol(data.len()); - return Err(err) - } - }; - self.update_peer_info(src, Some(info))? - } - MESSAGE_TYPE_KEEPALIVE => { - // COLD PATH - self.update_peer_info(src, None)? - } - MESSAGE_TYPE_CLOSE => { - // COLD PATH - self.remove_peer(src) - } - _ => { - // COLD PATH - self.traffic.count_invalid_protocol(data.len()); - return Err(Error::Message("Unknown message type")) - } - } - } - MessageResult::Initialized(info) => { - // COLD PATH - self.add_new_peer(src, info)? - } - MessageResult::InitializedWithReply(info) => { - // COLD PATH - self.add_new_peer(src, info)?; - self.send_to(src, data)? - } - MessageResult::Reply => { - // COLD PATH - self.send_to(src, data)? - } - MessageResult::None => { - // COLD PATH - } - } - Ok(()) - } - - pub fn handle_net_message(&mut self, src: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> { - // HOT PATH - let src = mapped_addr(src); - debug!("Received {} bytes from {}", data.len(), src); - let msg_result = if let Some(init) = self.pending_inits.get_mut(&src) { - // COLD PATH - init.handle_message(data) - } else if is_init_message(data.message()) { - // COLD PATH - let mut result = None; - if let Some(peer) = self.peers.get_mut(&src) { - if peer.crypto.has_init() { - result = Some(peer.crypto.handle_message(data)) - } - } - if let Some(result) = result { - result - } else { - let mut init = self.crypto.peer_instance(self.create_node_info()); - let msg_result = init.handle_message(data); - match msg_result { - Ok(res) => { - self.config.call_hook( - "peer_connecting", - vec![ - ("PEER", format!("{:?}", addr_nice(src))), - ("IFNAME", self.device.ifname().to_owned()), - ], - true - ); - self.pending_inits.insert(src, init); - Ok(res) - } - Err(err) => { - self.traffic.count_invalid_protocol(data.len()); - return Err(err) - } - } - } - } else if let Some(peer) = self.peers.get_mut(&src) { - // HOT PATH - peer.crypto.handle_message(data) - } else { - // COLD PATH - info!("Ignoring non-init message from unknown peer {}", addr_nice(src)); - self.traffic.count_invalid_protocol(data.len()); - return Ok(()) - }; - // HOT PATH - match msg_result { - Ok(val) => { - // HOT PATH - self.handle_message(src, val, data) - }, - Err(err) => { - // COLD PATH - self.traffic.count_invalid_protocol(data.len()); - Err(err) - } - } - } - - fn initialize(&mut self) { - if let Err(err) = self.reset_own_addresses() { - error!("Failed to obtain local addresses: {}", err) - } - } - - fn handle_socket_event(&mut self, buffer: &mut MsgBuffer) { - // HOT PATH - let src = try_fail!(self.socket.receive(buffer), "Failed to read from network socket: {}"); - self.traffic.count_in_traffic(src, buffer.len()); - match self.handle_net_message(src, buffer) { - Err(e @ Error::CryptoInitFatal(_)) => { - // COLD PATH - debug!("Fatal crypto init error from {}: {}", src, e); - info!("Closing pending connection to {} due to error in crypto init", addr_nice(src)); - self.pending_inits.remove(&src); - self.config.call_hook( - "peer_disconnected", - vec![("PEER", format!("{:?}", addr_nice(src))), ("IFNAME", self.device.ifname().to_owned())], - true - ); - } - Err(e @ Error::CryptoInit(_)) => { - // COLD PATH - debug!("Recoverable init error from {}: {}", src, e); - info!("Ignoring invalid init message from peer {}", addr_nice(src)); - } - Err(e) => { - // COLD PATH - error!("{}", e); - } - Ok(_) => {} // HOT PATH - } - } - - fn handle_device_event(&mut self, buffer: &mut MsgBuffer) { - // HOT PATH - try_fail!(self.device.read(buffer), "Failed to read from device: {}"); - if let Err(e) = self.handle_interface_data(buffer) { - error!("{}", e); - } - } - - /// The main method of the node - /// - /// This method will use epoll to wait in the sockets and the device at the same time. - /// It will read from the sockets, decode and decrypt the message and then call the - /// `handle_net_message` method. It will also read from the device and call - /// `handle_interface_data` for each packet read. - /// Also, this method will call `housekeep` every second. pub fn run(&mut self) { let ctrlc = CtrlC::new(); let waiter = try_fail!(WaitImpl::new(self.socket.as_raw_fd(), self.device.as_raw_fd(), 1000), "Failed to setup poll: {}"); @@ -932,8 +305,8 @@ impl GenericCloud {} - WaitResult::Socket => self.handle_socket_event(&mut buffer), - WaitResult::Device => self.handle_device_event(&mut buffer) + WaitResult::Socket => unimplemented!(), + WaitResult::Device => unimplemented!() } if self.next_housekeep < TS::now() { // COLD PATH @@ -941,9 +314,6 @@ impl GenericCloud GenericCloud { pub fn trigger_socket_event(&mut self) { let mut buffer = MsgBuffer::new(SPACE_BEFORE); - self.handle_socket_event(&mut buffer); + unimplemented!() } pub fn trigger_device_event(&mut self) { let mut buffer = MsgBuffer::new(SPACE_BEFORE); - self.handle_device_event(&mut buffer); + unimplemented!() } pub fn trigger_housekeep(&mut self) { - assert!(self.housekeep().is_ok()) + unimplemented!() } pub fn is_connected(&self, addr: &SocketAddr) -> bool { diff --git a/src/engine/shared.rs b/src/engine/shared.rs index 74e1b6d..e07781a 100644 --- a/src/engine/shared.rs +++ b/src/engine/shared.rs @@ -1,15 +1,20 @@ -use crate::error::Error; use crate::{ crypto::CryptoCore, engine::{Hash, PeerData, TimeSource}, + error::Error, messages::NodeInfo, table::ClaimTable, - traffic::TrafficStats, + traffic::{TrafficStats, TrafficEntry}, types::{Address, NodeId, RangeList}, util::MsgBuffer }; use parking_lot::Mutex; -use std::{collections::HashMap, net::SocketAddr, sync::Arc}; +use std::{ + collections::HashMap, + io::{self, Write}, + net::SocketAddr, + sync::Arc +}; pub struct SharedPeerCrypto { peers: Arc>, Hash>>> @@ -26,10 +31,12 @@ impl SharedPeerCrypto { None => Err(Error::InvalidCryptoState("No crypto found for peer")), Some(None) => Ok(()), Some(Some(crypto)) => Ok(crypto.encrypt(data)) - } + } } - pub fn for_each(&mut self, mut callback: impl FnMut(SocketAddr, Option>) -> Result<(), Error>) -> Result<(), Error> { + pub fn for_each( + &mut self, mut callback: impl FnMut(SocketAddr, Option>) -> Result<(), Error> + ) -> Result<(), Error> { let mut peers = self.peers.lock(); for (k, v) in peers.iter_mut() { callback(*k, v.clone())? @@ -75,6 +82,26 @@ impl SharedTraffic { pub fn count_invalid_protocol(&self, bytes: usize) { self.traffic.lock().count_invalid_protocol(bytes); } + + pub fn period(&mut self, cleanup_idle: Option) { + self.traffic.lock().period(cleanup_idle) + } + + pub fn write_out(&self, out: &mut W) -> Result<(), io::Error> { + self.traffic.lock().write_out(out) + } + + pub fn total_peer_traffic(&self) -> TrafficEntry { + self.traffic.lock().total_peer_traffic() + } + + pub fn total_payload_traffic(&self) -> TrafficEntry { + self.traffic.lock().total_payload_traffic() + } + + pub fn dropped(&self) -> TrafficEntry { + self.traffic.lock().dropped.clone() + } } @@ -87,19 +114,35 @@ impl SharedTable { // TODO sync if needed } - pub fn lookup(&self, addr: Address) -> Option { + pub fn lookup(&mut self, addr: Address) -> Option { self.table.lock().lookup(addr) } - pub fn set_claims(&self, peer: SocketAddr, claims: RangeList) { + pub fn set_claims(&mut self, peer: SocketAddr, claims: RangeList) { self.table.lock().set_claims(peer, claims) } - pub fn remove_claims(&self, peer: SocketAddr) { + pub fn remove_claims(&mut self, peer: SocketAddr) { self.table.lock().remove_claims(peer) } - pub fn cache(&self, addr: Address, peer: SocketAddr) { + pub fn cache(&mut self, addr: Address, peer: SocketAddr) { self.table.lock().cache(addr, peer) } + + pub fn housekeep(&mut self) { + self.table.lock().housekeep() + } + + pub fn write_out(&self, out: &mut W) -> Result<(), io::Error> { + self.table.lock().write_out(out) + } + + pub fn cache_len(&self) -> usize { + self.table.lock().cache_len() + } + + pub fn claim_len(&self) -> usize { + self.table.lock().claim_len() + } } diff --git a/src/engine/socket_thread.rs b/src/engine/socket_thread.rs index 87900ad..1a3f9ea 100644 --- a/src/engine/socket_thread.rs +++ b/src/engine/socket_thread.rs @@ -4,26 +4,48 @@ use super::{ }; use crate::{ - config::DEFAULT_PEER_TIMEOUT, - crypto::{is_init_message, MessageResult, PeerCrypto, InitState, InitResult}, + beacon::BeaconSerializer, + config::{DEFAULT_PEER_TIMEOUT, DEFAULT_PORT}, + crypto::{is_init_message, InitResult, InitState, MessageResult}, engine::{addr_nice, resolve, Hash, PeerData}, error::Error, - messages::{AddrList, NodeInfo, PeerInfo}, + messages::{AddrList, NodeInfo, PeerInfo, MESSAGE_TYPE_NODE_INFO}, net::{mapped_addr, Socket}, + port_forwarding::PortForwarding, types::{NodeId, RangeList}, - util::{MsgBuffer, Time, TimeSource}, + util::{MsgBuffer, StatsdMsg, Time, TimeSource}, Config, Crypto, Device, Protocol }; -use rand::{seq::SliceRandom}; +use rand::{random, seq::SliceRandom, thread_rng}; use smallvec::{smallvec, SmallVec}; use std::{ + cmp::{max, min}, collections::HashMap, fmt, - io::Cursor, + fs::File, + io, + io::{Write, Cursor, Seek, SeekFrom}, marker::PhantomData, - net::{SocketAddr, ToSocketAddrs}, + net::{SocketAddr, ToSocketAddrs} }; + +const MAX_RECONNECT_INTERVAL: u16 = 3600; +const RESOLVE_INTERVAL: Time = 300; +const OWN_ADDRESS_RESET_INTERVAL: Time = 300; +pub const STATS_INTERVAL: Time = 60; + + +#[derive(Clone)] +pub struct ReconnectEntry { + address: Option<(String, Time)>, + resolved: AddrList, + tries: u16, + timeout: u16, + next: Time, + final_timeout: Option