This commit is contained in:
Dennis Schwerdel 2015-11-20 09:11:54 +01:00
parent 90bf0f1c40
commit e76e93544b
6 changed files with 206 additions and 121 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
target target
Cargo.lock Cargo.lock
ethcloud-*
._* ._*

View File

@ -8,8 +8,8 @@ build = "build.rs"
time = "0.1" time = "0.1"
docopt = "0.6" docopt = "0.6"
rustc-serialize = "0.3" rustc-serialize = "0.3"
log = "*" log = "0.3"
env_logger = "*" epoll = "0.2"
[build-dependencies] [build-dependencies]
gcc = "0.3" gcc = "0.3"

118
benches/peerlist.rs Normal file
View File

@ -0,0 +1,118 @@
#![feature(test)]
extern crate test;
extern crate time;
use std::net::{SocketAddr, ToSocketAddrs, SocketAddrV4, Ipv4Addr};
use std::collections::HashMap;
use time::{Duration, SteadyTime};
use test::Bencher;
struct PeerListHashMap {
timeout: Duration,
peers: HashMap<SocketAddr, SteadyTime>
}
impl PeerListHashMap {
fn new(timeout: Duration) -> Self {
PeerListHashMap{peers: HashMap::new(), timeout: timeout}
}
fn add(&mut self, addr: &SocketAddr) {
if self.peers.insert(*addr, SteadyTime::now()+self.timeout).is_none() {
}
}
}
struct PeerListVec {
timeout: Duration,
peers: Vec<(SocketAddr, SteadyTime)>
}
impl PeerListVec {
fn new(timeout: Duration) -> Self {
PeerListVec{peers: Vec::new(), timeout: timeout}
}
fn add(&mut self, addr: &SocketAddr) {
for &(ref peer, ref timeout) in &self.peers {
if peer == addr {
return;
}
}
self.peers.push((*addr, SteadyTime::now()+self.timeout));
}
}
fn bench_hashmap_add_n(b: &mut Bencher, n: u16) {
let mut peers = PeerListHashMap::new(Duration::seconds(60));
for i in 0..n {
peers.add(&SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 1), i)));
}
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0));
b.iter(|| {
peers.add(&addr)
})
}
#[bench]
fn bench_hashmap_add_0(b: &mut Bencher) {
bench_hashmap_add_n(b, 0);
}
#[bench]
fn bench_hashmap_add_1(b: &mut Bencher) {
bench_hashmap_add_n(b, 1);
}
#[bench]
fn bench_hashmap_add_10(b: &mut Bencher) {
bench_hashmap_add_n(b, 10);
}
#[bench]
fn bench_hashmap_add_100(b: &mut Bencher) {
bench_hashmap_add_n(b, 100);
}
#[bench]
fn bench_hashmap_add_1000(b: &mut Bencher) {
bench_hashmap_add_n(b, 1000);
}
fn bench_vec_add_n(b: &mut Bencher, n: u16) {
let mut peers = PeerListVec::new(Duration::seconds(60));
for i in 0..n {
peers.add(&SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 1), i)));
}
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0));
b.iter(|| {
peers.add(&addr)
})
}
#[bench]
fn bench_vec_add_0(b: &mut Bencher) {
bench_vec_add_n(b, 0);
}
#[bench]
fn bench_vec_add_1(b: &mut Bencher) {
bench_vec_add_n(b, 1);
}
#[bench]
fn bench_vec_add_10(b: &mut Bencher) {
bench_vec_add_n(b, 10);
}
#[bench]
fn bench_vec_add_100(b: &mut Bencher) {
bench_vec_add_n(b, 100);
}
#[bench]
fn bench_vec_add_1000(b: &mut Bencher) {
bench_vec_add_n(b, 1000);
}

View File

