Exponential backoff for reconnect timeouts

pull/9/head
Dennis Schwerdel 2016-05-24 10:32:03 +02:00
parent 6367cb933d
commit 2119bbcae5
2 changed files with 66 additions and 12 deletions

View File

@ -4,6 +4,7 @@ This project follows [semantic versioning](http://semver.org).
### Unreleased
- [added] Exponential backoff for reconnect timeouts
- [added] Systemd compatible startup scripts
- [changed] Repeatedly resolving connect addresses to allow DynDNS
- [changed] Listening on IPv4 and IPv6

View File

@ -72,6 +72,17 @@ impl PeerList {
self.addresses.contains(addr)
}
#[inline]
fn is_connected<Addr: ToSocketAddrs+fmt::Display>(&self, addr: Addr) -> Result<bool, Error> {
let mut addrs = try!(addr.to_socket_addrs().map_err(|_| Error::SocketError("Error looking up name")));
while let Some(a) = addrs.next() {
if self.contains_addr(&a) {
return Ok(true);
}
}
Ok(false)
}
#[inline(always)]
fn contains_node(&self, node_id: &NodeId) -> bool {
self.nodes.contains_key(node_id)
@ -127,6 +138,13 @@ impl PeerList {
}
}
#[derive(Clone)]
pub struct ReconnectEntry {
address: String,
tries: u16,
timeout: u16,
next: Time
}
pub struct GenericCloud<P: Protocol> {
node_id: NodeId,
@ -134,7 +152,7 @@ pub struct GenericCloud<P: Protocol> {
addresses: Vec<Range>,
learning: bool,
broadcast: bool,
reconnect_peers: Vec<String>,
reconnect_peers: Vec<ReconnectEntry>,
blacklist_peers: Vec<SocketAddr>,
table: Box<Table>,
socket4: UdpSocket,
@ -243,17 +261,28 @@ impl<P: Protocol> GenericCloud<P> {
}
pub fn add_reconnect_peer(&mut self, add: String) {
self.reconnect_peers.push(add)
self.reconnect_peers.push(ReconnectEntry {
address: add,
tries: 0,
timeout: 1,
next: now()
})
}
pub fn connect<Addr: ToSocketAddrs+fmt::Display>(&mut self, addr: Addr) -> Result<(), Error> {
if let Ok(mut addrs) = addr.to_socket_addrs() {
while let Some(a) = addrs.next() {
if self.peers.contains_addr(&a) || self.blacklist_peers.contains(&a) {
return Ok(());
}
fn is_blacklisted<Addr: ToSocketAddrs+fmt::Display>(&self, addr: Addr) -> Result<bool, Error> {
let mut addrs = try!(addr.to_socket_addrs().map_err(|_| Error::SocketError("Error looking up name")));
while let Some(a) = addrs.next() {
if self.blacklist_peers.contains(&a) {
return Ok(true);
}
}
Ok(false)
}
pub fn connect<Addr: ToSocketAddrs+fmt::Display+Clone>(&mut self, addr: Addr) -> Result<(), Error> {
if try!(self.peers.is_connected(addr.clone())) || try!(self.is_blacklisted(addr.clone())) {
return Ok(())
}
debug!("Connecting to {}", addr);
let subnets = self.addresses.clone();
let node_id = self.node_id.clone();
@ -270,7 +299,8 @@ impl<P: Protocol> GenericCloud<P> {
fn housekeep(&mut self) -> Result<(), Error> {
self.peers.timeout();
self.table.housekeep();
if self.next_peerlist <= now() {
let now = now();
if self.next_peerlist <= now {
debug!("Send peer list to all peers");
let mut peer_num = self.peers.len();
if peer_num > 10 {
@ -285,10 +315,33 @@ impl<P: Protocol> GenericCloud<P> {
let peers = self.peers.subset(peer_num);
let mut msg = Message::Peers(peers);
try!(self.broadcast_msg(&mut msg));
self.next_peerlist = now() + self.update_freq as Time;
self.next_peerlist = now + self.update_freq as Time;
}
for addr in self.reconnect_peers.clone() {
try!(self.connect(&addr as &str));
for entry in self.reconnect_peers.clone() {
if entry.next > now {
continue
}
try!(self.connect(&entry.address as &str));
}
for entry in &mut self.reconnect_peers {
if try!(self.peers.is_connected(&entry.address as &str)) {
entry.tries = 0;
entry.timeout = 1;
entry.next = now + 1;
continue
}
if entry.next > now {
continue
}
entry.tries += 1;
if entry.tries > 10 {
entry.tries = 0;
entry.timeout *= 2;
}
if entry.timeout > 3600 {
entry.timeout = 3600;
}
entry.next = now + entry.timeout as Time;
}
Ok(())
}