Compare commits

...

2 Commits

Author SHA1 Message Date
Dennis Schwerdel aa949cdcec Update code 2023-10-07 00:02:13 +02:00
Dennis Schwerdel 2fcb7646c7 Coms 2022-12-01 18:42:57 +01:00
12 changed files with 705 additions and 361 deletions

View File

@ -11,7 +11,7 @@ RUN chown vscode: -R /usr/local/rustup /usr/local/cargo
USER vscode USER vscode
RUN rustup default 1.64.0 \ RUN rustup default 1.70.0 \
&& rustup component add clippy rust-src rustfmt && rustup component add clippy rust-src rustfmt
RUN cargo install cargo-outdated cargo-cache cargo-criterion \ RUN cargo install cargo-outdated cargo-cache cargo-criterion \

209
Cargo.lock generated
View File

@ -26,6 +26,12 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "anstyle"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41ed9a86bf92ae6580e0a31281f65a1b1d867c0cc68d5346e2ae128dddfa6a7d"
[[package]] [[package]]
name = "attohttpc" name = "attohttpc"
version = "0.16.3" version = "0.16.3"
@ -44,7 +50,7 @@ version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [ dependencies = [
"hermit-abi", "hermit-abi 0.1.19",
"libc", "libc",
"winapi", "winapi",
] ]
@ -173,31 +179,36 @@ dependencies = [
"atty", "atty",
"bitflags", "bitflags",
"strsim", "strsim",
"textwrap 0.11.0", "textwrap",
"unicode-width", "unicode-width",
"vec_map", "vec_map",
] ]
[[package]] [[package]]
name = "clap" name = "clap"
version = "3.2.22" version = "4.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86447ad904c7fb335a790c9d7fe3d0d971dc523b8ccd1561a520de9a85302750" checksum = "80672091db20273a15cf9fdd4e47ed43b5091ec9841bf4c6145c9dfbbcae09ed"
dependencies = [ dependencies = [
"clap_builder",
]
[[package]]
name = "clap_builder"
version = "4.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1458a1df40e1e2afebb7ab60ce55c1fa8f431146205aa5f4887e0b111c27636"
dependencies = [
"anstyle",
"bitflags", "bitflags",
"clap_lex", "clap_lex",
"indexmap",
"textwrap 0.15.1",
] ]
[[package]] [[package]]
name = "clap_lex" name = "clap_lex"
version = "0.2.4" version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5" checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b"
dependencies = [
"os_str_bytes",
]
[[package]] [[package]]
name = "console" name = "console"
@ -230,19 +241,19 @@ dependencies = [
[[package]] [[package]]
name = "criterion" name = "criterion"
version = "0.4.0" version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7c76e09c1aae2bc52b3d2f29e13c6572553b30c4aa1b8a49fd70de6412654cb" checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f"
dependencies = [ dependencies = [
"anes", "anes",
"atty",
"cast", "cast",
"ciborium", "ciborium",
"clap 3.2.22", "clap 4.3.4",
"criterion-plot", "criterion-plot",
"is-terminal",
"itertools", "itertools",
"lazy_static",
"num-traits", "num-traits",
"once_cell",
"oorandom", "oorandom",
"plotters", "plotters",
"rayon", "rayon",
@ -362,6 +373,27 @@ version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"
[[package]]
name = "errno"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a"
dependencies = [
"errno-dragonfly",
"libc",
"windows-sys 0.48.0",
]
[[package]]
name = "errno-dragonfly"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf"
dependencies = [
"cc",
"libc",
]
[[package]] [[package]]
name = "fastrand" name = "fastrand"
version = "1.8.0" version = "1.8.0"
@ -437,6 +469,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "hermit-abi"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286"
[[package]] [[package]]
name = "http" name = "http"
version = "0.2.8" version = "0.2.8"
@ -515,6 +553,29 @@ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
] ]
[[package]]
name = "io-lifetimes"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
dependencies = [
"hermit-abi 0.3.1",
"libc",
"windows-sys 0.48.0",
]
[[package]]
name = "is-terminal"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f"
dependencies = [
"hermit-abi 0.3.1",
"io-lifetimes",
"rustix",
"windows-sys 0.48.0",
]
[[package]] [[package]]
name = "itertools" name = "itertools"
version = "0.10.5" version = "0.10.5"
@ -547,9 +608,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.133" version = "0.2.146"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0f80d65747a3e43d1596c7c5492d95d5edddaabd45a7fcdb02b95f644164966" checksum = "f92be4933c13fd498862a9e02a3055f8a8d9c039ce33db97306fd5a6caa7f29b"
[[package]] [[package]]
name = "linked-hash-map" name = "linked-hash-map"
@ -557,6 +618,12 @@ version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"
[[package]]
name = "linux-raw-sys"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
[[package]] [[package]]
name = "lock_api" name = "lock_api"
version = "0.4.9" version = "0.4.9"
@ -635,7 +702,7 @@ version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1"
dependencies = [ dependencies = [
"hermit-abi", "hermit-abi 0.1.19",
"libc", "libc",
] ]
@ -651,12 +718,6 @@ version = "11.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "os_str_bytes"
version = "6.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ff7415e9ae3fff1225851df9e0d9e4e5479f947619774677a63572e55e80eff"
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.12.1" version = "0.12.1"
@ -677,7 +738,7 @@ dependencies = [
"libc", "libc",
"redox_syscall", "redox_syscall",
"smallvec", "smallvec",
"windows-sys", "windows-sys 0.36.1",
] ]
[[package]] [[package]]
@ -874,6 +935,20 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "rustix"
version = "0.37.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b96e891d04aa506a6d1f318d2771bcb1c7dfda84e126660ace067c9b474bb2c0"
dependencies = [
"bitflags",
"errno",
"io-lifetimes",
"libc",
"linux-raw-sys",
"windows-sys 0.48.0",
]
[[package]] [[package]]
name = "ryu" name = "ryu"
version = "1.0.11" version = "1.0.11"
@ -1046,12 +1121,6 @@ dependencies = [
"unicode-width", "unicode-width",
] ]
[[package]]
name = "textwrap"
version = "0.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "949517c0cf1bf4ee812e2e07e08ab448e3ae0d23472aee8a06c985f0c8815b16"
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "1.0.35" version = "1.0.35"
@ -1366,43 +1435,109 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2"
dependencies = [ dependencies = [
"windows_aarch64_msvc", "windows_aarch64_msvc 0.36.1",
"windows_i686_gnu", "windows_i686_gnu 0.36.1",
"windows_i686_msvc", "windows_i686_msvc 0.36.1",
"windows_x86_64_gnu", "windows_x86_64_gnu 0.36.1",
"windows_x86_64_msvc", "windows_x86_64_msvc 0.36.1",
] ]
[[package]]
name = "windows-sys"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc 0.48.0",
"windows_i686_gnu 0.48.0",
"windows_i686_msvc 0.48.0",
"windows_x86_64_gnu 0.48.0",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc 0.48.0",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc"
[[package]] [[package]]
name = "windows_aarch64_msvc" name = "windows_aarch64_msvc"
version = "0.36.1" version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47"
[[package]]
name = "windows_aarch64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3"
[[package]] [[package]]
name = "windows_i686_gnu" name = "windows_i686_gnu"
version = "0.36.1" version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6"
[[package]]
name = "windows_i686_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241"
[[package]] [[package]]
name = "windows_i686_msvc" name = "windows_i686_msvc"
version = "0.36.1" version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024"
[[package]]
name = "windows_i686_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00"
[[package]] [[package]]
name = "windows_x86_64_gnu" name = "windows_x86_64_gnu"
version = "0.36.1" version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1"
[[package]]
name = "windows_x86_64_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953"
[[package]] [[package]]
name = "windows_x86_64_msvc" name = "windows_x86_64_msvc"
version = "0.36.1" version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
[[package]]
name = "windows_x86_64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
[[package]] [[package]]
name = "xml-rs" name = "xml-rs"
version = "0.8.4" version = "0.8.4"

View File

@ -12,7 +12,7 @@ readme = "README.md"
edition = "2018" edition = "2018"
[package.metadata] [package.metadata]
toolchain = "1.64.0" toolchain = "1.70.0"
upx_version = "3.96" upx_version = "3.96"
[dependencies] [dependencies]
@ -41,7 +41,7 @@ signal = "0.7"
[dev-dependencies] [dev-dependencies]
tempfile = "3" tempfile = "3"
criterion = { version = "0.4", features = ["html_reports"] } criterion = { version = "0.5", features = ["html_reports"] }
iai = "0.1" iai = "0.1"
[features] [features]

View File

@ -18,6 +18,9 @@ mod engine {
pub mod common { pub mod common {
include!("../src/engine/common.rs"); include!("../src/engine/common.rs");
} }
pub mod coms {
include!("../src/engine/coms.rs");
}
mod shared { mod shared {
include!("../src/engine/shared.rs"); include!("../src/engine/shared.rs");
} }
@ -27,6 +30,12 @@ mod engine {
mod socket_thread { mod socket_thread {
include!("../src/engine/socket_thread.rs"); include!("../src/engine/socket_thread.rs");
} }
mod housekeep_thread {
include!("../src/engine/housekeep_thread.rs");
}
mod extras_thread {
include!("../src/engine/extras_thread.rs");
}
} }
mod config { mod config {
include!("../src/config.rs"); include!("../src/config.rs");

View File

@ -1,5 +1,3 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread; use std::thread;
use std::{fs::File, hash::BuildHasherDefault}; use std::{fs::File, hash::BuildHasherDefault};
@ -8,11 +6,13 @@ use fnv::FnvHasher;
use crate::util::CtrlC; use crate::util::CtrlC;
use crate::{ use crate::{
config::Config, config::Config,
crypto::PeerCrypto,
device::Device, device::Device,
engine::{ engine::{
coms::Coms,
device_thread::DeviceThread, device_thread::DeviceThread,
shared::{SharedPeerCrypto, SharedTable, SharedTraffic}, extras_thread::ExtrasThread,
housekeep_thread::HousekeepThread,
shared::SharedConfig,
socket_thread::{ReconnectEntry, SocketThread}, socket_thread::{ReconnectEntry, SocketThread},
}, },
error::Error, error::Error,
@ -36,13 +36,15 @@ pub struct PeerData {
pub timeout: Time, pub timeout: Time,
pub peer_timeout: u16, pub peer_timeout: u16,
pub node_id: NodeId, pub node_id: NodeId,
pub crypto: PeerCrypto, //pub crypto: PeerCrypto,
} }
pub struct GenericCloud<D: Device, P: Protocol, S: Socket, TS: TimeSource> { pub struct GenericCloud<D: Device, P: Protocol, S: Socket, TS: TimeSource> {
config: SharedConfig,
socket_thread: SocketThread<S, D, P, TS>, socket_thread: SocketThread<S, D, P, TS>,
device_thread: DeviceThread<S, D, P, TS>, device_thread: DeviceThread<S, D, P, TS>,
running: Arc<AtomicBool>, housekeep_thread: HousekeepThread<S, P, TS>,
extras_thread: ExtrasThread<S, P, TS>,
} }
impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS> { impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS> {
@ -50,32 +52,18 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
pub fn new( pub fn new(
config: &Config, socket: S, device: D, port_forwarding: Option<PortForwarding>, stats_file: Option<File>, config: &Config, socket: S, device: D, port_forwarding: Option<PortForwarding>, stats_file: Option<File>,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let table = SharedTable::<TS>::new(config); let config = SharedConfig::new(config.clone());
let traffic = SharedTraffic::new(); let coms = Coms::<S, TS, P>::new(
let peer_crypto = SharedPeerCrypto::new(); config.get_config(),
let running = Arc::new(AtomicBool::new(true));
let device_thread = DeviceThread::<S, D, P, TS>::new(
config.clone(),
device.duplicate()?,
socket.try_clone().map_err(|e| Error::SocketIo("Failed to clone socket", e))?, socket.try_clone().map_err(|e| Error::SocketIo("Failed to clone socket", e))?,
traffic.clone(),
peer_crypto.clone(),
table.clone(),
running.clone(),
);
let mut socket_thread = SocketThread::<S, D, P, TS>::new(
config.clone(),
device,
socket,
traffic,
peer_crypto,
table,
port_forwarding,
stats_file,
running.clone(),
); );
let device_thread = DeviceThread::<S, D, P, TS>::new(config.clone(), device.duplicate()?, coms.try_clone()?);
let housekeep_thread = HousekeepThread::<S, P, TS>::new(config.clone(), coms.try_clone()?);
let extras_thread = ExtrasThread::<S, P, TS>::new(config.clone(), coms.try_clone()?);
let mut socket_thread =
SocketThread::<S, D, P, TS>::new(config.clone(), coms, device, port_forwarding, stats_file);
socket_thread.housekeep()?; socket_thread.housekeep()?;
Ok(Self { socket_thread, device_thread, running }) Ok(Self { socket_thread, device_thread, config, housekeep_thread, extras_thread })
} }
pub fn add_peer(&mut self, addr: String) -> Result<(), Error> { pub fn add_peer(&mut self, addr: String) -> Result<(), Error> {
@ -93,17 +81,23 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
pub fn run(self) { pub fn run(self) {
debug!("Starting threads"); debug!("Starting threads");
let running = self.running.clone(); let config = self.config;
let device = self.device_thread; let device = self.device_thread;
let device_thread_handle = thread::spawn(move || device.run()); let device_thread_handle = thread::spawn(move || device.run());
let socket = self.socket_thread; let socket = self.socket_thread;
let socket_thread_handle = thread::spawn(move || socket.run()); let socket_thread_handle = thread::spawn(move || socket.run());
let housekeep = self.housekeep_thread;
let housekeep_thread_handle = thread::spawn(move || housekeep.run());
let extras = self.extras_thread;
let extras_thread_handle = thread::spawn(move || extras.run());
let ctrlc = CtrlC::new(); let ctrlc = CtrlC::new();
ctrlc.wait(); ctrlc.wait();
running.store(false, Ordering::SeqCst); config.stop();
debug!("Waiting for threads to end"); debug!("Waiting for threads to end");
device_thread_handle.join().unwrap(); device_thread_handle.join().unwrap();
socket_thread_handle.join().unwrap(); socket_thread_handle.join().unwrap();
housekeep_thread_handle.join().unwrap();
extras_thread_handle.join().unwrap();
debug!("Threads stopped"); debug!("Threads stopped");
} }
} }
@ -120,7 +114,7 @@ use std::net::SocketAddr;
#[cfg(test)] #[cfg(test)]
impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> { impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
pub fn socket(&mut self) -> &mut MockSocket { pub fn socket(&mut self) -> &mut MockSocket {
&mut self.socket_thread.socket &mut self.socket_thread.coms.socket
} }
pub fn device(&mut self) -> &mut MockDevice { pub fn device(&mut self) -> &mut MockDevice {
@ -145,7 +139,7 @@ impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
} }
pub fn is_connected(&self, addr: &SocketAddr) -> bool { pub fn is_connected(&self, addr: &SocketAddr) -> bool {
self.socket_thread.peers.contains_key(addr) self.socket_thread.coms.has_peer(addr)
} }
pub fn own_addresses(&self) -> &[SocketAddr] { pub fn own_addresses(&self) -> &[SocketAddr] {
@ -153,6 +147,6 @@ impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
} }
pub fn get_num(&self) -> usize { pub fn get_num(&self) -> usize {
self.socket_thread.socket.address().unwrap().port() as usize self.socket_thread.coms.socket.address().unwrap().port() as usize
} }
} }

172
src/engine/coms.rs Normal file
View File

@ -0,0 +1,172 @@
use std::{collections::HashMap, io, marker::PhantomData, net::SocketAddr, ops::DerefMut};
use crate::{
config::Config,
crypto::PeerCrypto,
error::Error,
messages::MESSAGE_TYPE_DATA,
net::{mapped_addr, Socket},
payload::Protocol,
util::{MsgBuffer, TimeSource, addr_nice},
};
use super::{
common::{Hash, PeerData, SPACE_BEFORE},
shared::{SharedCrypto, SharedPeers, SharedTable, SharedTraffic},
};
pub struct Coms<S: Socket, TS: TimeSource, P: Protocol> {
_dummy_p: PhantomData<P>,
broadcast: bool,
broadcast_buffer: MsgBuffer,
crypto: SharedCrypto,
peers: SharedPeers,
pub table: SharedTable<TS>,
pub traffic: SharedTraffic,
pub socket: S,
}
impl<S: Socket, TS: TimeSource, P: Protocol> Coms<S, TS, P> {
pub fn new(config: &Config, socket: S) -> Self {
Self {
_dummy_p: PhantomData,
broadcast: config.is_broadcasting(),
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE),
traffic: SharedTraffic::new(),
crypto: SharedCrypto::new(),
peers: SharedPeers::new(),
table: SharedTable::<TS>::new(config),
socket,
}
}
pub fn try_clone(&self) -> Result<Self, Error> {
Ok(Self {
_dummy_p: PhantomData,
broadcast: self.broadcast,
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE),
traffic: self.traffic.clone(),
crypto: self.crypto.clone(),
table: self.table.clone(),
peers: self.peers.clone(),
socket: self.socket.try_clone().map_err(|e| Error::SocketIo("Failed to clone socket", e))?,
})
}
pub fn sync(&mut self) -> Result<(), Error> {
self.crypto.load();
self.table.sync();
self.traffic.sync();
Ok(())
}
pub fn get_address(&self) -> Result<SocketAddr, io::Error> {
self.socket.address().map(mapped_addr)
}
pub fn send_raw(&mut self, data: &[u8], addr: SocketAddr) -> Result<(), Error> {
match self.socket.send(data, addr) {
Ok(written) if written == data.len() => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
}
}
//TODO: move back to socket thread
pub fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> {
self.socket.receive(buffer)
}
#[inline]
pub fn send_to(&mut self, addr: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
let size = data.len();
debug!("Sending msg with {} bytes to {}", size, addr);
self.traffic.count_out_traffic(addr, size);
match self.socket.send(data.message(), addr) {
Ok(written) if written == size => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
}
}
#[inline]
pub fn send_msg(&mut self, addr: SocketAddr, type_: u8, data: &mut MsgBuffer) -> Result<(), Error> {
debug!("Sending msg with {} bytes to {}", data.len(), addr);
data.prepend_byte(type_);
self.crypto.encrypt_for(addr, data)?;
self.send_to(addr, data)
}
#[inline]
pub fn broadcast_msg(&mut self, type_: u8, data: &mut MsgBuffer) -> Result<(), Error> {
let size = data.len();
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, size, self.crypto.count());
let traffic = &mut self.traffic;
let socket = &mut self.socket;
let peers = self.crypto.get_snapshot();
for (addr, crypto) in peers {
self.broadcast_buffer.set_start(data.get_start());
self.broadcast_buffer.set_length(data.len());
self.broadcast_buffer.message_mut().clone_from_slice(data.message());
self.broadcast_buffer.prepend_byte(type_);
if let Some(crypto) = crypto {
crypto.encrypt(&mut self.broadcast_buffer);
}
traffic.count_out_traffic(*addr, self.broadcast_buffer.len());
match socket.send(self.broadcast_buffer.message(), *addr) {
Ok(written) if written == self.broadcast_buffer.len() => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
}?
}
Ok(())
}
pub fn forward_packet(&mut self, data: &mut MsgBuffer) -> Result<(), Error> {
let (src, dst) = P::parse(data.message())?;
debug!("Read data from interface: src: {}, dst: {}, {} bytes", src, dst, data.len());
self.traffic.count_out_payload(dst.clone(), src, data.len());
match self.table.lookup(&dst) {
Some(addr) => {
// Peer found for destination
debug!("Found destination for {} => {}", dst, addr);
self.send_msg(addr, MESSAGE_TYPE_DATA, data)?;
}
//TODO: VIA: find relay peer and relay message
None => {
if self.broadcast {
debug!("No destination for {} found, broadcasting", dst);
self.broadcast_msg(MESSAGE_TYPE_DATA, data)?;
} else {
debug!("No destination for {} found, dropping", dst);
self.traffic.count_dropped_payload(data.len());
}
}
}
Ok(())
}
pub fn get_peers<'a>(&'a self) -> impl DerefMut<Target = HashMap<SocketAddr, PeerData, Hash>> + 'a {
self.peers.get_peers()
}
pub fn add_peer(&mut self, addr: SocketAddr, peer_data: PeerData, peer_crypto: &PeerCrypto) {
self.crypto.add(addr, peer_crypto.get_core());
self.peers.get_peers().insert(addr, peer_data);
}
pub fn remove_peer(&mut self, addr: &SocketAddr) -> bool {
if let Some(_peer) = self.peers.get_peers().remove(addr) {
info!("Closing connection to {}", addr_nice(*addr));
self.table.remove_claims(*addr);
self.crypto.remove(addr);
true
} else {
false
}
}
pub fn has_peer(&self, addr: &SocketAddr) -> bool {
self.crypto.contains(addr)
}
}

View File

@ -1,139 +1,40 @@
use super::{ use super::{common::SPACE_BEFORE, coms::Coms, shared::SharedConfig};
common::SPACE_BEFORE,
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
};
use crate::{ use crate::{
config::Config,
device::Device, device::Device,
error::Error, error::Error,
messages::MESSAGE_TYPE_DATA,
net::Socket, net::Socket,
util::{MsgBuffer, Time, TimeSource}, util::{MsgBuffer, Time, TimeSource},
Protocol, Protocol,
}; };
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{marker::PhantomData, net::SocketAddr};
pub struct DeviceThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> { pub struct DeviceThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
// Read-only fields
_dummy_ts: PhantomData<TS>,
_dummy_p: PhantomData<P>,
broadcast: bool,
// Device-only fields // Device-only fields
socket: S, config: SharedConfig,
coms: Coms<S, TS, P>,
pub device: D, pub device: D,
next_housekeep: Time, next_housekeep: Time,
buffer: MsgBuffer, buffer: MsgBuffer,
broadcast_buffer: MsgBuffer,
// Shared fields
traffic: SharedTraffic,
peer_crypto: SharedPeerCrypto,
table: SharedTable<TS>,
running: Arc<AtomicBool>,
} }
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS> { impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS> {
pub fn new( pub fn new(config: SharedConfig, device: D, coms: Coms<S, TS, P>) -> Self {
config: Config, device: D, socket: S, traffic: SharedTraffic, peer_crypto: SharedPeerCrypto,
table: SharedTable<TS>, running: Arc<AtomicBool>,
) -> Self {
Self { Self {
_dummy_ts: PhantomData, config,
_dummy_p: PhantomData,
broadcast: config.is_broadcasting(),
socket,
device, device,
next_housekeep: TS::now(), next_housekeep: TS::now(),
traffic,
peer_crypto,
table,
buffer: MsgBuffer::new(SPACE_BEFORE), buffer: MsgBuffer::new(SPACE_BEFORE),
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE), coms,
running,
} }
} }
#[inline]
fn send_to(&mut self, addr: SocketAddr) -> Result<(), Error> {
let size = self.buffer.len();
debug!("Sending msg with {} bytes to {}", size, addr);
self.traffic.count_out_traffic(addr, size);
match self.socket.send(self.buffer.message(), addr) {
Ok(written) if written == size => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
}
}
#[inline]
fn send_msg(&mut self, addr: SocketAddr, type_: u8) -> Result<(), Error> {
debug!("Sending msg with {} bytes to {}", self.buffer.len(), addr);
self.buffer.prepend_byte(type_);
self.peer_crypto.encrypt_for(addr, &mut self.buffer)?;
self.send_to(addr)
}
#[inline]
fn broadcast_msg(&mut self, type_: u8) -> Result<(), Error> {
let size = self.buffer.len();
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, size, self.peer_crypto.count());
let traffic = &mut self.traffic;
let socket = &mut self.socket;
let peers = self.peer_crypto.get_snapshot();
for (addr, crypto) in peers {
self.broadcast_buffer.set_start(self.buffer.get_start());
self.broadcast_buffer.set_length(self.buffer.len());
self.broadcast_buffer.message_mut().clone_from_slice(self.buffer.message());
self.broadcast_buffer.prepend_byte(type_);
if let Some(crypto) = crypto {
crypto.encrypt(&mut self.broadcast_buffer);
}
traffic.count_out_traffic(*addr, self.broadcast_buffer.len());
match socket.send(self.broadcast_buffer.message(), *addr) {
Ok(written) if written == self.broadcast_buffer.len() => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
}?
}
Ok(())
}
fn forward_packet(&mut self) -> Result<(), Error> {
let (src, dst) = P::parse(self.buffer.message())?;
debug!("Read data from interface: src: {}, dst: {}, {} bytes", src, dst, self.buffer.len());
self.traffic.count_out_payload(dst.clone(), src, self.buffer.len());
match self.table.lookup(&dst) {
Some(addr) => {
// Peer found for destination
debug!("Found destination for {} => {}", dst, addr);
self.send_msg(addr, MESSAGE_TYPE_DATA)?;
}
//TODO: VIA: find relay peer and relay message
None => {
if self.broadcast {
debug!("No destination for {} found, broadcasting", dst);
self.broadcast_msg(MESSAGE_TYPE_DATA)?;
} else {
debug!("No destination for {} found, dropping", dst);
self.traffic.count_dropped_payload(self.buffer.len());
}
}
}
Ok(())
}
pub fn housekeep(&mut self) -> Result<(), Error> { pub fn housekeep(&mut self) -> Result<(), Error> {
self.peer_crypto.load(); self.coms.sync()
self.table.sync();
self.traffic.sync();
Ok(())
} }
pub fn iteration(&mut self) -> bool { pub fn iteration(&mut self) -> bool {
if self.device.read(&mut self.buffer).is_ok() { if self.device.read(&mut self.buffer).is_ok() {
//try_fail!(result, "Failed to read from device: {}"); //try_fail!(result, "Failed to read from device: {}");
if let Err(e) = self.forward_packet() { if let Err(e) = self.coms.forward_packet(&mut self.buffer) {
error!("{}", e); error!("{}", e);
} }
} }
@ -143,7 +44,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
error!("{}", e) error!("{}", e)
} }
self.next_housekeep = now + 1; self.next_housekeep = now + 1;
if !self.running.load(Ordering::SeqCst) { if !self.config.is_running() {
debug!("Device: end"); debug!("Device: end");
return false; return false;
} }

View File

@ -0,0 +1,59 @@
use std::time::Duration;
use super::{common::SPACE_BEFORE, coms::Coms, shared::SharedConfig};
use crate::{
error::Error,
net::Socket,
util::{MsgBuffer, TimeSource, Time},
Protocol,
};
const MAX_RECONNECT_INTERVAL: u16 = 3600;
const RESOLVE_INTERVAL: Time = 300;
const OWN_ADDRESS_RESET_INTERVAL: Time = 300;
pub const STATS_INTERVAL: Time = 60;
pub struct ExtrasThread<S: Socket, P: Protocol, TS: TimeSource> {
config: SharedConfig,
coms: Coms<S, TS, P>,
next_housekeep: Time,
buffer: MsgBuffer,
}
impl<S: Socket, P: Protocol, TS: TimeSource> ExtrasThread<S, P, TS> {
pub fn new(config: SharedConfig, coms: Coms<S, TS, P>) -> Self {
Self {
config,
coms,
next_housekeep: TS::now(),
buffer: MsgBuffer::new(SPACE_BEFORE),
}
}
pub fn housekeep(&mut self) -> Result<(), Error> {
let now = TS::now();
assert!(self.buffer.is_empty());
Ok(())
}
pub fn iteration(&mut self) -> bool {
std::thread::sleep(Duration::from_millis(100));
let now = TS::now();
if self.next_housekeep < now {
if let Err(e) = self.housekeep() {
error!("{}", e)
}
self.next_housekeep = now + 1;
if !self.config.is_running() {
debug!("Extras: end");
return false;
}
}
true
}
pub fn run(mut self) {
while self.iteration() {}
}
}

View File

@ -0,0 +1,67 @@
use std::{time::Duration, net::SocketAddr, cmp::{min, max}};
use smallvec::SmallVec;
use super::{common::SPACE_BEFORE, coms::Coms, shared::SharedConfig};
use crate::{
error::Error,
net::Socket,
util::{MsgBuffer, TimeSource, Time, addr_nice},
Protocol, messages::MESSAGE_TYPE_NODE_INFO, config::DEFAULT_PEER_TIMEOUT,
};
const MAX_RECONNECT_INTERVAL: u16 = 3600;
const RESOLVE_INTERVAL: Time = 300;
const OWN_ADDRESS_RESET_INTERVAL: Time = 300;
pub const STATS_INTERVAL: Time = 60;
pub struct HousekeepThread<S: Socket, P: Protocol, TS: TimeSource> {
config: SharedConfig,
coms: Coms<S, TS, P>,
next_housekeep: Time,
buffer: MsgBuffer,
update_freq: u16,
next_peers: Time,
next_own_address_reset: Time,
}
impl<S: Socket, P: Protocol, TS: TimeSource> HousekeepThread<S, P, TS> {
pub fn new(config: SharedConfig, coms: Coms<S, TS, P>) -> Self {
let update_freq = config.get_config().get_keepalive() as u16;
Self {
config,
coms,
next_housekeep: TS::now(),
buffer: MsgBuffer::new(SPACE_BEFORE),
update_freq,
next_peers: TS::now(),
next_own_address_reset: TS::now(),
}
}
pub fn housekeep(&mut self) -> Result<(), Error> {
let now = TS::now();
assert!(self.buffer.is_empty());
Ok(())
}
pub fn iteration(&mut self) -> bool {
std::thread::sleep(Duration::from_millis(100));
let now = TS::now();
if self.next_housekeep < now {
if let Err(e) = self.housekeep() {
error!("{}", e)
}
self.next_housekeep = now + 1;
if !self.config.is_running() {
debug!("Housekeep: end");
return false;
}
}
true
}
pub fn run(mut self) {
while self.iteration() {}
}
}

View File

@ -2,7 +2,10 @@
// Copyright (C) 2015-2021 Dennis Schwerdel // Copyright (C) 2015-2021 Dennis Schwerdel
// This software is licensed under GPL-3 or newer (see LICENSE.md) // This software is licensed under GPL-3 or newer (see LICENSE.md)
mod device_thread;
mod shared; mod shared;
mod device_thread;
mod socket_thread; mod socket_thread;
mod housekeep_thread;
mod extras_thread;
pub mod coms;
pub mod common; pub mod common;

View File

@ -1,6 +1,6 @@
use crate::{ use crate::{
config::Config, config::Config,
crypto::CryptoCore, crypto::{CryptoCore, PeerCrypto},
engine::common::Hash, engine::common::Hash,
error::Error, error::Error,
table::ClaimTable, table::ClaimTable,
@ -13,34 +13,40 @@ use std::{
collections::HashMap, collections::HashMap,
io::{self, Write}, io::{self, Write},
net::SocketAddr, net::SocketAddr,
sync::Arc, ops::DerefMut,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
}; };
use super::common::PeerData; use super::common::PeerData;
#[derive(Clone)] #[derive(Clone)]
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
pub struct SharedPeerCrypto { pub struct SharedCrypto {
peers: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>, peer_crypto: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>,
cache: HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>, //TODO: local hashmap as cache cache: HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>, //TODO: local hashmap as cache
} }
impl SharedPeerCrypto { impl SharedCrypto {
pub fn new() -> Self { pub fn new() -> Self {
SharedPeerCrypto { peers: Arc::new(Mutex::new(HashMap::default())), cache: HashMap::default() } SharedCrypto { peer_crypto: Arc::new(Mutex::new(HashMap::default())), cache: HashMap::default() }
} }
pub fn encrypt_for(&mut self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> { pub fn encrypt_for(&mut self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
let crypto = match self.cache.get(&peer) { let cache = &mut self.cache;
Some(crypto) => crypto, let owned_crypto;
None => { let crypto = if let Some(crypto) = cache.get(&peer) {
let peers = self.peers.lock(); crypto
if let Some(crypto) = peers.get(&peer) { } else {
self.cache.insert(peer, crypto.clone()); let peers = self.peer_crypto.lock();
self.cache.get(&peer).unwrap() if let Some(crypto) = peers.get(&peer) {
} else { cache.insert(peer, crypto.clone());
return Err(Error::InvalidCryptoState("No crypto found for peer")); owned_crypto = crypto.clone();
} &owned_crypto
} else {
return Err(Error::InvalidCryptoState("No crypto found for peer"));
} }
}; };
if let Some(crypto) = crypto { if let Some(crypto) = crypto {
@ -49,16 +55,28 @@ impl SharedPeerCrypto {
Ok(()) Ok(())
} }
pub fn store(&mut self, data: &HashMap<SocketAddr, PeerData, Hash>) { pub fn add(&mut self, addr: SocketAddr, crypto: Option<Arc<CryptoCore>>) {
self.cache.insert(addr, crypto.clone());
let mut peers = self.peer_crypto.lock();
peers.insert(addr, crypto);
}
pub fn remove(&mut self, addr: &SocketAddr) {
self.cache.remove(addr);
let mut peers = self.peer_crypto.lock();
peers.remove(addr);
}
pub fn store(&mut self, data: &HashMap<SocketAddr, PeerCrypto, Hash>) {
self.cache.clear(); self.cache.clear();
self.cache.extend(data.iter().map(|(k, v)| (*k, v.crypto.get_core()))); self.cache.extend(data.iter().map(|(k, v)| (*k, v.get_core())));
let mut peers = self.peers.lock(); let mut peers = self.peer_crypto.lock();
peers.clear(); peers.clear();
peers.extend(self.cache.iter().map(|(k, v)| (*k, v.clone()))); peers.extend(self.cache.iter().map(|(k, v)| (*k, v.clone())));
} }
pub fn load(&mut self) { pub fn load(&mut self) {
let peers = self.peers.lock(); let peers = self.peer_crypto.lock();
self.cache.clear(); self.cache.clear();
self.cache.extend(peers.iter().map(|(k, v)| (*k, v.clone()))); self.cache.extend(peers.iter().map(|(k, v)| (*k, v.clone())));
} }
@ -67,6 +85,10 @@ impl SharedPeerCrypto {
&self.cache &self.cache
} }
pub fn contains(&self, addr: &SocketAddr) -> bool {
self.cache.contains_key(addr)
}
pub fn count(&self) -> usize { pub fn count(&self) -> usize {
self.cache.len() self.cache.len()
} }
@ -138,6 +160,12 @@ impl SharedTraffic {
} }
} }
impl Default for SharedTraffic {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct SharedTable<TS: TimeSource> { pub struct SharedTable<TS: TimeSource> {
table: Arc<Mutex<ClaimTable<TS>>>, table: Arc<Mutex<ClaimTable<TS>>>,
@ -198,3 +226,42 @@ impl<TS: TimeSource> SharedTable<TS> {
self.table.lock().claim_len() self.table.lock().claim_len()
} }
} }
#[derive(Clone)]
pub struct SharedConfig {
config: Config,
running: Arc<AtomicBool>,
}
impl SharedConfig {
pub fn new(config: Config) -> Self {
Self { config, running: Arc::new(AtomicBool::new(true)) }
}
pub fn get_config(&self) -> &Config {
&self.config
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::Relaxed)
}
pub fn stop(&self) {
self.running.store(false, Ordering::Relaxed)
}
}
#[derive(Clone)]
pub struct SharedPeers {
peers: Arc<Mutex<HashMap<SocketAddr, PeerData, Hash>>>,
}
impl SharedPeers {
pub fn new() -> Self {
Self { peers: Default::default() }
}
pub fn get_peers<'a>(&'a self) -> impl DerefMut<Target = HashMap<SocketAddr, PeerData, Hash>> + 'a {
self.peers.lock()
}
}

View File

@ -1,12 +1,9 @@
use super::{ use super::{common::SPACE_BEFORE, coms::Coms, shared::SharedConfig};
common::SPACE_BEFORE,
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
};
use crate::{ use crate::{
beacon::BeaconSerializer, beacon::BeaconSerializer,
config::{DEFAULT_PEER_TIMEOUT, DEFAULT_PORT}, config::{DEFAULT_PEER_TIMEOUT, DEFAULT_PORT},
crypto::{is_init_message, Crypto, InitResult, InitState, MessageResult}, crypto::{is_init_message, Crypto, InitResult, InitState, MessageResult, PeerCrypto},
device::{Device, Type}, device::{Device, Type},
engine::common::{Hash, PeerData}, engine::common::{Hash, PeerData},
error::Error, error::Error,
@ -18,12 +15,10 @@ use crate::{
port_forwarding::PortForwarding, port_forwarding::PortForwarding,
types::{Address, NodeId, Range, RangeList}, types::{Address, NodeId, Range, RangeList},
util::{addr_nice, resolve, MsgBuffer, StatsdMsg, Time, TimeSource}, util::{addr_nice, resolve, MsgBuffer, StatsdMsg, Time, TimeSource},
Config, Protocol, Protocol,
}; };
use rand::{random, seq::SliceRandom, thread_rng}; use rand::{random, seq::SliceRandom, thread_rng};
use smallvec::{smallvec, SmallVec}; use smallvec::{smallvec, SmallVec};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{ use std::{
cmp::{max, min}, cmp::{max, min},
collections::HashMap, collections::HashMap,
@ -33,7 +28,7 @@ use std::{
io::{Cursor, Seek, SeekFrom, Write}, io::{Cursor, Seek, SeekFrom, Write},
marker::PhantomData, marker::PhantomData,
net::{SocketAddr, ToSocketAddrs}, net::{SocketAddr, ToSocketAddrs},
str::FromStr, str::FromStr, ops::Deref,
}; };
const MAX_RECONNECT_INTERVAL: u16 = 3600; const MAX_RECONNECT_INTERVAL: u16 = 3600;
@ -55,35 +50,32 @@ pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
// Read-only fields // Read-only fields
node_id: NodeId, node_id: NodeId,
claims: RangeList, claims: RangeList,
config: Config,
peer_timeout_publish: u16, peer_timeout_publish: u16,
learning: bool, learning: bool,
update_freq: u16, update_freq: u16,
_dummy_ts: PhantomData<TS>, _dummy_ts: PhantomData<TS>,
_dummy_p: PhantomData<P>, _dummy_p: PhantomData<P>,
// Socket-only fields // Socket-only fields
pub socket: S, config: SharedConfig,
pub coms: Coms<S, TS, P>,
device: D, device: D,
next_housekeep: Time, next_housekeep: Time,
pub own_addresses: AddrList, pub own_addresses: AddrList,
next_own_address_reset: Time, next_own_address_reset: Time,
pending_inits: HashMap<SocketAddr, InitState<NodeInfo>, Hash>, pending_inits: HashMap<SocketAddr, InitState<NodeInfo>, Hash>,
crypto: Crypto, crypto: Crypto, // TODO 2nd: move to config
pub peers: HashMap<SocketAddr, PeerData, Hash>, //pub peers: HashMap<SocketAddr, PeerData, Hash>, // TODO 1st: move to shared peers
next_peers: Time, peer_crypto: HashMap<SocketAddr, PeerCrypto, Hash>,
next_stats_out: Time, next_peers: Time, // TODO: split off
next_beacon: Time, next_stats_out: Time, // TODO: split off
beacon_serializer: BeaconSerializer<TS>, next_beacon: Time, // TODO: split off
stats_file: Option<File>, beacon_serializer: BeaconSerializer<TS>, // TODO: split off
statsd_server: Option<String>, stats_file: Option<File>, // TODO: split off
pub reconnect_peers: SmallVec<[ReconnectEntry; 3]>, statsd_server: Option<String>, // TODO: split off
pub reconnect_peers: SmallVec<[ReconnectEntry; 3]>, // TODO: move to shared config
buffer: MsgBuffer, buffer: MsgBuffer,
broadcast_buffer: MsgBuffer,
// Shared fields // Shared fields
peer_crypto: SharedPeerCrypto, //table: SharedTable<TS>,
traffic: SharedTraffic,
table: SharedTable<TS>,
running: Arc<AtomicBool>,
// Should not be here // Should not be here
port_forwarding: Option<PortForwarding>, // TODO: 3rd thread port_forwarding: Option<PortForwarding>, // TODO: 3rd thread
} }
@ -91,15 +83,14 @@ pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS> { impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS> {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
config: Config, device: D, socket: S, traffic: SharedTraffic, peer_crypto: SharedPeerCrypto, config: SharedConfig, coms: Coms<S, TS, P>, device: D, port_forwarding: Option<PortForwarding>,
table: SharedTable<TS>, port_forwarding: Option<PortForwarding>, stats_file: Option<File>, stats_file: Option<File>,
running: Arc<AtomicBool>,
) -> Self { ) -> Self {
let mut claims = SmallVec::with_capacity(config.claims.len()); let mut claims = SmallVec::with_capacity(config.get_config().claims.len());
for s in &config.claims { for s in &config.get_config().claims {
claims.push(try_fail!(Range::from_str(s), "Invalid subnet format: {} ({})", s)); claims.push(try_fail!(Range::from_str(s), "Invalid subnet format: {} ({})", s));
} }
if device.get_type() == Type::Tun && config.auto_claim { if device.get_type() == Type::Tun && config.get_config().auto_claim {
match device.get_ip() { match device.get_ip() {
Ok(ip) => { Ok(ip) => {
let range = Range { base: Address::from_ipv4(ip), prefix_len: 32 }; let range = Range { base: Address::from_ipv4(ip), prefix_len: 32 };
@ -113,20 +104,17 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
} }
} }
let now = TS::now(); let now = TS::now();
let update_freq = config.get_keepalive() as u16; let update_freq = config.get_config().get_keepalive() as u16;
let node_id = random(); let node_id = random();
let beacon_key = config.beacon_password.as_ref().map(|s| s.as_bytes()).unwrap_or(&[]); let beacon_key = config.get_config().beacon_password.as_ref().map(|s| s.as_bytes()).unwrap_or(&[]);
Self { Self {
_dummy_p: PhantomData, _dummy_p: PhantomData,
_dummy_ts: PhantomData, _dummy_ts: PhantomData,
node_id, node_id,
claims, claims,
device, device,
socket, coms,
peer_crypto, learning: config.get_config().is_learning(),
traffic,
table,
learning: config.is_learning(),
next_housekeep: now, next_housekeep: now,
next_beacon: now, next_beacon: now,
next_peers: now, next_peers: now,
@ -135,59 +123,22 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
pending_inits: HashMap::default(), pending_inits: HashMap::default(),
reconnect_peers: SmallVec::new(), reconnect_peers: SmallVec::new(),
own_addresses: SmallVec::new(), own_addresses: SmallVec::new(),
peers: HashMap::default(), peer_timeout_publish: config.get_config().peer_timeout as u16,
peer_timeout_publish: config.peer_timeout as u16,
beacon_serializer: BeaconSerializer::new(beacon_key), beacon_serializer: BeaconSerializer::new(beacon_key),
port_forwarding, port_forwarding,
stats_file, stats_file,
update_freq, update_freq,
statsd_server: config.statsd_server.clone(), statsd_server: config.get_config().statsd_server.clone(),
crypto: Crypto::new(node_id, &config.crypto).unwrap(), crypto: Crypto::new(node_id, &config.get_config().crypto).unwrap(),
peer_crypto: Default::default(),
config, config,
buffer: MsgBuffer::new(SPACE_BEFORE), buffer: MsgBuffer::new(SPACE_BEFORE),
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE),
running,
} }
} }
#[inline]
fn send_to(&mut self, addr: SocketAddr) -> Result<(), Error> {
let size = self.buffer.len();
debug!("Sending msg with {} bytes to {}", size, addr);
self.traffic.count_out_traffic(addr, size);
match self.socket.send(self.buffer.message(), addr) {
Ok(written) if written == size => {
self.buffer.clear();
Ok(())
}
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
}
}
#[inline]
fn broadcast_msg(&mut self, type_: u8) -> Result<(), Error> {
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, self.buffer.len(), self.peers.len());
for (addr, peer) in &mut self.peers {
self.broadcast_buffer.set_start(self.buffer.get_start());
self.broadcast_buffer.set_length(self.buffer.len());
self.broadcast_buffer.message_mut().clone_from_slice(self.buffer.message());
self.broadcast_buffer.prepend_byte(type_);
peer.crypto.encrypt_message(&mut self.broadcast_buffer);
self.traffic.count_out_traffic(*addr, self.broadcast_buffer.len());
match self.socket.send(self.broadcast_buffer.message(), *addr) {
Ok(written) if written == self.broadcast_buffer.len() => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
}?
}
self.buffer.clear();
Ok(())
}
fn connect_sock(&mut self, addr: SocketAddr) -> Result<(), Error> { fn connect_sock(&mut self, addr: SocketAddr) -> Result<(), Error> {
let addr = mapped_addr(addr); let addr = mapped_addr(addr);
if self.peers.contains_key(&addr) if self.coms.has_peer(&addr)
|| self.own_addresses.contains(&addr) || self.own_addresses.contains(&addr)
|| self.pending_inits.contains_key(&addr) || self.pending_inits.contains_key(&addr)
{ {
@ -198,14 +149,14 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
let mut init = self.crypto.peer_instance(payload); let mut init = self.crypto.peer_instance(payload);
init.send_ping(&mut self.buffer); init.send_ping(&mut self.buffer);
self.pending_inits.insert(addr, init); self.pending_inits.insert(addr, init);
self.send_to(addr) self.coms.send_to(addr, &mut self.buffer)
} }
pub fn connect<Addr: ToSocketAddrs + fmt::Debug + Clone>(&mut self, addr: Addr) -> Result<(), Error> { pub fn connect<Addr: ToSocketAddrs + fmt::Debug + Clone>(&mut self, addr: Addr) -> Result<(), Error> {
let addrs = resolve(&addr)?.into_iter().map(mapped_addr).collect::<SmallVec<[SocketAddr; 3]>>(); let addrs = resolve(&addr)?.into_iter().map(mapped_addr).collect::<SmallVec<[SocketAddr; 3]>>();
for addr in &addrs { for addr in &addrs {
if self.own_addresses.contains(addr) if self.own_addresses.contains(addr)
|| self.peers.contains_key(addr) || self.coms.has_peer(addr)
|| self.pending_inits.contains_key(addr) || self.pending_inits.contains_key(addr)
{ {
return Ok(()); return Ok(());
@ -221,7 +172,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
fn create_node_info(&self) -> NodeInfo { fn create_node_info(&self) -> NodeInfo {
let mut peers = smallvec![]; let mut peers = smallvec![];
for peer in self.peers.values() { for peer in self.coms.get_peers().values() {
peers.push(PeerInfo { node_id: Some(peer.node_id), addrs: peer.addrs.clone() }) peers.push(PeerInfo { node_id: Some(peer.node_id), addrs: peer.addrs.clone() })
} }
if peers.len() > 20 { if peers.len() > 20 {
@ -239,9 +190,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
} }
fn update_peer_info(&mut self, addr: SocketAddr, info: Option<NodeInfo>) -> Result<(), Error> { fn update_peer_info(&mut self, addr: SocketAddr, info: Option<NodeInfo>) -> Result<(), Error> {
if let Some(peer) = self.peers.get_mut(&addr) { if let Some(peer) = self.coms.get_peers().get_mut(&addr) {
peer.last_seen = TS::now(); peer.last_seen = TS::now();
peer.timeout = TS::now() + self.config.peer_timeout as Time; peer.timeout = TS::now() + self.config.get_config().peer_timeout as Time;
if let Some(info) = &info { if let Some(info) = &info {
// Update peer addresses, always add seen address // Update peer addresses, always add seen address
peer.addrs.clear(); peer.addrs.clear();
@ -258,7 +209,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
} }
if let Some(info) = info { if let Some(info) = info {
debug!("Adding claims of peer {}: {:?}", addr_nice(addr), info.claims); debug!("Adding claims of peer {}: {:?}", addr_nice(addr), info.claims);
self.table.set_claims(addr, info.claims); self.coms.table.set_claims(addr, info.claims);
debug!("Received {} peers from {}: {:?}", info.peers.len(), addr_nice(addr), info.peers); debug!("Received {} peers from {}: {:?}", info.peers.len(), addr_nice(addr), info.peers);
self.connect_to_peers(&info.peers)?; self.connect_to_peers(&info.peers)?;
} }
@ -270,22 +221,19 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
if let Some(init) = self.pending_inits.remove(&addr) { if let Some(init) = self.pending_inits.remove(&addr) {
self.buffer.clear(); self.buffer.clear();
let crypto = init.finish(&mut self.buffer); let crypto = init.finish(&mut self.buffer);
self.peers.insert( let peer_data = PeerData {
addr, addrs: info.addrs.clone(),
PeerData { node_id: info.node_id,
addrs: info.addrs.clone(), peer_timeout: info.peer_timeout.unwrap_or(DEFAULT_PEER_TIMEOUT),
crypto, last_seen: TS::now(),
node_id: info.node_id, timeout: TS::now() + self.config.get_config().peer_timeout as Time,
peer_timeout: info.peer_timeout.unwrap_or(DEFAULT_PEER_TIMEOUT), };
last_seen: TS::now(), self.coms.add_peer(addr, peer_data,&crypto);
timeout: TS::now() + self.config.peer_timeout as Time, self.peer_crypto.insert(addr, crypto);
},
);
self.update_peer_info(addr, Some(info))?; self.update_peer_info(addr, Some(info))?;
if !self.buffer.is_empty() { if !self.buffer.is_empty() {
self.send_to(addr)?; self.coms.send_to(addr, &mut self.buffer)?;
} }
self.peer_crypto.store(&self.peers);
} else { } else {
error!("No init for new peer {}", addr_nice(addr)); error!("No init for new peer {}", addr_nice(addr));
} }
@ -295,7 +243,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
fn connect_to_peers(&mut self, peers: &[PeerInfo]) -> Result<(), Error> { fn connect_to_peers(&mut self, peers: &[PeerInfo]) -> Result<(), Error> {
'outer: for peer in peers { 'outer: for peer in peers {
for addr in &peer.addrs { for addr in &peer.addrs {
if self.peers.contains_key(addr) { if self.own_addresses.contains(addr) {
// Check addresses and add addresses that we don't know to own addresses // Check addresses and add addresses that we don't know to own addresses
for addr in &peer.addrs { for addr in &peer.addrs {
if !self.own_addresses.contains(addr) { if !self.own_addresses.contains(addr) {
@ -309,7 +257,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
if self.node_id == node_id { if self.node_id == node_id {
continue 'outer; continue 'outer;
} }
for p in self.peers.values() { for p in self.coms.get_peers().values() {
if p.node_id == node_id { if p.node_id == node_id {
continue 'outer; continue 'outer;
} }
@ -321,18 +269,15 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
} }
fn remove_peer(&mut self, addr: SocketAddr) { fn remove_peer(&mut self, addr: SocketAddr) {
if let Some(_peer) = self.peers.remove(&addr) { self.coms.remove_peer(&addr);
info!("Closing connection to {}", addr_nice(addr)); self.peer_crypto.remove(&addr);
self.table.remove_claims(addr);
self.peer_crypto.store(&self.peers);
}
} }
fn handle_payload_from(&mut self, peer: SocketAddr) -> Result<(), Error> { fn handle_payload_from(&mut self, peer: SocketAddr) -> Result<(), Error> {
let (src, dst) = P::parse(self.buffer.message())?; let (src, dst) = P::parse(self.buffer.message())?;
let len = self.buffer.len(); let len = self.buffer.len();
debug!("Writing data to device: {} bytes", len); debug!("Writing data to device: {} bytes", len);
self.traffic.count_in_payload(src.clone(), dst, len); self.coms.traffic.count_in_payload(src.clone(), dst, len);
if let Err(e) = self.device.write(&mut self.buffer) { if let Err(e) = self.device.write(&mut self.buffer) {
error!("Failed to send via device: {}", e); error!("Failed to send via device: {}", e);
return Err(e); return Err(e);
@ -340,7 +285,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
self.buffer.clear(); self.buffer.clear();
if self.learning { if self.learning {
// Learn single address // Learn single address
self.table.cache(&src, peer); self.coms.table.cache(&src, peer);
} }
Ok(()) Ok(())
} }
@ -354,7 +299,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
let info = match NodeInfo::decode(Cursor::new(self.buffer.message())) { let info = match NodeInfo::decode(Cursor::new(self.buffer.message())) {
Ok(val) => val, Ok(val) => val,
Err(err) => { Err(err) => {
self.traffic.count_invalid_protocol(self.buffer.len()); self.coms.traffic.count_invalid_protocol(self.buffer.len());
return Err(err); return Err(err);
} }
}; };
@ -371,11 +316,11 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
} }
_ => { _ => {
self.buffer.clear(); self.buffer.clear();
self.traffic.count_invalid_protocol(self.buffer.len()); self.coms.traffic.count_invalid_protocol(self.buffer.len());
return Err(Error::Message("Unknown message type")); return Err(Error::Message("Unknown message type"));
} }
}, },
MessageResult::Reply => self.send_to(src)?, MessageResult::Reply => self.coms.send_to(src, &mut self.buffer)?,
MessageResult::None => { MessageResult::None => {
self.buffer.clear(); self.buffer.clear();
} }
@ -387,8 +332,8 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
let src = mapped_addr(src); let src = mapped_addr(src);
debug!("Received {} bytes from {}", self.buffer.len(), src); debug!("Received {} bytes from {}", self.buffer.len(), src);
let buffer = &mut self.buffer; let buffer = &mut self.buffer;
self.traffic.count_in_traffic(src, buffer.len()); self.coms.traffic.count_in_traffic(src, buffer.len());
if let Some(result) = self.peers.get_mut(&src).map(|peer| peer.crypto.handle_message(buffer)) { if let Some(result) = self.peer_crypto.get_mut(&src).map(|crypto| crypto.handle_message(buffer)) {
return self.process_message(src, result?); return self.process_message(src, result?);
} }
let is_init = is_init_message(buffer.message()); let is_init = is_init_message(buffer.message());
@ -402,7 +347,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
} }
}) { }) {
if !buffer.is_empty() { if !buffer.is_empty() {
self.send_to(src)? self.coms.send_to(src, &mut self.buffer)?
} }
if let InitResult::Success { peer_payload, .. } = result? { if let InitResult::Success { peer_payload, .. } = result? {
self.add_new_peer(src, peer_payload)? self.add_new_peer(src, peer_payload)?
@ -411,7 +356,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
} }
if !is_init_message(self.buffer.message()) { if !is_init_message(self.buffer.message()) {
info!("Ignoring non-init message from unknown peer {}", addr_nice(src)); info!("Ignoring non-init message from unknown peer {}", addr_nice(src));
self.traffic.count_invalid_protocol(self.buffer.len()); self.coms.traffic.count_invalid_protocol(self.buffer.len());
self.buffer.clear(); self.buffer.clear();
return Ok(()); return Ok(());
} }
@ -420,10 +365,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
match msg_result { match msg_result {
Ok(_) => { Ok(_) => {
self.pending_inits.insert(src, init); self.pending_inits.insert(src, init);
self.send_to(src) self.coms.send_to(src, &mut self.buffer)
} }
Err(err) => { Err(err) => {
self.traffic.count_invalid_protocol(self.buffer.len()); self.coms.traffic.count_invalid_protocol(self.buffer.len());
Err(err) Err(err)
} }
} }
@ -432,18 +377,17 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
pub fn housekeep(&mut self) -> Result<(), Error> { pub fn housekeep(&mut self) -> Result<(), Error> {
let now = TS::now(); let now = TS::now();
let mut del: SmallVec<[SocketAddr; 3]> = SmallVec::new(); let mut del: SmallVec<[SocketAddr; 3]> = SmallVec::new();
for (&addr, data) in &self.peers { for (&addr, data) in self.coms.get_peers().deref() {
if data.timeout < now { if data.timeout < now {
del.push(addr); del.push(addr);
} }
} }
for addr in del { for addr in del {
info!("Forgot peer {} due to timeout", addr_nice(addr)); info!("Forgot peer {} due to timeout", addr_nice(addr));
self.peers.remove(&addr); self.coms.remove_peer(&addr);
self.table.remove_claims(addr);
self.connect_sock(addr)?; // Try to reconnect self.connect_sock(addr)?; // Try to reconnect
} }
self.table.housekeep(); self.coms.table.housekeep();
self.crypto_housekeep()?; self.crypto_housekeep()?;
// Periodically extend the port-forwarding // Periodically extend the port-forwarding
//TODO: extra thread //TODO: extra thread
@ -456,9 +400,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
debug!("Send peer list to all peers"); debug!("Send peer list to all peers");
let info = self.create_node_info(); let info = self.create_node_info();
info.encode(&mut self.buffer); info.encode(&mut self.buffer);
self.broadcast_msg(MESSAGE_TYPE_NODE_INFO)?; self.coms.broadcast_msg(MESSAGE_TYPE_NODE_INFO, &mut self.buffer)?;
// Reschedule for next update // Reschedule for next update
let min_peer_timeout = self.peers.iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT); let min_peer_timeout = self.coms.get_peers().iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT);
let interval = min(self.update_freq as u16, max(min_peer_timeout / 2 - 60, 1)); let interval = min(self.update_freq as u16, max(min_peer_timeout / 2 - 60, 1));
self.next_peers = now + Time::from(interval); self.next_peers = now + Time::from(interval);
} }
@ -470,7 +414,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
//TODO: extra thread //TODO: extra thread
self.send_stats_to_statsd()?; self.send_stats_to_statsd()?;
self.next_stats_out = now + STATS_INTERVAL; self.next_stats_out = now + STATS_INTERVAL;
self.traffic.period(Some(5)); self.coms.traffic.period(Some(5));
} }
if let Some(peers) = self.beacon_serializer.get_cmd_results() { if let Some(peers) = self.beacon_serializer.get_cmd_results() {
debug!("Loaded beacon with peers: {:?}", peers); debug!("Loaded beacon with peers: {:?}", peers);
@ -481,11 +425,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
if self.next_beacon < now { if self.next_beacon < now {
self.store_beacon()?; self.store_beacon()?;
self.load_beacon()?; self.load_beacon()?;
self.next_beacon = now + Time::from(self.config.beacon_interval); self.next_beacon = now + Time::from(self.config.get_config().beacon_interval);
} }
self.table.sync(); self.coms.sync()?;
self.traffic.sync();
self.peer_crypto.store(&self.peers);
// Periodically reset own peers // Periodically reset own peers
if self.next_own_address_reset <= now { if self.next_own_address_reset <= now {
self.reset_own_addresses().map_err(|err| Error::SocketIo("Failed to get own addresses", err))?; self.reset_own_addresses().map_err(|err| Error::SocketIo("Failed to get own addresses", err))?;
@ -502,19 +444,20 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
if self.pending_inits.get_mut(&addr).unwrap().every_second(&mut self.buffer).is_err() { if self.pending_inits.get_mut(&addr).unwrap().every_second(&mut self.buffer).is_err() {
del.push(addr) del.push(addr)
} else if !self.buffer.is_empty() { } else if !self.buffer.is_empty() {
self.send_to(addr)? self.coms.send_to(addr, &mut self.buffer)?
} }
} }
for addr in self.peers.keys().copied().collect::<SmallVec<[SocketAddr; 16]>>() { for (addr, crypto) in self.peer_crypto.iter_mut() {
self.buffer.clear(); self.buffer.clear();
self.peers.get_mut(&addr).unwrap().crypto.every_second(&mut self.buffer); crypto.every_second(&mut self.buffer);
if !self.buffer.is_empty() { if !self.buffer.is_empty() {
self.send_to(addr)? self.coms.send_to(*addr, &mut self.buffer)?
} }
} }
for addr in del { for addr in del {
self.pending_inits.remove(&addr); self.pending_inits.remove(&addr);
if self.peers.remove(&addr).is_some() { self.peer_crypto.remove(&addr);
if self.coms.remove_peer(&addr) {
self.connect_sock(addr)?; self.connect_sock(addr)?;
} }
} }
@ -523,9 +466,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
fn reset_own_addresses(&mut self) -> io::Result<()> { fn reset_own_addresses(&mut self) -> io::Result<()> {
self.own_addresses.clear(); self.own_addresses.clear();
let socket_addr = self.socket.address().map(mapped_addr)?; let socket_addr = self.coms.get_address()?;
// 1) Specified advertise addresses // 1) Specified advertise addresses
for addr in &self.config.advertise_addresses { for addr in &self.config.get_config().advertise_addresses {
self.own_addresses.push(parse_listen(addr, socket_addr.port())); self.own_addresses.push(parse_listen(addr, socket_addr.port()));
} }
// 2) Address of UDP socket // 2) Address of UDP socket
@ -542,7 +485,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
/// Stores the beacon /// Stores the beacon
fn store_beacon(&mut self) -> Result<(), Error> { fn store_beacon(&mut self) -> Result<(), Error> {
if let Some(ref path) = self.config.beacon_store { if let Some(ref path) = self.config.get_config().beacon_store {
let peers: SmallVec<[SocketAddr; 3]> = let peers: SmallVec<[SocketAddr; 3]> =
self.own_addresses.choose_multiple(&mut thread_rng(), 3).cloned().collect(); self.own_addresses.choose_multiple(&mut thread_rng(), 3).cloned().collect();
if let Some(path) = path.strip_prefix('|') { if let Some(path) = path.strip_prefix('|') {
@ -561,7 +504,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
/// Loads the beacon /// Loads the beacon
fn load_beacon(&mut self) -> Result<(), Error> { fn load_beacon(&mut self) -> Result<(), Error> {
let peers; let peers;
if let Some(ref path) = self.config.beacon_load { if let Some(ref path) = self.config.get_config().beacon_load {
if let Some(path) = path.strip_prefix('|') { if let Some(path) = path.strip_prefix('|') {
self.beacon_serializer self.beacon_serializer
.read_from_cmd(path, Some(50)) .read_from_cmd(path, Some(50))
@ -591,19 +534,19 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
f.set_len(0)?; f.set_len(0)?;
writeln!(f, "peers:")?; writeln!(f, "peers:")?;
let now = TS::now(); let now = TS::now();
for (addr, data) in &self.peers { for (addr, data) in self.coms.get_peers().iter() {
writeln!( writeln!(
f, f,
" - \"{}\": {{ ttl_secs: {}, crypto: {} }}", " - \"{}\": {{ ttl_secs: {}, crypto: {} }}",
addr_nice(*addr), addr_nice(*addr),
data.timeout - now, data.timeout - now,
data.crypto.algorithm_name() self.peer_crypto.get(addr).unwrap().algorithm_name()
)?; )?;
} }
writeln!(f)?; writeln!(f)?;
self.table.write_out(f)?; self.coms.table.write_out(f)?;
writeln!(f)?; writeln!(f)?;
self.traffic.write_out(f)?; self.coms.traffic.write_out(f)?;
writeln!(f)?; writeln!(f)?;
} }
Ok(()) Ok(())
@ -612,15 +555,15 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
/// Sends the statistics to a statsd endpoint /// Sends the statistics to a statsd endpoint
fn send_stats_to_statsd(&mut self) -> Result<(), Error> { fn send_stats_to_statsd(&mut self) -> Result<(), Error> {
if let Some(ref endpoint) = self.statsd_server { if let Some(ref endpoint) = self.statsd_server {
let peer_traffic = self.traffic.total_peer_traffic(); let peer_traffic = self.coms.traffic.total_peer_traffic();
let payload_traffic = self.traffic.total_payload_traffic(); let payload_traffic = self.coms.traffic.total_payload_traffic();
let dropped = &self.traffic.dropped(); let dropped = &self.coms.traffic.dropped();
let prefix = self.config.statsd_prefix.as_ref().map(|s| s as &str).unwrap_or("vpncloud"); let prefix = self.config.get_config().statsd_prefix.as_ref().map(|s| s as &str).unwrap_or("vpncloud");
let msg = StatsdMsg::new() let msg = StatsdMsg::new()
.with_ns(prefix, |msg| { .with_ns(prefix, |msg| {
msg.add("peer_count", self.peers.len(), "g"); msg.add("peer_count", self.coms.get_peers().len(), "g");
msg.add("table_cache_entries", self.table.cache_len(), "g"); msg.add("table_cache_entries", self.coms.table.cache_len(), "g");
msg.add("table_claims", self.table.claim_len(), "g"); msg.add("table_claims", self.coms.table.claim_len(), "g");
msg.with_ns("traffic", |msg| { msg.with_ns("traffic", |msg| {
msg.with_ns("protocol", |msg| { msg.with_ns("protocol", |msg| {
msg.with_ns("inbound", |msg| { msg.with_ns("inbound", |msg| {
@ -653,14 +596,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}); });
}) })
.build(); .build();
let msg_data = msg.as_bytes();
let addrs = resolve(endpoint)?; let addrs = resolve(endpoint)?;
if let Some(addr) = addrs.first() { if let Some(addr) = addrs.first() {
match self.socket.send(msg_data, *addr) { self.coms.send_raw(msg.as_bytes(), *addr)?;
Ok(written) if written == msg_data.len() => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
}?
} else { } else {
error!("Failed to resolve statsd server {}", endpoint); error!("Failed to resolve statsd server {}", endpoint);
} }
@ -680,7 +618,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
for entry in &mut self.reconnect_peers { for entry in &mut self.reconnect_peers {
// Schedule for next second if node is connected // Schedule for next second if node is connected
for addr in &entry.resolved { for addr in &entry.resolved {
if self.peers.contains_key(addr) { if self.coms.get_peers().contains_key(addr) {
entry.tries = 0; entry.tries = 0;
entry.timeout = 1; entry.timeout = 1;
entry.next = now + 1; entry.next = now + 1;
@ -722,8 +660,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
} }
pub fn iteration(&mut self) -> bool { pub fn iteration(&mut self) -> bool {
if let Ok(src) = self.socket.receive(&mut self.buffer) if let Ok(src) = self.coms.receive(&mut self.buffer) {
{
match self.handle_message(src) { match self.handle_message(src) {
Err(e @ Error::CryptoInitFatal(_)) => { Err(e @ Error::CryptoInitFatal(_)) => {
debug!("Fatal crypto init error from {}: {}", src, e); debug!("Fatal crypto init error from {}: {}", src, e);
@ -750,7 +687,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
error!("{}", e) error!("{}", e)
} }
self.next_housekeep = now + 1; self.next_housekeep = now + 1;
if !self.running.load(Ordering::SeqCst) { if !self.config.is_running() {
debug!("Socket: end"); debug!("Socket: end");
return false; return false;
} }