Exchange all peer addresses

pull/123/head
Dennis Schwerdel 2020-12-19 10:54:55 +00:00
parent 70ad35932c
commit 6d1bb8fabb
3 changed files with 75 additions and 26 deletions

View File

@ -26,7 +26,7 @@ use crate::{
device::{Device, Type},
error::Error,
messages::{
NodeInfo, PeerInfo, MESSAGE_TYPE_CLOSE, MESSAGE_TYPE_DATA, MESSAGE_TYPE_KEEPALIVE, MESSAGE_TYPE_NODE_INFO
AddrList, NodeInfo, PeerInfo, MESSAGE_TYPE_CLOSE, MESSAGE_TYPE_DATA, MESSAGE_TYPE_KEEPALIVE, MESSAGE_TYPE_NODE_INFO
},
net::{mapped_addr, Socket},
payload::Protocol,
@ -47,6 +47,7 @@ const OWN_ADDRESS_RESET_INTERVAL: Time = 300;
const SPACE_BEFORE: usize = 100;
struct PeerData {
addrs: AddrList,
last_seen: Time,
timeout: Time,
peer_timeout: u16,
@ -57,7 +58,7 @@ struct PeerData {
#[derive(Clone)]
pub struct ReconnectEntry {
address: Option<(String, Time)>,
resolved: SmallVec<[SocketAddr; 3]>,
resolved: AddrList,
tries: u16,
timeout: u16,
next: Time,
@ -72,7 +73,7 @@ pub struct GenericCloud<D: Device, P: Protocol, S: Socket, TS: TimeSource> {
broadcast: bool,
peers: HashMap<SocketAddr, PeerData, Hash>,
reconnect_peers: SmallVec<[ReconnectEntry; 3]>,
own_addresses: SmallVec<[SocketAddr; 3]>,
own_addresses: AddrList,
pending_inits: HashMap<SocketAddr, PeerCrypto<NodeInfo>, Hash>,
table: ClaimTable<TS>,
socket: S,
@ -288,8 +289,8 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
fn create_node_info(&self) -> NodeInfo {
let mut peers = smallvec![];
for (addr, peer) in &self.peers {
peers.push(PeerInfo { node_id: Some(peer.node_id), addrs: smallvec![*addr] })
for peer in self.peers.values() {
peers.push(PeerInfo { node_id: Some(peer.node_id), addrs: peer.addrs.clone() })
}
if peers.len() > 20 {
let mut rng = rand::thread_rng();
@ -300,7 +301,8 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
node_id: self.node_id,
peers,
claims: self.claims.clone(),
peer_timeout: Some(self.peer_timeout_publish)
peer_timeout: Some(self.peer_timeout_publish),
addrs: self.own_addresses.clone()
}
}
@ -628,6 +630,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
info!("Added peer {}", addr_nice(addr));
if let Some(init) = self.pending_inits.remove(&addr) {
self.peers.insert(addr, PeerData {
addrs: info.addrs.clone(),
crypto: init,
node_id: info.node_id,
peer_timeout: info.peer_timeout.unwrap_or(DEFAULT_PEER_TIMEOUT),

View File

@ -25,6 +25,7 @@ pub const MESSAGE_TYPE_CLOSE: u8 = 0xff;
pub type AddrList = SmallVec<[SocketAddr; 4]>;
pub type PeerList = SmallVec<[PeerInfo; 16]>;
#[derive(Debug, PartialEq)]
pub struct PeerInfo {
pub node_id: Option<NodeId>,
@ -36,7 +37,8 @@ pub struct NodeInfo {
pub node_id: NodeId,
pub peers: PeerList,
pub claims: RangeList,
pub peer_timeout: Option<u16>
pub peer_timeout: Option<u16>,
pub addrs: AddrList
}
impl NodeInfo {
@ -45,35 +47,46 @@ impl NodeInfo {
const PART_NODEID: u8 = 4;
const PART_PEERS: u8 = 1;
const PART_PEER_TIMEOUT: u8 = 3;
const PART_ADDRS: u8 = 5;
fn read_addr_list<R: Read>(r: &mut Take<R>) -> Result<AddrList, io::Error> {
let flags = r.read_u8()?;
Self::read_addr_list_inner(r, flags)
}
fn read_addr_list_inner<R: Read>(r: &mut Take<R>, flags: u8) -> Result<AddrList, io::Error> {
let num_ipv4_addrs = (flags & 0x07) as usize;
let num_ipv6_addrs = (flags & 0x38) as usize / 8;
let mut addrs = SmallVec::with_capacity(num_ipv4_addrs + num_ipv6_addrs);
for _ in 0..num_ipv6_addrs {
let mut ip = [0u8; 16];
r.read_exact(&mut ip)?;
let port = r.read_u16::<NetworkEndian>()?;
let addr = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(ip), port, 0, 0));
addrs.push(addr);
}
for _ in 0..num_ipv4_addrs {
let mut ip = [0u8; 4];
r.read_exact(&mut ip)?;
let port = r.read_u16::<NetworkEndian>()?;
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(ip), port));
addrs.push(addr);
}
Ok(addrs)
}
fn decode_peer_list_part<R: Read>(r: &mut Take<R>) -> Result<PeerList, io::Error> {
let mut peers = smallvec![];
while r.limit() > 0 {
let flags = r.read_u8()?;
let has_node_id = (flags & 0x80) != 0;
let num_ipv4_addrs = (flags & 0x07) as usize;
let num_ipv6_addrs = (flags & 0x38) as usize / 8;
let mut node_id = None;
if has_node_id {
let mut id = [0; NODE_ID_BYTES];
r.read_exact(&mut id)?;
node_id = Some(id)
}
let mut addrs = SmallVec::with_capacity(num_ipv4_addrs + num_ipv6_addrs);
for _ in 0..num_ipv6_addrs {
let mut ip = [0u8; 16];
r.read_exact(&mut ip)?;
let port = r.read_u16::<NetworkEndian>()?;
let addr = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(ip), port, 0, 0));
addrs.push(addr);
}
for _ in 0..num_ipv4_addrs {
let mut ip = [0u8; 4];
r.read_exact(&mut ip)?;
let port = r.read_u16::<NetworkEndian>()?;
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(ip), port));
addrs.push(addr);
}
let addrs = Self::read_addr_list_inner(r, flags)?;
peers.push(PeerInfo { addrs, node_id })
}
Ok(peers)
@ -92,6 +105,7 @@ impl NodeInfo {
let mut claims = smallvec![];
let mut peer_timeout = None;
let mut node_id = None;
let mut addrs = smallvec![];
loop {
let part = r.read_u8().map_err(|_| Error::Message("Truncated message"))?;
if part == Self::PART_END {
@ -113,6 +127,9 @@ impl NodeInfo {
rp.read_exact(&mut data).map_err(|_| Error::Message("Truncated message"))?;
node_id = Some(data);
}
Self::PART_ADDRS => {
addrs = Self::read_addr_list(&mut rp).map_err(|_| Error::Message("Truncated message"))?;
}
_ => {
let mut data = vec![0; part_len];
rp.read_exact(&mut data).map_err(|_| Error::Message("Truncated message"))?;
@ -124,7 +141,7 @@ impl NodeInfo {
Some(node_id) => node_id,
None => return Err(Error::Message("Payload without node_id"))
};
Ok(Self { node_id, peers, claims, peer_timeout })
Ok(Self { node_id, peers, claims, peer_timeout, addrs })
}
pub fn decode<R: Read>(r: R) -> Result<Self, Error> {
@ -167,6 +184,34 @@ impl NodeInfo {
Ok(())
}
fn encode_addrs_part<W: Write>(&self, mut out: W) -> Result<(), io::Error> {
let mut addr_ipv4: SmallVec<[SocketAddrV4; 16]> = smallvec![];
let mut addr_ipv6: SmallVec<[SocketAddrV6; 16]> = smallvec![];
for a in &self.addrs {
match a {
SocketAddr::V4(addr) => addr_ipv4.push(*addr),
SocketAddr::V6(addr) => addr_ipv6.push(*addr)
}
}
while addr_ipv4.len() >= 8 {
addr_ipv4.pop();
}
while addr_ipv6.len() >= 8 {
addr_ipv6.pop();
}
let flags = addr_ipv6.len() as u8 * 8 + addr_ipv4.len() as u8;
out.write_u8(flags)?;
for a in addr_ipv6 {
out.write_all(&a.ip().octets())?;
out.write_u16::<NetworkEndian>(a.port())?;
}
for a in addr_ipv4 {
out.write_all(&a.ip().octets())?;
out.write_u16::<NetworkEndian>(a.port())?;
}
Ok(())
}
fn encode_part<F: FnOnce(&mut Cursor<&mut [u8]>) -> Result<(), io::Error>>(
cursor: &mut Cursor<&mut [u8]>, part: u8, f: F
) -> Result<(), io::Error> {
@ -199,6 +244,7 @@ impl NodeInfo {
cursor.write_u16::<NetworkEndian>(timeout)
})?
}
Self::encode_part(&mut cursor, Self::PART_ADDRS, |cursor| self.encode_addrs_part(cursor))?;
cursor.write_u8(Self::PART_END)?;
len = cursor.position() as usize;
}

View File

@ -217,7 +217,7 @@ pub fn get_internal_ip() -> Ipv4Addr {
#[allow(unknown_lints, clippy::needless_pass_by_value)]
pub fn resolve<Addr: ToSocketAddrs + fmt::Debug>(addr: Addr) -> Result<SmallVec<[SocketAddr; 3]>, Error> {
pub fn resolve<Addr: ToSocketAddrs + fmt::Debug>(addr: Addr) -> Result<SmallVec<[SocketAddr; 4]>, Error> {
let mut addrs =
addr.to_socket_addrs().map_err(|_| Error::NameUnresolvable(format!("{:?}", addr)))?.collect::<SmallVec<_>>();
// Try IPv4 first as it usually is faster