Handling multiple addresses per peer

This commit is contained in:
Dennis Schwerdel 2016-05-02 09:05:34 +02:00
parent 2f2f7b725f
commit 8d3ab9cc4a
2 changed files with 62 additions and 19 deletions

View File

@ -4,6 +4,7 @@ This project follows [semantic versioning](http://semver.org).
### Unreleased ### Unreleased
- [changed] Listening on IPv4 and IPv6
- [changed] Using SO_REUSEADDR to allow frequent rebinding - [changed] Using SO_REUSEADDR to allow frequent rebinding
- [changed] Building and using local libsodium library automatically - [changed] Building and using local libsodium library automatically
- [changed] Updated dependencies - [changed] Updated dependencies

View File

@ -2,8 +2,8 @@
// Copyright (C) 2015-2016 Dennis Schwerdel // Copyright (C) 2015-2016 Dennis Schwerdel
// This software is licensed under GPL-3 or newer (see LICENSE.md) // This software is licensed under GPL-3 or newer (see LICENSE.md)
use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs}; use std::net::{SocketAddr, ToSocketAddrs};
use std::collections::{HashSet, HashMap}; use std::collections::{HashMap, HashSet};
use std::hash::Hasher; use std::hash::Hasher;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::io::Read; use std::io::Read;
@ -31,44 +31,77 @@ type Hash = BuildHasherDefault<FnvHasher>;
struct PeerList { struct PeerList {
timeout: Duration, timeout: Duration,
peers: HashMap<SocketAddr, Time, Hash> peers: HashMap<SocketAddr, (Time, NodeId, Vec<SocketAddr>), Hash>,
nodes: HashMap<NodeId, SocketAddr, Hash>,
addresses: HashSet<SocketAddr, Hash>
} }
impl PeerList { impl PeerList {
fn new(timeout: Duration) -> PeerList { fn new(timeout: Duration) -> PeerList {
PeerList{peers: HashMap::default(), timeout: timeout} PeerList{
peers: HashMap::default(),
timeout: timeout,
nodes: HashMap::default(),
addresses: HashSet::default()
}
} }
fn timeout(&mut self) -> Vec<SocketAddr> { fn timeout(&mut self) -> Vec<SocketAddr> {
let now = now(); let now = now();
let mut del: Vec<SocketAddr> = Vec::new(); let mut del: Vec<SocketAddr> = Vec::new();
for (&addr, &timeout) in &self.peers { for (&addr, &(timeout, _nodeid, ref _alt_addrs)) in &self.peers {
if timeout < now { if timeout < now {
del.push(addr); del.push(addr);
} }
} }
for addr in &del { for addr in &del {
debug!("Forgot peer: {}", addr); debug!("Forgot peer: {}", addr);
self.peers.remove(addr); if let Some((_timeout, nodeid, alt_addrs)) = self.peers.remove(addr) {
self.nodes.remove(&nodeid);
self.addresses.remove(addr);
for addr in &alt_addrs {
self.addresses.remove(addr);
}
}
} }
del del
} }
#[inline(always)] #[inline(always)]
fn contains(&mut self, addr: &SocketAddr) -> bool { fn contains_addr(&mut self, addr: &SocketAddr) -> bool {
self.peers.contains_key(addr) self.addresses.contains(addr)
}
#[inline(always)]
fn contains_node(&mut self, node_id: &NodeId) -> bool {
self.nodes.contains_key(node_id)
}
#[inline]
fn add(&mut self, node_id: NodeId, addr: SocketAddr) {
if self.nodes.insert(node_id, addr).is_none() {
info!("New peer: {}", addr);
self.peers.insert(addr, (now()+self.timeout as Time, node_id, vec![]));
}
} }
#[inline] #[inline]
fn add(&mut self, addr: &SocketAddr) { fn add_alt_addr(&mut self, node_id: NodeId, addr: SocketAddr) {
if self.peers.insert(*addr, now()+self.timeout as Time).is_none() { if let Some(main_addr) = self.nodes.get(&node_id) {
info!("New peer: {}", addr); if let Some(&mut (_timeout, _node_id, ref mut alt_addrs)) = self.peers.get_mut(main_addr) {
alt_addrs.push(addr);
} else {
error!("Main address for node is not connected");
}
} else {
error!("Node not connected");
} }
} }
#[inline] #[inline]
fn as_vec(&self) -> Vec<SocketAddr> { fn as_vec(&self) -> Vec<SocketAddr> {
self.peers.keys().map(|addr| *addr).collect() self.addresses.iter().map(|addr| *addr).collect()
} }
#[inline(always)] #[inline(always)]
@ -83,8 +116,13 @@ impl PeerList {
#[inline] #[inline]
fn remove(&mut self, addr: &SocketAddr) { fn remove(&mut self, addr: &SocketAddr) {
if self.peers.remove(&addr).is_some() { if let Some((_timeout, node_id, alt_addrs)) = self.peers.remove(&addr) {
info!("Removed peer: {}", addr); info!("Removed peer: {}", addr);
self.nodes.remove(&node_id);
self.addresses.remove(addr);
for addr in alt_addrs {
self.addresses.remove(&addr);
}
} }
} }
} }
@ -207,7 +245,7 @@ impl<P: Protocol> GenericCloud<P> {
pub fn connect<Addr: ToSocketAddrs+fmt::Display>(&mut self, addr: Addr, reconnect: bool) -> Result<(), Error> { pub fn connect<Addr: ToSocketAddrs+fmt::Display>(&mut self, addr: Addr, reconnect: bool) -> Result<(), Error> {
if let Ok(mut addrs) = addr.to_socket_addrs() { if let Ok(mut addrs) = addr.to_socket_addrs() {
while let Some(a) = addrs.next() { while let Some(a) = addrs.next() {
if self.peers.contains(&a) || self.blacklist_peers.contains(&a) { if self.peers.contains_addr(&a) || self.blacklist_peers.contains(&a) {
return Ok(()); return Ok(());
} }
} }
@ -264,7 +302,7 @@ impl<P: Protocol> GenericCloud<P> {
match self.table.lookup(&dst) { match self.table.lookup(&dst) {
Some(addr) => { Some(addr) => {
debug!("Found destination for {} => {}", dst, addr); debug!("Found destination for {} => {}", dst, addr);
if self.peers.contains(&addr) { if self.peers.contains_addr(&addr) {
try!(self.send_msg(addr, &mut Message::Data(payload, start, end))) try!(self.send_msg(addr, &mut Message::Data(payload, start, end)))
} else { } else {
warn!("Destination for {} not found in peers: {}", dst, addr); warn!("Destination for {} not found in peers: {}", dst, addr);
@ -309,7 +347,7 @@ impl<P: Protocol> GenericCloud<P> {
}, },
Message::Peers(peers) => { Message::Peers(peers) => {
for p in &peers { for p in &peers {
if ! self.peers.contains(p) && ! self.blacklist_peers.contains(p) { if ! self.peers.contains_addr(p) && ! self.blacklist_peers.contains(p) {
try!(self.connect(p, false)); try!(self.connect(p, false));
} }
} }
@ -319,9 +357,13 @@ impl<P: Protocol> GenericCloud<P> {
self.blacklist_peers.push(peer); self.blacklist_peers.push(peer);
return Ok(()) return Ok(())
} }
self.peers.add(&peer); if self.peers.contains_node(&node_id) {
for range in ranges { self.peers.add_alt_addr(node_id, peer);
self.table.learn(range.base, Some(range.prefix_len), peer.clone()); } else {
self.peers.add(node_id, peer);
for range in ranges {
self.table.learn(range.base, Some(range.prefix_len), peer.clone());
}
} }
if stage == 0 { if stage == 0 {
let peers = self.peers.as_vec(); let peers = self.peers.as_vec();