From 9614c9bd9724034bb23a9627b173e6036c1c6524 Mon Sep 17 00:00:00 2001 From: Dennis Schwerdel Date: Thu, 21 Feb 2019 22:41:36 +0100 Subject: [PATCH] Preparations for tests --- Cargo.lock | 1 - Cargo.toml | 1 - src/cloud.rs | 159 +++++++++++++++++++++------------------------- src/main.rs | 11 ++-- src/net.rs | 89 ++++++++++++++++++++++++++ src/poll/epoll.rs | 150 ++++++++++++++++++------------------------- src/poll/mod.rs | 13 +++- src/util.rs | 23 +++++++ 8 files changed, 265 insertions(+), 182 deletions(-) create mode 100644 src/net.rs diff --git a/Cargo.lock b/Cargo.lock index 14c4c43..8ee1531 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -737,7 +737,6 @@ name = "vpncloud" version = "0.9.1" dependencies = [ "base-62 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "cc 1.0.29 (registry+https://github.com/rust-lang/crates.io-index)", "daemonize 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "docopt 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index b82ac7c..7d8d21b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,6 @@ libc = "0.2" rand = "0.6" fnv = "1" net2 = "0.2" -bitflags = "^1" yaml-rust = "0.4" igd = "0.6" # Do not update, 0.7 has problems with exit by ctrl-c siphasher = "0.3" diff --git a/src/cloud.rs b/src/cloud.rs index cd9543b..965d78e 100644 --- a/src/cloud.rs +++ b/src/cloud.rs @@ -4,32 +4,28 @@ use std::net::{SocketAddr, ToSocketAddrs}; use std::collections::HashMap; -use std::net::UdpSocket; use std::io::{self, Write}; use std::fmt; -use std::os::unix::io::AsRawFd; use std::marker::PhantomData; use std::hash::BuildHasherDefault; -use std::time::Instant; use std::cmp::min; use std::fs::{self, File, Permissions}; use std::os::unix::fs::PermissionsExt; use fnv::FnvHasher; -use signal::{trap::Trap, Signal}; use rand::{prelude::*, random, thread_rng}; -use net2::UdpBuilder; use super::config::Config; use super::types::{Table, Protocol, Range, Error, HeaderMagic, NodeId}; -use super::device::{Device, Type}; +use super::device::Device; use super::udpmessage::{encode, decode, Message}; use super::crypto::Crypto; use super::port_forwarding::PortForwarding; -use super::util::{now, Time, Duration, resolve}; -use super::poll::{Poll, Flags}; +use super::util::{now, Time, Duration, resolve, CtrlC}; +use super::poll::{WaitImpl, WaitResult}; use super::traffic::TrafficStats; use super::beacon::BeaconSerializer; +use super::net::Socket; pub type Hash = BuildHasherDefault; @@ -202,7 +198,7 @@ pub struct ReconnectEntry { } -pub struct GenericCloud { +pub struct GenericCloud { config: Config, magic: HeaderMagic, node_id: NodeId, @@ -213,8 +209,8 @@ pub struct GenericCloud { reconnect_peers: Vec, own_addresses: Vec, table: T, - socket4: UdpSocket, - socket6: UdpSocket, + socket4: S, + socket6: S, device: Device, crypto: Crypto, next_peerlist: Time, @@ -229,23 +225,20 @@ pub struct GenericCloud { _dummy_p: PhantomData

, } -impl GenericCloud { +impl GenericCloud { pub fn new(config: &Config, device: Device, table: T, learning: bool, broadcast: bool, addresses: Vec, crypto: Crypto, port_forwarding: Option ) -> Self { - let socket4 = match UdpBuilder::new_v4().expect("Failed to obtain ipv4 socket builder") - .reuse_address(true).expect("Failed to set so_reuseaddr").bind(("0.0.0.0", config.port)) { + let socket4 = match S::listen_v4("0.0.0.0", config.port) { Ok(socket) => socket, Err(err) => fail!("Failed to open ipv4 address 0.0.0.0:{}: {}", config.port, err) }; - let socket6 = match UdpBuilder::new_v6().expect("Failed to obtain ipv6 socket builder") - .only_v6(true).expect("Failed to set only_v6") - .reuse_address(true).expect("Failed to set so_reuseaddr").bind(("::", config.port)) { + let socket6 = match S::listen_v6("::", config.port) { Ok(socket) => socket, Err(err) => fail!("Failed to open ipv6 address ::{}: {}", config.port, err) }; - GenericCloud{ + let mut res = GenericCloud{ magic: config.get_magic(), node_id: random(), peers: PeerList::new(config.peer_timeout), @@ -270,7 +263,9 @@ impl GenericCloud { crypto, config: config.clone(), _dummy_p: PhantomData, - } + }; + res.initialize(); + return res } #[inline] @@ -291,11 +286,11 @@ impl GenericCloud { let msg_data = encode(msg, &mut self.buffer_out, self.magic, &mut self.crypto); for addr in self.peers.peers.keys() { self.traffic.count_out_traffic(*addr, msg_data.len()); - let socket = match *addr { - SocketAddr::V4(_) => &self.socket4, - SocketAddr::V6(_) => &self.socket6 + let mut socket = match *addr { + SocketAddr::V4(_) => &mut self.socket4, + SocketAddr::V6(_) => &mut self.socket6 }; - try!(match socket.send_to(msg_data, addr) { + try!(match socket.send(msg_data, *addr) { Ok(written) if written == msg_data.len() => Ok(()), Ok(_) => Err(Error::Socket("Sent out truncated packet", io::Error::new(io::ErrorKind::Other, "truncated"))), Err(e) => Err(Error::Socket("IOError when sending", e)) @@ -316,10 +311,10 @@ impl GenericCloud { let msg_data = encode(msg, &mut self.buffer_out, self.magic, &mut self.crypto); self.traffic.count_out_traffic(addr, msg_data.len()); let socket = match addr { - SocketAddr::V4(_) => &self.socket4, - SocketAddr::V6(_) => &self.socket6 + SocketAddr::V4(_) => &mut self.socket4, + SocketAddr::V6(_) => &mut self.socket6 }; - match socket.send_to(msg_data, addr) { + match socket.send(msg_data, addr) { Ok(written) if written == msg_data.len() => Ok(()), Ok(_) => Err(Error::Socket("Sent out truncated packet", io::Error::new(io::ErrorKind::Other, "truncated"))), Err(e) => Err(Error::Socket("IOError when sending", e)) @@ -335,7 +330,7 @@ impl GenericCloud { /// Returns an IOError if the underlying system call fails #[allow(dead_code)] pub fn address(&self) -> io::Result<(SocketAddr, SocketAddr)> { - Ok((try!(self.socket4.local_addr()), try!(self.socket6.local_addr()))) + Ok((try!(self.socket4.address()), try!(self.socket6.address()))) } /// Returns the number of peers @@ -700,15 +695,7 @@ impl GenericCloud { Ok(()) } - /// The main method of the node - /// - /// This method will use epoll to wait in the sockets and the device at the same time. - /// It will read from the sockets, decode and decrypt the message and then call the - /// `handle_net_message` method. It will also read from the device and call - /// `handle_interface_data` for each packet read. - /// Also, this method will call `housekeep` every second. - #[allow(unknown_lints, clippy::cyclomatic_complexity)] - pub fn run(&mut self) { + fn initialize(&mut self) { match self.address() { Err(err) => error!("Failed to obtain local addresses: {}", err), Ok((v4, v6)) => { @@ -716,68 +703,68 @@ impl GenericCloud { self.own_addresses.push(v6); } } - let dummy_time = Instant::now(); - let trap = Trap::trap(&[Signal::SIGINT, Signal::SIGTERM, Signal::SIGQUIT]); - let mut poll_handle = try_fail!(Poll::new(3), "Failed to create poll handle: {}"); - let socket4_fd = self.socket4.as_raw_fd(); - let socket6_fd = self.socket6.as_raw_fd(); - let device_fd = self.device.as_raw_fd(); - try_fail!(poll_handle.register(socket4_fd, Flags::READ), "Failed to add ipv4 socket to poll handle: {}"); - try_fail!(poll_handle.register(socket6_fd, Flags::READ), "Failed to add ipv6 socket to poll handle: {}"); - if let Err(err) = poll_handle.register(device_fd, Flags::READ) { - if self.device.get_type() != Type::Dummy { - fail!("Failed to add device to poll handle: {}", err); - } else { - warn!("Failed to add device to poll handle: {}", err); - } + } + + fn handle_socket_data(&mut self, src: SocketAddr, data: &mut [u8]) { + let size = data.len(); + if let Err(e) = decode(data, self.magic, &mut self.crypto).and_then(|msg| { + self.traffic.count_in_traffic(src, size); + self.handle_net_message(src, msg) + }) { + error!("Error: {}, from: {}", e, src); } + } + + fn handle_socket_v4_event(&mut self, buffer: &mut [u8]) { + let (size, src) = try_fail!(self.socket4.receive(buffer), "Failed to read from ipv4 network socket: {}"); + self.handle_socket_data(src, &mut buffer[..size]) + } + + fn handle_socket_v6_event(&mut self, buffer: &mut [u8]) { + let (size, src) = try_fail!(self.socket6.receive(buffer), "Failed to read from ipv6 network socket: {}"); + self.handle_socket_data(src, &mut buffer[..size]) + } + + fn handle_device_event(&mut self, buffer: &mut [u8]) { + let mut start = 64; + let (offset, size) = try_fail!(self.device.read(&mut buffer[start..]), "Failed to read from tap device: {}"); + start += offset; + if let Err(e) = self.handle_interface_data(buffer, start, start+size) { + error!("Error: {}", e); + } + } + + /// The main method of the node + /// + /// This method will use epoll to wait in the sockets and the device at the same time. + /// It will read from the sockets, decode and decrypt the message and then call the + /// `handle_net_message` method. It will also read from the device and call + /// `handle_interface_data` for each packet read. + /// Also, this method will call `housekeep` every second. + pub fn run(&mut self) { + let ctrlc = CtrlC::new(); + let waiter = try_fail!(WaitImpl::new(&self.socket4, &self.socket6, &self.device, 1000), "Failed to setup poll: {}"); let mut buffer = [0; 64*1024]; let mut poll_error = false; - loop { - let evts = match poll_handle.wait(1000) { - Ok(evts) => evts, - Err(err) => { + for evt in waiter { + match evt { + WaitResult::Error(err) => { if poll_error { fail!("Poll wait failed again: {}", err); } error!("Poll wait failed: {}, retrying...", err); poll_error = true; - continue - } - }; - for evt in evts { - match evt.fd() { - fd if (fd == socket4_fd || fd == socket6_fd) => { - let (size, src) = match evt.fd() { - fd if fd == socket4_fd => try_fail!(self.socket4.recv_from(&mut buffer), "Failed to read from ipv4 network socket: {}"), - fd if fd == socket6_fd => try_fail!(self.socket6.recv_from(&mut buffer), "Failed to read from ipv6 network socket: {}"), - _ => unreachable!() - }; - if let Err(e) = decode(&mut buffer[..size], self.magic, &mut self.crypto).and_then(|msg| { - self.traffic.count_in_traffic(src, size); - self.handle_net_message(src, msg) - }) { - error!("Error: {}, from: {}", e, src); - } - }, - fd if (fd == device_fd) => { - let mut start = 64; - let (offset, size) = try_fail!(self.device.read(&mut buffer[start..]), "Failed to read from tap device: {}"); - start += offset; - if let Err(e) = self.handle_interface_data(&mut buffer, start, start+size) { - error!("Error: {}", e); - } - }, - _ => unreachable!() - } + }, + WaitResult::Timeout => {}, + WaitResult::SocketV4 => self.handle_socket_v4_event(&mut buffer), + WaitResult::SocketV6 => self.handle_socket_v6_event(&mut buffer), + WaitResult::Device => self.handle_device_event(&mut buffer) } if self.next_housekeep < now() { poll_error = false; - // Check for signals - if trap.wait(dummy_time).is_some() { - break; + if ctrlc.was_pressed() { + break } - // Do the housekeeping if let Err(e) = self.housekeep() { error!("Error: {}", e) } diff --git a/src/main.rs b/src/main.rs index 5cd31b2..96a9566 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,6 @@ #![cfg_attr(feature = "bench", feature(test))] #[macro_use] extern crate log; -#[macro_use] extern crate bitflags; extern crate time; extern crate docopt; #[macro_use] extern crate serde_derive; @@ -38,6 +37,7 @@ pub mod config; pub mod port_forwarding; pub mod traffic; pub mod beacon; +pub mod net; #[cfg(feature = "bench")] mod benches; use docopt::Docopt; @@ -58,6 +58,7 @@ use crypto::{Crypto, CryptoMethod}; use port_forwarding::PortForwarding; use util::Duration; use config::Config; +use std::net::UdpSocket; const VERSION: u8 = 1; @@ -160,8 +161,8 @@ enum AnyTable { } enum AnyCloud { - Switch(GenericCloud), - Routing(GenericCloud) + Switch(GenericCloud), + Routing(GenericCloud) } impl AnyCloud

{ @@ -170,10 +171,10 @@ impl AnyCloud

{ learning: bool, broadcast: bool, addresses: Vec, crypto: Crypto, port_forwarding: Option) -> Self { match table { - AnyTable::Switch(t) => AnyCloud::Switch(GenericCloud::::new( + AnyTable::Switch(t) => AnyCloud::Switch(GenericCloud::::new( config, device,t, learning, broadcast, addresses, crypto, port_forwarding )), - AnyTable::Routing(t) => AnyCloud::Routing(GenericCloud::::new( + AnyTable::Routing(t) => AnyCloud::Routing(GenericCloud::::new( config, device,t, learning, broadcast, addresses, crypto, port_forwarding )) } diff --git a/src/net.rs b/src/net.rs new file mode 100644 index 0000000..4fd2d34 --- /dev/null +++ b/src/net.rs @@ -0,0 +1,89 @@ +use std::os::unix::io::{RawFd, AsRawFd}; +use std::net::{UdpSocket, SocketAddr, SocketAddrV4, SocketAddrV6}; +use std::io::{self, ErrorKind}; +use std::collections::VecDeque; + +use net2::UdpBuilder; + + +pub trait Socket: AsRawFd + Sized { + fn listen_v4(host: &str, port: u16) -> Result; + fn listen_v6(host: &str, port: u16) -> Result; + fn receive(&mut self, buffer: &mut [u8]) -> Result<(usize, SocketAddr), io::Error>; + fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result; + fn address(&self) -> Result; +} + +impl Socket for UdpSocket { + fn listen_v4(host: &str, port: u16) -> Result { + UdpBuilder::new_v4().expect("Failed to obtain ipv4 socket builder") + .reuse_address(true).expect("Failed to set so_reuseaddr").bind((host, port)) + } + fn listen_v6(host: &str, port: u16) -> Result { + UdpBuilder::new_v6().expect("Failed to obtain ipv4 socket builder") + .only_v6(true).expect("Failed to set only_v6") + .reuse_address(true).expect("Failed to set so_reuseaddr").bind((host, port)) + } + fn receive(&mut self, buffer: &mut [u8]) -> Result<(usize, SocketAddr), io::Error> { + self.recv_from(buffer) + } + fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result { + self.send_to(data, addr) + } + fn address(&self) -> Result { + self.local_addr() + } +} + + +pub struct MockSocket { + address: SocketAddr, + outbound: VecDeque<(SocketAddr, Vec)>, + inbound: VecDeque<(SocketAddr, Vec)> +} + +impl MockSocket { + pub fn new(address: SocketAddr) -> Self { + Self { address, outbound: VecDeque::new(), inbound: VecDeque::new() } + } + + pub fn put_inbound(&mut self, from: SocketAddr, data: Vec) { + self.inbound.push_back((from, data)) + } + + pub fn pop_outbound(&mut self) -> Option<(SocketAddr, Vec)> { + self.outbound.pop_front() + } +} + +impl AsRawFd for MockSocket { + fn as_raw_fd(&self) -> RawFd { + unimplemented!() + } +} + +impl Socket for MockSocket { + fn listen_v4(host: &str, port: u16) -> Result { + let ip = try_fail!(host.parse(), "Failed to parse IPv4 address: {}"); + Ok(Self::new(SocketAddr::V4(SocketAddrV4::new(ip, port)))) + } + fn listen_v6(host: &str, port: u16) -> Result { + let ip = try_fail!(host.parse(), "Failed to parse IPv6 address: {}"); + Ok(Self::new(SocketAddr::V6(SocketAddrV6::new(ip, port, 0, 0)))) + } + fn receive(&mut self, buffer: &mut [u8]) -> Result<(usize, SocketAddr), io::Error> { + if let Some((addr, data)) = self.inbound.pop_front() { + buffer[0..data.len()].copy_from_slice(&data); + Ok((data.len(), addr)) + } else { + Err(io::Error::from(ErrorKind::UnexpectedEof)) + } + } + fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result { + self.outbound.push_back((addr, data.to_owned())); + Ok(data.len()) + } + fn address(&self) -> Result { + Ok(self.address) + } +} \ No newline at end of file diff --git a/src/poll/epoll.rs b/src/poll/epoll.rs index 5ab3eba..db0acda 100644 --- a/src/poll/epoll.rs +++ b/src/poll/epoll.rs @@ -6,103 +6,77 @@ use libc; use std::os::unix::io::RawFd; use std::io; -use std::ops::{Deref, DerefMut}; +use device::Device; +use std::os::unix::io::AsRawFd; -bitflags!{ - pub struct Flags: u32 { - const READ = libc::EPOLLIN as u32; - const WRITE = libc::EPOLLOUT as u32; - const ERROR = libc::EPOLLERR as u32; - } +use super::WaitResult; +use ::device::Type; +use net::Socket; + +pub struct EpollWait { + poll_fd: RawFd, + event: libc::epoll_event, + socketv4: RawFd, + socketv6: RawFd, + device: RawFd, + timeout: u32, } -#[derive(Clone, Copy)] -pub struct Event(libc::epoll_event); - -impl Event { - #[inline] - pub fn fd(&self) -> RawFd { - self.0.u64 as RawFd - } - - #[inline] - pub fn flags(&self) -> Flags { - Flags::from_bits(self.0.events).expect("Invalid flags set") - } - - #[inline] - fn new(fd: RawFd, flags: Flags) -> Self { - Event(libc::epoll_event{u64: fd as u64, events: flags.bits}) - } -} - -impl Deref for Event { - type Target = libc::epoll_event; - - #[inline] - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl DerefMut for Event { - #[inline] - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - - -pub struct Poll { - fd: RawFd, - events: Vec -} - -impl Poll { - #[inline] - pub fn new(max_events: usize) -> io::Result { - let mut events = Vec::with_capacity(max_events); - events.resize(max_events, Event::new(0, Flags::empty())); - let fd = unsafe { libc::epoll_create(max_events as i32) }; - if fd == -1 { +impl EpollWait { + pub fn new(socketv4: &S, socketv6: &S, device: &Device, timeout: u32) -> io::Result { + let mut event = libc::epoll_event{u64: 0, events: 0}; + let poll_fd = unsafe { libc::epoll_create(3) }; + if poll_fd == -1 { return Err(io::Error::last_os_error()); } - Ok(Poll{fd, events}) - } - - #[inline] - pub fn register(&mut self, fd: RawFd, flags: Flags) -> io::Result<()> { - let mut ev = Event::new(fd, flags); - let res = unsafe { libc::epoll_ctl(self.fd, libc::EPOLL_CTL_ADD, fd, &mut ev as &mut libc::epoll_event) }; - if res == -1 { - return Err(io::Error::last_os_error()); + let raw_fds = if device.get_type() != Type::Dummy { + vec![socketv4.as_raw_fd(), socketv6.as_raw_fd(), device.as_raw_fd()] + } else { + vec![socketv4.as_raw_fd(), socketv6.as_raw_fd()] + }; + for fd in raw_fds { + event.u64 = fd as u64; + event.events = libc::EPOLLIN as u32; + let res = unsafe { libc::epoll_ctl(poll_fd, libc::EPOLL_CTL_ADD, fd, &mut event) }; + if res == -1 { + return Err(io::Error::last_os_error()); + } } - Ok(()) - } - - #[inline] - pub fn unregister(&mut self, fd: RawFd) -> io::Result<()> { - let mut ev = Event::new(fd, Flags::empty()); - let res = unsafe { libc::epoll_ctl(self.fd, libc::EPOLL_CTL_DEL, fd, &mut ev as &mut libc::epoll_event) }; - if res == -1 { - return Err(io::Error::last_os_error()); - } - Ok(()) - } - - #[inline] - pub fn wait(&mut self, timeout_millis: u32) -> io::Result<&[Event]> { - let res = unsafe { libc::epoll_wait(self.fd, &mut self.events[0] as &mut libc::epoll_event, self.events.len() as i32, timeout_millis as i32) }; - if res == -1 { - return Err(io::Error::last_os_error()); - } - Ok(&self.events[0..res as usize]) + Ok(Self { + poll_fd, + event, + socketv4: socketv4.as_raw_fd(), + socketv6: socketv6.as_raw_fd(), + device: device.as_raw_fd(), + timeout + }) } } -impl Drop for Poll { - #[inline] +impl Drop for EpollWait { fn drop(&mut self) { - unsafe { libc::close(self.fd) }; + unsafe { libc::close(self.poll_fd) }; } } + +impl Iterator for EpollWait { + type Item = WaitResult; + + fn next(&mut self) -> Option { + Some(match unsafe { libc::epoll_wait(self.poll_fd, &mut self.event, 1, self.timeout as i32) } { + -1 => WaitResult::Error(io::Error::last_os_error()), + 0 => WaitResult::Timeout, + 1 => if self.event.u64 == self.socketv4 as u64 { + WaitResult::SocketV4 + } else if self.event.u64 == self.socketv6 as u64 { + WaitResult::SocketV6 + } else if self.event.u64 == self.device as u64 { + WaitResult::Device + } else { + unreachable!() + }, + _ => unreachable!() + }) + } +} + diff --git a/src/poll/mod.rs b/src/poll/mod.rs index 3f9282a..13d797b 100644 --- a/src/poll/mod.rs +++ b/src/poll/mod.rs @@ -6,4 +6,15 @@ mod epoll; #[cfg(any(target_os = "linux", target_os = "android"))] -pub use self::epoll::*; +pub use self::epoll::EpollWait as WaitImpl; + + +use std::io; + +pub enum WaitResult { + Timeout, + SocketV4, + SocketV6, + Device, + Error(io::Error) +} \ No newline at end of file diff --git a/src/util.rs b/src/util.rs index c69df8c..5267aab 100644 --- a/src/util.rs +++ b/src/util.rs @@ -13,6 +13,10 @@ use libc; #[cfg(not(target_os = "linux"))] use time; +use signal::{trap::Trap, Signal}; +use std::time::Instant; + + pub type Duration = u32; pub type Time = i64; @@ -166,3 +170,22 @@ impl fmt::Display for Bytes { write!(formatter, "{:.1} TiB", size) } } + + + +pub struct CtrlC { + dummy_time: Instant, + trap: Trap +} + +impl CtrlC { + pub fn new() -> Self { + let dummy_time = Instant::now(); + let trap = Trap::trap(&[Signal::SIGINT, Signal::SIGTERM, Signal::SIGQUIT]); + Self { dummy_time, trap } + } + + pub fn was_pressed(&self) -> bool { + self.trap.wait(self.dummy_time).is_some() + } +} \ No newline at end of file