Preparations for tests

pull/29/head
Dennis Schwerdel 2019-02-21 22:41:36 +01:00
parent be27f79b8c
commit 9614c9bd97
8 changed files with 265 additions and 182 deletions

1
Cargo.lock generated
View File

@ -737,7 +737,6 @@ name = "vpncloud"
version = "0.9.1"
dependencies = [
"base-62 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
"cc 1.0.29 (registry+https://github.com/rust-lang/crates.io-index)",
"daemonize 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"docopt 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -22,7 +22,6 @@ libc = "0.2"
rand = "0.6"
fnv = "1"
net2 = "0.2"
bitflags = "^1"
yaml-rust = "0.4"
igd = "0.6" # Do not update, 0.7 has problems with exit by ctrl-c
siphasher = "0.3"

View File

@ -4,32 +4,28 @@
use std::net::{SocketAddr, ToSocketAddrs};
use std::collections::HashMap;
use std::net::UdpSocket;
use std::io::{self, Write};
use std::fmt;
use std::os::unix::io::AsRawFd;
use std::marker::PhantomData;
use std::hash::BuildHasherDefault;
use std::time::Instant;
use std::cmp::min;
use std::fs::{self, File, Permissions};
use std::os::unix::fs::PermissionsExt;
use fnv::FnvHasher;
use signal::{trap::Trap, Signal};
use rand::{prelude::*, random, thread_rng};
use net2::UdpBuilder;
use super::config::Config;
use super::types::{Table, Protocol, Range, Error, HeaderMagic, NodeId};
use super::device::{Device, Type};
use super::device::Device;
use super::udpmessage::{encode, decode, Message};
use super::crypto::Crypto;
use super::port_forwarding::PortForwarding;
use super::util::{now, Time, Duration, resolve};
use super::poll::{Poll, Flags};
use super::util::{now, Time, Duration, resolve, CtrlC};
use super::poll::{WaitImpl, WaitResult};
use super::traffic::TrafficStats;
use super::beacon::BeaconSerializer;
use super::net::Socket;
pub type Hash = BuildHasherDefault<FnvHasher>;
@ -202,7 +198,7 @@ pub struct ReconnectEntry {
}
pub struct GenericCloud<P: Protocol, T: Table> {
pub struct GenericCloud<P: Protocol, T: Table, S: Socket> {
config: Config,
magic: HeaderMagic,
node_id: NodeId,
@ -213,8 +209,8 @@ pub struct GenericCloud<P: Protocol, T: Table> {
reconnect_peers: Vec<ReconnectEntry>,
own_addresses: Vec<SocketAddr>,
table: T,
socket4: UdpSocket,
socket6: UdpSocket,
socket4: S,
socket6: S,
device: Device,
crypto: Crypto,
next_peerlist: Time,
@ -229,23 +225,20 @@ pub struct GenericCloud<P: Protocol, T: Table> {
_dummy_p: PhantomData<P>,
}
impl<P: Protocol, T: Table> GenericCloud<P, T> {
impl<P: Protocol, T: Table, S: Socket> GenericCloud<P, T, S> {
pub fn new(config: &Config, device: Device, table: T,
learning: bool, broadcast: bool, addresses: Vec<Range>,
crypto: Crypto, port_forwarding: Option<PortForwarding>
) -> Self {
let socket4 = match UdpBuilder::new_v4().expect("Failed to obtain ipv4 socket builder")
.reuse_address(true).expect("Failed to set so_reuseaddr").bind(("0.0.0.0", config.port)) {
let socket4 = match S::listen_v4("0.0.0.0", config.port) {
Ok(socket) => socket,
Err(err) => fail!("Failed to open ipv4 address 0.0.0.0:{}: {}", config.port, err)
};
let socket6 = match UdpBuilder::new_v6().expect("Failed to obtain ipv6 socket builder")
.only_v6(true).expect("Failed to set only_v6")
.reuse_address(true).expect("Failed to set so_reuseaddr").bind(("::", config.port)) {
let socket6 = match S::listen_v6("::", config.port) {
Ok(socket) => socket,
Err(err) => fail!("Failed to open ipv6 address ::{}: {}", config.port, err)
};
GenericCloud{
let mut res = GenericCloud{
magic: config.get_magic(),
node_id: random(),
peers: PeerList::new(config.peer_timeout),
@ -270,7 +263,9 @@ impl<P: Protocol, T: Table> GenericCloud<P, T> {
crypto,
config: config.clone(),
_dummy_p: PhantomData,
}
};
res.initialize();
return res
}
#[inline]
@ -291,11 +286,11 @@ impl<P: Protocol, T: Table> GenericCloud<P, T> {
let msg_data = encode(msg, &mut self.buffer_out, self.magic, &mut self.crypto);
for addr in self.peers.peers.keys() {
self.traffic.count_out_traffic(*addr, msg_data.len());
let socket = match *addr {
SocketAddr::V4(_) => &self.socket4,
SocketAddr::V6(_) => &self.socket6
let mut socket = match *addr {
SocketAddr::V4(_) => &mut self.socket4,
SocketAddr::V6(_) => &mut self.socket6
};
try!(match socket.send_to(msg_data, addr) {
try!(match socket.send(msg_data, *addr) {
Ok(written) if written == msg_data.len() => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet", io::Error::new(io::ErrorKind::Other, "truncated"))),
Err(e) => Err(Error::Socket("IOError when sending", e))
@ -316,10 +311,10 @@ impl<P: Protocol, T: Table> GenericCloud<P, T> {
let msg_data = encode(msg, &mut self.buffer_out, self.magic, &mut self.crypto);
self.traffic.count_out_traffic(addr, msg_data.len());
let socket = match addr {
SocketAddr::V4(_) => &self.socket4,
SocketAddr::V6(_) => &self.socket6
SocketAddr::V4(_) => &mut self.socket4,
SocketAddr::V6(_) => &mut self.socket6
};
match socket.send_to(msg_data, addr) {
match socket.send(msg_data, addr) {
Ok(written) if written == msg_data.len() => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet", io::Error::new(io::ErrorKind::Other, "truncated"))),
Err(e) => Err(Error::Socket("IOError when sending", e))
@ -335,7 +330,7 @@ impl<P: Protocol, T: Table> GenericCloud<P, T> {
/// Returns an IOError if the underlying system call fails
#[allow(dead_code)]
pub fn address(&self) -> io::Result<(SocketAddr, SocketAddr)> {
Ok((try!(self.socket4.local_addr()), try!(self.socket6.local_addr())))
Ok((try!(self.socket4.address()), try!(self.socket6.address())))
}
/// Returns the number of peers
@ -700,15 +695,7 @@ impl<P: Protocol, T: Table> GenericCloud<P, T> {
Ok(())
}
/// The main method of the node
///
/// This method will use epoll to wait in the sockets and the device at the same time.
/// It will read from the sockets, decode and decrypt the message and then call the
/// `handle_net_message` method. It will also read from the device and call
/// `handle_interface_data` for each packet read.
/// Also, this method will call `housekeep` every second.
#[allow(unknown_lints, clippy::cyclomatic_complexity)]
pub fn run(&mut self) {
fn initialize(&mut self) {
match self.address() {
Err(err) => error!("Failed to obtain local addresses: {}", err),
Ok((v4, v6)) => {
@ -716,68 +703,68 @@ impl<P: Protocol, T: Table> GenericCloud<P, T> {
self.own_addresses.push(v6);
}
}
let dummy_time = Instant::now();
let trap = Trap::trap(&[Signal::SIGINT, Signal::SIGTERM, Signal::SIGQUIT]);
let mut poll_handle = try_fail!(Poll::new(3), "Failed to create poll handle: {}");
let socket4_fd = self.socket4.as_raw_fd();
let socket6_fd = self.socket6.as_raw_fd();
let device_fd = self.device.as_raw_fd();
try_fail!(poll_handle.register(socket4_fd, Flags::READ), "Failed to add ipv4 socket to poll handle: {}");
try_fail!(poll_handle.register(socket6_fd, Flags::READ), "Failed to add ipv6 socket to poll handle: {}");
if let Err(err) = poll_handle.register(device_fd, Flags::READ) {
if self.device.get_type() != Type::Dummy {
fail!("Failed to add device to poll handle: {}", err);
} else {
warn!("Failed to add device to poll handle: {}", err);
}
}
fn handle_socket_data(&mut self, src: SocketAddr, data: &mut [u8]) {
let size = data.len();
if let Err(e) = decode(data, self.magic, &mut self.crypto).and_then(|msg| {
self.traffic.count_in_traffic(src, size);
self.handle_net_message(src, msg)
}) {
error!("Error: {}, from: {}", e, src);
}
}
fn handle_socket_v4_event(&mut self, buffer: &mut [u8]) {
let (size, src) = try_fail!(self.socket4.receive(buffer), "Failed to read from ipv4 network socket: {}");
self.handle_socket_data(src, &mut buffer[..size])
}
fn handle_socket_v6_event(&mut self, buffer: &mut [u8]) {
let (size, src) = try_fail!(self.socket6.receive(buffer), "Failed to read from ipv6 network socket: {}");
self.handle_socket_data(src, &mut buffer[..size])
}
fn handle_device_event(&mut self, buffer: &mut [u8]) {
let mut start = 64;
let (offset, size) = try_fail!(self.device.read(&mut buffer[start..]), "Failed to read from tap device: {}");
start += offset;
if let Err(e) = self.handle_interface_data(buffer, start, start+size) {
error!("Error: {}", e);
}
}
/// The main method of the node
///
/// This method will use epoll to wait in the sockets and the device at the same time.
/// It will read from the sockets, decode and decrypt the message and then call the
/// `handle_net_message` method. It will also read from the device and call
/// `handle_interface_data` for each packet read.
/// Also, this method will call `housekeep` every second.
pub fn run(&mut self) {
let ctrlc = CtrlC::new();
let waiter = try_fail!(WaitImpl::new(&self.socket4, &self.socket6, &self.device, 1000), "Failed to setup poll: {}");
let mut buffer = [0; 64*1024];
let mut poll_error = false;
loop {
let evts = match poll_handle.wait(1000) {
Ok(evts) => evts,
Err(err) => {
for evt in waiter {
match evt {
WaitResult::Error(err) => {
if poll_error {
fail!("Poll wait failed again: {}", err);
}
error!("Poll wait failed: {}, retrying...", err);
poll_error = true;
continue
}
};
for evt in evts {
match evt.fd() {
fd if (fd == socket4_fd || fd == socket6_fd) => {
let (size, src) = match evt.fd() {
fd if fd == socket4_fd => try_fail!(self.socket4.recv_from(&mut buffer), "Failed to read from ipv4 network socket: {}"),
fd if fd == socket6_fd => try_fail!(self.socket6.recv_from(&mut buffer), "Failed to read from ipv6 network socket: {}"),
_ => unreachable!()
};
if let Err(e) = decode(&mut buffer[..size], self.magic, &mut self.crypto).and_then(|msg| {
self.traffic.count_in_traffic(src, size);
self.handle_net_message(src, msg)
}) {
error!("Error: {}, from: {}", e, src);
}
},
fd if (fd == device_fd) => {
let mut start = 64;
let (offset, size) = try_fail!(self.device.read(&mut buffer[start..]), "Failed to read from tap device: {}");
start += offset;
if let Err(e) = self.handle_interface_data(&mut buffer, start, start+size) {
error!("Error: {}", e);
}
},
_ => unreachable!()
}
},
WaitResult::Timeout => {},
WaitResult::SocketV4 => self.handle_socket_v4_event(&mut buffer),
WaitResult::SocketV6 => self.handle_socket_v6_event(&mut buffer),
WaitResult::Device => self.handle_device_event(&mut buffer)
}
if self.next_housekeep < now() {
poll_error = false;
// Check for signals
if trap.wait(dummy_time).is_some() {
break;
if ctrlc.was_pressed() {
break
}
// Do the housekeeping
if let Err(e) = self.housekeep() {
error!("Error: {}", e)
}

View File

@ -5,7 +5,6 @@
#![cfg_attr(feature = "bench", feature(test))]
#[macro_use] extern crate log;
#[macro_use] extern crate bitflags;
extern crate time;
extern crate docopt;
#[macro_use] extern crate serde_derive;
@ -38,6 +37,7 @@ pub mod config;
pub mod port_forwarding;
pub mod traffic;
pub mod beacon;
pub mod net;
#[cfg(feature = "bench")] mod benches;
use docopt::Docopt;
@ -58,6 +58,7 @@ use crypto::{Crypto, CryptoMethod};
use port_forwarding::PortForwarding;
use util::Duration;
use config::Config;
use std::net::UdpSocket;
const VERSION: u8 = 1;
@ -160,8 +161,8 @@ enum AnyTable {
}
enum AnyCloud<P: Protocol> {
Switch(GenericCloud<P, SwitchTable>),
Routing(GenericCloud<P, RoutingTable>)
Switch(GenericCloud<P, SwitchTable, UdpSocket>),
Routing(GenericCloud<P, RoutingTable, UdpSocket>)
}
impl<P: Protocol> AnyCloud<P> {
@ -170,10 +171,10 @@ impl<P: Protocol> AnyCloud<P> {
learning: bool, broadcast: bool, addresses: Vec<Range>,
crypto: Crypto, port_forwarding: Option<PortForwarding>) -> Self {
match table {
AnyTable::Switch(t) => AnyCloud::Switch(GenericCloud::<P, SwitchTable>::new(
AnyTable::Switch(t) => AnyCloud::Switch(GenericCloud::<P, SwitchTable, UdpSocket>::new(
config, device,t, learning, broadcast, addresses, crypto, port_forwarding
)),
AnyTable::Routing(t) => AnyCloud::Routing(GenericCloud::<P, RoutingTable>::new(
AnyTable::Routing(t) => AnyCloud::Routing(GenericCloud::<P, RoutingTable, UdpSocket>::new(
config, device,t, learning, broadcast, addresses, crypto, port_forwarding
))
}

89
src/net.rs Normal file
View File

@ -0,0 +1,89 @@
use std::os::unix::io::{RawFd, AsRawFd};
use std::net::{UdpSocket, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::io::{self, ErrorKind};
use std::collections::VecDeque;
use net2::UdpBuilder;
pub trait Socket: AsRawFd + Sized {
fn listen_v4(host: &str, port: u16) -> Result<Self, io::Error>;
fn listen_v6(host: &str, port: u16) -> Result<Self, io::Error>;
fn receive(&mut self, buffer: &mut [u8]) -> Result<(usize, SocketAddr), io::Error>;
fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error>;
fn address(&self) -> Result<SocketAddr, io::Error>;
}
impl Socket for UdpSocket {
fn listen_v4(host: &str, port: u16) -> Result<Self, io::Error> {
UdpBuilder::new_v4().expect("Failed to obtain ipv4 socket builder")
.reuse_address(true).expect("Failed to set so_reuseaddr").bind((host, port))
}
fn listen_v6(host: &str, port: u16) -> Result<Self, io::Error> {
UdpBuilder::new_v6().expect("Failed to obtain ipv4 socket builder")
.only_v6(true).expect("Failed to set only_v6")
.reuse_address(true).expect("Failed to set so_reuseaddr").bind((host, port))
}
fn receive(&mut self, buffer: &mut [u8]) -> Result<(usize, SocketAddr), io::Error> {
self.recv_from(buffer)
}
fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error> {
self.send_to(data, addr)
}
fn address(&self) -> Result<SocketAddr, io::Error> {
self.local_addr()
}
}
pub struct MockSocket {
address: SocketAddr,
outbound: VecDeque<(SocketAddr, Vec<u8>)>,
inbound: VecDeque<(SocketAddr, Vec<u8>)>
}
impl MockSocket {
pub fn new(address: SocketAddr) -> Self {
Self { address, outbound: VecDeque::new(), inbound: VecDeque::new() }
}
pub fn put_inbound(&mut self, from: SocketAddr, data: Vec<u8>) {
self.inbound.push_back((from, data))
}
pub fn pop_outbound(&mut self) -> Option<(SocketAddr, Vec<u8>)> {
self.outbound.pop_front()
}
}
impl AsRawFd for MockSocket {
fn as_raw_fd(&self) -> RawFd {
unimplemented!()
}
}
impl Socket for MockSocket {
fn listen_v4(host: &str, port: u16) -> Result<Self, io::Error> {
let ip = try_fail!(host.parse(), "Failed to parse IPv4 address: {}");
Ok(Self::new(SocketAddr::V4(SocketAddrV4::new(ip, port))))
}
fn listen_v6(host: &str, port: u16) -> Result<Self, io::Error> {
let ip = try_fail!(host.parse(), "Failed to parse IPv6 address: {}");
Ok(Self::new(SocketAddr::V6(SocketAddrV6::new(ip, port, 0, 0))))
}
fn receive(&mut self, buffer: &mut [u8]) -> Result<(usize, SocketAddr), io::Error> {
if let Some((addr, data)) = self.inbound.pop_front() {
buffer[0..data.len()].copy_from_slice(&data);
Ok((data.len(), addr))
} else {
Err(io::Error::from(ErrorKind::UnexpectedEof))
}
}
fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error> {
self.outbound.push_back((addr, data.to_owned()));
Ok(data.len())
}
fn address(&self) -> Result<SocketAddr, io::Error> {
Ok(self.address)
}
}

View File

@ -6,103 +6,77 @@ use libc;
use std::os::unix::io::RawFd;
use std::io;
use std::ops::{Deref, DerefMut};
use device::Device;
use std::os::unix::io::AsRawFd;
bitflags!{
pub struct Flags: u32 {
const READ = libc::EPOLLIN as u32;
const WRITE = libc::EPOLLOUT as u32;
const ERROR = libc::EPOLLERR as u32;
}
use super::WaitResult;
use ::device::Type;
use net::Socket;
pub struct EpollWait {
poll_fd: RawFd,
event: libc::epoll_event,
socketv4: RawFd,
socketv6: RawFd,
device: RawFd,
timeout: u32,
}
#[derive(Clone, Copy)]
pub struct Event(libc::epoll_event);
impl Event {
#[inline]
pub fn fd(&self) -> RawFd {
self.0.u64 as RawFd
}
#[inline]
pub fn flags(&self) -> Flags {
Flags::from_bits(self.0.events).expect("Invalid flags set")
}
#[inline]
fn new(fd: RawFd, flags: Flags) -> Self {
Event(libc::epoll_event{u64: fd as u64, events: flags.bits})
}
}
impl Deref for Event {
type Target = libc::epoll_event;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for Event {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
pub struct Poll {
fd: RawFd,
events: Vec<Event>
}
impl Poll {
#[inline]
pub fn new(max_events: usize) -> io::Result<Self> {
let mut events = Vec::with_capacity(max_events);
events.resize(max_events, Event::new(0, Flags::empty()));
let fd = unsafe { libc::epoll_create(max_events as i32) };
if fd == -1 {
impl EpollWait {
pub fn new<S: Socket>(socketv4: &S, socketv6: &S, device: &Device, timeout: u32) -> io::Result<Self> {
let mut event = libc::epoll_event{u64: 0, events: 0};
let poll_fd = unsafe { libc::epoll_create(3) };
if poll_fd == -1 {
return Err(io::Error::last_os_error());
}
Ok(Poll{fd, events})
}
#[inline]
pub fn register(&mut self, fd: RawFd, flags: Flags) -> io::Result<()> {
let mut ev = Event::new(fd, flags);
let res = unsafe { libc::epoll_ctl(self.fd, libc::EPOLL_CTL_ADD, fd, &mut ev as &mut libc::epoll_event) };
if res == -1 {
return Err(io::Error::last_os_error());
let raw_fds = if device.get_type() != Type::Dummy {
vec![socketv4.as_raw_fd(), socketv6.as_raw_fd(), device.as_raw_fd()]
} else {
vec![socketv4.as_raw_fd(), socketv6.as_raw_fd()]
};
for fd in raw_fds {
event.u64 = fd as u64;
event.events = libc::EPOLLIN as u32;
let res = unsafe { libc::epoll_ctl(poll_fd, libc::EPOLL_CTL_ADD, fd, &mut event) };
if res == -1 {
return Err(io::Error::last_os_error());
}
}
Ok(())
}
#[inline]
pub fn unregister(&mut self, fd: RawFd) -> io::Result<()> {
let mut ev = Event::new(fd, Flags::empty());
let res = unsafe { libc::epoll_ctl(self.fd, libc::EPOLL_CTL_DEL, fd, &mut ev as &mut libc::epoll_event) };
if res == -1 {
return Err(io::Error::last_os_error());
}
Ok(())
}
#[inline]
pub fn wait(&mut self, timeout_millis: u32) -> io::Result<&[Event]> {
let res = unsafe { libc::epoll_wait(self.fd, &mut self.events[0] as &mut libc::epoll_event, self.events.len() as i32, timeout_millis as i32) };
if res == -1 {
return Err(io::Error::last_os_error());
}
Ok(&self.events[0..res as usize])
Ok(Self {
poll_fd,
event,
socketv4: socketv4.as_raw_fd(),
socketv6: socketv6.as_raw_fd(),
device: device.as_raw_fd(),
timeout
})
}
}
impl Drop for Poll {
#[inline]
impl Drop for EpollWait {
fn drop(&mut self) {
unsafe { libc::close(self.fd) };
unsafe { libc::close(self.poll_fd) };
}
}
impl Iterator for EpollWait {
type Item = WaitResult;
fn next(&mut self) -> Option<Self::Item> {
Some(match unsafe { libc::epoll_wait(self.poll_fd, &mut self.event, 1, self.timeout as i32) } {
-1 => WaitResult::Error(io::Error::last_os_error()),
0 => WaitResult::Timeout,
1 => if self.event.u64 == self.socketv4 as u64 {
WaitResult::SocketV4
} else if self.event.u64 == self.socketv6 as u64 {
WaitResult::SocketV6
} else if self.event.u64 == self.device as u64 {
WaitResult::Device
} else {
unreachable!()
},
_ => unreachable!()
})
}
}

View File

@ -6,4 +6,15 @@
mod epoll;
#[cfg(any(target_os = "linux", target_os = "android"))]
pub use self::epoll::*;
pub use self::epoll::EpollWait as WaitImpl;
use std::io;
pub enum WaitResult {
Timeout,
SocketV4,
SocketV6,
Device,
Error(io::Error)
}

View File

@ -13,6 +13,10 @@ use libc;
#[cfg(not(target_os = "linux"))]
use time;
use signal::{trap::Trap, Signal};
use std::time::Instant;
pub type Duration = u32;
pub type Time = i64;
@ -166,3 +170,22 @@ impl fmt::Display for Bytes {
write!(formatter, "{:.1} TiB", size)
}
}
pub struct CtrlC {
dummy_time: Instant,
trap: Trap
}
impl CtrlC {
pub fn new() -> Self {
let dummy_time = Instant::now();
let trap = Trap::trap(&[Signal::SIGINT, Signal::SIGTERM, Signal::SIGQUIT]);
Self { dummy_time, trap }
}
pub fn was_pressed(&self) -> bool {
self.trap.wait(self.dummy_time).is_some()
}
}