@ -4,12 +4,10 @@ use std::hash::Hasher;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::io::Read; use std::io::Read;
use std::fmt; use std::fmt;
use std::sync::{Arc, Mutex}; use std::os::unix::io::AsRawFd;
use std::thread;
use std::ops::Deref;
use std::time::Duration as StdDuration;
use time::{Duration, SteadyTime}; use time::{Duration, SteadyTime};
use epoll;
use super::{ethernet, udpmessage}; use super::{ethernet, udpmessage};
use super::tapdev::TapDevice; use super::tapdev::TapDevice;
@ -135,27 +133,15 @@ impl MacTable {
} }
} }
pub struct EthCloudInner { pub struct EthCloud {
peers: Mutex<PeerList>, peers: PeerList,
mactable: Mutex<MacTable>, mactable: MacTable,
socket: Mutex<UdpSocket>, socket: UdpSocket,
tapdev: Mutex<TapDevice>, tapdev: TapDevice,
token: Token, token: Token,
next_peerlist: Mutex<SteadyTime>, next_peerlist: SteadyTime,
update_freq: Duration, update_freq: Duration,
running: Mutex<bool> buffer_out: [u8; 64*1024]
}
#[derive(Clone)]
pub struct EthCloud(Arc<EthCloudInner>);
impl Deref for EthCloud {
type Target = EthCloudInner;
#[inline(always)]
fn deref(&self) -> &Self::Target {
&self.0
}
} }
impl EthCloud { impl EthCloud {
@ -169,23 +155,22 @@ impl EthCloud {
_ => panic!("Failed to open tap device") _ => panic!("Failed to open tap device")
}; };
info!("Opened tap device {}", tapdev.ifname()); info!("Opened tap device {}", tapdev.ifname());
EthCloud(Arc::new(EthCloudInner{ EthCloud{
peers: Mutex::new(PeerList::new(peer_timeout)), peers: PeerList::new(peer_timeout),
mactable: Mutex::new(MacTable::new(mac_timeout)), mactable: MacTable::new(mac_timeout),
socket: Mutex::new(socket), socket: socket,
tapdev: Mutex::new(tapdev), tapdev: tapdev,
token: token, token: token,
next_peerlist: Mutex::new(SteadyTime::now()), next_peerlist: SteadyTime::now(),
update_freq: peer_timeout/2, update_freq: peer_timeout/2,
running: Mutex::new(true) buffer_out: [0; 64*1024]
})) }
} }
fn send_msg<A: ToSocketAddrs + fmt::Display>(&self, addr: A, msg: &udpmessage::Message) -> Result<(), Error> { fn send_msg<A: ToSocketAddrs + fmt::Display>(&mut self, addr: A, msg: &udpmessage::Message) -> Result<(), Error> {
debug!("Sending {:?} to {}", msg, addr); debug!("Sending {:?} to {}", msg, addr);
let mut buffer = [0u8; 64*1024]; let size = udpmessage::encode(self.token, msg, &mut self.buffer_out);
let size = udpmessage::encode(self.token, msg, &mut buffer); match self.socket.send_to(&self.buffer_out[..size], addr) {
match self.socket.lock().expect("Lock poisoned").send_to(&buffer[..size], addr) {
Ok(written) if written == size => Ok(()), Ok(written) if written == size => Ok(()),
Ok(_) => Err(Error::SocketError("Sent out truncated packet")), Ok(_) => Err(Error::SocketError("Sent out truncated packet")),
Err(e) => { Err(e) => {
@ -195,30 +180,29 @@ impl EthCloud {
} }
} }
pub fn connect<A: ToSocketAddrs + fmt::Display>(&self, addr: A) -> Result<(), Error> { pub fn connect<A: ToSocketAddrs + fmt::Display>(&mut self, addr: A) -> Result<(), Error> {
info!("Connecting to {}", addr); info!("Connecting to {}", addr);
self.send_msg(addr, &udpmessage::Message::GetPeers) self.send_msg(addr, &udpmessage::Message::GetPeers)
} }
fn housekeep(&self) -> Result<(), Error> { fn housekeep(&mut self) -> Result<(), Error> {
self.peers.lock().expect("Lock poisoned").timeout(); self.peers.timeout();
self.mactable.lock().expect("Lock poisoned").timeout(); self.mactable.timeout();
let mut next_peerlist = self.next_peerlist.lock().expect("Lock poisoned"); if self.next_peerlist <= SteadyTime::now() {
if *next_peerlist <= SteadyTime::now() {
debug!("Send peer list to all peers"); debug!("Send peer list to all peers");
let peers = self.peers.lock().expect("Lock poisoned").as_vec(); let peers = self.peers.as_vec();
let msg = udpmessage::Message::Peers(peers.clone()); let msg = udpmessage::Message::Peers(peers.clone());
for addr in &peers { for addr in &peers {
try!(self.send_msg(addr, &msg)); try!(self.send_msg(addr, &msg));
} }
*next_peerlist = SteadyTime::now() + self.update_freq; self.next_peerlist = SteadyTime::now() + self.update_freq;
} }
Ok(()) Ok(())
} }
fn handle_ethernet_frame(&self, frame: ethernet::Frame) -> Result<(), Error> { fn handle_ethernet_frame(&mut self, frame: ethernet::Frame) -> Result<(), Error> {
debug!("Read ethernet frame from tap {:?}", frame); debug!("Read ethernet frame from tap {:?}", frame);
match self.mactable.lock().expect("Lock poisoned").lookup(frame.dst, frame.vlan) { match self.mactable.lookup(frame.dst, frame.vlan) {
Some(addr) => { Some(addr) => {
debug!("Found destination for {:?} (vlan {}) => {}", frame.dst, frame.vlan, addr); debug!("Found destination for {:?} (vlan {}) => {}", frame.dst, frame.vlan, addr);
try!(self.send_msg(addr, &udpmessage::Message::Frame(frame))) try!(self.send_msg(addr, &udpmessage::Message::Frame(frame)))
@ -226,7 +210,7 @@ impl EthCloud {
None => { None => {
debug!("No destination for {:?} (vlan {}) found, broadcasting", frame.dst, frame.vlan); debug!("No destination for {:?} (vlan {}) found, broadcasting", frame.dst, frame.vlan);
let msg = udpmessage::Message::Frame(frame); let msg = udpmessage::Message::Frame(frame);
for addr in &self.peers.lock().expect("Lock poisoned").as_vec() { for addr in &self.peers.as_vec() {
try!(self.send_msg(addr, &msg)); try!(self.send_msg(addr, &msg));
} }
} }
@ -234,7 +218,7 @@ impl EthCloud {
Ok(()) Ok(())
} }
fn handle_net_message(&self, peer: SocketAddr, token: Token, msg: udpmessage::Message) -> Result<(), Error> { fn handle_net_message(&mut self, peer: SocketAddr, token: Token, msg: udpmessage::Message) -> Result<(), Error> {
if token != self.token { if token != self.token {
info!("Ignoring message from {} with wrong token {}", peer, token); info!("Ignoring message from {} with wrong token {}", peer, token);
return Err(Error::WrongToken(token)); return Err(Error::WrongToken(token));
@ -242,58 +226,51 @@ impl EthCloud {
debug!("Recieved {:?} from {}", msg, peer); debug!("Recieved {:?} from {}", msg, peer);
match msg { match msg {
udpmessage::Message::Frame(frame) => { udpmessage::Message::Frame(frame) => {
let mut buffer = [0u8; 64*1024]; let size = ethernet::encode(&frame, &mut self.buffer_out);
let size = ethernet::encode(&frame, &mut buffer);
debug!("Writing ethernet frame to tap: {:?}", frame); debug!("Writing ethernet frame to tap: {:?}", frame);
match self.tapdev.lock().expect("Lock poisoned").write(&buffer[..size]) { match self.tapdev.write(&self.buffer_out[..size]) {
Ok(()) => (), Ok(()) => (),
Err(e) => { Err(e) => {
error!("Failed to send via tap device {:?}", e); error!("Failed to send via tap device {:?}", e);
return Err(Error::TapdevError("Failed to write to tap device")); return Err(Error::TapdevError("Failed to write to tap device"));
} }
} }
self.peers.lock().expect("Lock poisoned").add(&peer); self.peers.add(&peer);
self.mactable.lock().expect("Lock poisoned").learn(frame.src, frame.vlan, &peer); self.mactable.learn(frame.src, frame.vlan, &peer);
}, },
udpmessage::Message::Peers(peers) => { udpmessage::Message::Peers(peers) => {
self.peers.lock().expect("Lock poisoned").add(&peer); self.peers.add(&peer);
for p in &peers { for p in &peers {
if ! self.peers.lock().expect("Lock poisoned").contains(p) { if ! self.peers.contains(p) {
try!(self.connect(p)); try!(self.connect(p));
} }
} }
}, },
udpmessage::Message::GetPeers => { udpmessage::Message::GetPeers => {
self.peers.lock().expect("Lock poisoned").add(&peer); self.peers.add(&peer);
let peers = self.peers.lock().expect("Lock poisoned").as_vec(); let peers = self.peers.as_vec();
try!(self.send_msg(peer, &udpmessage::Message::Peers(peers))); try!(self.send_msg(peer, &udpmessage::Message::Peers(peers)));
}, },
udpmessage::Message::Close => self.peers.lock().expect("Lock poisoned").remove(&peer) udpmessage::Message::Close => self.peers.remove(&peer)
} }
Ok(()) Ok(())
} }
fn run_tapdev(&self) { pub fn run(&mut self) {
let mut buffer = [0u8; 64*1024]; let epoll_handle = epoll::create1(0).expect("Failed to create epoll handle");
let mut tapdev = self.tapdev.lock().expect("Lock poisoned").clone(); let socket_fd = self.socket.as_raw_fd();
let tapdev_fd = self.tapdev.as_raw_fd();
let mut socket_event = epoll::EpollEvent{events: epoll::util::event_type::EPOLLIN, data: 0};
let mut tapdev_event = epoll::EpollEvent{events: epoll::util::event_type::EPOLLIN, data: 1};
epoll::ctl(epoll_handle, epoll::util::ctl_op::ADD, socket_fd, &mut socket_event).expect("Failed to add socket to epoll handle");
epoll::ctl(epoll_handle, epoll::util::ctl_op::ADD, tapdev_fd, &mut tapdev_event).expect("Failed to add tapdev to epoll handle");
let mut events = [epoll::EpollEvent{events: 0, data: 0}; 2];
let mut buffer = [0; 64*1024];
loop { loop {
match tapdev.read(&mut buffer) { let count = epoll::wait(epoll_handle, &mut events, 1000).expect("Epoll wait failed");
Ok(size) => { for i in 0..count {
match ethernet::decode(&mut buffer[..size]).and_then(|frame| self.handle_ethernet_frame(frame)) { match &events[i as usize].data {
Ok(_) => (), &0 => match self.socket.recv_from(&mut buffer) {
Err(e) => error!("Error: {:?}", e)
}
},
Err(_error) => panic!("Failed to read from tap device")
}
}
}
fn run_socket(&self) {
let mut buffer = [0u8; 64*1024];
let socket = self.socket.lock().expect("Lock poisoned").try_clone().expect("Failed to clone socket");
loop {
match socket.recv_from(&mut buffer) {
Ok((size, src)) => { Ok((size, src)) => {
match udpmessage::decode(&buffer[..size]).and_then(|(token, msg)| self.handle_net_message(src, token, msg)) { match udpmessage::decode(&buffer[..size]).and_then(|(token, msg)| self.handle_net_message(src, token, msg)) {
Ok(_) => (), Ok(_) => (),
@ -301,36 +278,23 @@ impl EthCloud {
} }
}, },
Err(_error) => panic!("Failed to read from network socket") Err(_error) => panic!("Failed to read from network socket")
},
&1 => match self.tapdev.read(&mut buffer) {
Ok(size) => {
match ethernet::decode(&mut buffer[..size]).and_then(|frame| self.handle_ethernet_frame(frame)) {
Ok(_) => (),
Err(e) => error!("Error: {:?}", e)
}
},
Err(_error) => panic!("Failed to read from tap device")
},
_ => unreachable!()
} }
} }
} }
pub fn run(&self) {
let clone = self.clone();
thread::spawn(move || {
clone.run_socket()
});
let clone = self.clone();
thread::spawn(move || {
clone.run_tapdev()
});
while *self.running.lock().expect("Lock poisoned") {
match self.housekeep() { match self.housekeep() {
Ok(_) => (), Ok(_) => (),
Err(e) => error!("Error: {:?}", e) Err(e) => error!("Error: {:?}", e)
} }
thread::sleep(StdDuration::new(10, 0));
}
}
pub fn close(&self) {
info!("Shutting down...");
for p in self.peers.lock().expect("Lock poisoned").as_vec() {
match self.send_msg(p, &udpmessage::Message::Close) {
Ok(()) => (),
Err(e) => error!("Failed to send close message to {}: {:?}", p, e)
}
}
*self.running.lock().expect("Lock poisoned") = false;
} }
} }

View File

@ -2,6 +2,7 @@
extern crate time; extern crate time;
extern crate docopt; extern crate docopt;
extern crate rustc_serialize; extern crate rustc_serialize;
extern crate epoll;
mod util; mod util;
mod udpmessage; mod udpmessage;
@ -79,7 +80,7 @@ fn main() {
Box::new(SimpleLogger) Box::new(SimpleLogger)
}).unwrap(); }).unwrap();
debug!("Args: {:?}", args); debug!("Args: {:?}", args);
let tapcloud = EthCloud::new( let mut tapcloud = EthCloud::new(
&args.flag_device, &args.flag_device,
args.flag_listen, args.flag_listen,
args.flag_token, args.flag_token,

View File

@ -1,6 +1,6 @@
use std::fs; use std::fs;
use std::io::{Read, Write, Result as IoResult, Error as IoError}; use std::io::{Read, Write, Result as IoResult, Error as IoError};
use std::os::unix::io::{AsRawFd, FromRawFd}; use std::os::unix::io::{AsRawFd, RawFd};
extern { extern {
fn setup_tap_device(fd: i32, ifname: *mut u8) -> i32; fn setup_tap_device(fd: i32, ifname: *mut u8) -> i32;
@ -24,11 +24,6 @@ impl TapDevice {
} }
} }
pub fn clone(&self) -> TapDevice {
let fd = unsafe { fs::File::from_raw_fd(self.fd.as_raw_fd()) };
TapDevice{fd: fd, ifname: self.ifname.clone()}
}
pub fn ifname(&self) -> &str { pub fn ifname(&self) -> &str {
&self.ifname &self.ifname
} }
@ -43,3 +38,9 @@ impl TapDevice {
self.fd.write_all(buffer) self.fd.write_all(buffer)
} }
} }
impl AsRawFd for TapDevice {
fn as_raw_fd(&self) -> RawFd {
self.fd.as_raw_fd()
}
}