diff --git a/.gitignore b/.gitignore index 2bb746f..f7fbbcb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ target Cargo.lock +ethcloud-* ._* diff --git a/Cargo.toml b/Cargo.toml index 1548432..d57e6e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,8 +8,8 @@ build = "build.rs" time = "0.1" docopt = "0.6" rustc-serialize = "0.3" -log = "*" -env_logger = "*" +log = "0.3" +epoll = "0.2" [build-dependencies] gcc = "0.3" diff --git a/benches/peerlist.rs b/benches/peerlist.rs new file mode 100644 index 0000000..29322ad --- /dev/null +++ b/benches/peerlist.rs @@ -0,0 +1,118 @@ +#![feature(test)] +extern crate test; +extern crate time; + +use std::net::{SocketAddr, ToSocketAddrs, SocketAddrV4, Ipv4Addr}; +use std::collections::HashMap; + +use time::{Duration, SteadyTime}; + +use test::Bencher; + +struct PeerListHashMap { + timeout: Duration, + peers: HashMap +} + +impl PeerListHashMap { + fn new(timeout: Duration) -> Self { + PeerListHashMap{peers: HashMap::new(), timeout: timeout} + } + + fn add(&mut self, addr: &SocketAddr) { + if self.peers.insert(*addr, SteadyTime::now()+self.timeout).is_none() { + } + } +} + +struct PeerListVec { + timeout: Duration, + peers: Vec<(SocketAddr, SteadyTime)> +} + +impl PeerListVec { + fn new(timeout: Duration) -> Self { + PeerListVec{peers: Vec::new(), timeout: timeout} + } + + fn add(&mut self, addr: &SocketAddr) { + for &(ref peer, ref timeout) in &self.peers { + if peer == addr { + return; + } + } + self.peers.push((*addr, SteadyTime::now()+self.timeout)); + } +} + +fn bench_hashmap_add_n(b: &mut Bencher, n: u16) { + let mut peers = PeerListHashMap::new(Duration::seconds(60)); + for i in 0..n { + peers.add(&SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 1), i))); + } + let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)); + b.iter(|| { + peers.add(&addr) + }) +} + +#[bench] +fn bench_hashmap_add_0(b: &mut Bencher) { + bench_hashmap_add_n(b, 0); +} + +#[bench] +fn bench_hashmap_add_1(b: &mut Bencher) { + bench_hashmap_add_n(b, 1); +} + +#[bench] +fn bench_hashmap_add_10(b: &mut Bencher) { + bench_hashmap_add_n(b, 10); +} + +#[bench] +fn bench_hashmap_add_100(b: &mut Bencher) { + bench_hashmap_add_n(b, 100); +} + +#[bench] +fn bench_hashmap_add_1000(b: &mut Bencher) { + bench_hashmap_add_n(b, 1000); +} + +fn bench_vec_add_n(b: &mut Bencher, n: u16) { + let mut peers = PeerListVec::new(Duration::seconds(60)); + for i in 0..n { + peers.add(&SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 1), i))); + } + let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)); + b.iter(|| { + peers.add(&addr) + }) +} + +#[bench] +fn bench_vec_add_0(b: &mut Bencher) { + bench_vec_add_n(b, 0); +} + +#[bench] +fn bench_vec_add_1(b: &mut Bencher) { + bench_vec_add_n(b, 1); +} + +#[bench] +fn bench_vec_add_10(b: &mut Bencher) { + bench_vec_add_n(b, 10); +} + +#[bench] +fn bench_vec_add_100(b: &mut Bencher) { + bench_vec_add_n(b, 100); +} + +#[bench] +fn bench_vec_add_1000(b: &mut Bencher) { + bench_vec_add_n(b, 1000); +} diff --git a/src/ethcloud.rs b/src/ethcloud.rs index 4d7ea27..4abdc08 100644 --- a/src/ethcloud.rs +++ b/src/ethcloud.rs @@ -4,12 +4,10 @@ use std::hash::Hasher; use std::net::UdpSocket; use std::io::Read; use std::fmt; -use std::sync::{Arc, Mutex}; -use std::thread; -use std::ops::Deref; -use std::time::Duration as StdDuration; +use std::os::unix::io::AsRawFd; use time::{Duration, SteadyTime}; +use epoll; use super::{ethernet, udpmessage}; use super::tapdev::TapDevice; @@ -135,27 +133,15 @@ impl MacTable { } } -pub struct EthCloudInner { - peers: Mutex, - mactable: Mutex, - socket: Mutex, - tapdev: Mutex, +pub struct EthCloud { + peers: PeerList, + mactable: MacTable, + socket: UdpSocket, + tapdev: TapDevice, token: Token, - next_peerlist: Mutex, + next_peerlist: SteadyTime, update_freq: Duration, - running: Mutex -} - -#[derive(Clone)] -pub struct EthCloud(Arc); - -impl Deref for EthCloud { - type Target = EthCloudInner; - - #[inline(always)] - fn deref(&self) -> &Self::Target { - &self.0 - } + buffer_out: [u8; 64*1024] } impl EthCloud { @@ -169,23 +155,22 @@ impl EthCloud { _ => panic!("Failed to open tap device") }; info!("Opened tap device {}", tapdev.ifname()); - EthCloud(Arc::new(EthCloudInner{ - peers: Mutex::new(PeerList::new(peer_timeout)), - mactable: Mutex::new(MacTable::new(mac_timeout)), - socket: Mutex::new(socket), - tapdev: Mutex::new(tapdev), + EthCloud{ + peers: PeerList::new(peer_timeout), + mactable: MacTable::new(mac_timeout), + socket: socket, + tapdev: tapdev, token: token, - next_peerlist: Mutex::new(SteadyTime::now()), + next_peerlist: SteadyTime::now(), update_freq: peer_timeout/2, - running: Mutex::new(true) - })) + buffer_out: [0; 64*1024] + } } - fn send_msg(&self, addr: A, msg: &udpmessage::Message) -> Result<(), Error> { + fn send_msg(&mut self, addr: A, msg: &udpmessage::Message) -> Result<(), Error> { debug!("Sending {:?} to {}", msg, addr); - let mut buffer = [0u8; 64*1024]; - let size = udpmessage::encode(self.token, msg, &mut buffer); - match self.socket.lock().expect("Lock poisoned").send_to(&buffer[..size], addr) { + let size = udpmessage::encode(self.token, msg, &mut self.buffer_out); + match self.socket.send_to(&self.buffer_out[..size], addr) { Ok(written) if written == size => Ok(()), Ok(_) => Err(Error::SocketError("Sent out truncated packet")), Err(e) => { @@ -195,30 +180,29 @@ impl EthCloud { } } - pub fn connect(&self, addr: A) -> Result<(), Error> { + pub fn connect(&mut self, addr: A) -> Result<(), Error> { info!("Connecting to {}", addr); self.send_msg(addr, &udpmessage::Message::GetPeers) } - fn housekeep(&self) -> Result<(), Error> { - self.peers.lock().expect("Lock poisoned").timeout(); - self.mactable.lock().expect("Lock poisoned").timeout(); - let mut next_peerlist = self.next_peerlist.lock().expect("Lock poisoned"); - if *next_peerlist <= SteadyTime::now() { + fn housekeep(&mut self) -> Result<(), Error> { + self.peers.timeout(); + self.mactable.timeout(); + if self.next_peerlist <= SteadyTime::now() { debug!("Send peer list to all peers"); - let peers = self.peers.lock().expect("Lock poisoned").as_vec(); + let peers = self.peers.as_vec(); let msg = udpmessage::Message::Peers(peers.clone()); for addr in &peers { try!(self.send_msg(addr, &msg)); } - *next_peerlist = SteadyTime::now() + self.update_freq; + self.next_peerlist = SteadyTime::now() + self.update_freq; } Ok(()) } - fn handle_ethernet_frame(&self, frame: ethernet::Frame) -> Result<(), Error> { + fn handle_ethernet_frame(&mut self, frame: ethernet::Frame) -> Result<(), Error> { debug!("Read ethernet frame from tap {:?}", frame); - match self.mactable.lock().expect("Lock poisoned").lookup(frame.dst, frame.vlan) { + match self.mactable.lookup(frame.dst, frame.vlan) { Some(addr) => { debug!("Found destination for {:?} (vlan {}) => {}", frame.dst, frame.vlan, addr); try!(self.send_msg(addr, &udpmessage::Message::Frame(frame))) @@ -226,7 +210,7 @@ impl EthCloud { None => { debug!("No destination for {:?} (vlan {}) found, broadcasting", frame.dst, frame.vlan); let msg = udpmessage::Message::Frame(frame); - for addr in &self.peers.lock().expect("Lock poisoned").as_vec() { + for addr in &self.peers.as_vec() { try!(self.send_msg(addr, &msg)); } } @@ -234,7 +218,7 @@ impl EthCloud { Ok(()) } - fn handle_net_message(&self, peer: SocketAddr, token: Token, msg: udpmessage::Message) -> Result<(), Error> { + fn handle_net_message(&mut self, peer: SocketAddr, token: Token, msg: udpmessage::Message) -> Result<(), Error> { if token != self.token { info!("Ignoring message from {} with wrong token {}", peer, token); return Err(Error::WrongToken(token)); @@ -242,95 +226,75 @@ impl EthCloud { debug!("Recieved {:?} from {}", msg, peer); match msg { udpmessage::Message::Frame(frame) => { - let mut buffer = [0u8; 64*1024]; - let size = ethernet::encode(&frame, &mut buffer); + let size = ethernet::encode(&frame, &mut self.buffer_out); debug!("Writing ethernet frame to tap: {:?}", frame); - match self.tapdev.lock().expect("Lock poisoned").write(&buffer[..size]) { + match self.tapdev.write(&self.buffer_out[..size]) { Ok(()) => (), Err(e) => { error!("Failed to send via tap device {:?}", e); return Err(Error::TapdevError("Failed to write to tap device")); } } - self.peers.lock().expect("Lock poisoned").add(&peer); - self.mactable.lock().expect("Lock poisoned").learn(frame.src, frame.vlan, &peer); + self.peers.add(&peer); + self.mactable.learn(frame.src, frame.vlan, &peer); }, udpmessage::Message::Peers(peers) => { - self.peers.lock().expect("Lock poisoned").add(&peer); + self.peers.add(&peer); for p in &peers { - if ! self.peers.lock().expect("Lock poisoned").contains(p) { + if ! self.peers.contains(p) { try!(self.connect(p)); } } }, udpmessage::Message::GetPeers => { - self.peers.lock().expect("Lock poisoned").add(&peer); - let peers = self.peers.lock().expect("Lock poisoned").as_vec(); + self.peers.add(&peer); + let peers = self.peers.as_vec(); try!(self.send_msg(peer, &udpmessage::Message::Peers(peers))); }, - udpmessage::Message::Close => self.peers.lock().expect("Lock poisoned").remove(&peer) + udpmessage::Message::Close => self.peers.remove(&peer) } Ok(()) } - fn run_tapdev(&self) { - let mut buffer = [0u8; 64*1024]; - let mut tapdev = self.tapdev.lock().expect("Lock poisoned").clone(); + pub fn run(&mut self) { + let epoll_handle = epoll::create1(0).expect("Failed to create epoll handle"); + let socket_fd = self.socket.as_raw_fd(); + let tapdev_fd = self.tapdev.as_raw_fd(); + let mut socket_event = epoll::EpollEvent{events: epoll::util::event_type::EPOLLIN, data: 0}; + let mut tapdev_event = epoll::EpollEvent{events: epoll::util::event_type::EPOLLIN, data: 1}; + epoll::ctl(epoll_handle, epoll::util::ctl_op::ADD, socket_fd, &mut socket_event).expect("Failed to add socket to epoll handle"); + epoll::ctl(epoll_handle, epoll::util::ctl_op::ADD, tapdev_fd, &mut tapdev_event).expect("Failed to add tapdev to epoll handle"); + let mut events = [epoll::EpollEvent{events: 0, data: 0}; 2]; + let mut buffer = [0; 64*1024]; loop { - match tapdev.read(&mut buffer) { - Ok(size) => { - match ethernet::decode(&mut buffer[..size]).and_then(|frame| self.handle_ethernet_frame(frame)) { - Ok(_) => (), - Err(e) => error!("Error: {:?}", e) - } - }, - Err(_error) => panic!("Failed to read from tap device") + let count = epoll::wait(epoll_handle, &mut events, 1000).expect("Epoll wait failed"); + for i in 0..count { + match &events[i as usize].data { + &0 => match self.socket.recv_from(&mut buffer) { + Ok((size, src)) => { + match udpmessage::decode(&buffer[..size]).and_then(|(token, msg)| self.handle_net_message(src, token, msg)) { + Ok(_) => (), + Err(e) => error!("Error: {:?}", e) + } + }, + Err(_error) => panic!("Failed to read from network socket") + }, + &1 => match self.tapdev.read(&mut buffer) { + Ok(size) => { + match ethernet::decode(&mut buffer[..size]).and_then(|frame| self.handle_ethernet_frame(frame)) { + Ok(_) => (), + Err(e) => error!("Error: {:?}", e) + } + }, + Err(_error) => panic!("Failed to read from tap device") + }, + _ => unreachable!() + } } } - } - - fn run_socket(&self) { - let mut buffer = [0u8; 64*1024]; - let socket = self.socket.lock().expect("Lock poisoned").try_clone().expect("Failed to clone socket"); - loop { - match socket.recv_from(&mut buffer) { - Ok((size, src)) => { - match udpmessage::decode(&buffer[..size]).and_then(|(token, msg)| self.handle_net_message(src, token, msg)) { - Ok(_) => (), - Err(e) => error!("Error: {:?}", e) - } - }, - Err(_error) => panic!("Failed to read from network socket") - } + match self.housekeep() { + Ok(_) => (), + Err(e) => error!("Error: {:?}", e) } } - - pub fn run(&self) { - let clone = self.clone(); - thread::spawn(move || { - clone.run_socket() - }); - let clone = self.clone(); - thread::spawn(move || { - clone.run_tapdev() - }); - while *self.running.lock().expect("Lock poisoned") { - match self.housekeep() { - Ok(_) => (), - Err(e) => error!("Error: {:?}", e) - } - thread::sleep(StdDuration::new(10, 0)); - } - } - - pub fn close(&self) { - info!("Shutting down..."); - for p in self.peers.lock().expect("Lock poisoned").as_vec() { - match self.send_msg(p, &udpmessage::Message::Close) { - Ok(()) => (), - Err(e) => error!("Failed to send close message to {}: {:?}", p, e) - } - } - *self.running.lock().expect("Lock poisoned") = false; - } } diff --git a/src/main.rs b/src/main.rs index 0155dd6..536e3f7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ extern crate time; extern crate docopt; extern crate rustc_serialize; +extern crate epoll; mod util; mod udpmessage; @@ -79,7 +80,7 @@ fn main() { Box::new(SimpleLogger) }).unwrap(); debug!("Args: {:?}", args); - let tapcloud = EthCloud::new( + let mut tapcloud = EthCloud::new( &args.flag_device, args.flag_listen, args.flag_token, diff --git a/src/tapdev.rs b/src/tapdev.rs index 4a776e3..407ac8c 100644 --- a/src/tapdev.rs +++ b/src/tapdev.rs @@ -1,6 +1,6 @@ use std::fs; use std::io::{Read, Write, Result as IoResult, Error as IoError}; -use std::os::unix::io::{AsRawFd, FromRawFd}; +use std::os::unix::io::{AsRawFd, RawFd}; extern { fn setup_tap_device(fd: i32, ifname: *mut u8) -> i32; @@ -24,11 +24,6 @@ impl TapDevice { } } - pub fn clone(&self) -> TapDevice { - let fd = unsafe { fs::File::from_raw_fd(self.fd.as_raw_fd()) }; - TapDevice{fd: fd, ifname: self.ifname.clone()} - } - pub fn ifname(&self) -> &str { &self.ifname } @@ -43,3 +38,9 @@ impl TapDevice { self.fd.write_all(buffer) } } + +impl AsRawFd for TapDevice { + fn as_raw_fd(&self) -> RawFd { + self.fd.as_raw_fd() + } +}