mirror of https://github.com/dswd/vpncloud.git
Threaded
This commit is contained in:
parent
84813fa011
commit
eaed378bb7
147
src/ethcloud.rs
147
src/ethcloud.rs
|
@ -2,12 +2,14 @@ use std::net::{SocketAddr, ToSocketAddrs};
|
|||
use std::collections::HashMap;
|
||||
use std::hash::Hasher;
|
||||
use std::net::UdpSocket;
|
||||
use std::io::{Read, ErrorKind};
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use std::io::Read;
|
||||
use std::fmt;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
use std::ops::Deref;
|
||||
use std::time::Duration as StdDuration;
|
||||
|
||||
use time::{Duration, SteadyTime};
|
||||
use libc;
|
||||
|
||||
pub use ethernet::{encode as eth_encode, decode as eth_decode, Frame as EthernetFrame};
|
||||
pub use tapdev::TapDevice;
|
||||
|
@ -135,50 +137,62 @@ impl MacTable {
|
|||
}
|
||||
|
||||
|
||||
pub struct EthCloud {
|
||||
peers: PeerList,
|
||||
mactable: MacTable,
|
||||
socket: UdpSocket,
|
||||
tapdev: TapDevice,
|
||||
pub struct EthCloudInner {
|
||||
peers: Mutex<PeerList>,
|
||||
mactable: Mutex<MacTable>,
|
||||
socket: Mutex<UdpSocket>,
|
||||
tapdev: Mutex<TapDevice>,
|
||||
token: Token,
|
||||
next_peerlist: SteadyTime,
|
||||
next_peerlist: Mutex<SteadyTime>,
|
||||
update_freq: Duration
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct EthCloud(Arc<EthCloudInner>);
|
||||
|
||||
impl Deref for EthCloud {
|
||||
type Target = EthCloudInner;
|
||||
|
||||
#[inline(always)]
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl EthCloud {
|
||||
pub fn new(device: &str, listen: String, token: Token, mac_timeout: Duration, peer_timeout: Duration) -> Self {
|
||||
let socket = match UdpSocket::bind(&listen as &str) {
|
||||
Ok(socket) => socket,
|
||||
_ => panic!("Failed to open socket")
|
||||
};
|
||||
let res: i32;
|
||||
/*let res: i32;
|
||||
unsafe {
|
||||
res = libc::fcntl(socket.as_raw_fd(), libc::consts::os::posix01::F_SETFL, libc::consts::os::extra::O_NONBLOCK);
|
||||
}
|
||||
if res != 0 {
|
||||
panic!("Failed to set socket to non-blocking");
|
||||
}
|
||||
}*/
|
||||
let tapdev = match TapDevice::new(device) {
|
||||
Ok(tapdev) => tapdev,
|
||||
_ => panic!("Failed to open tap device")
|
||||
};
|
||||
info!("Opened tap device {}", tapdev.ifname());
|
||||
EthCloud{
|
||||
peers: PeerList::new(peer_timeout),
|
||||
mactable: MacTable::new(mac_timeout),
|
||||
socket: socket,
|
||||
tapdev: tapdev,
|
||||
EthCloud(Arc::new(EthCloudInner{
|
||||
peers: Mutex::new(PeerList::new(peer_timeout)),
|
||||
mactable: Mutex::new(MacTable::new(mac_timeout)),
|
||||
socket: Mutex::new(socket),
|
||||
tapdev: Mutex::new(tapdev),
|
||||
token: token,
|
||||
next_peerlist: SteadyTime::now(),
|
||||
next_peerlist: Mutex::new(SteadyTime::now()),
|
||||
update_freq: peer_timeout/2
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
fn send_msg<A: ToSocketAddrs + fmt::Display>(&mut self, addr: A, msg: &UdpMessage) -> Result<(), Error> {
|
||||
fn send_msg<A: ToSocketAddrs + fmt::Display>(&self, addr: A, msg: &UdpMessage) -> Result<(), Error> {
|
||||
debug!("Sending {:?} to {}", msg, addr);
|
||||
let mut buffer = [0u8; 64*1024];
|
||||
let size = udp_encode(self.token, msg, &mut buffer);
|
||||
match self.socket.send_to(&buffer[..size], addr) {
|
||||
match self.socket.lock().expect("Lock poisoned").send_to(&buffer[..size], addr) {
|
||||
Ok(written) if written == size => Ok(()),
|
||||
Ok(_) => Err(Error::SocketError("Sent out truncated packet")),
|
||||
Err(e) => {
|
||||
|
@ -188,29 +202,30 @@ impl EthCloud {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn connect<A: ToSocketAddrs + fmt::Display>(&mut self, addr: A) -> Result<(), Error> {
|
||||
pub fn connect<A: ToSocketAddrs + fmt::Display>(&self, addr: A) -> Result<(), Error> {
|
||||
info!("Connecting to {}", addr);
|
||||
self.send_msg(addr, &UdpMessage::GetPeers)
|
||||
}
|
||||
|
||||
fn housekeep(&mut self) -> Result<(), Error> {
|
||||
self.peers.timeout();
|
||||
self.mactable.timeout();
|
||||
if self.next_peerlist <= SteadyTime::now() {
|
||||
fn housekeep(&self) -> Result<(), Error> {
|
||||
self.peers.lock().expect("Lock poisoned").timeout();
|
||||
self.mactable.lock().expect("Lock poisoned").timeout();
|
||||
let mut next_peerlist = self.next_peerlist.lock().expect("Lock poisoned");
|
||||
if *next_peerlist <= SteadyTime::now() {
|
||||
debug!("Send peer list to all peers");
|
||||
let peers = self.peers.as_vec();
|
||||
let msg = UdpMessage::Peers(peers);
|
||||
for addr in &self.peers.as_vec() {
|
||||
let peers = self.peers.lock().expect("Lock poisoned").as_vec();
|
||||
let msg = UdpMessage::Peers(peers.clone());
|
||||
for addr in &peers {
|
||||
try!(self.send_msg(addr, &msg));
|
||||
}
|
||||
self.next_peerlist = SteadyTime::now() + self.update_freq;
|
||||
*next_peerlist = SteadyTime::now() + self.update_freq;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_ethernet_frame(&mut self, frame: EthernetFrame) -> Result<(), Error> {
|
||||
fn handle_ethernet_frame(&self, frame: EthernetFrame) -> Result<(), Error> {
|
||||
debug!("Read ethernet frame from tap {:?}", frame);
|
||||
match self.mactable.lookup(frame.dst, frame.vlan) {
|
||||
match self.mactable.lock().expect("Lock poisoned").lookup(frame.dst, frame.vlan) {
|
||||
Some(addr) => {
|
||||
debug!("Found destination for {:?} (vlan {}) => {}", frame.dst, frame.vlan, addr);
|
||||
try!(self.send_msg(addr, &UdpMessage::Frame(frame)))
|
||||
|
@ -218,7 +233,7 @@ impl EthCloud {
|
|||
None => {
|
||||
debug!("No destination for {:?} (vlan {}) found, broadcasting", frame.dst, frame.vlan);
|
||||
let msg = UdpMessage::Frame(frame);
|
||||
for addr in &self.peers.as_vec() {
|
||||
for addr in &self.peers.lock().expect("Lock poisoned").as_vec() {
|
||||
try!(self.send_msg(addr, &msg));
|
||||
}
|
||||
}
|
||||
|
@ -226,7 +241,7 @@ impl EthCloud {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_net_message(&mut self, peer: SocketAddr, token: Token, msg: UdpMessage) -> Result<(), Error> {
|
||||
fn handle_net_message(&self, peer: SocketAddr, token: Token, msg: UdpMessage) -> Result<(), Error> {
|
||||
if token != self.token {
|
||||
info!("Ignoring message from {} with wrong token {}", peer, token);
|
||||
return Err(Error::WrongToken(token));
|
||||
|
@ -234,12 +249,12 @@ impl EthCloud {
|
|||
debug!("Recieved {:?} from {}", msg, peer);
|
||||
match msg {
|
||||
UdpMessage::Frame(frame) => {
|
||||
self.peers.add(&peer);
|
||||
self.mactable.learn(frame.src, frame.vlan, &peer);
|
||||
self.peers.lock().expect("Lock poisoned").add(&peer);
|
||||
self.mactable.lock().expect("Lock poisoned").learn(frame.src, frame.vlan, &peer);
|
||||
let mut buffer = [0u8; 64*1024];
|
||||
let size = eth_encode(&frame, &mut buffer);
|
||||
debug!("Writing ethernet frame to tap: {:?}", frame);
|
||||
match self.tapdev.write(&buffer[..size]) {
|
||||
match self.tapdev.lock().expect("Lock poisoned").write(&buffer[..size]) {
|
||||
Ok(()) => (),
|
||||
Err(e) => {
|
||||
error!("Failed to send via tap device {:?}", e);
|
||||
|
@ -248,54 +263,70 @@ impl EthCloud {
|
|||
}
|
||||
},
|
||||
UdpMessage::Peers(peers) => {
|
||||
self.peers.add(&peer);
|
||||
self.peers.lock().expect("Lock poisoned").add(&peer);
|
||||
for p in &peers {
|
||||
if ! self.peers.contains(p) {
|
||||
if ! self.peers.lock().expect("Lock poisoned").contains(p) {
|
||||
try!(self.connect(p));
|
||||
}
|
||||
}
|
||||
},
|
||||
UdpMessage::GetPeers => {
|
||||
self.peers.add(&peer);
|
||||
let peers = self.peers.as_vec();
|
||||
self.peers.lock().expect("Lock poisoned").add(&peer);
|
||||
let peers = self.peers.lock().expect("Lock poisoned").as_vec();
|
||||
try!(self.send_msg(peer, &UdpMessage::Peers(peers)));
|
||||
},
|
||||
UdpMessage::Close => self.peers.remove(&peer)
|
||||
UdpMessage::Close => self.peers.lock().expect("Lock poisoned").remove(&peer)
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn run(&mut self) {
|
||||
fn run_tapdev(&self) {
|
||||
let mut buffer = [0u8; 64*1024];
|
||||
let mut tapdev = self.tapdev.lock().expect("Lock poisoned").clone();
|
||||
loop {
|
||||
match self.socket.recv_from(&mut buffer) {
|
||||
Ok((size, src)) => {
|
||||
match udp_decode(&buffer[..size]).and_then(|(token, msg)| self.handle_net_message(src, token, msg)) {
|
||||
Ok(_) => (),
|
||||
Err(e) => error!("Error: {:?}", e)
|
||||
}
|
||||
},
|
||||
Err(error) => match error.kind() {
|
||||
ErrorKind::WouldBlock => (),
|
||||
_ => panic!("Failed to read from network socket")
|
||||
}
|
||||
}
|
||||
match self.tapdev.read(&mut buffer) {
|
||||
match tapdev.read(&mut buffer) {
|
||||
Ok(size) => {
|
||||
match eth_decode(&buffer[..size]).and_then(|frame| self.handle_ethernet_frame(frame)) {
|
||||
Ok(_) => (),
|
||||
Err(e) => error!("Error: {:?}", e)
|
||||
}
|
||||
},
|
||||
Err(error) => match error.kind() {
|
||||
ErrorKind::WouldBlock => (),
|
||||
_ => panic!("Failed to read from tap device")
|
||||
Err(_error) => panic!("Failed to read from tap device")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn run_socket(&self) {
|
||||
let mut buffer = [0u8; 64*1024];
|
||||
let socket = self.socket.lock().expect("Lock poisoned").try_clone().expect("Failed to clone socket");
|
||||
loop {
|
||||
match socket.recv_from(&mut buffer) {
|
||||
Ok((size, src)) => {
|
||||
match udp_decode(&buffer[..size]).and_then(|(token, msg)| self.handle_net_message(src, token, msg)) {
|
||||
Ok(_) => (),
|
||||
Err(e) => error!("Error: {:?}", e)
|
||||
}
|
||||
},
|
||||
Err(_error) => panic!("Failed to read from network socket")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run(&self) {
|
||||
let clone = self.clone();
|
||||
thread::spawn(move || {
|
||||
clone.run_socket()
|
||||
});
|
||||
let clone = self.clone();
|
||||
thread::spawn(move || {
|
||||
clone.run_tapdev()
|
||||
});
|
||||
loop {
|
||||
match self.housekeep() {
|
||||
Ok(_) => (),
|
||||
Err(e) => error!("Error: {:?}", e)
|
||||
}
|
||||
thread::sleep(StdDuration::new(1, 0));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,8 +1,5 @@
|
|||
#![feature(libc)]
|
||||
|
||||
#[macro_use] extern crate log;
|
||||
extern crate time;
|
||||
extern crate libc;
|
||||
extern crate docopt;
|
||||
extern crate rustc_serialize;
|
||||
|
||||
|
@ -72,7 +69,7 @@ fn main() {
|
|||
Box::new(SimpleLogger)
|
||||
}).unwrap();
|
||||
debug!("Args: {:?}", args);
|
||||
let mut tapcloud = EthCloud::new(
|
||||
let tapcloud = EthCloud::new(
|
||||
&args.flag_device,
|
||||
args.flag_listen,
|
||||
args.flag_token,
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
use std::fs;
|
||||
use std::io::{Read, Write, Result as IoResult, Error as IoError};
|
||||
use std::os::unix::io::AsRawFd;
|
||||
|
||||
use libc;
|
||||
use std::os::unix::io::{AsRawFd, FromRawFd};
|
||||
|
||||
extern {
|
||||
fn setup_tap_device(fd: i32, ifname: *mut u8) -> i32;
|
||||
|
@ -20,19 +18,17 @@ impl TapDevice {
|
|||
ifname_string.push_str(ifname);
|
||||
ifname_string.push('\0');
|
||||
let mut ifname_c = ifname_string.into_bytes();
|
||||
let mut res: i32;
|
||||
unsafe {
|
||||
res = setup_tap_device(fd.as_raw_fd(), ifname_c.as_mut_ptr());
|
||||
if res == 0 {
|
||||
res = libc::fcntl(fd.as_raw_fd(), libc::consts::os::posix01::F_SETFL, libc::consts::os::extra::O_NONBLOCK);
|
||||
}
|
||||
}
|
||||
match res {
|
||||
match unsafe { setup_tap_device(fd.as_raw_fd(), ifname_c.as_mut_ptr()) } {
|
||||
0 => Ok(TapDevice{fd: fd, ifname: String::from_utf8(ifname_c).unwrap()}),
|
||||
_ => Err(IoError::last_os_error())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn clone(&self) -> TapDevice {
|
||||
let fd = unsafe { fs::File::from_raw_fd(self.fd.as_raw_fd()) };
|
||||
TapDevice{fd: fd, ifname: self.ifname.clone()}
|
||||
}
|
||||
|
||||
pub fn ifname(&self) -> &str {
|
||||
&self.ifname
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue