From 6d1bb8fabb9683c443ac46d914841f993b6cd3ba Mon Sep 17 00:00:00 2001 From: Dennis Schwerdel Date: Sat, 19 Dec 2020 10:54:55 +0000 Subject: [PATCH] Exchange all peer addresses --- src/cloud.rs | 15 +++++---- src/messages.rs | 84 ++++++++++++++++++++++++++++++++++++++----------- src/util.rs | 2 +- 3 files changed, 75 insertions(+), 26 deletions(-) diff --git a/src/cloud.rs b/src/cloud.rs index 6cab3a3..1a3533a 100644 --- a/src/cloud.rs +++ b/src/cloud.rs @@ -26,7 +26,7 @@ use crate::{ device::{Device, Type}, error::Error, messages::{ - NodeInfo, PeerInfo, MESSAGE_TYPE_CLOSE, MESSAGE_TYPE_DATA, MESSAGE_TYPE_KEEPALIVE, MESSAGE_TYPE_NODE_INFO + AddrList, NodeInfo, PeerInfo, MESSAGE_TYPE_CLOSE, MESSAGE_TYPE_DATA, MESSAGE_TYPE_KEEPALIVE, MESSAGE_TYPE_NODE_INFO }, net::{mapped_addr, Socket}, payload::Protocol, @@ -47,6 +47,7 @@ const OWN_ADDRESS_RESET_INTERVAL: Time = 300; const SPACE_BEFORE: usize = 100; struct PeerData { + addrs: AddrList, last_seen: Time, timeout: Time, peer_timeout: u16, @@ -57,7 +58,7 @@ struct PeerData { #[derive(Clone)] pub struct ReconnectEntry { address: Option<(String, Time)>, - resolved: SmallVec<[SocketAddr; 3]>, + resolved: AddrList, tries: u16, timeout: u16, next: Time, @@ -72,7 +73,7 @@ pub struct GenericCloud { broadcast: bool, peers: HashMap, reconnect_peers: SmallVec<[ReconnectEntry; 3]>, - own_addresses: SmallVec<[SocketAddr; 3]>, + own_addresses: AddrList, pending_inits: HashMap, Hash>, table: ClaimTable, socket: S, @@ -288,8 +289,8 @@ impl GenericCloud NodeInfo { let mut peers = smallvec![]; - for (addr, peer) in &self.peers { - peers.push(PeerInfo { node_id: Some(peer.node_id), addrs: smallvec![*addr] }) + for peer in self.peers.values() { + peers.push(PeerInfo { node_id: Some(peer.node_id), addrs: peer.addrs.clone() }) } if peers.len() > 20 { let mut rng = rand::thread_rng(); @@ -300,7 +301,8 @@ impl GenericCloud GenericCloud; pub type PeerList = SmallVec<[PeerInfo; 16]>; + #[derive(Debug, PartialEq)] pub struct PeerInfo { pub node_id: Option, @@ -36,7 +37,8 @@ pub struct NodeInfo { pub node_id: NodeId, pub peers: PeerList, pub claims: RangeList, - pub peer_timeout: Option + pub peer_timeout: Option, + pub addrs: AddrList } impl NodeInfo { @@ -45,35 +47,46 @@ impl NodeInfo { const PART_NODEID: u8 = 4; const PART_PEERS: u8 = 1; const PART_PEER_TIMEOUT: u8 = 3; + const PART_ADDRS: u8 = 5; + fn read_addr_list(r: &mut Take) -> Result { + let flags = r.read_u8()?; + Self::read_addr_list_inner(r, flags) + } + + fn read_addr_list_inner(r: &mut Take, flags: u8) -> Result { + let num_ipv4_addrs = (flags & 0x07) as usize; + let num_ipv6_addrs = (flags & 0x38) as usize / 8; + let mut addrs = SmallVec::with_capacity(num_ipv4_addrs + num_ipv6_addrs); + for _ in 0..num_ipv6_addrs { + let mut ip = [0u8; 16]; + r.read_exact(&mut ip)?; + let port = r.read_u16::()?; + let addr = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(ip), port, 0, 0)); + addrs.push(addr); + } + for _ in 0..num_ipv4_addrs { + let mut ip = [0u8; 4]; + r.read_exact(&mut ip)?; + let port = r.read_u16::()?; + let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(ip), port)); + addrs.push(addr); + } + Ok(addrs) + } + fn decode_peer_list_part(r: &mut Take) -> Result { let mut peers = smallvec![]; while r.limit() > 0 { let flags = r.read_u8()?; let has_node_id = (flags & 0x80) != 0; - let num_ipv4_addrs = (flags & 0x07) as usize; - let num_ipv6_addrs = (flags & 0x38) as usize / 8; let mut node_id = None; if has_node_id { let mut id = [0; NODE_ID_BYTES]; r.read_exact(&mut id)?; node_id = Some(id) } - let mut addrs = SmallVec::with_capacity(num_ipv4_addrs + num_ipv6_addrs); - for _ in 0..num_ipv6_addrs { - let mut ip = [0u8; 16]; - r.read_exact(&mut ip)?; - let port = r.read_u16::()?; - let addr = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(ip), port, 0, 0)); - addrs.push(addr); - } - for _ in 0..num_ipv4_addrs { - let mut ip = [0u8; 4]; - r.read_exact(&mut ip)?; - let port = r.read_u16::()?; - let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(ip), port)); - addrs.push(addr); - } + let addrs = Self::read_addr_list_inner(r, flags)?; peers.push(PeerInfo { addrs, node_id }) } Ok(peers) @@ -92,6 +105,7 @@ impl NodeInfo { let mut claims = smallvec![]; let mut peer_timeout = None; let mut node_id = None; + let mut addrs = smallvec![]; loop { let part = r.read_u8().map_err(|_| Error::Message("Truncated message"))?; if part == Self::PART_END { @@ -113,6 +127,9 @@ impl NodeInfo { rp.read_exact(&mut data).map_err(|_| Error::Message("Truncated message"))?; node_id = Some(data); } + Self::PART_ADDRS => { + addrs = Self::read_addr_list(&mut rp).map_err(|_| Error::Message("Truncated message"))?; + } _ => { let mut data = vec![0; part_len]; rp.read_exact(&mut data).map_err(|_| Error::Message("Truncated message"))?; @@ -124,7 +141,7 @@ impl NodeInfo { Some(node_id) => node_id, None => return Err(Error::Message("Payload without node_id")) }; - Ok(Self { node_id, peers, claims, peer_timeout }) + Ok(Self { node_id, peers, claims, peer_timeout, addrs }) } pub fn decode(r: R) -> Result { @@ -167,6 +184,34 @@ impl NodeInfo { Ok(()) } + fn encode_addrs_part(&self, mut out: W) -> Result<(), io::Error> { + let mut addr_ipv4: SmallVec<[SocketAddrV4; 16]> = smallvec![]; + let mut addr_ipv6: SmallVec<[SocketAddrV6; 16]> = smallvec![]; + for a in &self.addrs { + match a { + SocketAddr::V4(addr) => addr_ipv4.push(*addr), + SocketAddr::V6(addr) => addr_ipv6.push(*addr) + } + } + while addr_ipv4.len() >= 8 { + addr_ipv4.pop(); + } + while addr_ipv6.len() >= 8 { + addr_ipv6.pop(); + } + let flags = addr_ipv6.len() as u8 * 8 + addr_ipv4.len() as u8; + out.write_u8(flags)?; + for a in addr_ipv6 { + out.write_all(&a.ip().octets())?; + out.write_u16::(a.port())?; + } + for a in addr_ipv4 { + out.write_all(&a.ip().octets())?; + out.write_u16::(a.port())?; + } + Ok(()) + } + fn encode_part) -> Result<(), io::Error>>( cursor: &mut Cursor<&mut [u8]>, part: u8, f: F ) -> Result<(), io::Error> { @@ -199,6 +244,7 @@ impl NodeInfo { cursor.write_u16::(timeout) })? } + Self::encode_part(&mut cursor, Self::PART_ADDRS, |cursor| self.encode_addrs_part(cursor))?; cursor.write_u8(Self::PART_END)?; len = cursor.position() as usize; } diff --git a/src/util.rs b/src/util.rs index bbd4dfd..42a53b1 100644 --- a/src/util.rs +++ b/src/util.rs @@ -217,7 +217,7 @@ pub fn get_internal_ip() -> Ipv4Addr { #[allow(unknown_lints, clippy::needless_pass_by_value)] -pub fn resolve(addr: Addr) -> Result, Error> { +pub fn resolve(addr: Addr) -> Result, Error> { let mut addrs = addr.to_socket_addrs().map_err(|_| Error::NameUnresolvable(format!("{:?}", addr)))?.collect::>(); // Try IPv4 first as it usually is faster