From eaed378bb77b04a98ec95286c045674dfa73034a Mon Sep 17 00:00:00 2001 From: Dennis Schwerdel Date: Thu, 19 Nov 2015 17:11:59 +0100 Subject: [PATCH] Threaded --- src/ethcloud.rs | 149 +++++++++++++++++++++++++++++------------------- src/main.rs | 5 +- src/tapdev.rs | 18 +++--- 3 files changed, 98 insertions(+), 74 deletions(-) diff --git a/src/ethcloud.rs b/src/ethcloud.rs index 4f8ca4a..59ef28a 100644 --- a/src/ethcloud.rs +++ b/src/ethcloud.rs @@ -2,12 +2,14 @@ use std::net::{SocketAddr, ToSocketAddrs}; use std::collections::HashMap; use std::hash::Hasher; use std::net::UdpSocket; -use std::io::{Read, ErrorKind}; -use std::os::unix::io::AsRawFd; +use std::io::Read; use std::fmt; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::ops::Deref; +use std::time::Duration as StdDuration; use time::{Duration, SteadyTime}; -use libc; pub use ethernet::{encode as eth_encode, decode as eth_decode, Frame as EthernetFrame}; pub use tapdev::TapDevice; @@ -135,50 +137,62 @@ impl MacTable { } -pub struct EthCloud { - peers: PeerList, - mactable: MacTable, - socket: UdpSocket, - tapdev: TapDevice, +pub struct EthCloudInner { + peers: Mutex, + mactable: Mutex, + socket: Mutex, + tapdev: Mutex, token: Token, - next_peerlist: SteadyTime, + next_peerlist: Mutex, update_freq: Duration } +#[derive(Clone)] +pub struct EthCloud(Arc); + +impl Deref for EthCloud { + type Target = EthCloudInner; + + #[inline(always)] + fn deref(&self) -> &Self::Target { + &self.0 + } +} + impl EthCloud { pub fn new(device: &str, listen: String, token: Token, mac_timeout: Duration, peer_timeout: Duration) -> Self { let socket = match UdpSocket::bind(&listen as &str) { Ok(socket) => socket, _ => panic!("Failed to open socket") }; - let res: i32; + /*let res: i32; unsafe { res = libc::fcntl(socket.as_raw_fd(), libc::consts::os::posix01::F_SETFL, libc::consts::os::extra::O_NONBLOCK); } if res != 0 { panic!("Failed to set socket to non-blocking"); - } + }*/ let tapdev = match TapDevice::new(device) { Ok(tapdev) => tapdev, _ => panic!("Failed to open tap device") }; info!("Opened tap device {}", tapdev.ifname()); - EthCloud{ - peers: PeerList::new(peer_timeout), - mactable: MacTable::new(mac_timeout), - socket: socket, - tapdev: tapdev, + EthCloud(Arc::new(EthCloudInner{ + peers: Mutex::new(PeerList::new(peer_timeout)), + mactable: Mutex::new(MacTable::new(mac_timeout)), + socket: Mutex::new(socket), + tapdev: Mutex::new(tapdev), token: token, - next_peerlist: SteadyTime::now(), + next_peerlist: Mutex::new(SteadyTime::now()), update_freq: peer_timeout/2 - } + })) } - fn send_msg(&mut self, addr: A, msg: &UdpMessage) -> Result<(), Error> { + fn send_msg(&self, addr: A, msg: &UdpMessage) -> Result<(), Error> { debug!("Sending {:?} to {}", msg, addr); let mut buffer = [0u8; 64*1024]; let size = udp_encode(self.token, msg, &mut buffer); - match self.socket.send_to(&buffer[..size], addr) { + match self.socket.lock().expect("Lock poisoned").send_to(&buffer[..size], addr) { Ok(written) if written == size => Ok(()), Ok(_) => Err(Error::SocketError("Sent out truncated packet")), Err(e) => { @@ -188,29 +202,30 @@ impl EthCloud { } } - pub fn connect(&mut self, addr: A) -> Result<(), Error> { + pub fn connect(&self, addr: A) -> Result<(), Error> { info!("Connecting to {}", addr); self.send_msg(addr, &UdpMessage::GetPeers) } - fn housekeep(&mut self) -> Result<(), Error> { - self.peers.timeout(); - self.mactable.timeout(); - if self.next_peerlist <= SteadyTime::now() { + fn housekeep(&self) -> Result<(), Error> { + self.peers.lock().expect("Lock poisoned").timeout(); + self.mactable.lock().expect("Lock poisoned").timeout(); + let mut next_peerlist = self.next_peerlist.lock().expect("Lock poisoned"); + if *next_peerlist <= SteadyTime::now() { debug!("Send peer list to all peers"); - let peers = self.peers.as_vec(); - let msg = UdpMessage::Peers(peers); - for addr in &self.peers.as_vec() { + let peers = self.peers.lock().expect("Lock poisoned").as_vec(); + let msg = UdpMessage::Peers(peers.clone()); + for addr in &peers { try!(self.send_msg(addr, &msg)); } - self.next_peerlist = SteadyTime::now() + self.update_freq; + *next_peerlist = SteadyTime::now() + self.update_freq; } Ok(()) } - fn handle_ethernet_frame(&mut self, frame: EthernetFrame) -> Result<(), Error> { + fn handle_ethernet_frame(&self, frame: EthernetFrame) -> Result<(), Error> { debug!("Read ethernet frame from tap {:?}", frame); - match self.mactable.lookup(frame.dst, frame.vlan) { + match self.mactable.lock().expect("Lock poisoned").lookup(frame.dst, frame.vlan) { Some(addr) => { debug!("Found destination for {:?} (vlan {}) => {}", frame.dst, frame.vlan, addr); try!(self.send_msg(addr, &UdpMessage::Frame(frame))) @@ -218,7 +233,7 @@ impl EthCloud { None => { debug!("No destination for {:?} (vlan {}) found, broadcasting", frame.dst, frame.vlan); let msg = UdpMessage::Frame(frame); - for addr in &self.peers.as_vec() { + for addr in &self.peers.lock().expect("Lock poisoned").as_vec() { try!(self.send_msg(addr, &msg)); } } @@ -226,7 +241,7 @@ impl EthCloud { Ok(()) } - fn handle_net_message(&mut self, peer: SocketAddr, token: Token, msg: UdpMessage) -> Result<(), Error> { + fn handle_net_message(&self, peer: SocketAddr, token: Token, msg: UdpMessage) -> Result<(), Error> { if token != self.token { info!("Ignoring message from {} with wrong token {}", peer, token); return Err(Error::WrongToken(token)); @@ -234,12 +249,12 @@ impl EthCloud { debug!("Recieved {:?} from {}", msg, peer); match msg { UdpMessage::Frame(frame) => { - self.peers.add(&peer); - self.mactable.learn(frame.src, frame.vlan, &peer); + self.peers.lock().expect("Lock poisoned").add(&peer); + self.mactable.lock().expect("Lock poisoned").learn(frame.src, frame.vlan, &peer); let mut buffer = [0u8; 64*1024]; let size = eth_encode(&frame, &mut buffer); debug!("Writing ethernet frame to tap: {:?}", frame); - match self.tapdev.write(&buffer[..size]) { + match self.tapdev.lock().expect("Lock poisoned").write(&buffer[..size]) { Ok(()) => (), Err(e) => { error!("Failed to send via tap device {:?}", e); @@ -248,54 +263,70 @@ impl EthCloud { } }, UdpMessage::Peers(peers) => { - self.peers.add(&peer); + self.peers.lock().expect("Lock poisoned").add(&peer); for p in &peers { - if ! self.peers.contains(p) { + if ! self.peers.lock().expect("Lock poisoned").contains(p) { try!(self.connect(p)); } } }, UdpMessage::GetPeers => { - self.peers.add(&peer); - let peers = self.peers.as_vec(); + self.peers.lock().expect("Lock poisoned").add(&peer); + let peers = self.peers.lock().expect("Lock poisoned").as_vec(); try!(self.send_msg(peer, &UdpMessage::Peers(peers))); }, - UdpMessage::Close => self.peers.remove(&peer) + UdpMessage::Close => self.peers.lock().expect("Lock poisoned").remove(&peer) } Ok(()) } - pub fn run(&mut self) { + fn run_tapdev(&self) { let mut buffer = [0u8; 64*1024]; + let mut tapdev = self.tapdev.lock().expect("Lock poisoned").clone(); loop { - match self.socket.recv_from(&mut buffer) { - Ok((size, src)) => { - match udp_decode(&buffer[..size]).and_then(|(token, msg)| self.handle_net_message(src, token, msg)) { - Ok(_) => (), - Err(e) => error!("Error: {:?}", e) - } - }, - Err(error) => match error.kind() { - ErrorKind::WouldBlock => (), - _ => panic!("Failed to read from network socket") - } - } - match self.tapdev.read(&mut buffer) { + match tapdev.read(&mut buffer) { Ok(size) => { match eth_decode(&buffer[..size]).and_then(|frame| self.handle_ethernet_frame(frame)) { Ok(_) => (), Err(e) => error!("Error: {:?}", e) } }, - Err(error) => match error.kind() { - ErrorKind::WouldBlock => (), - _ => panic!("Failed to read from tap device") - } + Err(_error) => panic!("Failed to read from tap device") } + } + } + + fn run_socket(&self) { + let mut buffer = [0u8; 64*1024]; + let socket = self.socket.lock().expect("Lock poisoned").try_clone().expect("Failed to clone socket"); + loop { + match socket.recv_from(&mut buffer) { + Ok((size, src)) => { + match udp_decode(&buffer[..size]).and_then(|(token, msg)| self.handle_net_message(src, token, msg)) { + Ok(_) => (), + Err(e) => error!("Error: {:?}", e) + } + }, + Err(_error) => panic!("Failed to read from network socket") + } + } + } + + pub fn run(&self) { + let clone = self.clone(); + thread::spawn(move || { + clone.run_socket() + }); + let clone = self.clone(); + thread::spawn(move || { + clone.run_tapdev() + }); + loop { match self.housekeep() { Ok(_) => (), Err(e) => error!("Error: {:?}", e) } + thread::sleep(StdDuration::new(1, 0)); } } } diff --git a/src/main.rs b/src/main.rs index 0985a7a..0210495 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,5 @@ -#![feature(libc)] - #[macro_use] extern crate log; extern crate time; -extern crate libc; extern crate docopt; extern crate rustc_serialize; @@ -72,7 +69,7 @@ fn main() { Box::new(SimpleLogger) }).unwrap(); debug!("Args: {:?}", args); - let mut tapcloud = EthCloud::new( + let tapcloud = EthCloud::new( &args.flag_device, args.flag_listen, args.flag_token, diff --git a/src/tapdev.rs b/src/tapdev.rs index 71b2aa8..4a776e3 100644 --- a/src/tapdev.rs +++ b/src/tapdev.rs @@ -1,8 +1,6 @@ use std::fs; use std::io::{Read, Write, Result as IoResult, Error as IoError}; -use std::os::unix::io::AsRawFd; - -use libc; +use std::os::unix::io::{AsRawFd, FromRawFd}; extern { fn setup_tap_device(fd: i32, ifname: *mut u8) -> i32; @@ -20,19 +18,17 @@ impl TapDevice { ifname_string.push_str(ifname); ifname_string.push('\0'); let mut ifname_c = ifname_string.into_bytes(); - let mut res: i32; - unsafe { - res = setup_tap_device(fd.as_raw_fd(), ifname_c.as_mut_ptr()); - if res == 0 { - res = libc::fcntl(fd.as_raw_fd(), libc::consts::os::posix01::F_SETFL, libc::consts::os::extra::O_NONBLOCK); - } - } - match res { + match unsafe { setup_tap_device(fd.as_raw_fd(), ifname_c.as_mut_ptr()) } { 0 => Ok(TapDevice{fd: fd, ifname: String::from_utf8(ifname_c).unwrap()}), _ => Err(IoError::last_os_error()) } } + pub fn clone(&self) -> TapDevice { + let fd = unsafe { fs::File::from_raw_fd(self.fd.as_raw_fd()) }; + TapDevice{fd: fd, ifname: self.ifname.clone()} + } + pub fn ifname(&self) -> &str { &self.ifname }