Partial peer list exchange for scaling

This commit is contained in:
Dennis Schwerdel 2015-11-20 13:34:54 +01:00
parent 9dc796d52b
commit 58f4d58d66
3 changed files with 40 additions and 4 deletions

View File

@ -6,7 +6,7 @@ use std::io::Read;
use std::fmt; use std::fmt;
use std::os::unix::io::AsRawFd; use std::os::unix::io::AsRawFd;
use time::{Duration, SteadyTime}; use time::{Duration, SteadyTime, precise_time_ns};
use epoll; use epoll;
use super::{ethernet, udpmessage}; use super::{ethernet, udpmessage};
@ -60,20 +60,41 @@ impl PeerList {
del del
} }
#[inline(always)]
fn contains(&mut self, addr: &SocketAddr) -> bool { fn contains(&mut self, addr: &SocketAddr) -> bool {
self.peers.contains_key(addr) self.peers.contains_key(addr)
} }
#[inline]
fn add(&mut self, addr: &SocketAddr) { fn add(&mut self, addr: &SocketAddr) {
if self.peers.insert(*addr, SteadyTime::now()+self.timeout).is_none() { if self.peers.insert(*addr, SteadyTime::now()+self.timeout).is_none() {
info!("New peer: {:?}", addr); info!("New peer: {:?}", addr);
} }
} }
#[inline]
fn as_vec(&self) -> Vec<SocketAddr> { fn as_vec(&self) -> Vec<SocketAddr> {
self.peers.keys().map(|addr| *addr).collect() self.peers.keys().map(|addr| *addr).collect()
} }
#[inline(always)]
fn len(&self) -> usize {
self.peers.len()
}
#[inline]
fn subset(&self, size: usize, seed: u32) -> Vec<SocketAddr> {
let mut peers = self.as_vec();
let mut psrng = seed;
let len = peers.len();
for i in size..len {
peers.swap_remove(psrng as usize % (len - i));
psrng = ((1664525 as u64) * (psrng as u64) + (1013904223 as u64)) as u32;
}
peers
}
#[inline]
fn remove(&mut self, addr: &SocketAddr) { fn remove(&mut self, addr: &SocketAddr) {
if self.peers.remove(&addr).is_some() { if self.peers.remove(&addr).is_some() {
info!("Removed peer: {:?}", addr); info!("Removed peer: {:?}", addr);
@ -117,6 +138,7 @@ impl MacTable {
} }
} }
#[inline]
fn learn(&mut self, mac: &Mac, vlan: u16, addr: &SocketAddr) { fn learn(&mut self, mac: &Mac, vlan: u16, addr: &SocketAddr) {
let key = MacTableKey{mac: *mac, vlan: vlan}; let key = MacTableKey{mac: *mac, vlan: vlan};
let value = MacTableValue{address: *addr, timeout: SteadyTime::now()+self.timeout}; let value = MacTableValue{address: *addr, timeout: SteadyTime::now()+self.timeout};
@ -125,6 +147,7 @@ impl MacTable {
} }
} }
#[inline]
fn lookup(&self, mac: &Mac, vlan: u16) -> Option<SocketAddr> { fn lookup(&self, mac: &Mac, vlan: u16) -> Option<SocketAddr> {
let key = MacTableKey{mac: *mac, vlan: vlan}; let key = MacTableKey{mac: *mac, vlan: vlan};
match self.table.get(&key) { match self.table.get(&key) {
@ -207,9 +230,18 @@ impl EthCloud {
self.mactable.timeout(); self.mactable.timeout();
if self.next_peerlist <= SteadyTime::now() { if self.next_peerlist <= SteadyTime::now() {
debug!("Send peer list to all peers"); debug!("Send peer list to all peers");
let peers = self.peers.as_vec(); let mut peer_num = self.peers.len();
let msg = udpmessage::Message::Peers(peers.clone()); if peer_num > 10 {
for addr in &peers { peer_num = (peer_num as f32).sqrt().ceil() as usize;
if peer_num < 10 {
peer_num = 10;
}
}
let mut seed = precise_time_ns() as u32;
let peers = self.peers.subset(peer_num, seed);
let msg = udpmessage::Message::Peers(peers);
seed ^= (precise_time_ns() >> 32) as u32;
for addr in &self.peers.subset(peer_num, seed) {
try!(self.send_msg(addr, &msg)); try!(self.send_msg(addr, &msg));
} }
self.next_peerlist = SteadyTime::now() + self.update_freq; self.next_peerlist = SteadyTime::now() + self.update_freq;

View File

@ -24,6 +24,7 @@ impl TapDevice {
} }
} }
#[inline(always)]
pub fn ifname(&self) -> &str { pub fn ifname(&self) -> &str {
&self.ifname &self.ifname
} }
@ -40,6 +41,7 @@ impl TapDevice {
} }
impl AsRawFd for TapDevice { impl AsRawFd for TapDevice {
#[inline(always)]
fn as_raw_fd(&self) -> RawFd { fn as_raw_fd(&self) -> RawFd {
self.fd.as_raw_fd() self.fd.as_raw_fd()
} }

View File

@ -1,9 +1,11 @@
use std::{mem, slice}; use std::{mem, slice};
#[inline(always)]
pub unsafe fn as_bytes<T>(obj: &T) -> &[u8] { pub unsafe fn as_bytes<T>(obj: &T) -> &[u8] {
slice::from_raw_parts(mem::transmute::<&T, *const u8>(obj), mem::size_of::<T>()) slice::from_raw_parts(mem::transmute::<&T, *const u8>(obj), mem::size_of::<T>())
} }
#[inline(always)]
pub unsafe fn as_obj<T>(data: &[u8]) -> &T { pub unsafe fn as_obj<T>(data: &[u8]) -> &T {
assert!(data.len() >= mem::size_of::<T>()); assert!(data.len() >= mem::size_of::<T>());
mem::transmute(data.as_ptr()) mem::transmute(data.as_ptr())