vpncloud/src/ethcloud.rs

358 lines
12 KiB
Rust
Raw Normal View History

2015-11-19 15:34:20 +00:00
use std::net::{SocketAddr, ToSocketAddrs};
2015-11-20 09:59:01 +00:00
use std::collections::HashMap;
2015-11-19 15:34:20 +00:00
use std::hash::Hasher;
use std::net::UdpSocket;
2015-11-19 16:11:59 +00:00
use std::io::Read;
2015-11-19 15:34:20 +00:00
use std::fmt;
2015-11-20 08:11:54 +00:00
use std::os::unix::io::AsRawFd;
2015-11-19 15:34:20 +00:00
2015-11-20 12:34:54 +00:00
use time::{Duration, SteadyTime, precise_time_ns};
2015-11-20 08:11:54 +00:00
use epoll;
2015-11-19 15:34:20 +00:00
2015-11-19 18:29:42 +00:00
use super::{ethernet, udpmessage};
use super::tapdev::TapDevice;
2015-11-19 15:34:20 +00:00
#[derive(Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct Mac(pub [u8; 6]);
impl fmt::Debug for Mac {
fn fmt(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(formatter, "{:x}:{:x}:{:x}:{:x}:{:x}:{:x}",
self.0[0], self.0[1], self.0[2], self.0[3], self.0[4], self.0[5])
}
}
pub type Token = u64;
#[derive(Debug)]
pub enum Error {
ParseError(&'static str),
WrongToken(Token),
SocketError(&'static str),
TapdevError(&'static str),
}
struct PeerList {
timeout: Duration,
peers: HashMap<SocketAddr, SteadyTime>
}
impl PeerList {
fn new(timeout: Duration) -> PeerList {
PeerList{peers: HashMap::new(), timeout: timeout}
}
2015-11-20 11:09:07 +00:00
fn timeout(&mut self) -> Vec<SocketAddr> {
2015-11-19 15:34:20 +00:00
let now = SteadyTime::now();
let mut del: Vec<SocketAddr> = Vec::new();
for (&addr, &timeout) in &self.peers {
if timeout < now {
del.push(addr);
}
}
2015-11-20 11:09:07 +00:00
for addr in &del {
2015-11-19 15:34:20 +00:00
debug!("Forgot peer: {:?}", addr);
2015-11-20 11:09:07 +00:00
self.peers.remove(addr);
2015-11-19 15:34:20 +00:00
}
2015-11-20 11:09:07 +00:00
del
2015-11-19 15:34:20 +00:00
}
2015-11-20 12:34:54 +00:00
#[inline(always)]
2015-11-19 15:34:20 +00:00
fn contains(&mut self, addr: &SocketAddr) -> bool {
self.peers.contains_key(addr)
}
2015-11-20 12:34:54 +00:00
#[inline]
2015-11-19 15:34:20 +00:00
fn add(&mut self, addr: &SocketAddr) {
if self.peers.insert(*addr, SteadyTime::now()+self.timeout).is_none() {
info!("New peer: {:?}", addr);
}
}
2015-11-20 12:34:54 +00:00
#[inline]
2015-11-19 15:34:20 +00:00
fn as_vec(&self) -> Vec<SocketAddr> {
self.peers.keys().map(|addr| *addr).collect()
}
2015-11-20 12:34:54 +00:00
#[inline(always)]
fn len(&self) -> usize {
self.peers.len()
}
#[inline]
fn subset(&self, size: usize, seed: u32) -> Vec<SocketAddr> {
let mut peers = self.as_vec();
let mut psrng = seed;
let len = peers.len();
for i in size..len {
peers.swap_remove(psrng as usize % (len - i));
psrng = ((1664525 as u64) * (psrng as u64) + (1013904223 as u64)) as u32;
}
peers
}
#[inline]
2015-11-19 15:34:20 +00:00
fn remove(&mut self, addr: &SocketAddr) {
if self.peers.remove(&addr).is_some() {
info!("Removed peer: {:?}", addr);
}
}
}
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
struct MacTableKey {
mac: Mac,
vlan: u16
}
struct MacTableValue {
address: SocketAddr,
timeout: SteadyTime
}
struct MacTable {
table: HashMap<MacTableKey, MacTableValue>,
timeout: Duration
}
impl MacTable {
fn new(timeout: Duration) -> MacTable {
MacTable{table: HashMap::new(), timeout: timeout}
}
fn timeout(&mut self) {
let now = SteadyTime::now();
let mut del: Vec<MacTableKey> = Vec::new();
for (&key, val) in &self.table {
if val.timeout < now {
del.push(key);
}
}
for key in del {
info!("Forgot mac: {:?} (vlan {})", key.mac, key.vlan);
self.table.remove(&key);
}
}
2015-11-20 12:34:54 +00:00
#[inline]
2015-11-19 15:34:20 +00:00
fn learn(&mut self, mac: &Mac, vlan: u16, addr: &SocketAddr) {
let key = MacTableKey{mac: *mac, vlan: vlan};
let value = MacTableValue{address: *addr, timeout: SteadyTime::now()+self.timeout};
if self.table.insert(key, value).is_none() {
info!("Learned mac: {:?} (vlan {}) => {}", mac, vlan, addr);
}
}
2015-11-20 12:34:54 +00:00
#[inline]
2015-11-19 15:34:20 +00:00
fn lookup(&self, mac: &Mac, vlan: u16) -> Option<SocketAddr> {
let key = MacTableKey{mac: *mac, vlan: vlan};
match self.table.get(&key) {
Some(value) => Some(value.address),
None => None
}
}
}
2015-11-20 08:11:54 +00:00
pub struct EthCloud {
peers: PeerList,
2015-11-20 11:09:07 +00:00
reconnect_peers: Vec<SocketAddr>,
2015-11-20 08:11:54 +00:00
mactable: MacTable,
socket: UdpSocket,
tapdev: TapDevice,
2015-11-19 15:34:20 +00:00
token: Token,
2015-11-20 08:11:54 +00:00
next_peerlist: SteadyTime,
2015-11-19 19:51:53 +00:00
update_freq: Duration,
2015-11-20 09:59:01 +00:00
buffer_out: [u8; 64*1024],
2015-11-20 11:09:07 +00:00
next_housekeep: SteadyTime,
2015-11-19 16:11:59 +00:00
}
2015-11-19 15:34:20 +00:00
impl EthCloud {
pub fn new(device: &str, listen: String, token: Token, mac_timeout: Duration, peer_timeout: Duration) -> Self {
let socket = match UdpSocket::bind(&listen as &str) {
Ok(socket) => socket,
_ => panic!("Failed to open socket")
};
let tapdev = match TapDevice::new(device) {
Ok(tapdev) => tapdev,
_ => panic!("Failed to open tap device")
};
info!("Opened tap device {}", tapdev.ifname());
2015-11-20 08:11:54 +00:00
EthCloud{
peers: PeerList::new(peer_timeout),
2015-11-20 11:09:07 +00:00
reconnect_peers: Vec::new(),
2015-11-20 08:11:54 +00:00
mactable: MacTable::new(mac_timeout),
socket: socket,
tapdev: tapdev,
2015-11-19 15:34:20 +00:00
token: token,
2015-11-20 08:11:54 +00:00
next_peerlist: SteadyTime::now(),
2015-11-19 19:51:53 +00:00
update_freq: peer_timeout/2,
2015-11-20 09:59:01 +00:00
buffer_out: [0; 64*1024],
2015-11-20 11:09:07 +00:00
next_housekeep: SteadyTime::now()
2015-11-20 08:11:54 +00:00
}
2015-11-19 15:34:20 +00:00
}
2015-11-20 08:11:54 +00:00
fn send_msg<A: ToSocketAddrs + fmt::Display>(&mut self, addr: A, msg: &udpmessage::Message) -> Result<(), Error> {
2015-11-19 15:34:20 +00:00
debug!("Sending {:?} to {}", msg, addr);
2015-11-20 08:11:54 +00:00
let size = udpmessage::encode(self.token, msg, &mut self.buffer_out);
match self.socket.send_to(&self.buffer_out[..size], addr) {
2015-11-19 15:34:20 +00:00
Ok(written) if written == size => Ok(()),
Ok(_) => Err(Error::SocketError("Sent out truncated packet")),
Err(e) => {
error!("Failed to send via network {:?}", e);
Err(Error::SocketError("IOError when sending"))
}
}
}
2015-11-20 11:09:07 +00:00
pub fn connect<A: ToSocketAddrs + fmt::Display>(&mut self, addr: A, reconnect: bool) -> Result<(), Error> {
2015-11-19 15:34:20 +00:00
info!("Connecting to {}", addr);
2015-11-20 11:09:07 +00:00
if let Ok(mut addrs) = addr.to_socket_addrs() {
while let Some(addr) = addrs.next() {
if self.peers.contains(&addr) {
return Ok(());
}
}
}
if reconnect {
let addr = addr.to_socket_addrs().unwrap().next().unwrap();
self.reconnect_peers.push(addr);
}
2015-11-19 18:29:42 +00:00
self.send_msg(addr, &udpmessage::Message::GetPeers)
2015-11-19 15:34:20 +00:00
}
2015-11-20 08:11:54 +00:00
fn housekeep(&mut self) -> Result<(), Error> {
2015-11-20 09:59:01 +00:00
debug!("Running housekeeping...");
2015-11-20 08:11:54 +00:00
self.peers.timeout();
self.mactable.timeout();
if self.next_peerlist <= SteadyTime::now() {
2015-11-19 15:34:20 +00:00
debug!("Send peer list to all peers");
2015-11-20 12:34:54 +00:00
let mut peer_num = self.peers.len();
if peer_num > 10 {
peer_num = (peer_num as f32).sqrt().ceil() as usize;
if peer_num < 10 {
peer_num = 10;
}
}
let peers = self.peers.subset(peer_num, precise_time_ns() as u32);
2015-11-20 12:34:54 +00:00
let msg = udpmessage::Message::Peers(peers);
for addr in &self.peers.as_vec() {
2015-11-19 15:34:20 +00:00
try!(self.send_msg(addr, &msg));
}
2015-11-20 08:11:54 +00:00
self.next_peerlist = SteadyTime::now() + self.update_freq;
2015-11-19 15:34:20 +00:00
}
2015-11-20 11:09:07 +00:00
for addr in self.reconnect_peers.clone() {
try!(self.connect(addr, false));
}
2015-11-19 15:34:20 +00:00
Ok(())
}
2015-11-20 08:11:54 +00:00
fn handle_ethernet_frame(&mut self, frame: ethernet::Frame) -> Result<(), Error> {
2015-11-19 15:34:20 +00:00
debug!("Read ethernet frame from tap {:?}", frame);
2015-11-20 09:59:01 +00:00
match self.mactable.lookup(frame.dst, frame.vlan) {
2015-11-19 15:34:20 +00:00
Some(addr) => {
debug!("Found destination for {:?} (vlan {}) => {}", frame.dst, frame.vlan, addr);
2015-11-19 18:29:42 +00:00
try!(self.send_msg(addr, &udpmessage::Message::Frame(frame)))
2015-11-19 15:34:20 +00:00
},
None => {
debug!("No destination for {:?} (vlan {}) found, broadcasting", frame.dst, frame.vlan);
2015-11-19 18:29:42 +00:00
let msg = udpmessage::Message::Frame(frame);
2015-11-20 08:11:54 +00:00
for addr in &self.peers.as_vec() {
2015-11-19 15:34:20 +00:00
try!(self.send_msg(addr, &msg));
}
}
}
Ok(())
}
2015-11-20 08:11:54 +00:00
fn handle_net_message(&mut self, peer: SocketAddr, token: Token, msg: udpmessage::Message) -> Result<(), Error> {
2015-11-19 15:34:20 +00:00
if token != self.token {
info!("Ignoring message from {} with wrong token {}", peer, token);
return Err(Error::WrongToken(token));
}
debug!("Recieved {:?} from {}", msg, peer);
match msg {
2015-11-19 18:29:42 +00:00
udpmessage::Message::Frame(frame) => {
2015-11-20 08:11:54 +00:00
let size = ethernet::encode(&frame, &mut self.buffer_out);
2015-11-19 15:34:20 +00:00
debug!("Writing ethernet frame to tap: {:?}", frame);
2015-11-20 08:11:54 +00:00
match self.tapdev.write(&self.buffer_out[..size]) {
2015-11-19 15:34:20 +00:00
Ok(()) => (),
Err(e) => {
error!("Failed to send via tap device {:?}", e);
return Err(Error::TapdevError("Failed to write to tap device"));
}
}
2015-11-20 09:59:01 +00:00
self.peers.add(&peer);
self.mactable.learn(frame.src, frame.vlan, &peer);
2015-11-19 15:34:20 +00:00
},
2015-11-19 18:29:42 +00:00
udpmessage::Message::Peers(peers) => {
2015-11-20 08:11:54 +00:00
self.peers.add(&peer);
2015-11-19 15:34:20 +00:00
for p in &peers {
2015-11-20 08:11:54 +00:00
if ! self.peers.contains(p) {
2015-11-20 11:09:07 +00:00
try!(self.connect(p, false));
2015-11-19 15:34:20 +00:00
}
}
},
2015-11-19 18:29:42 +00:00
udpmessage::Message::GetPeers => {
2015-11-20 08:11:54 +00:00
self.peers.add(&peer);
let peers = self.peers.as_vec();
2015-11-19 18:29:42 +00:00
try!(self.send_msg(peer, &udpmessage::Message::Peers(peers)));
2015-11-19 15:34:20 +00:00
},
2015-11-20 09:59:01 +00:00
udpmessage::Message::Close => {
self.peers.remove(&peer);
}
2015-11-19 15:34:20 +00:00
}
Ok(())
}
2015-11-20 08:11:54 +00:00
pub fn run(&mut self) {
let epoll_handle = epoll::create1(0).expect("Failed to create epoll handle");
let socket_fd = self.socket.as_raw_fd();
let tapdev_fd = self.tapdev.as_raw_fd();
let mut socket_event = epoll::EpollEvent{events: epoll::util::event_type::EPOLLIN, data: 0};
let mut tapdev_event = epoll::EpollEvent{events: epoll::util::event_type::EPOLLIN, data: 1};
epoll::ctl(epoll_handle, epoll::util::ctl_op::ADD, socket_fd, &mut socket_event).expect("Failed to add socket to epoll handle");
epoll::ctl(epoll_handle, epoll::util::ctl_op::ADD, tapdev_fd, &mut tapdev_event).expect("Failed to add tapdev to epoll handle");
let mut events = [epoll::EpollEvent{events: 0, data: 0}; 2];
let mut buffer = [0; 64*1024];
2015-11-19 15:34:20 +00:00
loop {
2015-11-20 08:11:54 +00:00
let count = epoll::wait(epoll_handle, &mut events, 1000).expect("Epoll wait failed");
2015-11-20 09:59:01 +00:00
// Process events
2015-11-20 08:11:54 +00:00
for i in 0..count {
match &events[i as usize].data {
&0 => match self.socket.recv_from(&mut buffer) {
Ok((size, src)) => {
match udpmessage::decode(&buffer[..size]).and_then(|(token, msg)| self.handle_net_message(src, token, msg)) {
Ok(_) => (),
Err(e) => error!("Error: {:?}", e)
}
},
Err(_error) => panic!("Failed to read from network socket")
},
&1 => match self.tapdev.read(&mut buffer) {
Ok(size) => {
match ethernet::decode(&mut buffer[..size]).and_then(|frame| self.handle_ethernet_frame(frame)) {
Ok(_) => (),
Err(e) => error!("Error: {:?}", e)
}
},
Err(_error) => panic!("Failed to read from tap device")
},
_ => unreachable!()
}
2015-11-19 15:34:20 +00:00
}
2015-11-20 09:59:01 +00:00
// Do the housekeeping
2015-11-20 11:09:07 +00:00
if self.next_housekeep < SteadyTime::now() {
2015-11-20 09:59:01 +00:00
match self.housekeep() {
Ok(_) => (),
Err(e) => error!("Error: {:?}", e)
}
2015-11-20 11:09:07 +00:00
self.next_housekeep = SteadyTime::now() + Duration::seconds(1)
2015-11-20 09:59:01 +00:00
}
2015-11-19 19:51:53 +00:00
}
}
2015-11-19 15:34:20 +00:00
}