vpncloud/src/cloud.rs

336 lines
12 KiB
Rust
Raw Normal View History

2015-11-23 00:04:30 +00:00
use std::net::{SocketAddr, ToSocketAddrs};
2015-11-20 09:59:01 +00:00
use std::collections::HashMap;
2015-11-19 15:34:20 +00:00
use std::hash::Hasher;
use std::net::UdpSocket;
2015-11-19 16:11:59 +00:00
use std::io::Read;
2015-11-23 00:04:30 +00:00
use std::fmt;
2015-11-20 08:11:54 +00:00
use std::os::unix::io::AsRawFd;
2015-11-21 17:09:13 +00:00
use std::marker::PhantomData;
2015-11-19 15:34:20 +00:00
2015-11-20 08:11:54 +00:00
use epoll;
2015-11-25 13:31:05 +00:00
use nix::sys::signal::{SIGTERM, SIGQUIT, SIGINT};
use signal::trap::Trap;
2015-11-25 20:55:30 +00:00
use time::SteadyTime;
use rand::random;
2015-11-19 15:34:20 +00:00
use super::types::{Table, Protocol, Range, Error, NetworkId, NodeId};
2015-11-23 00:40:47 +00:00
use super::device::Device;
2015-11-22 16:28:04 +00:00
use super::udpmessage::{encode, decode, Options, Message};
2015-11-24 19:55:14 +00:00
use super::crypto::Crypto;
2015-11-25 20:55:30 +00:00
use super::util::{now, Time, Duration, time_rand};
2015-11-19 15:34:20 +00:00
struct PeerList {
timeout: Duration,
2015-11-25 20:55:30 +00:00
peers: HashMap<SocketAddr, Time>
2015-11-19 15:34:20 +00:00
}
impl PeerList {
fn new(timeout: Duration) -> PeerList {
PeerList{peers: HashMap::new(), timeout: timeout}
}
2015-11-20 11:09:07 +00:00
fn timeout(&mut self) -> Vec<SocketAddr> {
2015-11-25 20:55:30 +00:00
let now = now();
2015-11-19 15:34:20 +00:00
let mut del: Vec<SocketAddr> = Vec::new();
for (&addr, &timeout) in &self.peers {
if timeout < now {
del.push(addr);
}
}
2015-11-20 11:09:07 +00:00
for addr in &del {
2015-11-19 15:34:20 +00:00
debug!("Forgot peer: {:?}", addr);
2015-11-20 11:09:07 +00:00
self.peers.remove(addr);
2015-11-19 15:34:20 +00:00
}
2015-11-20 11:09:07 +00:00
del
2015-11-19 15:34:20 +00:00
}
2015-11-20 12:34:54 +00:00
#[inline(always)]
2015-11-19 15:34:20 +00:00
fn contains(&mut self, addr: &SocketAddr) -> bool {
self.peers.contains_key(addr)
}
2015-11-20 12:34:54 +00:00
#[inline]
2015-11-19 15:34:20 +00:00
fn add(&mut self, addr: &SocketAddr) {
2015-11-25 20:55:30 +00:00
if self.peers.insert(*addr, now()+self.timeout as Time).is_none() {
2015-11-19 15:34:20 +00:00
info!("New peer: {:?}", addr);
}
}
2015-11-20 12:34:54 +00:00
#[inline]
2015-11-19 15:34:20 +00:00
fn as_vec(&self) -> Vec<SocketAddr> {
self.peers.keys().map(|addr| *addr).collect()
}
2015-11-20 12:34:54 +00:00
#[inline(always)]
fn len(&self) -> usize {
self.peers.len()
}
#[inline]
fn subset(&self, size: usize, seed: u32) -> Vec<SocketAddr> {
let mut peers = self.as_vec();
let mut psrng = seed;
let len = peers.len();
for i in size..len {
peers.swap_remove(psrng as usize % (len - i));
psrng = ((1664525 as u64) * (psrng as u64) + (1013904223 as u64)) as u32;
}
peers
}
#[inline]
2015-11-19 15:34:20 +00:00
fn remove(&mut self, addr: &SocketAddr) {
if self.peers.remove(&addr).is_some() {
info!("Removed peer: {:?}", addr);
}
}
}
2015-11-22 16:28:04 +00:00
2015-11-23 00:40:47 +00:00
pub struct GenericCloud<P: Protocol> {
node_id: NodeId,
2015-11-20 08:11:54 +00:00
peers: PeerList,
2015-11-22 23:49:58 +00:00
addresses: Vec<Range>,
2015-11-22 21:00:34 +00:00
learning: bool,
2015-11-22 21:45:04 +00:00
broadcast: bool,
2015-11-20 11:09:07 +00:00
reconnect_peers: Vec<SocketAddr>,
blacklist_peers: Vec<SocketAddr>,
2015-11-23 00:40:47 +00:00
table: Box<Table>,
2015-11-20 08:11:54 +00:00
socket: UdpSocket,
2015-11-23 00:40:47 +00:00
device: Device,
2015-11-23 14:40:04 +00:00
options: Options,
crypto: Crypto,
2015-11-25 20:55:30 +00:00
next_peerlist: Time,
2015-11-19 19:51:53 +00:00
update_freq: Duration,
2015-11-20 09:59:01 +00:00
buffer_out: [u8; 64*1024],
2015-11-25 20:55:30 +00:00
next_housekeep: Time,
2015-11-23 00:40:47 +00:00
_dummy_p: PhantomData<P>,
2015-11-19 16:11:59 +00:00
}
2015-11-23 00:40:47 +00:00
impl<P: Protocol> GenericCloud<P> {
pub fn new(device: Device, listen: String, network_id: Option<NetworkId>, table: Box<Table>,
2015-11-23 14:40:04 +00:00
peer_timeout: Duration, learning: bool, broadcast: bool, addresses: Vec<Range>,
crypto: Crypto) -> Self {
2015-11-19 15:34:20 +00:00
let socket = match UdpSocket::bind(&listen as &str) {
Ok(socket) => socket,
_ => fail!("Failed to open socket {}", listen)
2015-11-19 15:34:20 +00:00
};
2015-11-23 14:40:04 +00:00
let mut options = Options::default();
options.network_id = network_id;
2015-11-22 16:28:04 +00:00
GenericCloud{
node_id: random(),
2015-11-20 08:11:54 +00:00
peers: PeerList::new(peer_timeout),
2015-11-22 21:00:34 +00:00
addresses: addresses,
learning: learning,
2015-11-22 21:45:04 +00:00
broadcast: broadcast,
2015-11-20 11:09:07 +00:00
reconnect_peers: Vec::new(),
blacklist_peers: Vec::new(),
2015-11-22 15:48:01 +00:00
table: table,
2015-11-20 08:11:54 +00:00
socket: socket,
2015-11-22 15:48:01 +00:00
device: device,
2015-11-23 14:40:04 +00:00
options: options,
crypto: crypto,
2015-11-25 20:55:30 +00:00
next_peerlist: now(),
2015-11-19 19:51:53 +00:00
update_freq: peer_timeout/2,
2015-11-20 09:59:01 +00:00
buffer_out: [0; 64*1024],
2015-11-25 20:55:30 +00:00
next_housekeep: now(),
2015-11-23 00:40:47 +00:00
_dummy_p: PhantomData,
2015-11-20 08:11:54 +00:00
}
2015-11-19 15:34:20 +00:00
}
2015-11-23 18:06:25 +00:00
pub fn ifname(&self) -> &str {
self.device.ifname()
}
2015-11-22 23:49:58 +00:00
fn send_msg<Addr: ToSocketAddrs+fmt::Display>(&mut self, addr: Addr, msg: &Message) -> Result<(), Error> {
2015-11-19 15:34:20 +00:00
debug!("Sending {:?} to {}", msg, addr);
2015-11-23 14:40:04 +00:00
let size = encode(&mut self.options, msg, &mut self.buffer_out, &mut self.crypto);
2015-11-20 08:11:54 +00:00
match self.socket.send_to(&self.buffer_out[..size], addr) {
2015-11-19 15:34:20 +00:00
Ok(written) if written == size => Ok(()),
Ok(_) => Err(Error::SocketError("Sent out truncated packet")),
Err(e) => {
error!("Failed to send via network {:?}", e);
Err(Error::SocketError("IOError when sending"))
}
}
}
2015-11-21 17:09:13 +00:00
pub fn connect<Addr: ToSocketAddrs+fmt::Display>(&mut self, addr: Addr, reconnect: bool) -> Result<(), Error> {
2015-11-20 11:09:07 +00:00
if let Ok(mut addrs) = addr.to_socket_addrs() {
while let Some(addr) = addrs.next() {
if self.peers.contains(&addr) || self.blacklist_peers.contains(&addr) {
2015-11-20 11:09:07 +00:00
return Ok(());
}
}
}
2015-11-23 10:55:37 +00:00
debug!("Connecting to {}", addr);
2015-11-20 11:09:07 +00:00
if reconnect {
let addr = addr.to_socket_addrs().unwrap().next().unwrap();
self.reconnect_peers.push(addr);
}
2015-11-22 21:00:34 +00:00
let addrs = self.addresses.clone();
let node_id = self.node_id.clone();
self.send_msg(addr, &Message::Init(0, node_id, addrs))
2015-11-19 15:34:20 +00:00
}
2015-11-20 08:11:54 +00:00
fn housekeep(&mut self) -> Result<(), Error> {
self.peers.timeout();
2015-11-21 17:09:13 +00:00
self.table.housekeep();
2015-11-25 20:55:30 +00:00
if self.next_peerlist <= now() {
2015-11-19 15:34:20 +00:00
debug!("Send peer list to all peers");
2015-11-20 12:34:54 +00:00
let mut peer_num = self.peers.len();
if peer_num > 10 {
peer_num = (peer_num as f32).sqrt().ceil() as usize;
if peer_num < 10 {
peer_num = 10;
}
}
2015-11-25 20:55:30 +00:00
let peers = self.peers.subset(peer_num, time_rand() as u32);
2015-11-20 17:09:51 +00:00
let msg = Message::Peers(peers);
for addr in &self.peers.as_vec() {
2015-11-19 15:34:20 +00:00
try!(self.send_msg(addr, &msg));
}
2015-11-25 20:55:30 +00:00
self.next_peerlist = now() + self.update_freq as Time;
2015-11-19 15:34:20 +00:00
}
2015-11-20 11:09:07 +00:00
for addr in self.reconnect_peers.clone() {
try!(self.connect(addr, false));
}
2015-11-19 15:34:20 +00:00
Ok(())
}
2015-11-22 17:05:15 +00:00
fn handle_interface_data(&mut self, payload: &[u8]) -> Result<(), Error> {
2015-11-22 23:49:58 +00:00
let (src, dst) = try!(P::parse(payload));
2015-11-26 21:16:51 +00:00
debug!("Read data from interface: src: {}, dst: {}, {} bytes", src, dst, payload.len());
2015-11-22 17:05:15 +00:00
match self.table.lookup(&dst) {
2015-11-19 15:34:20 +00:00
Some(addr) => {
2015-11-26 21:16:51 +00:00
debug!("Found destination for {} => {}", dst, addr);
2015-11-22 21:00:34 +00:00
if self.peers.contains(&addr) {
try!(self.send_msg(addr, &Message::Data(payload)))
} else {
2015-11-26 21:16:51 +00:00
warn!("Destination for {} not found in peers: {}", dst, addr);
2015-11-22 21:00:34 +00:00
}
2015-11-19 15:34:20 +00:00
},
None => {
2015-11-22 21:45:04 +00:00
if !self.broadcast {
2015-11-26 21:16:51 +00:00
debug!("No destination for {} found, dropping", dst);
2015-11-22 21:45:04 +00:00
return Ok(());
}
2015-11-26 21:16:51 +00:00
debug!("No destination for {} found, broadcasting", dst);
2015-11-22 21:00:34 +00:00
let msg = Message::Data(payload);
2015-11-20 08:11:54 +00:00
for addr in &self.peers.as_vec() {
2015-11-19 15:34:20 +00:00
try!(self.send_msg(addr, &msg));
}
}
}
Ok(())
}
2015-11-22 23:49:58 +00:00
fn handle_net_message(&mut self, peer: SocketAddr, options: Options, msg: Message) -> Result<(), Error> {
2015-12-02 13:16:13 +00:00
if self.options.network_id != options.network_id {
info!("Ignoring message from {} with wrong token {:?}", peer, options.network_id);
return Err(Error::WrongNetwork(options.network_id));
2015-11-19 15:34:20 +00:00
}
debug!("Received {:?} from {}", msg, peer);
2015-11-19 15:34:20 +00:00
match msg {
2015-11-22 21:00:34 +00:00
Message::Data(payload) => {
2015-11-22 23:49:58 +00:00
let (src, _dst) = try!(P::parse(payload));
2015-11-22 17:05:15 +00:00
debug!("Writing data to device: {} bytes", payload.len());
match self.device.write(&payload) {
2015-11-19 15:34:20 +00:00
Ok(()) => (),
Err(e) => {
2015-11-26 21:16:51 +00:00
error!("Failed to send via device: {}", e);
2015-11-22 21:00:34 +00:00
return Err(Error::TunTapDevError("Failed to write to device"));
2015-11-19 15:34:20 +00:00
}
}
// not adding peer to increase performance
2015-11-22 21:00:34 +00:00
if self.learning {
2015-11-22 23:49:58 +00:00
//learn single address
self.table.learn(src, None, peer);
2015-11-22 21:00:34 +00:00
}
2015-11-19 15:34:20 +00:00
},
2015-11-20 17:09:51 +00:00
Message::Peers(peers) => {
2015-11-20 08:11:54 +00:00
self.peers.add(&peer);
2015-11-19 15:34:20 +00:00
for p in &peers {
if ! self.peers.contains(p) && ! self.blacklist_peers.contains(p) {
2015-11-20 11:09:07 +00:00
try!(self.connect(p, false));
2015-11-19 15:34:20 +00:00
}
}
},
Message::Init(stage, node_id, ranges) => {
if node_id == self.node_id {
self.blacklist_peers.push(peer);
return Ok(())
}
2015-11-20 08:11:54 +00:00
self.peers.add(&peer);
2015-11-22 23:49:58 +00:00
for range in ranges {
self.table.learn(range.base, Some(range.prefix_len), peer.clone());
2015-11-22 21:00:34 +00:00
}
2015-11-26 09:52:58 +00:00
if stage == 0 {
let peers = self.peers.as_vec();
let own_addrs = self.addresses.clone();
let own_node_id = self.node_id.clone();
try!(self.send_msg(peer, &Message::Init(stage+1, own_node_id, own_addrs)));
2015-11-26 09:52:58 +00:00
try!(self.send_msg(peer, &Message::Peers(peers)));
}
2015-11-19 15:34:20 +00:00
},
2015-11-20 17:09:51 +00:00
Message::Close => {
2015-11-20 09:59:01 +00:00
self.peers.remove(&peer);
}
2015-11-19 15:34:20 +00:00
}
Ok(())
}
2015-11-20 08:11:54 +00:00
pub fn run(&mut self) {
2015-11-25 20:55:30 +00:00
let dummy_time = SteadyTime::now();
2015-11-25 13:31:05 +00:00
let trap = Trap::trap(&[SIGINT, SIGTERM, SIGQUIT]);
let epoll_handle = try_fail!(epoll::create1(0), "Failed to create epoll handle: {}");
2015-11-20 08:11:54 +00:00
let socket_fd = self.socket.as_raw_fd();
2015-11-22 15:48:01 +00:00
let device_fd = self.device.as_raw_fd();
2015-11-20 08:11:54 +00:00
let mut socket_event = epoll::EpollEvent{events: epoll::util::event_type::EPOLLIN, data: 0};
2015-11-22 15:48:01 +00:00
let mut device_event = epoll::EpollEvent{events: epoll::util::event_type::EPOLLIN, data: 1};
try_fail!(epoll::ctl(epoll_handle, epoll::util::ctl_op::ADD, socket_fd, &mut socket_event), "Failed to add socket to epoll handle: {}");
try_fail!(epoll::ctl(epoll_handle, epoll::util::ctl_op::ADD, device_fd, &mut device_event), "Failed to add device to epoll handle: {}");
2015-11-20 08:11:54 +00:00
let mut events = [epoll::EpollEvent{events: 0, data: 0}; 2];
let mut buffer = [0; 64*1024];
2015-11-19 15:34:20 +00:00
loop {
let count = try_fail!(epoll::wait(epoll_handle, &mut events, 1000), "Epoll wait failed: {}");
2015-11-20 09:59:01 +00:00
// Process events
2015-11-20 08:11:54 +00:00
for i in 0..count {
match &events[i as usize].data {
&0 => {
let (size, src) = try_fail!(self.socket.recv_from(&mut buffer), "Failed to read from network socket: {}");
match decode(&mut buffer[..size], &mut self.crypto).and_then(|(options, msg)| self.handle_net_message(src, options, msg)) {
Ok(_) => (),
2015-11-30 22:04:24 +00:00
Err(e) => error!("Error: {}, from: {}", e, src)
}
2015-11-20 08:11:54 +00:00
},
&1 => {
let size = try_fail!(self.device.read(&mut buffer), "Failed to read from tap device: {}");
match self.handle_interface_data(&buffer[..size]) {
2015-11-22 15:48:01 +00:00
Ok(_) => (),
2015-11-26 21:16:51 +00:00
Err(e) => error!("Error: {}", e)
}
2015-11-20 08:11:54 +00:00
},
_ => unreachable!()
}
2015-11-19 15:34:20 +00:00
}
2015-11-25 20:55:30 +00:00
if self.next_housekeep < now() {
// Check for signals
if trap.wait(dummy_time).is_some() {
break;
}
// Do the housekeeping
2015-11-20 09:59:01 +00:00
match self.housekeep() {
Ok(_) => (),
2015-11-26 21:16:51 +00:00
Err(e) => error!("Error: {}", e)
2015-11-20 09:59:01 +00:00
}
2015-11-25 20:55:30 +00:00
self.next_housekeep = now() + 1
2015-11-20 09:59:01 +00:00
}
2015-11-19 19:51:53 +00:00
}
2015-11-25 13:31:05 +00:00
info!("Shutting down...");
for p in &self.peers.as_vec() {
let _ = self.send_msg(p, &Message::Close);
}
2015-11-19 19:51:53 +00:00
}
2015-11-19 15:34:20 +00:00
}