diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index aa1c79c..9d77121 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -11,7 +11,7 @@ RUN chown vscode: -R /usr/local/rustup /usr/local/cargo USER vscode -RUN rustup default 1.64.0 \ +RUN rustup default 1.70.0 \ && rustup component add clippy rust-src rustfmt RUN cargo install cargo-outdated cargo-cache cargo-criterion \ diff --git a/Cargo.lock b/Cargo.lock index f74ddcb..b1898a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "anstyle" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41ed9a86bf92ae6580e0a31281f65a1b1d867c0cc68d5346e2ae128dddfa6a7d" + [[package]] name = "attohttpc" version = "0.16.3" @@ -44,7 +50,7 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ - "hermit-abi", + "hermit-abi 0.1.19", "libc", "winapi", ] @@ -173,31 +179,36 @@ dependencies = [ "atty", "bitflags", "strsim", - "textwrap 0.11.0", + "textwrap", "unicode-width", "vec_map", ] [[package]] name = "clap" -version = "3.2.22" +version = "4.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86447ad904c7fb335a790c9d7fe3d0d971dc523b8ccd1561a520de9a85302750" +checksum = "80672091db20273a15cf9fdd4e47ed43b5091ec9841bf4c6145c9dfbbcae09ed" dependencies = [ + "clap_builder", +] + +[[package]] +name = "clap_builder" +version = "4.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1458a1df40e1e2afebb7ab60ce55c1fa8f431146205aa5f4887e0b111c27636" +dependencies = [ + "anstyle", "bitflags", "clap_lex", - "indexmap", - "textwrap 0.15.1", ] [[package]] name = "clap_lex" -version = "0.2.4" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5" -dependencies = [ - "os_str_bytes", -] +checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b" [[package]] name = "console" @@ -230,19 +241,19 @@ dependencies = [ [[package]] name = "criterion" -version = "0.4.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7c76e09c1aae2bc52b3d2f29e13c6572553b30c4aa1b8a49fd70de6412654cb" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" dependencies = [ "anes", - "atty", "cast", "ciborium", - "clap 3.2.22", + "clap 4.3.4", "criterion-plot", + "is-terminal", "itertools", - "lazy_static", "num-traits", + "once_cell", "oorandom", "plotters", "rayon", @@ -362,6 +373,27 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "errno" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a" +dependencies = [ + "errno-dragonfly", + "libc", + "windows-sys 0.48.0", +] + +[[package]] +name = "errno-dragonfly" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "fastrand" version = "1.8.0" @@ -437,6 +469,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hermit-abi" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286" + [[package]] name = "http" version = "0.2.8" @@ -515,6 +553,29 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi 0.3.1", + "libc", + "windows-sys 0.48.0", +] + +[[package]] +name = "is-terminal" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f" +dependencies = [ + "hermit-abi 0.3.1", + "io-lifetimes", + "rustix", + "windows-sys 0.48.0", +] + [[package]] name = "itertools" version = "0.10.5" @@ -547,9 +608,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.133" +version = "0.2.146" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0f80d65747a3e43d1596c7c5492d95d5edddaabd45a7fcdb02b95f644164966" +checksum = "f92be4933c13fd498862a9e02a3055f8a8d9c039ce33db97306fd5a6caa7f29b" [[package]] name = "linked-hash-map" @@ -557,6 +618,12 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" +[[package]] +name = "linux-raw-sys" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" + [[package]] name = "lock_api" version = "0.4.9" @@ -635,7 +702,7 @@ version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" dependencies = [ - "hermit-abi", + "hermit-abi 0.1.19", "libc", ] @@ -651,12 +718,6 @@ version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" -[[package]] -name = "os_str_bytes" -version = "6.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ff7415e9ae3fff1225851df9e0d9e4e5479f947619774677a63572e55e80eff" - [[package]] name = "parking_lot" version = "0.12.1" @@ -677,7 +738,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-sys", + "windows-sys 0.36.1", ] [[package]] @@ -874,6 +935,20 @@ dependencies = [ "winapi", ] +[[package]] +name = "rustix" +version = "0.37.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b96e891d04aa506a6d1f318d2771bcb1c7dfda84e126660ace067c9b474bb2c0" +dependencies = [ + "bitflags", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys", + "windows-sys 0.48.0", +] + [[package]] name = "ryu" version = "1.0.11" @@ -1046,12 +1121,6 @@ dependencies = [ "unicode-width", ] -[[package]] -name = "textwrap" -version = "0.15.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "949517c0cf1bf4ee812e2e07e08ab448e3ae0d23472aee8a06c985f0c8815b16" - [[package]] name = "thiserror" version = "1.0.35" @@ -1366,43 +1435,109 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" dependencies = [ - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_msvc", + "windows_aarch64_msvc 0.36.1", + "windows_i686_gnu 0.36.1", + "windows_i686_msvc 0.36.1", + "windows_x86_64_gnu 0.36.1", + "windows_x86_64_msvc 0.36.1", ] +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc 0.48.0", + "windows_i686_gnu 0.48.0", + "windows_i686_msvc 0.48.0", + "windows_x86_64_gnu 0.48.0", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc 0.48.0", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" + [[package]] name = "windows_aarch64_msvc" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" + [[package]] name = "windows_i686_gnu" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" +[[package]] +name = "windows_i686_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" + [[package]] name = "windows_i686_msvc" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" +[[package]] +name = "windows_i686_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" + [[package]] name = "windows_x86_64_gnu" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" + [[package]] name = "windows_x86_64_msvc" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" + [[package]] name = "xml-rs" version = "0.8.4" diff --git a/Cargo.toml b/Cargo.toml index e7c0fed..29df5b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ readme = "README.md" edition = "2018" [package.metadata] -toolchain = "1.64.0" +toolchain = "1.70.0" upx_version = "3.96" [dependencies] @@ -41,7 +41,7 @@ signal = "0.7" [dev-dependencies] tempfile = "3" -criterion = { version = "0.4", features = ["html_reports"] } +criterion = { version = "0.5", features = ["html_reports"] } iai = "0.1" [features] diff --git a/benches/.code.rs b/benches/.code.rs index 07b7683..e290e49 100644 --- a/benches/.code.rs +++ b/benches/.code.rs @@ -18,6 +18,9 @@ mod engine { pub mod common { include!("../src/engine/common.rs"); } + pub mod coms { + include!("../src/engine/coms.rs"); + } mod shared { include!("../src/engine/shared.rs"); } @@ -27,6 +30,12 @@ mod engine { mod socket_thread { include!("../src/engine/socket_thread.rs"); } + mod housekeep_thread { + include!("../src/engine/housekeep_thread.rs"); + } + mod extras_thread { + include!("../src/engine/extras_thread.rs"); + } } mod config { include!("../src/config.rs"); diff --git a/src/engine/common.rs b/src/engine/common.rs index 81c0cae..910ad2d 100644 --- a/src/engine/common.rs +++ b/src/engine/common.rs @@ -1,5 +1,3 @@ -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; use std::thread; use std::{fs::File, hash::BuildHasherDefault}; @@ -8,12 +6,14 @@ use fnv::FnvHasher; use crate::util::CtrlC; use crate::{ config::Config, - crypto::PeerCrypto, device::Device, engine::{ + coms::Coms, device_thread::DeviceThread, + extras_thread::ExtrasThread, + housekeep_thread::HousekeepThread, + shared::SharedConfig, socket_thread::{ReconnectEntry, SocketThread}, - coms::Coms }, error::Error, messages::AddrList, @@ -36,13 +36,15 @@ pub struct PeerData { pub timeout: Time, pub peer_timeout: u16, pub node_id: NodeId, - pub crypto: PeerCrypto, + //pub crypto: PeerCrypto, } pub struct GenericCloud { + config: SharedConfig, socket_thread: SocketThread, device_thread: DeviceThread, - running: Arc, + housekeep_thread: HousekeepThread, + extras_thread: ExtrasThread, } impl GenericCloud { @@ -50,23 +52,18 @@ impl GenericCloud, stats_file: Option, ) -> Result { - let running = Arc::new(AtomicBool::new(true)); - let coms = Coms::::new(config, socket.try_clone().map_err(|e| Error::SocketIo("Failed to clone socket", e))?); - let device_thread = DeviceThread::::new( - device.duplicate()?, - coms.try_clone()?, - running.clone(), - ); - let mut socket_thread = SocketThread::::new( - config.clone(), - coms, - device, - port_forwarding, - stats_file, - running.clone(), + let config = SharedConfig::new(config.clone()); + let coms = Coms::::new( + config.get_config(), + socket.try_clone().map_err(|e| Error::SocketIo("Failed to clone socket", e))?, ); + let device_thread = DeviceThread::::new(config.clone(), device.duplicate()?, coms.try_clone()?); + let housekeep_thread = HousekeepThread::::new(config.clone(), coms.try_clone()?); + let extras_thread = ExtrasThread::::new(config.clone(), coms.try_clone()?); + let mut socket_thread = + SocketThread::::new(config.clone(), coms, device, port_forwarding, stats_file); socket_thread.housekeep()?; - Ok(Self { socket_thread, device_thread, running }) + Ok(Self { socket_thread, device_thread, config, housekeep_thread, extras_thread }) } pub fn add_peer(&mut self, addr: String) -> Result<(), Error> { @@ -84,17 +81,23 @@ impl GenericCloud GenericCloud { } pub fn is_connected(&self, addr: &SocketAddr) -> bool { - self.socket_thread.peers.contains_key(addr) + self.socket_thread.coms.has_peer(addr) } pub fn own_addresses(&self) -> &[SocketAddr] { diff --git a/src/engine/coms.rs b/src/engine/coms.rs index 2ca7506..5fa5bec 100644 --- a/src/engine/coms.rs +++ b/src/engine/coms.rs @@ -1,23 +1,26 @@ -use std::{net::SocketAddr, marker::PhantomData, io}; +use std::{collections::HashMap, io, marker::PhantomData, net::SocketAddr, ops::DerefMut}; use crate::{ config::Config, + crypto::PeerCrypto, error::Error, messages::MESSAGE_TYPE_DATA, - net::{Socket, mapped_addr}, - util::{MsgBuffer, TimeSource}, payload::Protocol, + net::{mapped_addr, Socket}, + payload::Protocol, + util::{MsgBuffer, TimeSource, addr_nice}, }; use super::{ - common::{SPACE_BEFORE, PeerData}, - shared::{SharedPeerCrypto, SharedTable, SharedTraffic}, + common::{Hash, PeerData, SPACE_BEFORE}, + shared::{SharedCrypto, SharedPeers, SharedTable, SharedTraffic}, }; pub struct Coms { _dummy_p: PhantomData

, broadcast: bool, broadcast_buffer: MsgBuffer, - peer_crypto: SharedPeerCrypto, + crypto: SharedCrypto, + peers: SharedPeers, pub table: SharedTable, pub traffic: SharedTraffic, pub socket: S, @@ -30,7 +33,8 @@ impl Coms { broadcast: config.is_broadcasting(), broadcast_buffer: MsgBuffer::new(SPACE_BEFORE), traffic: SharedTraffic::new(), - peer_crypto: SharedPeerCrypto::new(), + crypto: SharedCrypto::new(), + peers: SharedPeers::new(), table: SharedTable::::new(config), socket, } @@ -42,14 +46,15 @@ impl Coms { broadcast: self.broadcast, broadcast_buffer: MsgBuffer::new(SPACE_BEFORE), traffic: self.traffic.clone(), - peer_crypto: self.peer_crypto.clone(), + crypto: self.crypto.clone(), table: self.table.clone(), + peers: self.peers.clone(), socket: self.socket.try_clone().map_err(|e| Error::SocketIo("Failed to clone socket", e))?, }) } pub fn sync(&mut self) -> Result<(), Error> { - self.peer_crypto.load(); + self.crypto.load(); self.table.sync(); self.traffic.sync(); Ok(()) @@ -67,6 +72,7 @@ impl Coms { } } + //TODO: move back to socket thread pub fn receive(&mut self, buffer: &mut MsgBuffer) -> Result { self.socket.receive(buffer) } @@ -87,17 +93,17 @@ impl Coms { pub fn send_msg(&mut self, addr: SocketAddr, type_: u8, data: &mut MsgBuffer) -> Result<(), Error> { debug!("Sending msg with {} bytes to {}", data.len(), addr); data.prepend_byte(type_); - self.peer_crypto.encrypt_for(addr, data)?; + self.crypto.encrypt_for(addr, data)?; self.send_to(addr, data) } #[inline] pub fn broadcast_msg(&mut self, type_: u8, data: &mut MsgBuffer) -> Result<(), Error> { let size = data.len(); - debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, size, self.peer_crypto.count()); + debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, size, self.crypto.count()); let traffic = &mut self.traffic; let socket = &mut self.socket; - let peers = self.peer_crypto.get_snapshot(); + let peers = self.crypto.get_snapshot(); for (addr, crypto) in peers { self.broadcast_buffer.set_start(data.get_start()); self.broadcast_buffer.set_length(data.len()); @@ -140,13 +146,27 @@ impl Coms { Ok(()) } - pub fn add_peer(&mut self, addr: SocketAddr, peer: &PeerData) { - self.peer_crypto.add(addr, peer.crypto.get_core()); + pub fn get_peers<'a>(&'a self) -> impl DerefMut> + 'a { + self.peers.get_peers() } - pub fn remove_peer(&mut self, addr: &SocketAddr) { - self.table.remove_claims(*addr); - self.peer_crypto.remove(addr); + pub fn add_peer(&mut self, addr: SocketAddr, peer_data: PeerData, peer_crypto: &PeerCrypto) { + self.crypto.add(addr, peer_crypto.get_core()); + self.peers.get_peers().insert(addr, peer_data); } + pub fn remove_peer(&mut self, addr: &SocketAddr) -> bool { + if let Some(_peer) = self.peers.get_peers().remove(addr) { + info!("Closing connection to {}", addr_nice(*addr)); + self.table.remove_claims(*addr); + self.crypto.remove(addr); + true + } else { + false + } + } + + pub fn has_peer(&self, addr: &SocketAddr) -> bool { + self.crypto.contains(addr) + } } diff --git a/src/engine/device_thread.rs b/src/engine/device_thread.rs index bb85fd3..c2a09f3 100644 --- a/src/engine/device_thread.rs +++ b/src/engine/device_thread.rs @@ -1,4 +1,4 @@ -use super::{common::SPACE_BEFORE, coms::Coms}; +use super::{common::SPACE_BEFORE, coms::Coms, shared::SharedConfig}; use crate::{ device::Device, error::Error, @@ -6,26 +6,23 @@ use crate::{ util::{MsgBuffer, Time, TimeSource}, Protocol, }; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; pub struct DeviceThread { // Device-only fields + config: SharedConfig, coms: Coms, pub device: D, next_housekeep: Time, buffer: MsgBuffer, - // Shared fields - running: Arc, } impl DeviceThread { - pub fn new(device: D, coms: Coms, running: Arc) -> Self { + pub fn new(config: SharedConfig, device: D, coms: Coms) -> Self { Self { + config, device, next_housekeep: TS::now(), buffer: MsgBuffer::new(SPACE_BEFORE), - running, coms, } } @@ -47,7 +44,7 @@ impl DeviceThread { + config: SharedConfig, + coms: Coms, + next_housekeep: Time, + buffer: MsgBuffer, +} + +impl ExtrasThread { + pub fn new(config: SharedConfig, coms: Coms) -> Self { + Self { + config, + coms, + next_housekeep: TS::now(), + buffer: MsgBuffer::new(SPACE_BEFORE), + } + } + + pub fn housekeep(&mut self) -> Result<(), Error> { + let now = TS::now(); + assert!(self.buffer.is_empty()); + Ok(()) + } + + + pub fn iteration(&mut self) -> bool { + std::thread::sleep(Duration::from_millis(100)); + let now = TS::now(); + if self.next_housekeep < now { + if let Err(e) = self.housekeep() { + error!("{}", e) + } + self.next_housekeep = now + 1; + if !self.config.is_running() { + debug!("Extras: end"); + return false; + } + } + true + } + + pub fn run(mut self) { + while self.iteration() {} + } +} diff --git a/src/engine/housekeep_thread.rs b/src/engine/housekeep_thread.rs new file mode 100644 index 0000000..3572122 --- /dev/null +++ b/src/engine/housekeep_thread.rs @@ -0,0 +1,67 @@ +use std::{time::Duration, net::SocketAddr, cmp::{min, max}}; + +use smallvec::SmallVec; + +use super::{common::SPACE_BEFORE, coms::Coms, shared::SharedConfig}; +use crate::{ + error::Error, + net::Socket, + util::{MsgBuffer, TimeSource, Time, addr_nice}, + Protocol, messages::MESSAGE_TYPE_NODE_INFO, config::DEFAULT_PEER_TIMEOUT, +}; + +const MAX_RECONNECT_INTERVAL: u16 = 3600; +const RESOLVE_INTERVAL: Time = 300; +const OWN_ADDRESS_RESET_INTERVAL: Time = 300; +pub const STATS_INTERVAL: Time = 60; + +pub struct HousekeepThread { + config: SharedConfig, + coms: Coms, + next_housekeep: Time, + buffer: MsgBuffer, + update_freq: u16, + next_peers: Time, + next_own_address_reset: Time, +} + +impl HousekeepThread { + pub fn new(config: SharedConfig, coms: Coms) -> Self { + let update_freq = config.get_config().get_keepalive() as u16; + Self { + config, + coms, + next_housekeep: TS::now(), + buffer: MsgBuffer::new(SPACE_BEFORE), + update_freq, + next_peers: TS::now(), + next_own_address_reset: TS::now(), + } + } + + pub fn housekeep(&mut self) -> Result<(), Error> { + let now = TS::now(); + assert!(self.buffer.is_empty()); + Ok(()) + } + + pub fn iteration(&mut self) -> bool { + std::thread::sleep(Duration::from_millis(100)); + let now = TS::now(); + if self.next_housekeep < now { + if let Err(e) = self.housekeep() { + error!("{}", e) + } + self.next_housekeep = now + 1; + if !self.config.is_running() { + debug!("Housekeep: end"); + return false; + } + } + true + } + + pub fn run(mut self) { + while self.iteration() {} + } +} diff --git a/src/engine/mod.rs b/src/engine/mod.rs index 64c7d57..2250ec3 100644 --- a/src/engine/mod.rs +++ b/src/engine/mod.rs @@ -2,8 +2,10 @@ // Copyright (C) 2015-2021 Dennis Schwerdel // This software is licensed under GPL-3 or newer (see LICENSE.md) -mod device_thread; mod shared; +mod device_thread; mod socket_thread; +mod housekeep_thread; +mod extras_thread; pub mod coms; pub mod common; \ No newline at end of file diff --git a/src/engine/shared.rs b/src/engine/shared.rs index 1b05d0c..b51de0c 100644 --- a/src/engine/shared.rs +++ b/src/engine/shared.rs @@ -1,6 +1,6 @@ use crate::{ config::Config, - crypto::CryptoCore, + crypto::{CryptoCore, PeerCrypto}, engine::common::Hash, error::Error, table::ClaimTable, @@ -13,34 +13,40 @@ use std::{ collections::HashMap, io::{self, Write}, net::SocketAddr, - sync::Arc, + ops::DerefMut, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, }; use super::common::PeerData; #[derive(Clone)] #[allow(clippy::type_complexity)] -pub struct SharedPeerCrypto { - peers: Arc>, Hash>>>, +pub struct SharedCrypto { + peer_crypto: Arc>, Hash>>>, cache: HashMap>, Hash>, //TODO: local hashmap as cache } -impl SharedPeerCrypto { +impl SharedCrypto { pub fn new() -> Self { - SharedPeerCrypto { peers: Arc::new(Mutex::new(HashMap::default())), cache: HashMap::default() } + SharedCrypto { peer_crypto: Arc::new(Mutex::new(HashMap::default())), cache: HashMap::default() } } pub fn encrypt_for(&mut self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> { - let crypto = match self.cache.get(&peer) { - Some(crypto) => crypto, - None => { - let peers = self.peers.lock(); - if let Some(crypto) = peers.get(&peer) { - self.cache.insert(peer, crypto.clone()); - self.cache.get(&peer).unwrap() - } else { - return Err(Error::InvalidCryptoState("No crypto found for peer")); - } + let cache = &mut self.cache; + let owned_crypto; + let crypto = if let Some(crypto) = cache.get(&peer) { + crypto + } else { + let peers = self.peer_crypto.lock(); + if let Some(crypto) = peers.get(&peer) { + cache.insert(peer, crypto.clone()); + owned_crypto = crypto.clone(); + &owned_crypto + } else { + return Err(Error::InvalidCryptoState("No crypto found for peer")); } }; if let Some(crypto) = crypto { @@ -51,26 +57,26 @@ impl SharedPeerCrypto { pub fn add(&mut self, addr: SocketAddr, crypto: Option>) { self.cache.insert(addr, crypto.clone()); - let mut peers = self.peers.lock(); + let mut peers = self.peer_crypto.lock(); peers.insert(addr, crypto); } pub fn remove(&mut self, addr: &SocketAddr) { self.cache.remove(addr); - let mut peers = self.peers.lock(); + let mut peers = self.peer_crypto.lock(); peers.remove(addr); } - pub fn store(&mut self, data: &HashMap) { + pub fn store(&mut self, data: &HashMap) { self.cache.clear(); - self.cache.extend(data.iter().map(|(k, v)| (*k, v.crypto.get_core()))); - let mut peers = self.peers.lock(); + self.cache.extend(data.iter().map(|(k, v)| (*k, v.get_core()))); + let mut peers = self.peer_crypto.lock(); peers.clear(); peers.extend(self.cache.iter().map(|(k, v)| (*k, v.clone()))); } pub fn load(&mut self) { - let peers = self.peers.lock(); + let peers = self.peer_crypto.lock(); self.cache.clear(); self.cache.extend(peers.iter().map(|(k, v)| (*k, v.clone()))); } @@ -79,6 +85,10 @@ impl SharedPeerCrypto { &self.cache } + pub fn contains(&self, addr: &SocketAddr) -> bool { + self.cache.contains_key(addr) + } + pub fn count(&self) -> usize { self.cache.len() } @@ -216,3 +226,42 @@ impl SharedTable { self.table.lock().claim_len() } } + +#[derive(Clone)] +pub struct SharedConfig { + config: Config, + running: Arc, +} + +impl SharedConfig { + pub fn new(config: Config) -> Self { + Self { config, running: Arc::new(AtomicBool::new(true)) } + } + + pub fn get_config(&self) -> &Config { + &self.config + } + + pub fn is_running(&self) -> bool { + self.running.load(Ordering::Relaxed) + } + + pub fn stop(&self) { + self.running.store(false, Ordering::Relaxed) + } +} + +#[derive(Clone)] +pub struct SharedPeers { + peers: Arc>>, +} + +impl SharedPeers { + pub fn new() -> Self { + Self { peers: Default::default() } + } + + pub fn get_peers<'a>(&'a self) -> impl DerefMut> + 'a { + self.peers.lock() + } +} diff --git a/src/engine/socket_thread.rs b/src/engine/socket_thread.rs index 90b280b..904b13a 100644 --- a/src/engine/socket_thread.rs +++ b/src/engine/socket_thread.rs @@ -1,9 +1,9 @@ -use super::{common::SPACE_BEFORE, coms::Coms}; +use super::{common::SPACE_BEFORE, coms::Coms, shared::SharedConfig}; use crate::{ beacon::BeaconSerializer, config::{DEFAULT_PEER_TIMEOUT, DEFAULT_PORT}, - crypto::{is_init_message, Crypto, InitResult, InitState, MessageResult}, + crypto::{is_init_message, Crypto, InitResult, InitState, MessageResult, PeerCrypto}, device::{Device, Type}, engine::common::{Hash, PeerData}, error::Error, @@ -15,12 +15,10 @@ use crate::{ port_forwarding::PortForwarding, types::{Address, NodeId, Range, RangeList}, util::{addr_nice, resolve, MsgBuffer, StatsdMsg, Time, TimeSource}, - Config, Protocol, + Protocol, }; use rand::{random, seq::SliceRandom, thread_rng}; use smallvec::{smallvec, SmallVec}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; use std::{ cmp::{max, min}, collections::HashMap, @@ -30,7 +28,7 @@ use std::{ io::{Cursor, Seek, SeekFrom, Write}, marker::PhantomData, net::{SocketAddr, ToSocketAddrs}, - str::FromStr, + str::FromStr, ops::Deref, }; const MAX_RECONNECT_INTERVAL: u16 = 3600; @@ -52,32 +50,32 @@ pub struct SocketThread { // Read-only fields node_id: NodeId, claims: RangeList, - config: Config, peer_timeout_publish: u16, learning: bool, update_freq: u16, _dummy_ts: PhantomData, _dummy_p: PhantomData

, // Socket-only fields + config: SharedConfig, pub coms: Coms, device: D, next_housekeep: Time, pub own_addresses: AddrList, next_own_address_reset: Time, pending_inits: HashMap, Hash>, - crypto: Crypto, - pub peers: HashMap, - next_peers: Time, - next_stats_out: Time, - next_beacon: Time, - beacon_serializer: BeaconSerializer, - stats_file: Option, - statsd_server: Option, - pub reconnect_peers: SmallVec<[ReconnectEntry; 3]>, + crypto: Crypto, // TODO 2nd: move to config + //pub peers: HashMap, // TODO 1st: move to shared peers + peer_crypto: HashMap, + next_peers: Time, // TODO: split off + next_stats_out: Time, // TODO: split off + next_beacon: Time, // TODO: split off + beacon_serializer: BeaconSerializer, // TODO: split off + stats_file: Option, // TODO: split off + statsd_server: Option, // TODO: split off + pub reconnect_peers: SmallVec<[ReconnectEntry; 3]>, // TODO: move to shared config buffer: MsgBuffer, // Shared fields //table: SharedTable, - running: Arc, // Should not be here port_forwarding: Option, // TODO: 3rd thread } @@ -85,14 +83,14 @@ pub struct SocketThread { impl SocketThread { #[allow(clippy::too_many_arguments)] pub fn new( - config: Config, coms: Coms, device: D, - port_forwarding: Option, stats_file: Option, running: Arc, + config: SharedConfig, coms: Coms, device: D, port_forwarding: Option, + stats_file: Option, ) -> Self { - let mut claims = SmallVec::with_capacity(config.claims.len()); - for s in &config.claims { + let mut claims = SmallVec::with_capacity(config.get_config().claims.len()); + for s in &config.get_config().claims { claims.push(try_fail!(Range::from_str(s), "Invalid subnet format: {} ({})", s)); } - if device.get_type() == Type::Tun && config.auto_claim { + if device.get_type() == Type::Tun && config.get_config().auto_claim { match device.get_ip() { Ok(ip) => { let range = Range { base: Address::from_ipv4(ip), prefix_len: 32 }; @@ -106,9 +104,9 @@ impl SocketThread SocketThread SocketThread Result<(), Error> { let addr = mapped_addr(addr); - if self.peers.contains_key(&addr) + if self.coms.has_peer(&addr) || self.own_addresses.contains(&addr) || self.pending_inits.contains_key(&addr) { @@ -159,7 +156,7 @@ impl SocketThread>(); for addr in &addrs { if self.own_addresses.contains(addr) - || self.peers.contains_key(addr) + || self.coms.has_peer(addr) || self.pending_inits.contains_key(addr) { return Ok(()); @@ -175,7 +172,7 @@ impl SocketThread NodeInfo { let mut peers = smallvec![]; - for peer in self.peers.values() { + for peer in self.coms.get_peers().values() { peers.push(PeerInfo { node_id: Some(peer.node_id), addrs: peer.addrs.clone() }) } if peers.len() > 20 { @@ -193,9 +190,9 @@ impl SocketThread) -> Result<(), Error> { - if let Some(peer) = self.peers.get_mut(&addr) { + if let Some(peer) = self.coms.get_peers().get_mut(&addr) { peer.last_seen = TS::now(); - peer.timeout = TS::now() + self.config.peer_timeout as Time; + peer.timeout = TS::now() + self.config.get_config().peer_timeout as Time; if let Some(info) = &info { // Update peer addresses, always add seen address peer.addrs.clear(); @@ -226,14 +223,13 @@ impl SocketThread SocketThread Result<(), Error> { 'outer: for peer in peers { for addr in &peer.addrs { - if self.peers.contains_key(addr) { + if self.own_addresses.contains(addr) { // Check addresses and add addresses that we don't know to own addresses for addr in &peer.addrs { if !self.own_addresses.contains(addr) { @@ -261,7 +257,7 @@ impl SocketThread SocketThread Result<(), Error> { @@ -339,7 +333,7 @@ impl SocketThread SocketThread Result<(), Error> { let now = TS::now(); let mut del: SmallVec<[SocketAddr; 3]> = SmallVec::new(); - for (&addr, data) in &self.peers { + for (&addr, data) in self.coms.get_peers().deref() { if data.timeout < now { del.push(addr); } } for addr in del { info!("Forgot peer {} due to timeout", addr_nice(addr)); - self.peers.remove(&addr); self.coms.remove_peer(&addr); self.connect_sock(addr)?; // Try to reconnect } @@ -409,7 +402,7 @@ impl SocketThread SocketThread SocketThread>() { + for (addr, crypto) in self.peer_crypto.iter_mut() { self.buffer.clear(); - self.peers.get_mut(&addr).unwrap().crypto.every_second(&mut self.buffer); + crypto.every_second(&mut self.buffer); if !self.buffer.is_empty() { - self.coms.send_to(addr, &mut self.buffer)? + self.coms.send_to(*addr, &mut self.buffer)? } } for addr in del { self.pending_inits.remove(&addr); - if self.peers.remove(&addr).is_some() { + self.peer_crypto.remove(&addr); + if self.coms.remove_peer(&addr) { self.connect_sock(addr)?; } } @@ -474,7 +468,7 @@ impl SocketThread SocketThread Result<(), Error> { - if let Some(ref path) = self.config.beacon_store { + if let Some(ref path) = self.config.get_config().beacon_store { let peers: SmallVec<[SocketAddr; 3]> = self.own_addresses.choose_multiple(&mut thread_rng(), 3).cloned().collect(); if let Some(path) = path.strip_prefix('|') { @@ -510,7 +504,7 @@ impl SocketThread Result<(), Error> { let peers; - if let Some(ref path) = self.config.beacon_load { + if let Some(ref path) = self.config.get_config().beacon_load { if let Some(path) = path.strip_prefix('|') { self.beacon_serializer .read_from_cmd(path, Some(50)) @@ -540,13 +534,13 @@ impl SocketThread SocketThread SocketThread SocketThread