From ee8542307f907a824a31c71e3ecdb85ecf9a73d1 Mon Sep 17 00:00:00 2001 From: Dennis Schwerdel Date: Wed, 4 Dec 2019 10:04:42 +0100 Subject: [PATCH] Exchange peer_timeout and adapt keepalive --- src/cloud.rs | 30 +++++++++++++++++++---------- src/tests/peers.rs | 17 ++++++++++------ src/udpmessage.rs | 48 ++++++++++++++++++++++++++++++---------------- 3 files changed, 62 insertions(+), 33 deletions(-) diff --git a/src/cloud.rs b/src/cloud.rs index 359f601..cea9ecb 100644 --- a/src/cloud.rs +++ b/src/cloud.rs @@ -40,6 +40,7 @@ pub const STATS_INTERVAL: Time = 60; struct PeerData { timeout: Time, + peer_timeout: u16, node_id: NodeId, alt_addrs: Vec } @@ -84,6 +85,10 @@ impl PeerList { del } + pub fn min_peer_timeout(&self) -> u16 { + self.peers.iter().map(|p| p.1.peer_timeout).min().unwrap_or(1800) + } + #[inline] pub fn contains_addr(&self, addr: &SocketAddr) -> bool { self.addresses.contains_key(addr) @@ -106,13 +111,14 @@ impl PeerList { #[inline] - fn add(&mut self, node_id: NodeId, addr: SocketAddr) { + fn add(&mut self, node_id: NodeId, addr: SocketAddr, peer_timeout: u16) { if self.nodes.insert(node_id, addr).is_none() { info!("New peer: {}", addr); self.peers.insert(addr, PeerData { timeout: TS::now() + Time::from(self.timeout), node_id, - alt_addrs: vec![] + alt_addrs: vec![], + peer_timeout }); self.addresses.insert(addr, node_id); } @@ -221,7 +227,7 @@ pub struct GenericCloud GenericCloud GenericCloud GenericCloud GenericCloud GenericCloud GenericCloud { + Message::Init(stage, node_id, ranges, peer_timeout) => { // Avoid connecting to self if node_id == self.node_id { self.own_addresses.push(peer); @@ -707,7 +714,7 @@ impl GenericCloud GenericCloud Node 2: Init 0 - assert_message4!(node1, node1_addr, node2, node2_addr, Message::Init(0, node1.node_id(), vec![])); + assert_message4!(node1, node1_addr, node2, node2_addr, Message::Init(0, node1.node_id(), vec![], 1800)); assert_clean!(node1); assert!(node2.peers().contains_node(&node1.node_id())); // Node 2 -> Node 1: Init 1 | Node 2 -> Node 1: Peers - assert_message4!(node2, node2_addr, node1, node1_addr, Message::Init(1, node2.node_id(), vec![])); + assert_message4!(node2, node2_addr, node1, node1_addr, Message::Init(1, node2.node_id(), vec![], 1800)); assert!(node1.peers().contains_node(&node2.node_id())); assert_message4!(node2, node2_addr, node1, node1_addr, Message::Peers(vec![node1_addr])); assert_clean!(node2); // Node 1 -> Node 2: Peers | Node 1 -> Node 1: Init 0 assert_message4!(node1, node1_addr, node2, node2_addr, Message::Peers(vec![node2_addr])); - assert_message4!(node1, node1_addr, node1, node1_addr, Message::Init(0, node1.node_id(), vec![])); + assert_message4!(node1, node1_addr, node1, node1_addr, Message::Init(0, node1.node_id(), vec![], 1800)); assert!(node1.own_addresses().contains(&node1_addr)); assert_clean!(node1); // Node 2 -> Node 2: Init 0 - assert_message4!(node2, node2_addr, node2, node2_addr, Message::Init(0, node2.node_id(), vec![])); + assert_message4!(node2, node2_addr, node2, node2_addr, Message::Init(0, node2.node_id(), vec![], 1800)); assert_clean!(node2); assert!(node2.own_addresses().contains(&node2_addr)); @@ -156,7 +156,7 @@ fn lost_init1() { node1.connect("2.3.4.5:6789").unwrap(); // Node 1 -> Node 2: Init 0 - assert_message4!(node1, node1_addr, node2, node2_addr, Message::Init(0, node1.node_id(), vec![])); + assert_message4!(node1, node1_addr, node2, node2_addr, Message::Init(0, node1.node_id(), vec![], 1800)); assert_clean!(node1); // Node 2 -> Node 1: Init 1 | Node 2 -> Node 1: Peers @@ -177,7 +177,7 @@ fn wrong_magic() { let node2_addr = addr!("2.3.4.5:6789"); node1.connect("2.3.4.5:6789").unwrap(); - assert_message4!(node1, node1_addr, node2, node2_addr, Message::Init(0, node1.node_id(), vec![])); + assert_message4!(node1, node1_addr, node2, node2_addr, Message::Init(0, node1.node_id(), vec![], 1800)); assert_clean!(node1, node2); @@ -204,3 +204,8 @@ fn remove_dead_peers() { fn update_primary_address() { // TODO } + +#[test] +fn automatic_peer_timeout() { + // TODO +} diff --git a/src/udpmessage.rs b/src/udpmessage.rs index 463def6..d1f5da1 100644 --- a/src/udpmessage.rs +++ b/src/udpmessage.rs @@ -53,9 +53,9 @@ impl TopHeader { } pub enum Message<'a> { - Data(&'a mut [u8], usize, usize), // data, start, end - Peers(Vec), // peers - Init(u8, NodeId, Vec), // step, node_id, ranges + Data(&'a mut [u8], usize, usize), // data, start, end + Peers(Vec), // peers + Init(u8, NodeId, Vec, u16), // step, node_id, ranges Close } @@ -64,7 +64,7 @@ impl<'a> Message<'a> { match self { Message::Data(_, start, end) => Message::Data(&mut [], start, end), Message::Peers(peers) => Message::Peers(peers), - Message::Init(step, node_id, ranges) => Message::Init(step, node_id, ranges), + Message::Init(step, node_id, ranges, timeout) => Message::Init(step, node_id, ranges, timeout), Message::Close => Message::Close } } @@ -86,8 +86,15 @@ impl<'a> fmt::Debug for Message<'a> { } write!(formatter, "]") } - Message::Init(stage, ref node_id, ref peers) => { - write!(formatter, "Init(stage={}, node_id={}, {:?})", stage, bytes_to_hex(node_id), peers) + Message::Init(stage, ref node_id, ref peers, ref peer_timeout) => { + write!( + formatter, + "Init(stage={}, node_id={}, peer_timeout={}, {:?})", + stage, + bytes_to_hex(node_id), + peer_timeout, + peers + ) } Message::Close => write!(formatter, "Close") } @@ -183,7 +190,12 @@ pub fn decode<'a>(data: &'a mut [u8], magic: HeaderMagic, crypto: &Crypto) -> Re pos += read; addrs.push(range); } - Message::Init(stage, node_id, addrs) + let mut peer_timeout = 1800; + if data.len() >= pos + 2 { + peer_timeout = Encoder::read_u16(&data[pos..]); + // pos += 2; never read + } + Message::Init(stage, node_id, addrs, peer_timeout) } 3 => Message::Close, _ => return Err(Error::Parse("Unknown message type")) @@ -198,7 +210,7 @@ pub fn encode<'a>( let header_type = match msg { Message::Data(_, _, _) => 0, Message::Peers(_) => 1, - Message::Init(_, _, _) => 2, + Message::Init(_, _, _, _) => 2, Message::Close => 3 }; let mut start = 64; @@ -244,7 +256,7 @@ pub fn encode<'a>( } end = pos; } - Message::Init(stage, ref node_id, ref ranges) => { + Message::Init(stage, ref node_id, ref ranges, peer_timeout) => { let mut pos = start; assert!(buf.len() >= pos + 2 + NODE_ID_BYTES); buf[pos] = stage; @@ -257,6 +269,8 @@ pub fn encode<'a>( for range in ranges { pos += range.write_to(&mut buf[pos..]); } + Encoder::write_u16(peer_timeout, &mut buf[pos..]); + pos += 2; end = pos; } Message::Close => {} @@ -299,9 +313,9 @@ impl<'a> PartialEq for Message<'a> { false } } - Message::Init(step1, node_id1, ref ranges1) => { - if let Message::Init(step2, node_id2, ref ranges2) = *other { - step1 == step2 && node_id1 == node_id2 && ranges1 == ranges2 + Message::Init(step1, node_id1, ref ranges1, peer_timeout1) => { + if let Message::Init(step2, node_id2, ref ranges2, peer_timeout2) = *other { + step1 == step2 && node_id1 == node_id2 && ranges1 == ranges2 && peer_timeout1 == peer_timeout2 } else { false } @@ -421,15 +435,15 @@ fn udpmessage_init() { Range { base: Address { data: [0, 1, 2, 3, 4, 5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], len: 6 }, prefix_len: 16 }, ]; let node_id = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]; - let mut msg = Message::Init(0, node_id, addrs); + let mut msg = Message::Init(0, node_id, addrs, 1800); let mut should = [ 118, 112, 110, 1, 0, 0, 0, 2, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 2, 4, 0, 1, 2, 3, 24, 6, - 0, 1, 2, 3, 4, 5, 16 + 0, 1, 2, 3, 4, 5, 16, 7, 8 ]; { let mut buf = [0; 1024]; let res = encode(&mut msg, &mut buf[..], MAGIC, &mut crypto); - assert_eq!(res.len(), 40); + assert_eq!(res.len(), 42); for i in 0..res.len() { assert_eq!(res[i], should[i]); } @@ -502,9 +516,9 @@ fn message_fmt() { base: Address { data: [0, 1, 2, 3, 4, 5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], len: 6 }, prefix_len: 16 } - ]) + ], 1800) ), - "Init(stage=0, node_id=000102030405060708090a0b0c0d0e0f, [0.1.2.3/24, 00:01:02:03:04:05/16])" + "Init(stage=0, node_id=000102030405060708090a0b0c0d0e0f, peer_timeout=1800, [0.1.2.3/24, 00:01:02:03:04:05/16])" ); assert_eq!(format!("{:?}", Message::Close), "Close"); }