mirror of https://github.com/dswd/vpncloud.git
Compare commits
2 Commits
a9b960b04e
...
aa949cdcec
Author | SHA1 | Date |
---|---|---|
Dennis Schwerdel | aa949cdcec | |
Dennis Schwerdel | 2fcb7646c7 |
|
@ -11,7 +11,7 @@ RUN chown vscode: -R /usr/local/rustup /usr/local/cargo
|
|||
|
||||
USER vscode
|
||||
|
||||
RUN rustup default 1.64.0 \
|
||||
RUN rustup default 1.70.0 \
|
||||
&& rustup component add clippy rust-src rustfmt
|
||||
|
||||
RUN cargo install cargo-outdated cargo-cache cargo-criterion \
|
||||
|
|
|
@ -26,6 +26,12 @@ dependencies = [
|
|||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anstyle"
|
||||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "41ed9a86bf92ae6580e0a31281f65a1b1d867c0cc68d5346e2ae128dddfa6a7d"
|
||||
|
||||
[[package]]
|
||||
name = "attohttpc"
|
||||
version = "0.16.3"
|
||||
|
@ -44,7 +50,7 @@ version = "0.2.14"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
|
||||
dependencies = [
|
||||
"hermit-abi",
|
||||
"hermit-abi 0.1.19",
|
||||
"libc",
|
||||
"winapi",
|
||||
]
|
||||
|
@ -173,31 +179,36 @@ dependencies = [
|
|||
"atty",
|
||||
"bitflags",
|
||||
"strsim",
|
||||
"textwrap 0.11.0",
|
||||
"textwrap",
|
||||
"unicode-width",
|
||||
"vec_map",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "3.2.22"
|
||||
version = "4.3.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "86447ad904c7fb335a790c9d7fe3d0d971dc523b8ccd1561a520de9a85302750"
|
||||
checksum = "80672091db20273a15cf9fdd4e47ed43b5091ec9841bf4c6145c9dfbbcae09ed"
|
||||
dependencies = [
|
||||
"clap_builder",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_builder"
|
||||
version = "4.3.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c1458a1df40e1e2afebb7ab60ce55c1fa8f431146205aa5f4887e0b111c27636"
|
||||
dependencies = [
|
||||
"anstyle",
|
||||
"bitflags",
|
||||
"clap_lex",
|
||||
"indexmap",
|
||||
"textwrap 0.15.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_lex"
|
||||
version = "0.2.4"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5"
|
||||
dependencies = [
|
||||
"os_str_bytes",
|
||||
]
|
||||
checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b"
|
||||
|
||||
[[package]]
|
||||
name = "console"
|
||||
|
@ -230,19 +241,19 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "criterion"
|
||||
version = "0.4.0"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e7c76e09c1aae2bc52b3d2f29e13c6572553b30c4aa1b8a49fd70de6412654cb"
|
||||
checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f"
|
||||
dependencies = [
|
||||
"anes",
|
||||
"atty",
|
||||
"cast",
|
||||
"ciborium",
|
||||
"clap 3.2.22",
|
||||
"clap 4.3.4",
|
||||
"criterion-plot",
|
||||
"is-terminal",
|
||||
"itertools",
|
||||
"lazy_static",
|
||||
"num-traits",
|
||||
"once_cell",
|
||||
"oorandom",
|
||||
"plotters",
|
||||
"rayon",
|
||||
|
@ -362,6 +373,27 @@ version = "0.3.6"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"
|
||||
|
||||
[[package]]
|
||||
name = "errno"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a"
|
||||
dependencies = [
|
||||
"errno-dragonfly",
|
||||
"libc",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "errno-dragonfly"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fastrand"
|
||||
version = "1.8.0"
|
||||
|
@ -437,6 +469,12 @@ dependencies = [
|
|||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hermit-abi"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286"
|
||||
|
||||
[[package]]
|
||||
name = "http"
|
||||
version = "0.2.8"
|
||||
|
@ -515,6 +553,29 @@ dependencies = [
|
|||
"cfg-if 1.0.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "io-lifetimes"
|
||||
version = "1.0.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
|
||||
dependencies = [
|
||||
"hermit-abi 0.3.1",
|
||||
"libc",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "is-terminal"
|
||||
version = "0.4.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f"
|
||||
dependencies = [
|
||||
"hermit-abi 0.3.1",
|
||||
"io-lifetimes",
|
||||
"rustix",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "itertools"
|
||||
version = "0.10.5"
|
||||
|
@ -547,9 +608,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
|
|||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.133"
|
||||
version = "0.2.146"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c0f80d65747a3e43d1596c7c5492d95d5edddaabd45a7fcdb02b95f644164966"
|
||||
checksum = "f92be4933c13fd498862a9e02a3055f8a8d9c039ce33db97306fd5a6caa7f29b"
|
||||
|
||||
[[package]]
|
||||
name = "linked-hash-map"
|
||||
|
@ -557,6 +618,12 @@ version = "0.5.6"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"
|
||||
|
||||
[[package]]
|
||||
name = "linux-raw-sys"
|
||||
version = "0.3.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
|
||||
|
||||
[[package]]
|
||||
name = "lock_api"
|
||||
version = "0.4.9"
|
||||
|
@ -635,7 +702,7 @@ version = "1.13.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1"
|
||||
dependencies = [
|
||||
"hermit-abi",
|
||||
"hermit-abi 0.1.19",
|
||||
"libc",
|
||||
]
|
||||
|
||||
|
@ -651,12 +718,6 @@ version = "11.1.3"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
|
||||
|
||||
[[package]]
|
||||
name = "os_str_bytes"
|
||||
version = "6.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9ff7415e9ae3fff1225851df9e0d9e4e5479f947619774677a63572e55e80eff"
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.12.1"
|
||||
|
@ -677,7 +738,7 @@ dependencies = [
|
|||
"libc",
|
||||
"redox_syscall",
|
||||
"smallvec",
|
||||
"windows-sys",
|
||||
"windows-sys 0.36.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -874,6 +935,20 @@ dependencies = [
|
|||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustix"
|
||||
version = "0.37.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b96e891d04aa506a6d1f318d2771bcb1c7dfda84e126660ace067c9b474bb2c0"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"errno",
|
||||
"io-lifetimes",
|
||||
"libc",
|
||||
"linux-raw-sys",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ryu"
|
||||
version = "1.0.11"
|
||||
|
@ -1046,12 +1121,6 @@ dependencies = [
|
|||
"unicode-width",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "textwrap"
|
||||
version = "0.15.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "949517c0cf1bf4ee812e2e07e08ab448e3ae0d23472aee8a06c985f0c8815b16"
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "1.0.35"
|
||||
|
@ -1366,43 +1435,109 @@ version = "0.36.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2"
|
||||
dependencies = [
|
||||
"windows_aarch64_msvc",
|
||||
"windows_i686_gnu",
|
||||
"windows_i686_msvc",
|
||||
"windows_x86_64_gnu",
|
||||
"windows_x86_64_msvc",
|
||||
"windows_aarch64_msvc 0.36.1",
|
||||
"windows_i686_gnu 0.36.1",
|
||||
"windows_i686_msvc 0.36.1",
|
||||
"windows_x86_64_gnu 0.36.1",
|
||||
"windows_x86_64_msvc 0.36.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-sys"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
|
||||
dependencies = [
|
||||
"windows-targets",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-targets"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5"
|
||||
dependencies = [
|
||||
"windows_aarch64_gnullvm",
|
||||
"windows_aarch64_msvc 0.48.0",
|
||||
"windows_i686_gnu 0.48.0",
|
||||
"windows_i686_msvc 0.48.0",
|
||||
"windows_x86_64_gnu 0.48.0",
|
||||
"windows_x86_64_gnullvm",
|
||||
"windows_x86_64_msvc 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_gnullvm"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc"
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_msvc"
|
||||
version = "0.36.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47"
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_msvc"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_gnu"
|
||||
version = "0.36.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_gnu"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_msvc"
|
||||
version = "0.36.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_msvc"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnu"
|
||||
version = "0.36.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnu"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnullvm"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_msvc"
|
||||
version = "0.36.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_msvc"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
|
||||
|
||||
[[package]]
|
||||
name = "xml-rs"
|
||||
version = "0.8.4"
|
||||
|
|
|
@ -12,7 +12,7 @@ readme = "README.md"
|
|||
edition = "2018"
|
||||
|
||||
[package.metadata]
|
||||
toolchain = "1.64.0"
|
||||
toolchain = "1.70.0"
|
||||
upx_version = "3.96"
|
||||
|
||||
[dependencies]
|
||||
|
@ -41,7 +41,7 @@ signal = "0.7"
|
|||
|
||||
[dev-dependencies]
|
||||
tempfile = "3"
|
||||
criterion = { version = "0.4", features = ["html_reports"] }
|
||||
criterion = { version = "0.5", features = ["html_reports"] }
|
||||
iai = "0.1"
|
||||
|
||||
[features]
|
||||
|
|
|
@ -18,6 +18,9 @@ mod engine {
|
|||
pub mod common {
|
||||
include!("../src/engine/common.rs");
|
||||
}
|
||||
pub mod coms {
|
||||
include!("../src/engine/coms.rs");
|
||||
}
|
||||
mod shared {
|
||||
include!("../src/engine/shared.rs");
|
||||
}
|
||||
|
@ -27,6 +30,12 @@ mod engine {
|
|||
mod socket_thread {
|
||||
include!("../src/engine/socket_thread.rs");
|
||||
}
|
||||
mod housekeep_thread {
|
||||
include!("../src/engine/housekeep_thread.rs");
|
||||
}
|
||||
mod extras_thread {
|
||||
include!("../src/engine/extras_thread.rs");
|
||||
}
|
||||
}
|
||||
mod config {
|
||||
include!("../src/config.rs");
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::{fs::File, hash::BuildHasherDefault};
|
||||
|
||||
|
@ -8,11 +6,13 @@ use fnv::FnvHasher;
|
|||
use crate::util::CtrlC;
|
||||
use crate::{
|
||||
config::Config,
|
||||
crypto::PeerCrypto,
|
||||
device::Device,
|
||||
engine::{
|
||||
coms::Coms,
|
||||
device_thread::DeviceThread,
|
||||
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
|
||||
extras_thread::ExtrasThread,
|
||||
housekeep_thread::HousekeepThread,
|
||||
shared::SharedConfig,
|
||||
socket_thread::{ReconnectEntry, SocketThread},
|
||||
},
|
||||
error::Error,
|
||||
|
@ -36,13 +36,15 @@ pub struct PeerData {
|
|||
pub timeout: Time,
|
||||
pub peer_timeout: u16,
|
||||
pub node_id: NodeId,
|
||||
pub crypto: PeerCrypto,
|
||||
//pub crypto: PeerCrypto,
|
||||
}
|
||||
|
||||
pub struct GenericCloud<D: Device, P: Protocol, S: Socket, TS: TimeSource> {
|
||||
config: SharedConfig,
|
||||
socket_thread: SocketThread<S, D, P, TS>,
|
||||
device_thread: DeviceThread<S, D, P, TS>,
|
||||
running: Arc<AtomicBool>,
|
||||
housekeep_thread: HousekeepThread<S, P, TS>,
|
||||
extras_thread: ExtrasThread<S, P, TS>,
|
||||
}
|
||||
|
||||
impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS> {
|
||||
|
@ -50,32 +52,18 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
|
|||
pub fn new(
|
||||
config: &Config, socket: S, device: D, port_forwarding: Option<PortForwarding>, stats_file: Option<File>,
|
||||
) -> Result<Self, Error> {
|
||||
let table = SharedTable::<TS>::new(config);
|
||||
let traffic = SharedTraffic::new();
|
||||
let peer_crypto = SharedPeerCrypto::new();
|
||||
let running = Arc::new(AtomicBool::new(true));
|
||||
let device_thread = DeviceThread::<S, D, P, TS>::new(
|
||||
config.clone(),
|
||||
device.duplicate()?,
|
||||
let config = SharedConfig::new(config.clone());
|
||||
let coms = Coms::<S, TS, P>::new(
|
||||
config.get_config(),
|
||||
socket.try_clone().map_err(|e| Error::SocketIo("Failed to clone socket", e))?,
|
||||
traffic.clone(),
|
||||
peer_crypto.clone(),
|
||||
table.clone(),
|
||||
running.clone(),
|
||||
);
|
||||
let mut socket_thread = SocketThread::<S, D, P, TS>::new(
|
||||
config.clone(),
|
||||
device,
|
||||
socket,
|
||||
traffic,
|
||||
peer_crypto,
|
||||
table,
|
||||
port_forwarding,
|
||||
stats_file,
|
||||
running.clone(),
|
||||
);
|
||||
let device_thread = DeviceThread::<S, D, P, TS>::new(config.clone(), device.duplicate()?, coms.try_clone()?);
|
||||
let housekeep_thread = HousekeepThread::<S, P, TS>::new(config.clone(), coms.try_clone()?);
|
||||
let extras_thread = ExtrasThread::<S, P, TS>::new(config.clone(), coms.try_clone()?);
|
||||
let mut socket_thread =
|
||||
SocketThread::<S, D, P, TS>::new(config.clone(), coms, device, port_forwarding, stats_file);
|
||||
socket_thread.housekeep()?;
|
||||
Ok(Self { socket_thread, device_thread, running })
|
||||
Ok(Self { socket_thread, device_thread, config, housekeep_thread, extras_thread })
|
||||
}
|
||||
|
||||
pub fn add_peer(&mut self, addr: String) -> Result<(), Error> {
|
||||
|
@ -93,17 +81,23 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
|
|||
|
||||
pub fn run(self) {
|
||||
debug!("Starting threads");
|
||||
let running = self.running.clone();
|
||||
let config = self.config;
|
||||
let device = self.device_thread;
|
||||
let device_thread_handle = thread::spawn(move || device.run());
|
||||
let socket = self.socket_thread;
|
||||
let socket_thread_handle = thread::spawn(move || socket.run());
|
||||
let housekeep = self.housekeep_thread;
|
||||
let housekeep_thread_handle = thread::spawn(move || housekeep.run());
|
||||
let extras = self.extras_thread;
|
||||
let extras_thread_handle = thread::spawn(move || extras.run());
|
||||
let ctrlc = CtrlC::new();
|
||||
ctrlc.wait();
|
||||
running.store(false, Ordering::SeqCst);
|
||||
config.stop();
|
||||
debug!("Waiting for threads to end");
|
||||
device_thread_handle.join().unwrap();
|
||||
socket_thread_handle.join().unwrap();
|
||||
housekeep_thread_handle.join().unwrap();
|
||||
extras_thread_handle.join().unwrap();
|
||||
debug!("Threads stopped");
|
||||
}
|
||||
}
|
||||
|
@ -120,7 +114,7 @@ use std::net::SocketAddr;
|
|||
#[cfg(test)]
|
||||
impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
|
||||
pub fn socket(&mut self) -> &mut MockSocket {
|
||||
&mut self.socket_thread.socket
|
||||
&mut self.socket_thread.coms.socket
|
||||
}
|
||||
|
||||
pub fn device(&mut self) -> &mut MockDevice {
|
||||
|
@ -145,7 +139,7 @@ impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
|
|||
}
|
||||
|
||||
pub fn is_connected(&self, addr: &SocketAddr) -> bool {
|
||||
self.socket_thread.peers.contains_key(addr)
|
||||
self.socket_thread.coms.has_peer(addr)
|
||||
}
|
||||
|
||||
pub fn own_addresses(&self) -> &[SocketAddr] {
|
||||
|
@ -153,6 +147,6 @@ impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
|
|||
}
|
||||
|
||||
pub fn get_num(&self) -> usize {
|
||||
self.socket_thread.socket.address().unwrap().port() as usize
|
||||
self.socket_thread.coms.socket.address().unwrap().port() as usize
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,172 @@
|
|||
use std::{collections::HashMap, io, marker::PhantomData, net::SocketAddr, ops::DerefMut};
|
||||
|
||||
use crate::{
|
||||
config::Config,
|
||||
crypto::PeerCrypto,
|
||||
error::Error,
|
||||
messages::MESSAGE_TYPE_DATA,
|
||||
net::{mapped_addr, Socket},
|
||||
payload::Protocol,
|
||||
util::{MsgBuffer, TimeSource, addr_nice},
|
||||
};
|
||||
|
||||
use super::{
|
||||
common::{Hash, PeerData, SPACE_BEFORE},
|
||||
shared::{SharedCrypto, SharedPeers, SharedTable, SharedTraffic},
|
||||
};
|
||||
|
||||
pub struct Coms<S: Socket, TS: TimeSource, P: Protocol> {
|
||||
_dummy_p: PhantomData<P>,
|
||||
broadcast: bool,
|
||||
broadcast_buffer: MsgBuffer,
|
||||
crypto: SharedCrypto,
|
||||
peers: SharedPeers,
|
||||
pub table: SharedTable<TS>,
|
||||
pub traffic: SharedTraffic,
|
||||
pub socket: S,
|
||||
}
|
||||
|
||||
impl<S: Socket, TS: TimeSource, P: Protocol> Coms<S, TS, P> {
|
||||
pub fn new(config: &Config, socket: S) -> Self {
|
||||
Self {
|
||||
_dummy_p: PhantomData,
|
||||
broadcast: config.is_broadcasting(),
|
||||
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE),
|
||||
traffic: SharedTraffic::new(),
|
||||
crypto: SharedCrypto::new(),
|
||||
peers: SharedPeers::new(),
|
||||
table: SharedTable::<TS>::new(config),
|
||||
socket,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_clone(&self) -> Result<Self, Error> {
|
||||
Ok(Self {
|
||||
_dummy_p: PhantomData,
|
||||
broadcast: self.broadcast,
|
||||
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE),
|
||||
traffic: self.traffic.clone(),
|
||||
crypto: self.crypto.clone(),
|
||||
table: self.table.clone(),
|
||||
peers: self.peers.clone(),
|
||||
socket: self.socket.try_clone().map_err(|e| Error::SocketIo("Failed to clone socket", e))?,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn sync(&mut self) -> Result<(), Error> {
|
||||
self.crypto.load();
|
||||
self.table.sync();
|
||||
self.traffic.sync();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_address(&self) -> Result<SocketAddr, io::Error> {
|
||||
self.socket.address().map(mapped_addr)
|
||||
}
|
||||
|
||||
pub fn send_raw(&mut self, data: &[u8], addr: SocketAddr) -> Result<(), Error> {
|
||||
match self.socket.send(data, addr) {
|
||||
Ok(written) if written == data.len() => Ok(()),
|
||||
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
||||
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
|
||||
}
|
||||
}
|
||||
|
||||
//TODO: move back to socket thread
|
||||
pub fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> {
|
||||
self.socket.receive(buffer)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn send_to(&mut self, addr: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
|
||||
let size = data.len();
|
||||
debug!("Sending msg with {} bytes to {}", size, addr);
|
||||
self.traffic.count_out_traffic(addr, size);
|
||||
match self.socket.send(data.message(), addr) {
|
||||
Ok(written) if written == size => Ok(()),
|
||||
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
||||
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn send_msg(&mut self, addr: SocketAddr, type_: u8, data: &mut MsgBuffer) -> Result<(), Error> {
|
||||
debug!("Sending msg with {} bytes to {}", data.len(), addr);
|
||||
data.prepend_byte(type_);
|
||||
self.crypto.encrypt_for(addr, data)?;
|
||||
self.send_to(addr, data)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn broadcast_msg(&mut self, type_: u8, data: &mut MsgBuffer) -> Result<(), Error> {
|
||||
let size = data.len();
|
||||
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, size, self.crypto.count());
|
||||
let traffic = &mut self.traffic;
|
||||
let socket = &mut self.socket;
|
||||
let peers = self.crypto.get_snapshot();
|
||||
for (addr, crypto) in peers {
|
||||
self.broadcast_buffer.set_start(data.get_start());
|
||||
self.broadcast_buffer.set_length(data.len());
|
||||
self.broadcast_buffer.message_mut().clone_from_slice(data.message());
|
||||
self.broadcast_buffer.prepend_byte(type_);
|
||||
if let Some(crypto) = crypto {
|
||||
crypto.encrypt(&mut self.broadcast_buffer);
|
||||
}
|
||||
traffic.count_out_traffic(*addr, self.broadcast_buffer.len());
|
||||
match socket.send(self.broadcast_buffer.message(), *addr) {
|
||||
Ok(written) if written == self.broadcast_buffer.len() => Ok(()),
|
||||
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
||||
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
|
||||
}?
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn forward_packet(&mut self, data: &mut MsgBuffer) -> Result<(), Error> {
|
||||
let (src, dst) = P::parse(data.message())?;
|
||||
debug!("Read data from interface: src: {}, dst: {}, {} bytes", src, dst, data.len());
|
||||
self.traffic.count_out_payload(dst.clone(), src, data.len());
|
||||
match self.table.lookup(&dst) {
|
||||
Some(addr) => {
|
||||
// Peer found for destination
|
||||
debug!("Found destination for {} => {}", dst, addr);
|
||||
self.send_msg(addr, MESSAGE_TYPE_DATA, data)?;
|
||||
}
|
||||
//TODO: VIA: find relay peer and relay message
|
||||
None => {
|
||||
if self.broadcast {
|
||||
debug!("No destination for {} found, broadcasting", dst);
|
||||
self.broadcast_msg(MESSAGE_TYPE_DATA, data)?;
|
||||
} else {
|
||||
debug!("No destination for {} found, dropping", dst);
|
||||
self.traffic.count_dropped_payload(data.len());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_peers<'a>(&'a self) -> impl DerefMut<Target = HashMap<SocketAddr, PeerData, Hash>> + 'a {
|
||||
self.peers.get_peers()
|
||||
}
|
||||
|
||||
pub fn add_peer(&mut self, addr: SocketAddr, peer_data: PeerData, peer_crypto: &PeerCrypto) {
|
||||
self.crypto.add(addr, peer_crypto.get_core());
|
||||
self.peers.get_peers().insert(addr, peer_data);
|
||||
}
|
||||
|
||||
pub fn remove_peer(&mut self, addr: &SocketAddr) -> bool {
|
||||
if let Some(_peer) = self.peers.get_peers().remove(addr) {
|
||||
info!("Closing connection to {}", addr_nice(*addr));
|
||||
self.table.remove_claims(*addr);
|
||||
self.crypto.remove(addr);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn has_peer(&self, addr: &SocketAddr) -> bool {
|
||||
self.crypto.contains(addr)
|
||||
}
|
||||
}
|
|
@ -1,139 +1,40 @@
|
|||
use super::{
|
||||
common::SPACE_BEFORE,
|
||||
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
|
||||
};
|
||||
use super::{common::SPACE_BEFORE, coms::Coms, shared::SharedConfig};
|
||||
use crate::{
|
||||
config::Config,
|
||||
device::Device,
|
||||
error::Error,
|
||||
messages::MESSAGE_TYPE_DATA,
|
||||
net::Socket,
|
||||
util::{MsgBuffer, Time, TimeSource},
|
||||
Protocol,
|
||||
};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::{marker::PhantomData, net::SocketAddr};
|
||||
|
||||
pub struct DeviceThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
|
||||
// Read-only fields
|
||||
_dummy_ts: PhantomData<TS>,
|
||||
_dummy_p: PhantomData<P>,
|
||||
broadcast: bool,
|
||||
// Device-only fields
|
||||
socket: S,
|
||||
config: SharedConfig,
|
||||
coms: Coms<S, TS, P>,
|
||||
pub device: D,
|
||||
next_housekeep: Time,
|
||||
buffer: MsgBuffer,
|
||||
broadcast_buffer: MsgBuffer,
|
||||
// Shared fields
|
||||
traffic: SharedTraffic,
|
||||
peer_crypto: SharedPeerCrypto,
|
||||
table: SharedTable<TS>,
|
||||
running: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS> {
|
||||
pub fn new(
|
||||
config: Config, device: D, socket: S, traffic: SharedTraffic, peer_crypto: SharedPeerCrypto,
|
||||
table: SharedTable<TS>, running: Arc<AtomicBool>,
|
||||
) -> Self {
|
||||
pub fn new(config: SharedConfig, device: D, coms: Coms<S, TS, P>) -> Self {
|
||||
Self {
|
||||
_dummy_ts: PhantomData,
|
||||
_dummy_p: PhantomData,
|
||||
broadcast: config.is_broadcasting(),
|
||||
socket,
|
||||
config,
|
||||
device,
|
||||
next_housekeep: TS::now(),
|
||||
traffic,
|
||||
peer_crypto,
|
||||
table,
|
||||
buffer: MsgBuffer::new(SPACE_BEFORE),
|
||||
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE),
|
||||
running,
|
||||
coms,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn send_to(&mut self, addr: SocketAddr) -> Result<(), Error> {
|
||||
let size = self.buffer.len();
|
||||
debug!("Sending msg with {} bytes to {}", size, addr);
|
||||
self.traffic.count_out_traffic(addr, size);
|
||||
match self.socket.send(self.buffer.message(), addr) {
|
||||
Ok(written) if written == size => Ok(()),
|
||||
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
||||
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn send_msg(&mut self, addr: SocketAddr, type_: u8) -> Result<(), Error> {
|
||||
debug!("Sending msg with {} bytes to {}", self.buffer.len(), addr);
|
||||
self.buffer.prepend_byte(type_);
|
||||
self.peer_crypto.encrypt_for(addr, &mut self.buffer)?;
|
||||
self.send_to(addr)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn broadcast_msg(&mut self, type_: u8) -> Result<(), Error> {
|
||||
let size = self.buffer.len();
|
||||
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, size, self.peer_crypto.count());
|
||||
let traffic = &mut self.traffic;
|
||||
let socket = &mut self.socket;
|
||||
let peers = self.peer_crypto.get_snapshot();
|
||||
for (addr, crypto) in peers {
|
||||
self.broadcast_buffer.set_start(self.buffer.get_start());
|
||||
self.broadcast_buffer.set_length(self.buffer.len());
|
||||
self.broadcast_buffer.message_mut().clone_from_slice(self.buffer.message());
|
||||
self.broadcast_buffer.prepend_byte(type_);
|
||||
if let Some(crypto) = crypto {
|
||||
crypto.encrypt(&mut self.broadcast_buffer);
|
||||
}
|
||||
traffic.count_out_traffic(*addr, self.broadcast_buffer.len());
|
||||
match socket.send(self.broadcast_buffer.message(), *addr) {
|
||||
Ok(written) if written == self.broadcast_buffer.len() => Ok(()),
|
||||
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
||||
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
|
||||
}?
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn forward_packet(&mut self) -> Result<(), Error> {
|
||||
let (src, dst) = P::parse(self.buffer.message())?;
|
||||
debug!("Read data from interface: src: {}, dst: {}, {} bytes", src, dst, self.buffer.len());
|
||||
self.traffic.count_out_payload(dst.clone(), src, self.buffer.len());
|
||||
match self.table.lookup(&dst) {
|
||||
Some(addr) => {
|
||||
// Peer found for destination
|
||||
debug!("Found destination for {} => {}", dst, addr);
|
||||
self.send_msg(addr, MESSAGE_TYPE_DATA)?;
|
||||
}
|
||||
//TODO: VIA: find relay peer and relay message
|
||||
None => {
|
||||
if self.broadcast {
|
||||
debug!("No destination for {} found, broadcasting", dst);
|
||||
self.broadcast_msg(MESSAGE_TYPE_DATA)?;
|
||||
} else {
|
||||
debug!("No destination for {} found, dropping", dst);
|
||||
self.traffic.count_dropped_payload(self.buffer.len());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn housekeep(&mut self) -> Result<(), Error> {
|
||||
self.peer_crypto.load();
|
||||
self.table.sync();
|
||||
self.traffic.sync();
|
||||
Ok(())
|
||||
self.coms.sync()
|
||||
}
|
||||
|
||||
pub fn iteration(&mut self) -> bool {
|
||||
if self.device.read(&mut self.buffer).is_ok() {
|
||||
//try_fail!(result, "Failed to read from device: {}");
|
||||
if let Err(e) = self.forward_packet() {
|
||||
if let Err(e) = self.coms.forward_packet(&mut self.buffer) {
|
||||
error!("{}", e);
|
||||
}
|
||||
}
|
||||
|
@ -143,7 +44,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
|
|||
error!("{}", e)
|
||||
}
|
||||
self.next_housekeep = now + 1;
|
||||
if !self.running.load(Ordering::SeqCst) {
|
||||
if !self.config.is_running() {
|
||||
debug!("Device: end");
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use super::{common::SPACE_BEFORE, coms::Coms, shared::SharedConfig};
|
||||
use crate::{
|
||||
error::Error,
|
||||
net::Socket,
|
||||
util::{MsgBuffer, TimeSource, Time},
|
||||
Protocol,
|
||||
};
|
||||
|
||||
const MAX_RECONNECT_INTERVAL: u16 = 3600;
|
||||
const RESOLVE_INTERVAL: Time = 300;
|
||||
const OWN_ADDRESS_RESET_INTERVAL: Time = 300;
|
||||
pub const STATS_INTERVAL: Time = 60;
|
||||
|
||||
pub struct ExtrasThread<S: Socket, P: Protocol, TS: TimeSource> {
|
||||
config: SharedConfig,
|
||||
coms: Coms<S, TS, P>,
|
||||
next_housekeep: Time,
|
||||
buffer: MsgBuffer,
|
||||
}
|
||||
|
||||
impl<S: Socket, P: Protocol, TS: TimeSource> ExtrasThread<S, P, TS> {
|
||||
pub fn new(config: SharedConfig, coms: Coms<S, TS, P>) -> Self {
|
||||
Self {
|
||||
config,
|
||||
coms,
|
||||
next_housekeep: TS::now(),
|
||||
buffer: MsgBuffer::new(SPACE_BEFORE),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn housekeep(&mut self) -> Result<(), Error> {
|
||||
let now = TS::now();
|
||||
assert!(self.buffer.is_empty());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
pub fn iteration(&mut self) -> bool {
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
let now = TS::now();
|
||||
if self.next_housekeep < now {
|
||||
if let Err(e) = self.housekeep() {
|
||||
error!("{}", e)
|
||||
}
|
||||
self.next_housekeep = now + 1;
|
||||
if !self.config.is_running() {
|
||||
debug!("Extras: end");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
pub fn run(mut self) {
|
||||
while self.iteration() {}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
use std::{time::Duration, net::SocketAddr, cmp::{min, max}};
|
||||
|
||||
use smallvec::SmallVec;
|
||||
|
||||
use super::{common::SPACE_BEFORE, coms::Coms, shared::SharedConfig};
|
||||
use crate::{
|
||||
error::Error,
|
||||
net::Socket,
|
||||
util::{MsgBuffer, TimeSource, Time, addr_nice},
|
||||
Protocol, messages::MESSAGE_TYPE_NODE_INFO, config::DEFAULT_PEER_TIMEOUT,
|
||||
};
|
||||
|
||||
const MAX_RECONNECT_INTERVAL: u16 = 3600;
|
||||
const RESOLVE_INTERVAL: Time = 300;
|
||||
const OWN_ADDRESS_RESET_INTERVAL: Time = 300;
|
||||
pub const STATS_INTERVAL: Time = 60;
|
||||
|
||||
pub struct HousekeepThread<S: Socket, P: Protocol, TS: TimeSource> {
|
||||
config: SharedConfig,
|
||||
coms: Coms<S, TS, P>,
|
||||
next_housekeep: Time,
|
||||
buffer: MsgBuffer,
|
||||
update_freq: u16,
|
||||
next_peers: Time,
|
||||
next_own_address_reset: Time,
|
||||
}
|
||||
|
||||
impl<S: Socket, P: Protocol, TS: TimeSource> HousekeepThread<S, P, TS> {
|
||||
pub fn new(config: SharedConfig, coms: Coms<S, TS, P>) -> Self {
|
||||
let update_freq = config.get_config().get_keepalive() as u16;
|
||||
Self {
|
||||
config,
|
||||
coms,
|
||||
next_housekeep: TS::now(),
|
||||
buffer: MsgBuffer::new(SPACE_BEFORE),
|
||||
update_freq,
|
||||
next_peers: TS::now(),
|
||||
next_own_address_reset: TS::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn housekeep(&mut self) -> Result<(), Error> {
|
||||
let now = TS::now();
|
||||
assert!(self.buffer.is_empty());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn iteration(&mut self) -> bool {
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
let now = TS::now();
|
||||
if self.next_housekeep < now {
|
||||
if let Err(e) = self.housekeep() {
|
||||
error!("{}", e)
|
||||
}
|
||||
self.next_housekeep = now + 1;
|
||||
if !self.config.is_running() {
|
||||
debug!("Housekeep: end");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
pub fn run(mut self) {
|
||||
while self.iteration() {}
|
||||
}
|
||||
}
|
|
@ -2,7 +2,10 @@
|
|||
// Copyright (C) 2015-2021 Dennis Schwerdel
|
||||
// This software is licensed under GPL-3 or newer (see LICENSE.md)
|
||||
|
||||
mod device_thread;
|
||||
mod shared;
|
||||
mod device_thread;
|
||||
mod socket_thread;
|
||||
mod housekeep_thread;
|
||||
mod extras_thread;
|
||||
pub mod coms;
|
||||
pub mod common;
|
|
@ -1,6 +1,6 @@
|
|||
use crate::{
|
||||
config::Config,
|
||||
crypto::CryptoCore,
|
||||
crypto::{CryptoCore, PeerCrypto},
|
||||
engine::common::Hash,
|
||||
error::Error,
|
||||
table::ClaimTable,
|
||||
|
@ -13,35 +13,41 @@ use std::{
|
|||
collections::HashMap,
|
||||
io::{self, Write},
|
||||
net::SocketAddr,
|
||||
sync::Arc,
|
||||
ops::DerefMut,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
};
|
||||
|
||||
use super::common::PeerData;
|
||||
|
||||
#[derive(Clone)]
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub struct SharedPeerCrypto {
|
||||
peers: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>,
|
||||
pub struct SharedCrypto {
|
||||
peer_crypto: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>,
|
||||
cache: HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>, //TODO: local hashmap as cache
|
||||
}
|
||||
|
||||
impl SharedPeerCrypto {
|
||||
impl SharedCrypto {
|
||||
pub fn new() -> Self {
|
||||
SharedPeerCrypto { peers: Arc::new(Mutex::new(HashMap::default())), cache: HashMap::default() }
|
||||
SharedCrypto { peer_crypto: Arc::new(Mutex::new(HashMap::default())), cache: HashMap::default() }
|
||||
}
|
||||
|
||||
pub fn encrypt_for(&mut self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
|
||||
let crypto = match self.cache.get(&peer) {
|
||||
Some(crypto) => crypto,
|
||||
None => {
|
||||
let peers = self.peers.lock();
|
||||
let cache = &mut self.cache;
|
||||
let owned_crypto;
|
||||
let crypto = if let Some(crypto) = cache.get(&peer) {
|
||||
crypto
|
||||
} else {
|
||||
let peers = self.peer_crypto.lock();
|
||||
if let Some(crypto) = peers.get(&peer) {
|
||||
self.cache.insert(peer, crypto.clone());
|
||||
self.cache.get(&peer).unwrap()
|
||||
cache.insert(peer, crypto.clone());
|
||||
owned_crypto = crypto.clone();
|
||||
&owned_crypto
|
||||
} else {
|
||||
return Err(Error::InvalidCryptoState("No crypto found for peer"));
|
||||
}
|
||||
}
|
||||
};
|
||||
if let Some(crypto) = crypto {
|
||||
crypto.encrypt(data);
|
||||
|
@ -49,16 +55,28 @@ impl SharedPeerCrypto {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn store(&mut self, data: &HashMap<SocketAddr, PeerData, Hash>) {
|
||||
pub fn add(&mut self, addr: SocketAddr, crypto: Option<Arc<CryptoCore>>) {
|
||||
self.cache.insert(addr, crypto.clone());
|
||||
let mut peers = self.peer_crypto.lock();
|
||||
peers.insert(addr, crypto);
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, addr: &SocketAddr) {
|
||||
self.cache.remove(addr);
|
||||
let mut peers = self.peer_crypto.lock();
|
||||
peers.remove(addr);
|
||||
}
|
||||
|
||||
pub fn store(&mut self, data: &HashMap<SocketAddr, PeerCrypto, Hash>) {
|
||||
self.cache.clear();
|
||||
self.cache.extend(data.iter().map(|(k, v)| (*k, v.crypto.get_core())));
|
||||
let mut peers = self.peers.lock();
|
||||
self.cache.extend(data.iter().map(|(k, v)| (*k, v.get_core())));
|
||||
let mut peers = self.peer_crypto.lock();
|
||||
peers.clear();
|
||||
peers.extend(self.cache.iter().map(|(k, v)| (*k, v.clone())));
|
||||
}
|
||||
|
||||
pub fn load(&mut self) {
|
||||
let peers = self.peers.lock();
|
||||
let peers = self.peer_crypto.lock();
|
||||
self.cache.clear();
|
||||
self.cache.extend(peers.iter().map(|(k, v)| (*k, v.clone())));
|
||||
}
|
||||
|
@ -67,6 +85,10 @@ impl SharedPeerCrypto {
|
|||
&self.cache
|
||||
}
|
||||
|
||||
pub fn contains(&self, addr: &SocketAddr) -> bool {
|
||||
self.cache.contains_key(addr)
|
||||
}
|
||||
|
||||
pub fn count(&self) -> usize {
|
||||
self.cache.len()
|
||||
}
|
||||
|
@ -138,6 +160,12 @@ impl SharedTraffic {
|
|||
}
|
||||
}
|
||||
|
||||
impl Default for SharedTraffic {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SharedTable<TS: TimeSource> {
|
||||
table: Arc<Mutex<ClaimTable<TS>>>,
|
||||
|
@ -198,3 +226,42 @@ impl<TS: TimeSource> SharedTable<TS> {
|
|||
self.table.lock().claim_len()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SharedConfig {
|
||||
config: Config,
|
||||
running: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl SharedConfig {
|
||||
pub fn new(config: Config) -> Self {
|
||||
Self { config, running: Arc::new(AtomicBool::new(true)) }
|
||||
}
|
||||
|
||||
pub fn get_config(&self) -> &Config {
|
||||
&self.config
|
||||
}
|
||||
|
||||
pub fn is_running(&self) -> bool {
|
||||
self.running.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn stop(&self) {
|
||||
self.running.store(false, Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SharedPeers {
|
||||
peers: Arc<Mutex<HashMap<SocketAddr, PeerData, Hash>>>,
|
||||
}
|
||||
|
||||
impl SharedPeers {
|
||||
pub fn new() -> Self {
|
||||
Self { peers: Default::default() }
|
||||
}
|
||||
|
||||
pub fn get_peers<'a>(&'a self) -> impl DerefMut<Target = HashMap<SocketAddr, PeerData, Hash>> + 'a {
|
||||
self.peers.lock()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,12 +1,9 @@
|
|||
use super::{
|
||||
common::SPACE_BEFORE,
|
||||
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
|
||||
};
|
||||
use super::{common::SPACE_BEFORE, coms::Coms, shared::SharedConfig};
|
||||
|
||||
use crate::{
|
||||
beacon::BeaconSerializer,
|
||||
config::{DEFAULT_PEER_TIMEOUT, DEFAULT_PORT},
|
||||
crypto::{is_init_message, Crypto, InitResult, InitState, MessageResult},
|
||||
crypto::{is_init_message, Crypto, InitResult, InitState, MessageResult, PeerCrypto},
|
||||
device::{Device, Type},
|
||||
engine::common::{Hash, PeerData},
|
||||
error::Error,
|
||||
|
@ -18,12 +15,10 @@ use crate::{
|
|||
port_forwarding::PortForwarding,
|
||||
types::{Address, NodeId, Range, RangeList},
|
||||
util::{addr_nice, resolve, MsgBuffer, StatsdMsg, Time, TimeSource},
|
||||
Config, Protocol,
|
||||
Protocol,
|
||||
};
|
||||
use rand::{random, seq::SliceRandom, thread_rng};
|
||||
use smallvec::{smallvec, SmallVec};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::{
|
||||
cmp::{max, min},
|
||||
collections::HashMap,
|
||||
|
@ -33,7 +28,7 @@ use std::{
|
|||
io::{Cursor, Seek, SeekFrom, Write},
|
||||
marker::PhantomData,
|
||||
net::{SocketAddr, ToSocketAddrs},
|
||||
str::FromStr,
|
||||
str::FromStr, ops::Deref,
|
||||
};
|
||||
|
||||
const MAX_RECONNECT_INTERVAL: u16 = 3600;
|
||||
|
@ -55,35 +50,32 @@ pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
|
|||
// Read-only fields
|
||||
node_id: NodeId,
|
||||
claims: RangeList,
|
||||
config: Config,
|
||||
peer_timeout_publish: u16,
|
||||
learning: bool,
|
||||
update_freq: u16,
|
||||
_dummy_ts: PhantomData<TS>,
|
||||
_dummy_p: PhantomData<P>,
|
||||
// Socket-only fields
|
||||
pub socket: S,
|
||||
config: SharedConfig,
|
||||
pub coms: Coms<S, TS, P>,
|
||||
device: D,
|
||||
next_housekeep: Time,
|
||||
pub own_addresses: AddrList,
|
||||
next_own_address_reset: Time,
|
||||
pending_inits: HashMap<SocketAddr, InitState<NodeInfo>, Hash>,
|
||||
crypto: Crypto,
|
||||
pub peers: HashMap<SocketAddr, PeerData, Hash>,
|
||||
next_peers: Time,
|
||||
next_stats_out: Time,
|
||||
next_beacon: Time,
|
||||
beacon_serializer: BeaconSerializer<TS>,
|
||||
stats_file: Option<File>,
|
||||
statsd_server: Option<String>,
|
||||
pub reconnect_peers: SmallVec<[ReconnectEntry; 3]>,
|
||||
crypto: Crypto, // TODO 2nd: move to config
|
||||
//pub peers: HashMap<SocketAddr, PeerData, Hash>, // TODO 1st: move to shared peers
|
||||
peer_crypto: HashMap<SocketAddr, PeerCrypto, Hash>,
|
||||
next_peers: Time, // TODO: split off
|
||||
next_stats_out: Time, // TODO: split off
|
||||
next_beacon: Time, // TODO: split off
|
||||
beacon_serializer: BeaconSerializer<TS>, // TODO: split off
|
||||
stats_file: Option<File>, // TODO: split off
|
||||
statsd_server: Option<String>, // TODO: split off
|
||||
pub reconnect_peers: SmallVec<[ReconnectEntry; 3]>, // TODO: move to shared config
|
||||
buffer: MsgBuffer,
|
||||
broadcast_buffer: MsgBuffer,
|
||||
// Shared fields
|
||||
peer_crypto: SharedPeerCrypto,
|
||||
traffic: SharedTraffic,
|
||||
table: SharedTable<TS>,
|
||||
running: Arc<AtomicBool>,
|
||||
//table: SharedTable<TS>,
|
||||
// Should not be here
|
||||
port_forwarding: Option<PortForwarding>, // TODO: 3rd thread
|
||||
}
|
||||
|
@ -91,15 +83,14 @@ pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
|
|||
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS> {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
config: Config, device: D, socket: S, traffic: SharedTraffic, peer_crypto: SharedPeerCrypto,
|
||||
table: SharedTable<TS>, port_forwarding: Option<PortForwarding>, stats_file: Option<File>,
|
||||
running: Arc<AtomicBool>,
|
||||
config: SharedConfig, coms: Coms<S, TS, P>, device: D, port_forwarding: Option<PortForwarding>,
|
||||
stats_file: Option<File>,
|
||||
) -> Self {
|
||||
let mut claims = SmallVec::with_capacity(config.claims.len());
|
||||
for s in &config.claims {
|
||||
let mut claims = SmallVec::with_capacity(config.get_config().claims.len());
|
||||
for s in &config.get_config().claims {
|
||||
claims.push(try_fail!(Range::from_str(s), "Invalid subnet format: {} ({})", s));
|
||||
}
|
||||
if device.get_type() == Type::Tun && config.auto_claim {
|
||||
if device.get_type() == Type::Tun && config.get_config().auto_claim {
|
||||
match device.get_ip() {
|
||||
Ok(ip) => {
|
||||
let range = Range { base: Address::from_ipv4(ip), prefix_len: 32 };
|
||||
|
@ -113,20 +104,17 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
}
|
||||
}
|
||||
let now = TS::now();
|
||||
let update_freq = config.get_keepalive() as u16;
|
||||
let update_freq = config.get_config().get_keepalive() as u16;
|
||||
let node_id = random();
|
||||
let beacon_key = config.beacon_password.as_ref().map(|s| s.as_bytes()).unwrap_or(&[]);
|
||||
let beacon_key = config.get_config().beacon_password.as_ref().map(|s| s.as_bytes()).unwrap_or(&[]);
|
||||
Self {
|
||||
_dummy_p: PhantomData,
|
||||
_dummy_ts: PhantomData,
|
||||
node_id,
|
||||
claims,
|
||||
device,
|
||||
socket,
|
||||
peer_crypto,
|
||||
traffic,
|
||||
table,
|
||||
learning: config.is_learning(),
|
||||
coms,
|
||||
learning: config.get_config().is_learning(),
|
||||
next_housekeep: now,
|
||||
next_beacon: now,
|
||||
next_peers: now,
|
||||
|
@ -135,59 +123,22 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
pending_inits: HashMap::default(),
|
||||
reconnect_peers: SmallVec::new(),
|
||||
own_addresses: SmallVec::new(),
|
||||
peers: HashMap::default(),
|
||||
peer_timeout_publish: config.peer_timeout as u16,
|
||||
peer_timeout_publish: config.get_config().peer_timeout as u16,
|
||||
beacon_serializer: BeaconSerializer::new(beacon_key),
|
||||
port_forwarding,
|
||||
stats_file,
|
||||
update_freq,
|
||||
statsd_server: config.statsd_server.clone(),
|
||||
crypto: Crypto::new(node_id, &config.crypto).unwrap(),
|
||||
statsd_server: config.get_config().statsd_server.clone(),
|
||||
crypto: Crypto::new(node_id, &config.get_config().crypto).unwrap(),
|
||||
peer_crypto: Default::default(),
|
||||
config,
|
||||
buffer: MsgBuffer::new(SPACE_BEFORE),
|
||||
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE),
|
||||
running,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn send_to(&mut self, addr: SocketAddr) -> Result<(), Error> {
|
||||
let size = self.buffer.len();
|
||||
debug!("Sending msg with {} bytes to {}", size, addr);
|
||||
self.traffic.count_out_traffic(addr, size);
|
||||
match self.socket.send(self.buffer.message(), addr) {
|
||||
Ok(written) if written == size => {
|
||||
self.buffer.clear();
|
||||
Ok(())
|
||||
}
|
||||
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
||||
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn broadcast_msg(&mut self, type_: u8) -> Result<(), Error> {
|
||||
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, self.buffer.len(), self.peers.len());
|
||||
for (addr, peer) in &mut self.peers {
|
||||
self.broadcast_buffer.set_start(self.buffer.get_start());
|
||||
self.broadcast_buffer.set_length(self.buffer.len());
|
||||
self.broadcast_buffer.message_mut().clone_from_slice(self.buffer.message());
|
||||
self.broadcast_buffer.prepend_byte(type_);
|
||||
peer.crypto.encrypt_message(&mut self.broadcast_buffer);
|
||||
self.traffic.count_out_traffic(*addr, self.broadcast_buffer.len());
|
||||
match self.socket.send(self.broadcast_buffer.message(), *addr) {
|
||||
Ok(written) if written == self.broadcast_buffer.len() => Ok(()),
|
||||
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
||||
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
|
||||
}?
|
||||
}
|
||||
self.buffer.clear();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn connect_sock(&mut self, addr: SocketAddr) -> Result<(), Error> {
|
||||
let addr = mapped_addr(addr);
|
||||
if self.peers.contains_key(&addr)
|
||||
if self.coms.has_peer(&addr)
|
||||
|| self.own_addresses.contains(&addr)
|
||||
|| self.pending_inits.contains_key(&addr)
|
||||
{
|
||||
|
@ -198,14 +149,14 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
let mut init = self.crypto.peer_instance(payload);
|
||||
init.send_ping(&mut self.buffer);
|
||||
self.pending_inits.insert(addr, init);
|
||||
self.send_to(addr)
|
||||
self.coms.send_to(addr, &mut self.buffer)
|
||||
}
|
||||
|
||||
pub fn connect<Addr: ToSocketAddrs + fmt::Debug + Clone>(&mut self, addr: Addr) -> Result<(), Error> {
|
||||
let addrs = resolve(&addr)?.into_iter().map(mapped_addr).collect::<SmallVec<[SocketAddr; 3]>>();
|
||||
for addr in &addrs {
|
||||
if self.own_addresses.contains(addr)
|
||||
|| self.peers.contains_key(addr)
|
||||
|| self.coms.has_peer(addr)
|
||||
|| self.pending_inits.contains_key(addr)
|
||||
{
|
||||
return Ok(());
|
||||
|
@ -221,7 +172,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
|
||||
fn create_node_info(&self) -> NodeInfo {
|
||||
let mut peers = smallvec![];
|
||||
for peer in self.peers.values() {
|
||||
for peer in self.coms.get_peers().values() {
|
||||
peers.push(PeerInfo { node_id: Some(peer.node_id), addrs: peer.addrs.clone() })
|
||||
}
|
||||
if peers.len() > 20 {
|
||||
|
@ -239,9 +190,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
}
|
||||
|
||||
fn update_peer_info(&mut self, addr: SocketAddr, info: Option<NodeInfo>) -> Result<(), Error> {
|
||||
if let Some(peer) = self.peers.get_mut(&addr) {
|
||||
if let Some(peer) = self.coms.get_peers().get_mut(&addr) {
|
||||
peer.last_seen = TS::now();
|
||||
peer.timeout = TS::now() + self.config.peer_timeout as Time;
|
||||
peer.timeout = TS::now() + self.config.get_config().peer_timeout as Time;
|
||||
if let Some(info) = &info {
|
||||
// Update peer addresses, always add seen address
|
||||
peer.addrs.clear();
|
||||
|
@ -258,7 +209,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
}
|
||||
if let Some(info) = info {
|
||||
debug!("Adding claims of peer {}: {:?}", addr_nice(addr), info.claims);
|
||||
self.table.set_claims(addr, info.claims);
|
||||
self.coms.table.set_claims(addr, info.claims);
|
||||
debug!("Received {} peers from {}: {:?}", info.peers.len(), addr_nice(addr), info.peers);
|
||||
self.connect_to_peers(&info.peers)?;
|
||||
}
|
||||
|
@ -270,22 +221,19 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
if let Some(init) = self.pending_inits.remove(&addr) {
|
||||
self.buffer.clear();
|
||||
let crypto = init.finish(&mut self.buffer);
|
||||
self.peers.insert(
|
||||
addr,
|
||||
PeerData {
|
||||
let peer_data = PeerData {
|
||||
addrs: info.addrs.clone(),
|
||||
crypto,
|
||||
node_id: info.node_id,
|
||||
peer_timeout: info.peer_timeout.unwrap_or(DEFAULT_PEER_TIMEOUT),
|
||||
last_seen: TS::now(),
|
||||
timeout: TS::now() + self.config.peer_timeout as Time,
|
||||
},
|
||||
);
|
||||
timeout: TS::now() + self.config.get_config().peer_timeout as Time,
|
||||
};
|
||||
self.coms.add_peer(addr, peer_data,&crypto);
|
||||
self.peer_crypto.insert(addr, crypto);
|
||||
self.update_peer_info(addr, Some(info))?;
|
||||
if !self.buffer.is_empty() {
|
||||
self.send_to(addr)?;
|
||||
self.coms.send_to(addr, &mut self.buffer)?;
|
||||
}
|
||||
self.peer_crypto.store(&self.peers);
|
||||
} else {
|
||||
error!("No init for new peer {}", addr_nice(addr));
|
||||
}
|
||||
|
@ -295,7 +243,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
fn connect_to_peers(&mut self, peers: &[PeerInfo]) -> Result<(), Error> {
|
||||
'outer: for peer in peers {
|
||||
for addr in &peer.addrs {
|
||||
if self.peers.contains_key(addr) {
|
||||
if self.own_addresses.contains(addr) {
|
||||
// Check addresses and add addresses that we don't know to own addresses
|
||||
for addr in &peer.addrs {
|
||||
if !self.own_addresses.contains(addr) {
|
||||
|
@ -309,7 +257,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
if self.node_id == node_id {
|
||||
continue 'outer;
|
||||
}
|
||||
for p in self.peers.values() {
|
||||
for p in self.coms.get_peers().values() {
|
||||
if p.node_id == node_id {
|
||||
continue 'outer;
|
||||
}
|
||||
|
@ -321,18 +269,15 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
}
|
||||
|
||||
fn remove_peer(&mut self, addr: SocketAddr) {
|
||||
if let Some(_peer) = self.peers.remove(&addr) {
|
||||
info!("Closing connection to {}", addr_nice(addr));
|
||||
self.table.remove_claims(addr);
|
||||
self.peer_crypto.store(&self.peers);
|
||||
}
|
||||
self.coms.remove_peer(&addr);
|
||||
self.peer_crypto.remove(&addr);
|
||||
}
|
||||
|
||||
fn handle_payload_from(&mut self, peer: SocketAddr) -> Result<(), Error> {
|
||||
let (src, dst) = P::parse(self.buffer.message())?;
|
||||
let len = self.buffer.len();
|
||||
debug!("Writing data to device: {} bytes", len);
|
||||
self.traffic.count_in_payload(src.clone(), dst, len);
|
||||
self.coms.traffic.count_in_payload(src.clone(), dst, len);
|
||||
if let Err(e) = self.device.write(&mut self.buffer) {
|
||||
error!("Failed to send via device: {}", e);
|
||||
return Err(e);
|
||||
|
@ -340,7 +285,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
self.buffer.clear();
|
||||
if self.learning {
|
||||
// Learn single address
|
||||
self.table.cache(&src, peer);
|
||||
self.coms.table.cache(&src, peer);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -354,7 +299,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
let info = match NodeInfo::decode(Cursor::new(self.buffer.message())) {
|
||||
Ok(val) => val,
|
||||
Err(err) => {
|
||||
self.traffic.count_invalid_protocol(self.buffer.len());
|
||||
self.coms.traffic.count_invalid_protocol(self.buffer.len());
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
@ -371,11 +316,11 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
}
|
||||
_ => {
|
||||
self.buffer.clear();
|
||||
self.traffic.count_invalid_protocol(self.buffer.len());
|
||||
self.coms.traffic.count_invalid_protocol(self.buffer.len());
|
||||
return Err(Error::Message("Unknown message type"));
|
||||
}
|
||||
},
|
||||
MessageResult::Reply => self.send_to(src)?,
|
||||
MessageResult::Reply => self.coms.send_to(src, &mut self.buffer)?,
|
||||
MessageResult::None => {
|
||||
self.buffer.clear();
|
||||
}
|
||||
|
@ -387,8 +332,8 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
let src = mapped_addr(src);
|
||||
debug!("Received {} bytes from {}", self.buffer.len(), src);
|
||||
let buffer = &mut self.buffer;
|
||||
self.traffic.count_in_traffic(src, buffer.len());
|
||||
if let Some(result) = self.peers.get_mut(&src).map(|peer| peer.crypto.handle_message(buffer)) {
|
||||
self.coms.traffic.count_in_traffic(src, buffer.len());
|
||||
if let Some(result) = self.peer_crypto.get_mut(&src).map(|crypto| crypto.handle_message(buffer)) {
|
||||
return self.process_message(src, result?);
|
||||
}
|
||||
let is_init = is_init_message(buffer.message());
|
||||
|
@ -402,7 +347,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
}
|
||||
}) {
|
||||
if !buffer.is_empty() {
|
||||
self.send_to(src)?
|
||||
self.coms.send_to(src, &mut self.buffer)?
|
||||
}
|
||||
if let InitResult::Success { peer_payload, .. } = result? {
|
||||
self.add_new_peer(src, peer_payload)?
|
||||
|
@ -411,7 +356,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
}
|
||||
if !is_init_message(self.buffer.message()) {
|
||||
info!("Ignoring non-init message from unknown peer {}", addr_nice(src));
|
||||
self.traffic.count_invalid_protocol(self.buffer.len());
|
||||
self.coms.traffic.count_invalid_protocol(self.buffer.len());
|
||||
self.buffer.clear();
|
||||
return Ok(());
|
||||
}
|
||||
|
@ -420,10 +365,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
match msg_result {
|
||||
Ok(_) => {
|
||||
self.pending_inits.insert(src, init);
|
||||
self.send_to(src)
|
||||
self.coms.send_to(src, &mut self.buffer)
|
||||
}
|
||||
Err(err) => {
|
||||
self.traffic.count_invalid_protocol(self.buffer.len());
|
||||
self.coms.traffic.count_invalid_protocol(self.buffer.len());
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
|
@ -432,18 +377,17 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
pub fn housekeep(&mut self) -> Result<(), Error> {
|
||||
let now = TS::now();
|
||||
let mut del: SmallVec<[SocketAddr; 3]> = SmallVec::new();
|
||||
for (&addr, data) in &self.peers {
|
||||
for (&addr, data) in self.coms.get_peers().deref() {
|
||||
if data.timeout < now {
|
||||
del.push(addr);
|
||||
}
|
||||
}
|
||||
for addr in del {
|
||||
info!("Forgot peer {} due to timeout", addr_nice(addr));
|
||||
self.peers.remove(&addr);
|
||||
self.table.remove_claims(addr);
|
||||
self.coms.remove_peer(&addr);
|
||||
self.connect_sock(addr)?; // Try to reconnect
|
||||
}
|
||||
self.table.housekeep();
|
||||
self.coms.table.housekeep();
|
||||
self.crypto_housekeep()?;
|
||||
// Periodically extend the port-forwarding
|
||||
//TODO: extra thread
|
||||
|
@ -456,9 +400,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
debug!("Send peer list to all peers");
|
||||
let info = self.create_node_info();
|
||||
info.encode(&mut self.buffer);
|
||||
self.broadcast_msg(MESSAGE_TYPE_NODE_INFO)?;
|
||||
self.coms.broadcast_msg(MESSAGE_TYPE_NODE_INFO, &mut self.buffer)?;
|
||||
// Reschedule for next update
|
||||
let min_peer_timeout = self.peers.iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT);
|
||||
let min_peer_timeout = self.coms.get_peers().iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT);
|
||||
let interval = min(self.update_freq as u16, max(min_peer_timeout / 2 - 60, 1));
|
||||
self.next_peers = now + Time::from(interval);
|
||||
}
|
||||
|
@ -470,7 +414,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
//TODO: extra thread
|
||||
self.send_stats_to_statsd()?;
|
||||
self.next_stats_out = now + STATS_INTERVAL;
|
||||
self.traffic.period(Some(5));
|
||||
self.coms.traffic.period(Some(5));
|
||||
}
|
||||
if let Some(peers) = self.beacon_serializer.get_cmd_results() {
|
||||
debug!("Loaded beacon with peers: {:?}", peers);
|
||||
|
@ -481,11 +425,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
if self.next_beacon < now {
|
||||
self.store_beacon()?;
|
||||
self.load_beacon()?;
|
||||
self.next_beacon = now + Time::from(self.config.beacon_interval);
|
||||
self.next_beacon = now + Time::from(self.config.get_config().beacon_interval);
|
||||
}
|
||||
self.table.sync();
|
||||
self.traffic.sync();
|
||||
self.peer_crypto.store(&self.peers);
|
||||
self.coms.sync()?;
|
||||
// Periodically reset own peers
|
||||
if self.next_own_address_reset <= now {
|
||||
self.reset_own_addresses().map_err(|err| Error::SocketIo("Failed to get own addresses", err))?;
|
||||
|
@ -502,19 +444,20 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
if self.pending_inits.get_mut(&addr).unwrap().every_second(&mut self.buffer).is_err() {
|
||||
del.push(addr)
|
||||
} else if !self.buffer.is_empty() {
|
||||
self.send_to(addr)?
|
||||
self.coms.send_to(addr, &mut self.buffer)?
|
||||
}
|
||||
}
|
||||
for addr in self.peers.keys().copied().collect::<SmallVec<[SocketAddr; 16]>>() {
|
||||
for (addr, crypto) in self.peer_crypto.iter_mut() {
|
||||
self.buffer.clear();
|
||||
self.peers.get_mut(&addr).unwrap().crypto.every_second(&mut self.buffer);
|
||||
crypto.every_second(&mut self.buffer);
|
||||
if !self.buffer.is_empty() {
|
||||
self.send_to(addr)?
|
||||
self.coms.send_to(*addr, &mut self.buffer)?
|
||||
}
|
||||
}
|
||||
for addr in del {
|
||||
self.pending_inits.remove(&addr);
|
||||
if self.peers.remove(&addr).is_some() {
|
||||
self.peer_crypto.remove(&addr);
|
||||
if self.coms.remove_peer(&addr) {
|
||||
self.connect_sock(addr)?;
|
||||
}
|
||||
}
|
||||
|
@ -523,9 +466,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
|
||||
fn reset_own_addresses(&mut self) -> io::Result<()> {
|
||||
self.own_addresses.clear();
|
||||
let socket_addr = self.socket.address().map(mapped_addr)?;
|
||||
let socket_addr = self.coms.get_address()?;
|
||||
// 1) Specified advertise addresses
|
||||
for addr in &self.config.advertise_addresses {
|
||||
for addr in &self.config.get_config().advertise_addresses {
|
||||
self.own_addresses.push(parse_listen(addr, socket_addr.port()));
|
||||
}
|
||||
// 2) Address of UDP socket
|
||||
|
@ -542,7 +485,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
|
||||
/// Stores the beacon
|
||||
fn store_beacon(&mut self) -> Result<(), Error> {
|
||||
if let Some(ref path) = self.config.beacon_store {
|
||||
if let Some(ref path) = self.config.get_config().beacon_store {
|
||||
let peers: SmallVec<[SocketAddr; 3]> =
|
||||
self.own_addresses.choose_multiple(&mut thread_rng(), 3).cloned().collect();
|
||||
if let Some(path) = path.strip_prefix('|') {
|
||||
|
@ -561,7 +504,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
/// Loads the beacon
|
||||
fn load_beacon(&mut self) -> Result<(), Error> {
|
||||
let peers;
|
||||
if let Some(ref path) = self.config.beacon_load {
|
||||
if let Some(ref path) = self.config.get_config().beacon_load {
|
||||
if let Some(path) = path.strip_prefix('|') {
|
||||
self.beacon_serializer
|
||||
.read_from_cmd(path, Some(50))
|
||||
|
@ -591,19 +534,19 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
f.set_len(0)?;
|
||||
writeln!(f, "peers:")?;
|
||||
let now = TS::now();
|
||||
for (addr, data) in &self.peers {
|
||||
for (addr, data) in self.coms.get_peers().iter() {
|
||||
writeln!(
|
||||
f,
|
||||
" - \"{}\": {{ ttl_secs: {}, crypto: {} }}",
|
||||
addr_nice(*addr),
|
||||
data.timeout - now,
|
||||
data.crypto.algorithm_name()
|
||||
self.peer_crypto.get(addr).unwrap().algorithm_name()
|
||||
)?;
|
||||
}
|
||||
writeln!(f)?;
|
||||
self.table.write_out(f)?;
|
||||
self.coms.table.write_out(f)?;
|
||||
writeln!(f)?;
|
||||
self.traffic.write_out(f)?;
|
||||
self.coms.traffic.write_out(f)?;
|
||||
writeln!(f)?;
|
||||
}
|
||||
Ok(())
|
||||
|
@ -612,15 +555,15 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
/// Sends the statistics to a statsd endpoint
|
||||
fn send_stats_to_statsd(&mut self) -> Result<(), Error> {
|
||||
if let Some(ref endpoint) = self.statsd_server {
|
||||
let peer_traffic = self.traffic.total_peer_traffic();
|
||||
let payload_traffic = self.traffic.total_payload_traffic();
|
||||
let dropped = &self.traffic.dropped();
|
||||
let prefix = self.config.statsd_prefix.as_ref().map(|s| s as &str).unwrap_or("vpncloud");
|
||||
let peer_traffic = self.coms.traffic.total_peer_traffic();
|
||||
let payload_traffic = self.coms.traffic.total_payload_traffic();
|
||||
let dropped = &self.coms.traffic.dropped();
|
||||
let prefix = self.config.get_config().statsd_prefix.as_ref().map(|s| s as &str).unwrap_or("vpncloud");
|
||||
let msg = StatsdMsg::new()
|
||||
.with_ns(prefix, |msg| {
|
||||
msg.add("peer_count", self.peers.len(), "g");
|
||||
msg.add("table_cache_entries", self.table.cache_len(), "g");
|
||||
msg.add("table_claims", self.table.claim_len(), "g");
|
||||
msg.add("peer_count", self.coms.get_peers().len(), "g");
|
||||
msg.add("table_cache_entries", self.coms.table.cache_len(), "g");
|
||||
msg.add("table_claims", self.coms.table.claim_len(), "g");
|
||||
msg.with_ns("traffic", |msg| {
|
||||
msg.with_ns("protocol", |msg| {
|
||||
msg.with_ns("inbound", |msg| {
|
||||
|
@ -653,14 +596,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
});
|
||||
})
|
||||
.build();
|
||||
let msg_data = msg.as_bytes();
|
||||
let addrs = resolve(endpoint)?;
|
||||
if let Some(addr) = addrs.first() {
|
||||
match self.socket.send(msg_data, *addr) {
|
||||
Ok(written) if written == msg_data.len() => Ok(()),
|
||||
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
||||
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
|
||||
}?
|
||||
self.coms.send_raw(msg.as_bytes(), *addr)?;
|
||||
} else {
|
||||
error!("Failed to resolve statsd server {}", endpoint);
|
||||
}
|
||||
|
@ -680,7 +618,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
for entry in &mut self.reconnect_peers {
|
||||
// Schedule for next second if node is connected
|
||||
for addr in &entry.resolved {
|
||||
if self.peers.contains_key(addr) {
|
||||
if self.coms.get_peers().contains_key(addr) {
|
||||
entry.tries = 0;
|
||||
entry.timeout = 1;
|
||||
entry.next = now + 1;
|
||||
|
@ -722,8 +660,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
}
|
||||
|
||||
pub fn iteration(&mut self) -> bool {
|
||||
if let Ok(src) = self.socket.receive(&mut self.buffer)
|
||||
{
|
||||
if let Ok(src) = self.coms.receive(&mut self.buffer) {
|
||||
match self.handle_message(src) {
|
||||
Err(e @ Error::CryptoInitFatal(_)) => {
|
||||
debug!("Fatal crypto init error from {}: {}", src, e);
|
||||
|
@ -750,7 +687,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
error!("{}", e)
|
||||
}
|
||||
self.next_housekeep = now + 1;
|
||||
if !self.running.load(Ordering::SeqCst) {
|
||||
if !self.config.is_running() {
|
||||
debug!("Socket: end");
|
||||
return false;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue