diff --git a/src/device.rs b/src/device.rs index c6a16d1..2404871 100644 --- a/src/device.rs +++ b/src/device.rs @@ -77,7 +77,7 @@ impl FromStr for Type { } } -pub trait Device: AsRawFd + Clone { +pub trait Device: AsRawFd + Clone + Send + 'static { /// Returns the type of this device fn get_type(&self) -> Type; diff --git a/src/engine/mod.rs b/src/engine/mod.rs index cf25811..7ad1464 100644 --- a/src/engine/mod.rs +++ b/src/engine/mod.rs @@ -6,54 +6,31 @@ mod device_thread; mod shared; mod socket_thread; -use std::{ - cmp::{max, min}, - collections::HashMap, - fmt, - fs::{self, File}, - hash::BuildHasherDefault, - io::{self, Cursor, Seek, SeekFrom, Write}, - marker::PhantomData, - net::{SocketAddr, ToSocketAddrs}, - path::Path, - str::FromStr, - thread -}; +use std::{fs::File, hash::BuildHasherDefault, thread}; use fnv::FnvHasher; -use rand::{random, seq::SliceRandom, thread_rng}; -use smallvec::{smallvec, SmallVec}; use crate::{ - beacon::BeaconSerializer, - config::{Config, DEFAULT_PEER_TIMEOUT, DEFAULT_PORT}, - crypto::{is_init_message, Crypto, InitResult, InitState, MessageResult, PeerCrypto}, - device::{Device, Type}, + config::Config, + crypto::PeerCrypto, + device::Device, engine::{ device_thread::DeviceThread, - shared::{SharedPeerCrypto, SharedTable, SharedTraffic} + shared::{SharedPeerCrypto, SharedTable, SharedTraffic}, + socket_thread::SocketThread }, error::Error, - messages::{ - AddrList, NodeInfo, PeerInfo, MESSAGE_TYPE_CLOSE, MESSAGE_TYPE_DATA, MESSAGE_TYPE_KEEPALIVE, - MESSAGE_TYPE_NODE_INFO - }, - net::{mapped_addr, Socket}, + messages::AddrList, + net::Socket, payload::Protocol, - poll::{WaitImpl, WaitResult}, port_forwarding::PortForwarding, - table::ClaimTable, - traffic::TrafficStats, - types::{Address, Mode, NodeId, Range, RangeList}, - util::{addr_nice, bytes_to_hex, resolve, CtrlC, Duration, MsgBuffer, StatsdMsg, Time, TimeSource} + types::NodeId, + util::{addr_nice, resolve, CtrlC, Time, TimeSource} }; pub type Hash = BuildHasherDefault; -const MAX_RECONNECT_INTERVAL: u16 = 3600; -const RESOLVE_INTERVAL: Time = 300; pub const STATS_INTERVAL: Time = 60; -const OWN_ADDRESS_RESET_INTERVAL: Time = 300; const SPACE_BEFORE: usize = 100; struct PeerData { @@ -78,33 +55,8 @@ pub struct ReconnectEntry { pub struct GenericCloud { - node_id: NodeId, - config: Config, - learning: bool, - broadcast: bool, - peers: HashMap, - reconnect_peers: SmallVec<[ReconnectEntry; 3]>, - own_addresses: AddrList, - pending_inits: HashMap, Hash>, - table: ClaimTable, - socket: S, - device: D, - claims: RangeList, - crypto: Crypto, - next_peers: Time, - peer_timeout_publish: u16, - update_freq: u16, - stats_file: Option, - statsd_server: Option, - next_housekeep: Time, - next_stats_out: Time, - next_beacon: Time, - next_own_address_reset: Time, - port_forwarding: Option, - traffic: TrafficStats, - beacon_serializer: BeaconSerializer, - _dummy_p: PhantomData

, - _dummy_ts: PhantomData + socket_thread: SocketThread, + device_thread: DeviceThread } impl GenericCloud { @@ -112,265 +64,63 @@ impl GenericCloud, stats_file: Option ) -> Self { - let (learning, broadcast) = match config.mode { - Mode::Normal => { - match config.device_type { - Type::Tap => (true, true), - Type::Tun => (false, false) - } - } - Mode::Router => (false, false), - Mode::Switch => (true, true), - Mode::Hub => (false, true) - }; - let mut claims = SmallVec::with_capacity(config.claims.len()); - for s in &config.claims { - claims.push(try_fail!(Range::from_str(s), "Invalid subnet format: {} ({})", s)); - } - if device.get_type() == Type::Tun && config.auto_claim { - match device.get_ip() { - Ok(ip) => { - let range = Range { base: Address::from_ipv4(ip), prefix_len: 32 }; - info!("Auto-claiming {} due to interface address", range); - claims.push(range); - } - Err(Error::DeviceIo(_, e)) if e.kind() == io::ErrorKind::AddrNotAvailable => { - info!("No address set on interface.") - } - Err(e) => error!("{}", e) - } - } - let now = TS::now(); - let update_freq = config.get_keepalive() as u16; - let node_id = random(); - let crypto = Crypto::new(node_id, &config.crypto).unwrap(); - let beacon_key = config.beacon_password.as_ref().map(|s| s.as_bytes()).unwrap_or(&[]); - let mut res = GenericCloud { - node_id, - peers: HashMap::default(), - claims, - learning, - broadcast, - pending_inits: HashMap::default(), - reconnect_peers: SmallVec::new(), - own_addresses: SmallVec::new(), - peer_timeout_publish: config.peer_timeout as u16, - table: ClaimTable::new(config.switch_timeout as Duration, config.peer_timeout as Duration), - socket, - device, - next_peers: now, - update_freq, - stats_file, - statsd_server: config.statsd_server.clone(), - next_housekeep: now, - next_stats_out: now + STATS_INTERVAL, - next_beacon: now, - next_own_address_reset: now + OWN_ADDRESS_RESET_INTERVAL, - port_forwarding, - traffic: TrafficStats::default(), - beacon_serializer: BeaconSerializer::new(beacon_key), - crypto, - config: config.clone(), - _dummy_p: PhantomData, - _dummy_ts: PhantomData - }; - res - } - - #[inline] - pub fn ifname(&self) -> &str { - self.device.ifname() - } - - /// Sends the message to all peers - /// - /// # Errors - /// Returns an `Error::SocketError` when the underlying system call fails or only part of the - /// message could be sent (can this even happen?). - /// Some messages could have been sent. - #[inline] - fn broadcast_msg(&mut self, type_: u8, msg: &mut MsgBuffer) -> Result<(), Error> { - debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, msg.len(), self.peers.len()); - let mut msg_data = MsgBuffer::new(100); - for (addr, peer) in &mut self.peers { - msg_data.set_start(msg.get_start()); - msg_data.set_length(msg.len()); - msg_data.message_mut().clone_from_slice(msg.message()); - msg_data.prepend_byte(type_); - peer.crypto.encrypt_message(&mut msg_data); - self.traffic.count_out_traffic(*addr, msg_data.len()); - match self.socket.send(msg_data.message(), *addr) { - Ok(written) if written == msg_data.len() => Ok(()), - Ok(_) => Err(Error::Socket("Sent out truncated packet")), - Err(e) => Err(Error::SocketIo("IOError when sending", e)) - }? - } - Ok(()) - } - - #[inline] - fn send_to(&mut self, addr: SocketAddr, msg: &mut MsgBuffer) -> Result<(), Error> { - // HOT PATH - debug!("Sending msg with {} bytes to {}", msg.len(), addr); - self.traffic.count_out_traffic(addr, msg.len()); - match self.socket.send(msg.message(), addr) { - Ok(written) if written == msg.len() => Ok(()), - Ok(_) => Err(Error::Socket("Sent out truncated packet")), - Err(e) => Err(Error::SocketIo("IOError when sending", e)) - } - } - - #[inline] - fn send_msg(&mut self, addr: SocketAddr, type_: u8, msg: &mut MsgBuffer) -> Result<(), Error> { - // HOT PATH - debug!("Sending msg with {} bytes to {}", msg.len(), addr); - let peer = match self.peers.get_mut(&addr) { - Some(peer) => peer, - None => return Err(Error::Message("Sending to node that is not a peer")) - }; - msg.prepend_byte(type_); - peer.crypto.encrypt_message(msg); - self.send_to(addr, msg) - } - - pub fn reset_own_addresses(&mut self) -> io::Result<()> { - self.own_addresses.clear(); - self.own_addresses.push(self.socket.address().map(mapped_addr)?); - if let Some(ref pfw) = self.port_forwarding { - self.own_addresses.push(pfw.get_internal_ip().into()); - self.own_addresses.push(pfw.get_external_ip().into()); - } - debug!("Own addresses: {:?}", self.own_addresses); - // TODO: detect address changes and call event - Ok(()) - } - - /// Returns the number of peers - #[allow(dead_code)] - pub fn peer_count(&self) -> usize { - self.peers.len() - } - - /// Adds a peer to the reconnect list - /// - /// This method adds a peer to the list of nodes to reconnect to. A periodic task will try to - /// connect to the peer if it is not already connected. - pub fn add_reconnect_peer(&mut self, add: String) { - let now = TS::now(); - let resolved = match resolve(&add as &str) { - Ok(addrs) => addrs, - Err(err) => { - warn!("Failed to resolve {}: {:?}", add, err); - smallvec![] - } - }; - self.reconnect_peers.push(ReconnectEntry { - address: Some((add, now)), - tries: 0, - timeout: 1, - resolved, - next: now, - final_timeout: None - }) - } - - pub fn connect(&mut self, addr: Addr) -> Result<(), Error> { - let addrs = resolve(&addr)?.into_iter().map(mapped_addr).collect::>(); - for addr in &addrs { - if self.own_addresses.contains(addr) - || self.peers.contains_key(addr) - || self.pending_inits.contains_key(addr) - { - return Ok(()) - } - } - if !addrs.is_empty() { - self.config.call_hook( - "peer_connecting", - vec![("PEER", format!("{:?}", addr_nice(addrs[0]))), ("IFNAME", self.device.ifname().to_owned())], - true - ); - } - unimplemented!() - } - - pub fn run(&mut self) { - let table = SharedTable::::new(&self.config); + let table = SharedTable::::new(&config); let traffic = SharedTraffic::new(); let peer_crypto = SharedPeerCrypto::new(); let device_thread = DeviceThread::::new( - self.config.clone(), - self.device.clone(), - self.socket.clone(), + config.clone(), + device.clone(), + socket.clone(), traffic.clone(), peer_crypto.clone(), table.clone() ); - - // TODO: create shared data structures - // TODO: create and spawn threads - let ctrlc = CtrlC::new(); - // TODO: wait for ctrl-c - let waiter = try_fail!( - WaitImpl::new(self.socket.as_raw_fd(), self.device.as_raw_fd(), 1000), - "Failed to setup poll: {}" + let socket_thread = SocketThread::::new( + config.clone(), + device, + socket, + traffic, + peer_crypto, + table, + port_forwarding, + stats_file ); - let mut buffer = MsgBuffer::new(SPACE_BEFORE); - let mut poll_error = false; - self.config.call_hook("vpn_started", vec![("IFNAME", self.device.ifname())], true); - for evt in waiter { - // HOT PATH - match evt { - WaitResult::Error(err) => { - // COLD PATH - if poll_error { - fail!("Poll wait failed again: {}", err); - } - debug!("Poll wait failed: {}, retrying...", err); - poll_error = true; - } - WaitResult::Timeout => {} - WaitResult::Socket => unimplemented!(), - WaitResult::Device => unimplemented!() - } - if self.next_housekeep < TS::now() { - // COLD PATH - poll_error = false; - if ctrlc.was_pressed() { - break - } - self.next_housekeep = TS::now() + 1 - } - } - info!("Shutting down..."); - self.config.call_hook("vpn_shutdown", vec![("IFNAME", self.device.ifname())], true); - buffer.clear(); - self.broadcast_msg(MESSAGE_TYPE_CLOSE, &mut buffer).ok(); - if let Some(ref path) = self.config.beacon_store { - let path = Path::new(path); - if path.exists() { - info!("Removing beacon file"); - if let Err(e) = fs::remove_file(path) { - error!("Failed to remove beacon file: {}", e) - } - } - } + Self { socket_thread, device_thread } + } + + pub fn add_peer(&mut self, addr: String) -> Result<(), Error> { + unimplemented!() + } + + pub fn run(self) { + // TODO: spawn threads + let ctrlc = CtrlC::new(); + let device_thread = self.device_thread; + let device_thread_handle = thread::spawn(move || device_thread.run()); + let socket_thread = self.socket_thread; + let socket_thread_handle = thread::spawn(move || socket_thread.run()); + // TODO: wait for ctrl-c + device_thread_handle.join().unwrap(); + socket_thread_handle.join().unwrap(); } } #[cfg(test)] use super::device::MockDevice; #[cfg(test)] use super::net::MockSocket; -#[cfg(test)] use super::util::MockTimeSource; +#[cfg(test)] use super::util::{MockTimeSource, MsgBuffer}; +#[cfg(test)] use std::net::SocketAddr; #[cfg(test)] impl GenericCloud { pub fn socket(&mut self) -> &mut MockSocket { - &mut self.socket + unimplemented!() + //&mut self.socket } pub fn device(&mut self) -> &mut MockDevice { - &mut self.device + unimplemented!() + //&mut self.device } pub fn trigger_socket_event(&mut self) { @@ -388,14 +138,17 @@ impl GenericCloud { } pub fn is_connected(&self, addr: &SocketAddr) -> bool { - self.peers.contains_key(addr) + unimplemented!() + // self.peers.contains_key(addr) } pub fn own_addresses(&self) -> &[SocketAddr] { - &self.own_addresses + unimplemented!() + //&self.own_addresses } pub fn get_num(&self) -> usize { - self.socket.address().unwrap().port() as usize + unimplemented!() + // self.socket.address().unwrap().port() as usize } } diff --git a/src/engine/shared.rs b/src/engine/shared.rs index 594e9d0..4f4c9aa 100644 --- a/src/engine/shared.rs +++ b/src/engine/shared.rs @@ -1,14 +1,12 @@ use crate::{ + config::Config, crypto::CryptoCore, - engine::{Hash, PeerData, TimeSource}, + engine::{Hash, TimeSource}, error::Error, - messages::NodeInfo, table::ClaimTable, - traffic::{TrafficStats, TrafficEntry}, - types::{Address, NodeId, RangeList}, - util::MsgBuffer, - util::Duration, - config::Config + traffic::{TrafficEntry, TrafficStats}, + types::{Address, RangeList}, + util::{Duration, MsgBuffer} }; use parking_lot::Mutex; use std::{ @@ -37,7 +35,10 @@ impl SharedPeerCrypto { match peers.get_mut(&peer) { None => Err(Error::InvalidCryptoState("No crypto found for peer")), Some(None) => Ok(()), - Some(Some(crypto)) => Ok(crypto.encrypt(data)) + Some(Some(crypto)) => { + crypto.encrypt(data); + Ok(()) + } } } diff --git a/src/engine/socket_thread.rs b/src/engine/socket_thread.rs index 9b65d9f..9f9a733 100644 --- a/src/engine/socket_thread.rs +++ b/src/engine/socket_thread.rs @@ -7,12 +7,16 @@ use crate::{ beacon::BeaconSerializer, config::{DEFAULT_PEER_TIMEOUT, DEFAULT_PORT}, crypto::{is_init_message, InitResult, InitState, MessageResult}, + device::Type, engine::{addr_nice, resolve, Hash, PeerData}, error::Error, - messages::{AddrList, NodeInfo, PeerInfo, MESSAGE_TYPE_NODE_INFO}, + messages::{ + AddrList, NodeInfo, PeerInfo, MESSAGE_TYPE_CLOSE, MESSAGE_TYPE_DATA, MESSAGE_TYPE_KEEPALIVE, + MESSAGE_TYPE_NODE_INFO + }, net::{mapped_addr, Socket}, port_forwarding::PortForwarding, - types::{NodeId, RangeList}, + types::{Address, NodeId, Range, RangeList}, util::{MsgBuffer, StatsdMsg, Time, TimeSource}, Config, Crypto, Device, Protocol }; @@ -26,7 +30,8 @@ use std::{ io, io::{Cursor, Seek, SeekFrom, Write}, marker::PhantomData, - net::{SocketAddr, ToSocketAddrs} + net::{SocketAddr, ToSocketAddrs}, + str::FromStr }; @@ -81,6 +86,63 @@ pub struct SocketThread { } impl SocketThread { + pub fn new( + config: Config, device: D, socket: S, traffic: SharedTraffic, peer_crypto: SharedPeerCrypto, + table: SharedTable, port_forwarding: Option, stats_file: Option + ) -> Self { + let mut claims = SmallVec::with_capacity(config.claims.len()); + for s in &config.claims { + claims.push(try_fail!(Range::from_str(s), "Invalid subnet format: {} ({})", s)); + } + if device.get_type() == Type::Tun && config.auto_claim { + match device.get_ip() { + Ok(ip) => { + let range = Range { base: Address::from_ipv4(ip), prefix_len: 32 }; + info!("Auto-claiming {} due to interface address", range); + claims.push(range); + } + Err(Error::DeviceIo(_, e)) if e.kind() == io::ErrorKind::AddrNotAvailable => { + info!("No address set on interface.") + } + Err(e) => error!("{}", e) + } + } + let now = TS::now(); + let update_freq = config.get_keepalive() as u16; + let node_id = random(); + let crypto = Crypto::new(node_id, &config.crypto).unwrap(); + let beacon_key = config.beacon_password.as_ref().map(|s| s.as_bytes()).unwrap_or(&[]); + Self { + _dummy_p: PhantomData, + _dummy_ts: PhantomData, + node_id, + claims, + device, + socket, + peer_crypto, + traffic, + table, + learning: config.is_learning(), + next_housekeep: now, + next_beacon: now, + next_peers: now, + next_stats_out: now + STATS_INTERVAL, + next_own_address_reset: now + OWN_ADDRESS_RESET_INTERVAL, + pending_inits: HashMap::default(), + reconnect_peers: SmallVec::new(), + own_addresses: SmallVec::new(), + peers: HashMap::default(), + peer_timeout_publish: config.peer_timeout as u16, + beacon_serializer: BeaconSerializer::new(beacon_key), + port_forwarding, + stats_file, + update_freq, + statsd_server: config.statsd_server.clone(), + crypto: Crypto::new(node_id, &config.crypto).unwrap(), + config + } + } + #[inline] fn send_to(&mut self, addr: SocketAddr, msg: &MsgBuffer) -> Result<(), Error> { debug!("Sending msg with {} bytes to {}", msg.len(), addr); @@ -126,7 +188,7 @@ impl SocketThread(&mut self, addr: Addr) -> Result<(), Error> { @@ -322,7 +384,7 @@ impl SocketThread { self.traffic.count_invalid_protocol(data.len()); - return Err(err) + Err(err) } } } @@ -398,14 +460,14 @@ impl SocketThread>() { msg.clear(); self.peers.get_mut(&addr).unwrap().crypto.every_second(&mut msg); if !msg.is_empty() { - self.send_to(addr, &mut msg)? + self.send_to(addr, &msg)? } } for addr in del { diff --git a/src/main.rs b/src/main.rs index 0ffbfbb..84da342 100644 --- a/src/main.rs +++ b/src/main.rs @@ -185,6 +185,7 @@ fn run(config: Config, socket: S) { Some(file) } }; + let ifname = device.ifname().to_string(); let mut cloud = GenericCloud::::new(&config, socket, device, port_forwarding, stats_file); for mut addr in config.peers { @@ -192,8 +193,7 @@ fn run(config: Config, socket: S) { // : not present or only in IPv6 address addr = format!("{}:{}", addr, DEFAULT_PORT) } - try_fail!(cloud.connect(&addr as &str), "Failed to send message to {}: {}", &addr); - cloud.add_reconnect_peer(addr); + try_fail!(cloud.add_peer(addr.clone()), "Failed to send message to {}: {}", &addr); } if config.daemonize { info!("Running process as daemon"); @@ -223,7 +223,7 @@ fn run(config: Config, socket: S) { } cloud.run(); if let Some(script) = config.ifdown { - run_script(&script, cloud.ifname()); + run_script(&script, &ifname); } } diff --git a/src/net.rs b/src/net.rs index 69e19db..d93be38 100644 --- a/src/net.rs +++ b/src/net.rs @@ -31,7 +31,7 @@ pub fn get_ip() -> IpAddr { s.local_addr().unwrap().ip() } -pub trait Socket: AsRawFd + Sized + Clone { +pub trait Socket: AsRawFd + Sized + Clone + Send + 'static { fn listen(addr: &str) -> Result; fn receive(&mut self, buffer: &mut MsgBuffer) -> Result; fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result; diff --git a/src/payload.rs b/src/payload.rs index d1fab9f..5d98943 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -5,7 +5,7 @@ use crate::{error::Error, types::Address}; use std::io::{Cursor, Read}; -pub trait Protocol: Sized { +pub trait Protocol: Sized + Send + 'static { fn parse(_: &[u8]) -> Result<(Address, Address), Error>; } diff --git a/src/wsproxy.rs b/src/wsproxy.rs index c784ae9..754d130 100644 --- a/src/wsproxy.rs +++ b/src/wsproxy.rs @@ -159,8 +159,10 @@ impl Socket for ProxyConnection { write_addr(addr, &mut msg)?; msg.write_all(data)?; unimplemented!(); - //io_error!(self.socket.write_message(Message::Binary(msg)), "Failed to write to ws proxy: {}")?; + /* + io_error!(self.socket.write_message(Message::Binary(msg)), "Failed to write to ws proxy: {}")?; Ok(data.len()) + */ } fn address(&self) -> Result {