From 2119bbcae53c5e41efa41f1b17f8275ce4ce686b Mon Sep 17 00:00:00 2001 From: Dennis Schwerdel Date: Tue, 24 May 2016 10:32:03 +0200 Subject: [PATCH] Exponential backoff for reconnect timeouts --- CHANGELOG.md | 1 + src/cloud.rs | 77 ++++++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 66 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 80b980a..66a1957 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ This project follows [semantic versioning](http://semver.org). ### Unreleased +- [added] Exponential backoff for reconnect timeouts - [added] Systemd compatible startup scripts - [changed] Repeatedly resolving connect addresses to allow DynDNS - [changed] Listening on IPv4 and IPv6 diff --git a/src/cloud.rs b/src/cloud.rs index 8d59c73..8705ecb 100644 --- a/src/cloud.rs +++ b/src/cloud.rs @@ -72,6 +72,17 @@ impl PeerList { self.addresses.contains(addr) } + #[inline] + fn is_connected(&self, addr: Addr) -> Result { + let mut addrs = try!(addr.to_socket_addrs().map_err(|_| Error::SocketError("Error looking up name"))); + while let Some(a) = addrs.next() { + if self.contains_addr(&a) { + return Ok(true); + } + } + Ok(false) + } + #[inline(always)] fn contains_node(&self, node_id: &NodeId) -> bool { self.nodes.contains_key(node_id) @@ -127,6 +138,13 @@ impl PeerList { } } +#[derive(Clone)] +pub struct ReconnectEntry { + address: String, + tries: u16, + timeout: u16, + next: Time +} pub struct GenericCloud { node_id: NodeId, @@ -134,7 +152,7 @@ pub struct GenericCloud { addresses: Vec, learning: bool, broadcast: bool, - reconnect_peers: Vec, + reconnect_peers: Vec, blacklist_peers: Vec, table: Box, socket4: UdpSocket, @@ -243,17 +261,28 @@ impl GenericCloud

{ } pub fn add_reconnect_peer(&mut self, add: String) { - self.reconnect_peers.push(add) + self.reconnect_peers.push(ReconnectEntry { + address: add, + tries: 0, + timeout: 1, + next: now() + }) } - pub fn connect(&mut self, addr: Addr) -> Result<(), Error> { - if let Ok(mut addrs) = addr.to_socket_addrs() { - while let Some(a) = addrs.next() { - if self.peers.contains_addr(&a) || self.blacklist_peers.contains(&a) { - return Ok(()); - } + fn is_blacklisted(&self, addr: Addr) -> Result { + let mut addrs = try!(addr.to_socket_addrs().map_err(|_| Error::SocketError("Error looking up name"))); + while let Some(a) = addrs.next() { + if self.blacklist_peers.contains(&a) { + return Ok(true); } } + Ok(false) + } + + pub fn connect(&mut self, addr: Addr) -> Result<(), Error> { + if try!(self.peers.is_connected(addr.clone())) || try!(self.is_blacklisted(addr.clone())) { + return Ok(()) + } debug!("Connecting to {}", addr); let subnets = self.addresses.clone(); let node_id = self.node_id.clone(); @@ -270,7 +299,8 @@ impl GenericCloud

{ fn housekeep(&mut self) -> Result<(), Error> { self.peers.timeout(); self.table.housekeep(); - if self.next_peerlist <= now() { + let now = now(); + if self.next_peerlist <= now { debug!("Send peer list to all peers"); let mut peer_num = self.peers.len(); if peer_num > 10 { @@ -285,10 +315,33 @@ impl GenericCloud

{ let peers = self.peers.subset(peer_num); let mut msg = Message::Peers(peers); try!(self.broadcast_msg(&mut msg)); - self.next_peerlist = now() + self.update_freq as Time; + self.next_peerlist = now + self.update_freq as Time; } - for addr in self.reconnect_peers.clone() { - try!(self.connect(&addr as &str)); + for entry in self.reconnect_peers.clone() { + if entry.next > now { + continue + } + try!(self.connect(&entry.address as &str)); + } + for entry in &mut self.reconnect_peers { + if try!(self.peers.is_connected(&entry.address as &str)) { + entry.tries = 0; + entry.timeout = 1; + entry.next = now + 1; + continue + } + if entry.next > now { + continue + } + entry.tries += 1; + if entry.tries > 10 { + entry.tries = 0; + entry.timeout *= 2; + } + if entry.timeout > 3600 { + entry.timeout = 3600; + } + entry.next = now + entry.timeout as Time; } Ok(()) }