From 2ea70ca1cd53fd3c64df0c3f9496bde302df2993 Mon Sep 17 00:00:00 2001 From: Dennis Schwerdel Date: Tue, 9 Mar 2021 08:42:39 +0000 Subject: [PATCH] First tests work --- src/crypto/common.rs | 7 +++++ src/engine/device_thread.rs | 2 +- src/engine/mod.rs | 17 +++++------ src/engine/shared.rs | 18 ++++++++---- src/engine/socket_thread.rs | 58 +++++++++++++++++++++++++------------ src/tests/common.rs | 18 ++++++------ src/tests/nat.rs | 20 ++++++------- src/tests/payload.rs | 28 +++++++++--------- src/tests/peers.rs | 16 +++++----- 9 files changed, 109 insertions(+), 75 deletions(-) diff --git a/src/crypto/common.rs b/src/crypto/common.rs index d884953..374a1d9 100644 --- a/src/crypto/common.rs +++ b/src/crypto/common.rs @@ -242,6 +242,13 @@ impl PeerCrypto { } } + pub fn get_core(&self) -> Option> { + match self { + PeerCrypto::Encrypted { core, .. } => Some(core.clone()), + PeerCrypto::Unencrypted { .. } => None + } + } + fn handle_init_message(&mut self, buffer: &mut MsgBuffer) -> Result { // TODO: parse message stage // TODO: depending on stage resend last message diff --git a/src/engine/device_thread.rs b/src/engine/device_thread.rs index 89134a7..c3a61d4 100644 --- a/src/engine/device_thread.rs +++ b/src/engine/device_thread.rs @@ -120,7 +120,7 @@ impl DeviceThread Result<(), Error> { - self.peer_crypto.sync(); + self.peer_crypto.load(); self.table.sync(); self.traffic.sync(); Ok(()) diff --git a/src/engine/mod.rs b/src/engine/mod.rs index 5225764..a1145cd 100644 --- a/src/engine/mod.rs +++ b/src/engine/mod.rs @@ -34,7 +34,7 @@ pub type Hash = BuildHasherDefault; pub const STATS_INTERVAL: Time = 60; const SPACE_BEFORE: usize = 100; -struct PeerData { +pub struct PeerData { addrs: AddrList, #[allow(dead_code)] // TODO: export in status last_seen: Time, @@ -122,8 +122,8 @@ impl GenericCloud { &mut self.device_thread.device } - pub fn connect(&mut self, addr: SocketAddr) -> Result<(), Error> { - unimplemented!() + pub async fn connect(&mut self, addr: SocketAddr) -> Result<(), Error> { + self.socket_thread.connect(addr).await } pub async fn trigger_socket_event(&mut self) { @@ -140,17 +140,14 @@ impl GenericCloud { } pub fn is_connected(&self, addr: &SocketAddr) -> bool { - unimplemented!() - // self.peers.contains_key(addr) + self.socket_thread.peers.contains_key(addr) } pub fn own_addresses(&self) -> &[SocketAddr] { - unimplemented!() - //&self.own_addresses + &self.socket_thread.own_addresses } - pub fn get_num(&self) -> usize { - unimplemented!() - // self.socket.address().unwrap().port() as usize + pub async fn get_num(&self) -> usize { + self.socket_thread.socket.address().await.unwrap().port() as usize } } diff --git a/src/engine/shared.rs b/src/engine/shared.rs index 683d9b0..2a97b42 100644 --- a/src/engine/shared.rs +++ b/src/engine/shared.rs @@ -16,6 +16,8 @@ use std::{ sync::Arc }; +use super::PeerData; + #[derive(Clone)] pub struct SharedPeerCrypto { peers: Arc>, Hash>>> @@ -26,11 +28,7 @@ impl SharedPeerCrypto { SharedPeerCrypto { peers: Arc::new(Mutex::new(HashMap::default())) } } - pub fn sync(&mut self) { - // TODO sync if needed - } - - pub fn encrypt_for(&mut self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> { + pub fn encrypt_for(&self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> { let mut peers = self.peers.lock(); match peers.get_mut(&peer) { None => Err(Error::InvalidCryptoState("No crypto found for peer")), @@ -42,6 +40,16 @@ impl SharedPeerCrypto { } } + pub fn store(&self, data: &HashMap) { + let mut peers = self.peers.lock(); + peers.clear(); + peers.extend(data.iter().map(|(k, v)| (*k, v.crypto.get_core()))); + } + + pub fn load(&mut self) { + // TODO sync if needed + } + pub fn get_snapshot(&self) -> HashMap>, Hash> { self.peers.lock().clone() } diff --git a/src/engine/socket_thread.rs b/src/engine/socket_thread.rs index 10dde03..0135e7d 100644 --- a/src/engine/socket_thread.rs +++ b/src/engine/socket_thread.rs @@ -64,11 +64,11 @@ pub struct SocketThread { pub socket: S, device: D, next_housekeep: Time, - own_addresses: AddrList, + pub own_addresses: AddrList, next_own_address_reset: Time, pending_inits: HashMap, Hash>, crypto: Crypto, - peers: HashMap, + pub peers: HashMap, next_peers: Time, next_stats_out: Time, next_beacon: Time, @@ -111,7 +111,6 @@ impl SocketThread SocketThread Ok(()), + Ok(written) if written == size => { + self.buffer.clear(); + Ok(()) + } Ok(_) => Err(Error::Socket("Sent out truncated packet")), Err(e) => Err(Error::SocketIo("IOError when sending", e)), } @@ -174,6 +176,7 @@ impl SocketThread Err(Error::SocketIo("IOError when sending", e)), }? } + self.buffer.clear(); Ok(()) } @@ -267,6 +270,7 @@ impl SocketThread SocketThread SocketThread SocketThread { + self.update_peer_info(src, None).await?; + self.buffer.clear(); + } + MESSAGE_TYPE_CLOSE => { + self.remove_peer(src); + self.buffer.clear(); } - MESSAGE_TYPE_KEEPALIVE => self.update_peer_info(src, None).await?, - MESSAGE_TYPE_CLOSE => self.remove_peer(src), _ => { self.traffic.count_invalid_protocol(self.buffer.len()); + self.buffer.clear(); return Err(Error::Message("Unknown message type")); } }, MessageResult::Reply => self.send_to(src).await?, - MessageResult::None => (), + MessageResult::None => { + self.buffer.clear(); + } } Ok(()) } @@ -362,19 +378,18 @@ impl SocketThread { - if !buffer.is_empty() { - self.send_to(src).await? - } - } - InitResult::Success { peer_payload, .. } => self.add_new_peer(src, peer_payload).await?, + if !buffer.is_empty() { + self.send_to(src).await? + } + if let InitResult::Success { peer_payload, .. } = result? { + self.add_new_peer(src, peer_payload).await? } return Ok(()); } if !is_init_message(self.buffer.message()) { info!("Ignoring non-init message from unknown peer {}", addr_nice(src)); self.traffic.count_invalid_protocol(self.buffer.len()); + self.buffer.clear(); return Ok(()); } let mut init = self.crypto.peer_instance(self.create_node_info()); @@ -447,10 +462,11 @@ impl SocketThread Result<(), Error> { @@ -673,23 +689,28 @@ impl SocketThread { debug!("Fatal crypto init error from {}: {}", src, e); info!("Closing pending connection to {} due to error in crypto init", addr_nice(src)); self.pending_inits.remove(&src); + self.buffer.clear(); } Err(e @ Error::CryptoInit(_)) => { debug!("Recoverable init error from {}: {}", src, e); info!("Ignoring invalid init message from peer {}", addr_nice(src)); + self.buffer.clear(); } Err(e) => { error!("{}", e); + self.buffer.clear(); } Ok(_) => {} } + debug_assert!(self.buffer.is_empty()); } let now = TS::now(); if self.next_housekeep < now { @@ -698,6 +719,7 @@ impl SocketThread Simulator

{ } #[allow(dead_code)] - pub fn get_node(&mut self, addr: SocketAddr) -> &mut TestNode

{ + pub async fn get_node(&mut self, addr: SocketAddr) -> &mut TestNode

{ let node = self.nodes.get_mut(&addr).unwrap(); - DebugLogger::set_node(node.get_num()); + DebugLogger::set_node(node.get_num().await); node } @@ -109,7 +109,7 @@ impl Simulator

{ if let Some((src, dst, data)) = self.messages.pop_front() { if let Some(node) = self.nodes.get_mut(&dst) { if node.socket().put_inbound(src, data) { - DebugLogger::set_node(node.get_num()); + DebugLogger::set_node(node.get_num().await); node.trigger_socket_event().await; DebugLogger::set_node(0); let sock = node.socket(); @@ -132,7 +132,7 @@ impl Simulator

{ pub async fn trigger_node_housekeep(&mut self, addr: SocketAddr) { let node = self.nodes.get_mut(&addr).unwrap(); - DebugLogger::set_node(node.get_num()); + DebugLogger::set_node(node.get_num().await); node.trigger_housekeep().await; DebugLogger::set_node(0); let sock = node.socket(); @@ -143,7 +143,7 @@ impl Simulator

{ pub async fn trigger_housekeep(&mut self) { for (src, node) in &mut self.nodes { - DebugLogger::set_node(node.get_num()); + DebugLogger::set_node(node.get_num().await); node.trigger_housekeep().await; DebugLogger::set_node(0); let sock = node.socket(); @@ -167,10 +167,10 @@ impl Simulator

{ } } - pub fn connect(&mut self, src: SocketAddr, dst: SocketAddr) { + pub async fn connect(&mut self, src: SocketAddr, dst: SocketAddr) { let node = self.nodes.get_mut(&src).unwrap(); - DebugLogger::set_node(node.get_num()); - node.connect(dst).unwrap(); + DebugLogger::set_node(node.get_num().await); + node.connect(dst).await.unwrap(); DebugLogger::set_node(0); let sock = node.socket(); while let Some((dst, data)) = sock.pop_outbound() { @@ -195,7 +195,7 @@ impl Simulator

{ pub async fn put_payload(&mut self, addr: SocketAddr, data: Vec) { let node = self.nodes.get_mut(&addr).unwrap(); node.device().put_inbound(data); - DebugLogger::set_node(node.get_num()); + DebugLogger::set_node(node.get_num().await); node.trigger_device_event().await; DebugLogger::set_node(0); let sock = node.socket(); diff --git a/src/tests/nat.rs b/src/tests/nat.rs index 6537bf7..62a1452 100644 --- a/src/tests/nat.rs +++ b/src/tests/nat.rs @@ -11,8 +11,8 @@ async fn connect_nat_2_peers() { let node1 = sim.add_node(true, &config).await; let node2 = sim.add_node(true, &config).await; - sim.connect(node1, node2); - sim.connect(node2, node1); + sim.connect(node1, node2).await; + sim.connect(node2, node1).await; sim.simulate_time(60).await; @@ -28,10 +28,10 @@ async fn connect_nat_3_peers() { let node2 = sim.add_node(true, &config).await; let node3 = sim.add_node(true, &config).await; - sim.connect(node1, node2); - sim.connect(node2, node1); - sim.connect(node1, node3); - sim.connect(node3, node1); + sim.connect(node1, node2).await; + sim.connect(node2, node1).await; + sim.connect(node1, node3).await; + sim.connect(node3, node1).await; sim.simulate_time(300).await; assert!(sim.is_connected(node1, node2)); @@ -50,10 +50,10 @@ async fn nat_keepalive() { let node2 = sim.add_node(true, &config).await; let node3 = sim.add_node(true, &config).await; - sim.connect(node1, node2); - sim.connect(node2, node1); - sim.connect(node1, node3); - sim.connect(node3, node1); + sim.connect(node1, node2).await; + sim.connect(node2, node1).await; + sim.connect(node1, node3).await; + sim.connect(node3, node1).await; sim.simulate_time(1000).await; assert!(sim.is_connected(node1, node2)); diff --git a/src/tests/payload.rs b/src/tests/payload.rs index 72ee7d6..073e94b 100644 --- a/src/tests/payload.rs +++ b/src/tests/payload.rs @@ -11,7 +11,7 @@ async fn switch_delivers() { let node1 = sim.add_node(false, &config).await; let node2 = sim.add_node(false, &config).await; - sim.connect(node1, node2); + sim.connect(node1, node2).await; sim.simulate_all_messages().await; assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); @@ -32,9 +32,9 @@ async fn switch_learns() { let node2 = sim.add_node(false, &config).await; let node3 = sim.add_node(false, &config).await; - sim.connect(node1, node2); - sim.connect(node1, node3); - sim.connect(node2, node3); + sim.connect(node1, node2).await; + sim.connect(node1, node3).await; + sim.connect(node2, node3).await; sim.simulate_all_messages().await; assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); @@ -72,9 +72,9 @@ async fn switch_honours_vlans() { let node2 = sim.add_node(false, &config).await; let node3 = sim.add_node(false, &config).await; - sim.connect(node1, node2); - sim.connect(node1, node3); - sim.connect(node2, node3); + sim.connect(node1, node2).await; + sim.connect(node1, node3).await; + sim.connect(node2, node3).await; sim.simulate_all_messages().await; assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); @@ -139,17 +139,17 @@ async fn size() { let mut sim = TunSimulator::new(); let node1 = sim.add_node(false, &config1).await; let node2 = sim.add_node(false, &config2).await; - - sim.connect(node1, node2); + + sim.connect(node1, node2).await; sim.simulate_all_messages().await; assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); - + let payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2]; - + sim.put_payload(node1, payload.clone()).await; sim.simulate_all_messages().await; - + assert_eq!(Some(payload), sim.pop_payload(node2)); }; assert_eq!(std::mem::size_of_val(&future), 100); @@ -174,7 +174,7 @@ async fn router_delivers() { let node1 = sim.add_node(false, &config1).await; let node2 = sim.add_node(false, &config2).await; - sim.connect(node1, node2); + sim.connect(node1, node2).await; sim.simulate_all_messages().await; assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); @@ -205,7 +205,7 @@ async fn router_drops_unknown_dest() { let node1 = sim.add_node(false, &config1).await; let node2 = sim.add_node(false, &config2).await; - sim.connect(node1, node2); + sim.connect(node1, node2).await; sim.simulate_all_messages().await; assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); diff --git a/src/tests/peers.rs b/src/tests/peers.rs index 9c888e4..ce65df4 100644 --- a/src/tests/peers.rs +++ b/src/tests/peers.rs @@ -11,7 +11,7 @@ async fn direct_connect() { let node1 = sim.add_node(false, &config).await; let node2 = sim.add_node(false, &config).await; - sim.connect(node1, node2); + sim.connect(node1, node2).await; sim.simulate_all_messages().await; assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); @@ -27,7 +27,7 @@ async fn direct_connect_unencrypted() { let node1 = sim.add_node(false, &config).await; let node2 = sim.add_node(false, &config).await; - sim.connect(node1, node2); + sim.connect(node1, node2).await; sim.simulate_all_messages().await; assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); @@ -41,8 +41,8 @@ async fn cross_connect() { let node2 = sim.add_node(false, &config).await; let node3 = sim.add_node(false, &config).await; - sim.connect(node1, node2); - sim.connect(node1, node3); + sim.connect(node1, node2).await; + sim.connect(node1, node3).await; sim.simulate_all_messages().await; sim.simulate_time(120).await; @@ -80,7 +80,7 @@ async fn reconnect_after_timeout() { let node1 = sim.add_node(false, &config).await; let node2 = sim.add_node(false, &config).await; - sim.connect(node1, node2); + sim.connect(node1, node2).await; sim.simulate_all_messages().await; assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); @@ -102,7 +102,7 @@ async fn lost_init_ping() { let node1 = sim.add_node(false, &config).await; let node2 = sim.add_node(false, &config).await; - sim.connect(node1, node2); + sim.connect(node1, node2).await; sim.drop_message(); // drop init ping sim.simulate_time(120).await; @@ -117,7 +117,7 @@ async fn lost_init_pong() { let node1 = sim.add_node(false, &config).await; let node2 = sim.add_node(false, &config).await; - sim.connect(node1, node2); + sim.connect(node1, node2).await; sim.simulate_next_message().await; // init ping sim.drop_message(); // drop init pong @@ -133,7 +133,7 @@ async fn lost_init_peng() { let node1 = sim.add_node(false, &config).await; let node2 = sim.add_node(false, &config).await; - sim.connect(node1, node2); + sim.connect(node1, node2).await; sim.simulate_next_message().await; // init ping sim.simulate_next_message().await; // init pong sim.drop_message(); // drop init peng