Update code

This commit is contained in:
Dennis Schwerdel 2023-10-07 00:02:13 +02:00
parent 2fcb7646c7
commit aa949cdcec
12 changed files with 510 additions and 175 deletions

View File

@ -11,7 +11,7 @@ RUN chown vscode: -R /usr/local/rustup /usr/local/cargo
USER vscode
RUN rustup default 1.64.0 \
RUN rustup default 1.70.0 \
&& rustup component add clippy rust-src rustfmt
RUN cargo install cargo-outdated cargo-cache cargo-criterion \

209
Cargo.lock generated
View File

@ -26,6 +26,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "anstyle"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41ed9a86bf92ae6580e0a31281f65a1b1d867c0cc68d5346e2ae128dddfa6a7d"
[[package]]
name = "attohttpc"
version = "0.16.3"
@ -44,7 +50,7 @@ version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"hermit-abi 0.1.19",
"libc",
"winapi",
]
@ -173,31 +179,36 @@ dependencies = [
"atty",
"bitflags",
"strsim",
"textwrap 0.11.0",
"textwrap",
"unicode-width",
"vec_map",
]
[[package]]
name = "clap"
version = "3.2.22"
version = "4.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86447ad904c7fb335a790c9d7fe3d0d971dc523b8ccd1561a520de9a85302750"
checksum = "80672091db20273a15cf9fdd4e47ed43b5091ec9841bf4c6145c9dfbbcae09ed"
dependencies = [
"clap_builder",
]
[[package]]
name = "clap_builder"
version = "4.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1458a1df40e1e2afebb7ab60ce55c1fa8f431146205aa5f4887e0b111c27636"
dependencies = [
"anstyle",
"bitflags",
"clap_lex",
"indexmap",
"textwrap 0.15.1",
]
[[package]]
name = "clap_lex"
version = "0.2.4"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5"
dependencies = [
"os_str_bytes",
]
checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b"
[[package]]
name = "console"
@ -230,19 +241,19 @@ dependencies = [
[[package]]
name = "criterion"
version = "0.4.0"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7c76e09c1aae2bc52b3d2f29e13c6572553b30c4aa1b8a49fd70de6412654cb"
checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f"
dependencies = [
"anes",
"atty",
"cast",
"ciborium",
"clap 3.2.22",
"clap 4.3.4",
"criterion-plot",
"is-terminal",
"itertools",
"lazy_static",
"num-traits",
"once_cell",
"oorandom",
"plotters",
"rayon",
@ -362,6 +373,27 @@ version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"
[[package]]
name = "errno"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a"
dependencies = [
"errno-dragonfly",
"libc",
"windows-sys 0.48.0",
]
[[package]]
name = "errno-dragonfly"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "fastrand"
version = "1.8.0"
@ -437,6 +469,12 @@ dependencies = [
"libc",
]
[[package]]
name = "hermit-abi"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286"
[[package]]
name = "http"
version = "0.2.8"
@ -515,6 +553,29 @@ dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "io-lifetimes"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
dependencies = [
"hermit-abi 0.3.1",
"libc",
"windows-sys 0.48.0",
]
[[package]]
name = "is-terminal"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f"
dependencies = [
"hermit-abi 0.3.1",
"io-lifetimes",
"rustix",
"windows-sys 0.48.0",
]
[[package]]
name = "itertools"
version = "0.10.5"
@ -547,9 +608,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.133"
version = "0.2.146"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0f80d65747a3e43d1596c7c5492d95d5edddaabd45a7fcdb02b95f644164966"
checksum = "f92be4933c13fd498862a9e02a3055f8a8d9c039ce33db97306fd5a6caa7f29b"
[[package]]
name = "linked-hash-map"
@ -557,6 +618,12 @@ version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"
[[package]]
name = "linux-raw-sys"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
[[package]]
name = "lock_api"
version = "0.4.9"
@ -635,7 +702,7 @@ version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1"
dependencies = [
"hermit-abi",
"hermit-abi 0.1.19",
"libc",
]
@ -651,12 +718,6 @@ version = "11.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "os_str_bytes"
version = "6.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ff7415e9ae3fff1225851df9e0d9e4e5479f947619774677a63572e55e80eff"
[[package]]
name = "parking_lot"
version = "0.12.1"
@ -677,7 +738,7 @@ dependencies = [
"libc",
"redox_syscall",
"smallvec",
"windows-sys",
"windows-sys 0.36.1",
]
[[package]]
@ -874,6 +935,20 @@ dependencies = [
"winapi",
]
[[package]]
name = "rustix"
version = "0.37.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b96e891d04aa506a6d1f318d2771bcb1c7dfda84e126660ace067c9b474bb2c0"
dependencies = [
"bitflags",
"errno",
"io-lifetimes",
"libc",
"linux-raw-sys",
"windows-sys 0.48.0",
]
[[package]]
name = "ryu"
version = "1.0.11"
@ -1046,12 +1121,6 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "textwrap"
version = "0.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "949517c0cf1bf4ee812e2e07e08ab448e3ae0d23472aee8a06c985f0c8815b16"
[[package]]
name = "thiserror"
version = "1.0.35"
@ -1366,43 +1435,109 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2"
dependencies = [
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_msvc",
"windows_aarch64_msvc 0.36.1",
"windows_i686_gnu 0.36.1",
"windows_i686_msvc 0.36.1",
"windows_x86_64_gnu 0.36.1",
"windows_x86_64_msvc 0.36.1",
]
[[package]]
name = "windows-sys"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc 0.48.0",
"windows_i686_gnu 0.48.0",
"windows_i686_msvc 0.48.0",
"windows_x86_64_gnu 0.48.0",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc 0.48.0",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc"
[[package]]
name = "windows_aarch64_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47"
[[package]]
name = "windows_aarch64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3"
[[package]]
name = "windows_i686_gnu"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6"
[[package]]
name = "windows_i686_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241"
[[package]]
name = "windows_i686_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024"
[[package]]
name = "windows_i686_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00"
[[package]]
name = "windows_x86_64_gnu"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1"
[[package]]
name = "windows_x86_64_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953"
[[package]]
name = "windows_x86_64_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
[[package]]
name = "windows_x86_64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
[[package]]
name = "xml-rs"
version = "0.8.4"

View File

@ -12,7 +12,7 @@ readme = "README.md"
edition = "2018"
[package.metadata]
toolchain = "1.64.0"
toolchain = "1.70.0"
upx_version = "3.96"
[dependencies]
@ -41,7 +41,7 @@ signal = "0.7"
[dev-dependencies]
tempfile = "3"
criterion = { version = "0.4", features = ["html_reports"] }
criterion = { version = "0.5", features = ["html_reports"] }
iai = "0.1"
[features]

View File

@ -18,6 +18,9 @@ mod engine {
pub mod common {
include!("../src/engine/common.rs");
}
pub mod coms {
include!("../src/engine/coms.rs");
}
mod shared {
include!("../src/engine/shared.rs");
}
@ -27,6 +30,12 @@ mod engine {
mod socket_thread {
include!("../src/engine/socket_thread.rs");
}
mod housekeep_thread {
include!("../src/engine/housekeep_thread.rs");
}
mod extras_thread {
include!("../src/engine/extras_thread.rs");
}
}
mod config {
include!("../src/config.rs");

View File

@ -1,5 +1,3 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::{fs::File, hash::BuildHasherDefault};
@ -8,12 +6,14 @@ use fnv::FnvHasher;
use crate::util::CtrlC;
use crate::{
config::Config,
crypto::PeerCrypto,
device::Device,
engine::{
coms::Coms,
device_thread::DeviceThread,
extras_thread::ExtrasThread,
housekeep_thread::HousekeepThread,
shared::SharedConfig,
socket_thread::{ReconnectEntry, SocketThread},
coms::Coms
},
error::Error,
messages::AddrList,
@ -36,13 +36,15 @@ pub struct PeerData {
pub timeout: Time,
pub peer_timeout: u16,
pub node_id: NodeId,
pub crypto: PeerCrypto,
//pub crypto: PeerCrypto,
}
pub struct GenericCloud<D: Device, P: Protocol, S: Socket, TS: TimeSource> {
config: SharedConfig,
socket_thread: SocketThread<S, D, P, TS>,
device_thread: DeviceThread<S, D, P, TS>,
running: Arc<AtomicBool>,
housekeep_thread: HousekeepThread<S, P, TS>,
extras_thread: ExtrasThread<S, P, TS>,
}
impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS> {
@ -50,23 +52,18 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
pub fn new(
config: &Config, socket: S, device: D, port_forwarding: Option<PortForwarding>, stats_file: Option<File>,
) -> Result<Self, Error> {
let running = Arc::new(AtomicBool::new(true));
let coms = Coms::<S, TS, P>::new(config, socket.try_clone().map_err(|e| Error::SocketIo("Failed to clone socket", e))?);
let device_thread = DeviceThread::<S, D, P, TS>::new(
device.duplicate()?,
coms.try_clone()?,
running.clone(),
);
let mut socket_thread = SocketThread::<S, D, P, TS>::new(
config.clone(),
coms,
device,
port_forwarding,
stats_file,
running.clone(),
let config = SharedConfig::new(config.clone());
let coms = Coms::<S, TS, P>::new(
config.get_config(),
socket.try_clone().map_err(|e| Error::SocketIo("Failed to clone socket", e))?,
);
let device_thread = DeviceThread::<S, D, P, TS>::new(config.clone(), device.duplicate()?, coms.try_clone()?);
let housekeep_thread = HousekeepThread::<S, P, TS>::new(config.clone(), coms.try_clone()?);
let extras_thread = ExtrasThread::<S, P, TS>::new(config.clone(), coms.try_clone()?);
let mut socket_thread =
SocketThread::<S, D, P, TS>::new(config.clone(), coms, device, port_forwarding, stats_file);
socket_thread.housekeep()?;
Ok(Self { socket_thread, device_thread, running })
Ok(Self { socket_thread, device_thread, config, housekeep_thread, extras_thread })
}
pub fn add_peer(&mut self, addr: String) -> Result<(), Error> {
@ -84,17 +81,23 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
pub fn run(self) {
debug!("Starting threads");
let running = self.running.clone();
let config = self.config;
let device = self.device_thread;
let device_thread_handle = thread::spawn(move || device.run());
let socket = self.socket_thread;
let socket_thread_handle = thread::spawn(move || socket.run());
let housekeep = self.housekeep_thread;
let housekeep_thread_handle = thread::spawn(move || housekeep.run());
let extras = self.extras_thread;
let extras_thread_handle = thread::spawn(move || extras.run());
let ctrlc = CtrlC::new();
ctrlc.wait();
running.store(false, Ordering::SeqCst);
config.stop();
debug!("Waiting for threads to end");
device_thread_handle.join().unwrap();
socket_thread_handle.join().unwrap();
housekeep_thread_handle.join().unwrap();
extras_thread_handle.join().unwrap();
debug!("Threads stopped");
}
}
@ -136,7 +139,7 @@ impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
}
pub fn is_connected(&self, addr: &SocketAddr) -> bool {
self.socket_thread.peers.contains_key(addr)
self.socket_thread.coms.has_peer(addr)
}
pub fn own_addresses(&self) -> &[SocketAddr] {

View File

@ -1,23 +1,26 @@
use std::{net::SocketAddr, marker::PhantomData, io};
use std::{collections::HashMap, io, marker::PhantomData, net::SocketAddr, ops::DerefMut};
use crate::{
config::Config,
crypto::PeerCrypto,
error::Error,
messages::MESSAGE_TYPE_DATA,
net::{Socket, mapped_addr},
util::{MsgBuffer, TimeSource}, payload::Protocol,
net::{mapped_addr, Socket},
payload::Protocol,
util::{MsgBuffer, TimeSource, addr_nice},
};
use super::{
common::{SPACE_BEFORE, PeerData},
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
common::{Hash, PeerData, SPACE_BEFORE},
shared::{SharedCrypto, SharedPeers, SharedTable, SharedTraffic},
};
pub struct Coms<S: Socket, TS: TimeSource, P: Protocol> {
_dummy_p: PhantomData<P>,
broadcast: bool,
broadcast_buffer: MsgBuffer,
peer_crypto: SharedPeerCrypto,
crypto: SharedCrypto,
peers: SharedPeers,
pub table: SharedTable<TS>,
pub traffic: SharedTraffic,
pub socket: S,
@ -30,7 +33,8 @@ impl<S: Socket, TS: TimeSource, P: Protocol> Coms<S, TS, P> {
broadcast: config.is_broadcasting(),
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE),
traffic: SharedTraffic::new(),
peer_crypto: SharedPeerCrypto::new(),
crypto: SharedCrypto::new(),
peers: SharedPeers::new(),
table: SharedTable::<TS>::new(config),
socket,
}
@ -42,14 +46,15 @@ impl<S: Socket, TS: TimeSource, P: Protocol> Coms<S, TS, P> {
broadcast: self.broadcast,
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE),
traffic: self.traffic.clone(),
peer_crypto: self.peer_crypto.clone(),
crypto: self.crypto.clone(),
table: self.table.clone(),
peers: self.peers.clone(),
socket: self.socket.try_clone().map_err(|e| Error::SocketIo("Failed to clone socket", e))?,
})
}
pub fn sync(&mut self) -> Result<(), Error> {
self.peer_crypto.load();
self.crypto.load();
self.table.sync();
self.traffic.sync();
Ok(())
@ -67,6 +72,7 @@ impl<S: Socket, TS: TimeSource, P: Protocol> Coms<S, TS, P> {
}
}
//TODO: move back to socket thread
pub fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> {
self.socket.receive(buffer)
}
@ -87,17 +93,17 @@ impl<S: Socket, TS: TimeSource, P: Protocol> Coms<S, TS, P> {
pub fn send_msg(&mut self, addr: SocketAddr, type_: u8, data: &mut MsgBuffer) -> Result<(), Error> {
debug!("Sending msg with {} bytes to {}", data.len(), addr);
data.prepend_byte(type_);
self.peer_crypto.encrypt_for(addr, data)?;
self.crypto.encrypt_for(addr, data)?;
self.send_to(addr, data)
}
#[inline]
pub fn broadcast_msg(&mut self, type_: u8, data: &mut MsgBuffer) -> Result<(), Error> {
let size = data.len();
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, size, self.peer_crypto.count());
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, size, self.crypto.count());
let traffic = &mut self.traffic;
let socket = &mut self.socket;
let peers = self.peer_crypto.get_snapshot();
let peers = self.crypto.get_snapshot();
for (addr, crypto) in peers {
self.broadcast_buffer.set_start(data.get_start());
self.broadcast_buffer.set_length(data.len());
@ -140,13 +146,27 @@ impl<S: Socket, TS: TimeSource, P: Protocol> Coms<S, TS, P> {
Ok(())
}
pub fn add_peer(&mut self, addr: SocketAddr, peer: &PeerData) {
self.peer_crypto.add(addr, peer.crypto.get_core());
pub fn get_peers<'a>(&'a self) -> impl DerefMut<Target = HashMap<SocketAddr, PeerData, Hash>> + 'a {
self.peers.get_peers()
}
pub fn remove_peer(&mut self, addr: &SocketAddr) {
self.table.remove_claims(*addr);
self.peer_crypto.remove(addr);
pub fn add_peer(&mut self, addr: SocketAddr, peer_data: PeerData, peer_crypto: &PeerCrypto) {
self.crypto.add(addr, peer_crypto.get_core());
self.peers.get_peers().insert(addr, peer_data);
}
pub fn remove_peer(&mut self, addr: &SocketAddr) -> bool {
if let Some(_peer) = self.peers.get_peers().remove(addr) {
info!("Closing connection to {}", addr_nice(*addr));
self.table.remove_claims(*addr);
self.crypto.remove(addr);
true
} else {
false
}
}
pub fn has_peer(&self, addr: &SocketAddr) -> bool {
self.crypto.contains(addr)
}
}

View File

@ -1,4 +1,4 @@
use super::{common::SPACE_BEFORE, coms::Coms};
use super::{common::SPACE_BEFORE, coms::Coms, shared::SharedConfig};
use crate::{
device::Device,
error::Error,
@ -6,26 +6,23 @@ use crate::{
util::{MsgBuffer, Time, TimeSource},
Protocol,
};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
pub struct DeviceThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
// Device-only fields
config: SharedConfig,
coms: Coms<S, TS, P>,
pub device: D,
next_housekeep: Time,
buffer: MsgBuffer,
// Shared fields
running: Arc<AtomicBool>,
}
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS> {
pub fn new(device: D, coms: Coms<S, TS, P>, running: Arc<AtomicBool>) -> Self {
pub fn new(config: SharedConfig, device: D, coms: Coms<S, TS, P>) -> Self {
Self {
config,
device,
next_housekeep: TS::now(),
buffer: MsgBuffer::new(SPACE_BEFORE),
running,
coms,
}
}
@ -47,7 +44,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
error!("{}", e)
}
self.next_housekeep = now + 1;
if !self.running.load(Ordering::SeqCst) {
if !self.config.is_running() {
debug!("Device: end");
return false;
}

View File

@ -0,0 +1,59 @@
use std::time::Duration;
use super::{common::SPACE_BEFORE, coms::Coms, shared::SharedConfig};
use crate::{
error::Error,
net::Socket,
util::{MsgBuffer, TimeSource, Time},
Protocol,
};
const MAX_RECONNECT_INTERVAL: u16 = 3600;
const RESOLVE_INTERVAL: Time = 300;
const OWN_ADDRESS_RESET_INTERVAL: Time = 300;
pub const STATS_INTERVAL: Time = 60;
pub struct ExtrasThread<S: Socket, P: Protocol, TS: TimeSource> {
config: SharedConfig,
coms: Coms<S, TS, P>,
next_housekeep: Time,
buffer: MsgBuffer,
}
impl<S: Socket, P: Protocol, TS: TimeSource> ExtrasThread<S, P, TS> {
pub fn new(config: SharedConfig, coms: Coms<S, TS, P>) -> Self {
Self {
config,
coms,
next_housekeep: TS::now(),
buffer: MsgBuffer::new(SPACE_BEFORE),
}
}
pub fn housekeep(&mut self) -> Result<(), Error> {
let now = TS::now();
assert!(self.buffer.is_empty());
Ok(())
}
pub fn iteration(&mut self) -> bool {
std::thread::sleep(Duration::from_millis(100));
let now = TS::now();
if self.next_housekeep < now {
if let Err(e) = self.housekeep() {
error!("{}", e)
}
self.next_housekeep = now + 1;
if !self.config.is_running() {
debug!("Extras: end");
return false;
}
}
true
}
pub fn run(mut self) {
while self.iteration() {}
}
}

View File

@ -0,0 +1,67 @@
use std::{time::Duration, net::SocketAddr, cmp::{min, max}};
use smallvec::SmallVec;
use super::{common::SPACE_BEFORE, coms::Coms, shared::SharedConfig};
use crate::{
error::Error,
net::Socket,
util::{MsgBuffer, TimeSource, Time, addr_nice},
Protocol, messages::MESSAGE_TYPE_NODE_INFO, config::DEFAULT_PEER_TIMEOUT,
};
const MAX_RECONNECT_INTERVAL: u16 = 3600;
const RESOLVE_INTERVAL: Time = 300;
const OWN_ADDRESS_RESET_INTERVAL: Time = 300;
pub const STATS_INTERVAL: Time = 60;
pub struct HousekeepThread<S: Socket, P: Protocol, TS: TimeSource> {
config: SharedConfig,
coms: Coms<S, TS, P>,
next_housekeep: Time,
buffer: MsgBuffer,
update_freq: u16,
next_peers: Time,
next_own_address_reset: Time,
}
impl<S: Socket, P: Protocol, TS: TimeSource> HousekeepThread<S, P, TS> {
pub fn new(config: SharedConfig, coms: Coms<S, TS, P>) -> Self {
let update_freq = config.get_config().get_keepalive() as u16;
Self {
config,
coms,
next_housekeep: TS::now(),
buffer: MsgBuffer::new(SPACE_BEFORE),
update_freq,
next_peers: TS::now(),
next_own_address_reset: TS::now(),
}
}
pub fn housekeep(&mut self) -> Result<(), Error> {
let now = TS::now();
assert!(self.buffer.is_empty());
Ok(())
}
pub fn iteration(&mut self) -> bool {
std::thread::sleep(Duration::from_millis(100));
let now = TS::now();
if self.next_housekeep < now {
if let Err(e) = self.housekeep() {
error!("{}", e)
}
self.next_housekeep = now + 1;
if !self.config.is_running() {
debug!("Housekeep: end");
return false;
}
}
true
}
pub fn run(mut self) {
while self.iteration() {}
}
}

View File

@ -2,8 +2,10 @@
// Copyright (C) 2015-2021 Dennis Schwerdel
// This software is licensed under GPL-3 or newer (see LICENSE.md)
mod device_thread;
mod shared;
mod device_thread;
mod socket_thread;
mod housekeep_thread;
mod extras_thread;
pub mod coms;
pub mod common;

View File

@ -1,6 +1,6 @@
use crate::{
config::Config,
crypto::CryptoCore,
crypto::{CryptoCore, PeerCrypto},
engine::common::Hash,
error::Error,
table::ClaimTable,
@ -13,34 +13,40 @@ use std::{
collections::HashMap,
io::{self, Write},
net::SocketAddr,
sync::Arc,
ops::DerefMut,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use super::common::PeerData;
#[derive(Clone)]
#[allow(clippy::type_complexity)]
pub struct SharedPeerCrypto {
peers: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>,
pub struct SharedCrypto {
peer_crypto: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>,
cache: HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>, //TODO: local hashmap as cache
}
impl SharedPeerCrypto {
impl SharedCrypto {
pub fn new() -> Self {
SharedPeerCrypto { peers: Arc::new(Mutex::new(HashMap::default())), cache: HashMap::default() }
SharedCrypto { peer_crypto: Arc::new(Mutex::new(HashMap::default())), cache: HashMap::default() }
}
pub fn encrypt_for(&mut self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
let crypto = match self.cache.get(&peer) {
Some(crypto) => crypto,
None => {
let peers = self.peers.lock();
if let Some(crypto) = peers.get(&peer) {
self.cache.insert(peer, crypto.clone());
self.cache.get(&peer).unwrap()
} else {
return Err(Error::InvalidCryptoState("No crypto found for peer"));
}
let cache = &mut self.cache;
let owned_crypto;
let crypto = if let Some(crypto) = cache.get(&peer) {
crypto
} else {
let peers = self.peer_crypto.lock();
if let Some(crypto) = peers.get(&peer) {
cache.insert(peer, crypto.clone());
owned_crypto = crypto.clone();
&owned_crypto
} else {
return Err(Error::InvalidCryptoState("No crypto found for peer"));
}
};
if let Some(crypto) = crypto {
@ -51,26 +57,26 @@ impl SharedPeerCrypto {
pub fn add(&mut self, addr: SocketAddr, crypto: Option<Arc<CryptoCore>>) {
self.cache.insert(addr, crypto.clone());
let mut peers = self.peers.lock();
let mut peers = self.peer_crypto.lock();
peers.insert(addr, crypto);
}
pub fn remove(&mut self, addr: &SocketAddr) {
self.cache.remove(addr);
let mut peers = self.peers.lock();
let mut peers = self.peer_crypto.lock();
peers.remove(addr);
}
pub fn store(&mut self, data: &HashMap<SocketAddr, PeerData, Hash>) {
pub fn store(&mut self, data: &HashMap<SocketAddr, PeerCrypto, Hash>) {
self.cache.clear();
self.cache.extend(data.iter().map(|(k, v)| (*k, v.crypto.get_core())));
let mut peers = self.peers.lock();
self.cache.extend(data.iter().map(|(k, v)| (*k, v.get_core())));
let mut peers = self.peer_crypto.lock();
peers.clear();
peers.extend(self.cache.iter().map(|(k, v)| (*k, v.clone())));
}
pub fn load(&mut self) {
let peers = self.peers.lock();
let peers = self.peer_crypto.lock();
self.cache.clear();
self.cache.extend(peers.iter().map(|(k, v)| (*k, v.clone())));
}
@ -79,6 +85,10 @@ impl SharedPeerCrypto {
&self.cache
}
pub fn contains(&self, addr: &SocketAddr) -> bool {
self.cache.contains_key(addr)
}
pub fn count(&self) -> usize {
self.cache.len()
}
@ -216,3 +226,42 @@ impl<TS: TimeSource> SharedTable<TS> {
self.table.lock().claim_len()
}
}
#[derive(Clone)]
pub struct SharedConfig {
config: Config,
running: Arc<AtomicBool>,
}
impl SharedConfig {
pub fn new(config: Config) -> Self {
Self { config, running: Arc::new(AtomicBool::new(true)) }
}
pub fn get_config(&self) -> &Config {
&self.config
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::Relaxed)
}
pub fn stop(&self) {
self.running.store(false, Ordering::Relaxed)
}
}
#[derive(Clone)]
pub struct SharedPeers {
peers: Arc<Mutex<HashMap<SocketAddr, PeerData, Hash>>>,
}
impl SharedPeers {
pub fn new() -> Self {
Self { peers: Default::default() }
}
pub fn get_peers<'a>(&'a self) -> impl DerefMut<Target = HashMap<SocketAddr, PeerData, Hash>> + 'a {
self.peers.lock()
}
}

View File

@ -1,9 +1,9 @@
use super::{common::SPACE_BEFORE, coms::Coms};
use super::{common::SPACE_BEFORE, coms::Coms, shared::SharedConfig};
use crate::{
beacon::BeaconSerializer,
config::{DEFAULT_PEER_TIMEOUT, DEFAULT_PORT},
crypto::{is_init_message, Crypto, InitResult, InitState, MessageResult},
crypto::{is_init_message, Crypto, InitResult, InitState, MessageResult, PeerCrypto},
device::{Device, Type},
engine::common::{Hash, PeerData},
error::Error,
@ -15,12 +15,10 @@ use crate::{
port_forwarding::PortForwarding,
types::{Address, NodeId, Range, RangeList},
util::{addr_nice, resolve, MsgBuffer, StatsdMsg, Time, TimeSource},
Config, Protocol,
Protocol,
};
use rand::{random, seq::SliceRandom, thread_rng};
use smallvec::{smallvec, SmallVec};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{
cmp::{max, min},
collections::HashMap,
@ -30,7 +28,7 @@ use std::{
io::{Cursor, Seek, SeekFrom, Write},
marker::PhantomData,
net::{SocketAddr, ToSocketAddrs},
str::FromStr,
str::FromStr, ops::Deref,
};
const MAX_RECONNECT_INTERVAL: u16 = 3600;
@ -52,32 +50,32 @@ pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
// Read-only fields
node_id: NodeId,
claims: RangeList,
config: Config,
peer_timeout_publish: u16,
learning: bool,
update_freq: u16,
_dummy_ts: PhantomData<TS>,
_dummy_p: PhantomData<P>,
// Socket-only fields
config: SharedConfig,
pub coms: Coms<S, TS, P>,
device: D,
next_housekeep: Time,
pub own_addresses: AddrList,
next_own_address_reset: Time,
pending_inits: HashMap<SocketAddr, InitState<NodeInfo>, Hash>,
crypto: Crypto,
pub peers: HashMap<SocketAddr, PeerData, Hash>,
next_peers: Time,
next_stats_out: Time,
next_beacon: Time,
beacon_serializer: BeaconSerializer<TS>,
stats_file: Option<File>,
statsd_server: Option<String>,
pub reconnect_peers: SmallVec<[ReconnectEntry; 3]>,
crypto: Crypto, // TODO 2nd: move to config
//pub peers: HashMap<SocketAddr, PeerData, Hash>, // TODO 1st: move to shared peers
peer_crypto: HashMap<SocketAddr, PeerCrypto, Hash>,
next_peers: Time, // TODO: split off
next_stats_out: Time, // TODO: split off
next_beacon: Time, // TODO: split off
beacon_serializer: BeaconSerializer<TS>, // TODO: split off
stats_file: Option<File>, // TODO: split off
statsd_server: Option<String>, // TODO: split off
pub reconnect_peers: SmallVec<[ReconnectEntry; 3]>, // TODO: move to shared config
buffer: MsgBuffer,
// Shared fields
//table: SharedTable<TS>,
running: Arc<AtomicBool>,
// Should not be here
port_forwarding: Option<PortForwarding>, // TODO: 3rd thread
}
@ -85,14 +83,14 @@ pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS> {
#[allow(clippy::too_many_arguments)]
pub fn new(
config: Config, coms: Coms<S, TS, P>, device: D,
port_forwarding: Option<PortForwarding>, stats_file: Option<File>, running: Arc<AtomicBool>,
config: SharedConfig, coms: Coms<S, TS, P>, device: D, port_forwarding: Option<PortForwarding>,
stats_file: Option<File>,
) -> Self {
let mut claims = SmallVec::with_capacity(config.claims.len());
for s in &config.claims {
let mut claims = SmallVec::with_capacity(config.get_config().claims.len());
for s in &config.get_config().claims {
claims.push(try_fail!(Range::from_str(s), "Invalid subnet format: {} ({})", s));
}
if device.get_type() == Type::Tun && config.auto_claim {
if device.get_type() == Type::Tun && config.get_config().auto_claim {
match device.get_ip() {
Ok(ip) => {
let range = Range { base: Address::from_ipv4(ip), prefix_len: 32 };
@ -106,9 +104,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
}
let now = TS::now();
let update_freq = config.get_keepalive() as u16;
let update_freq = config.get_config().get_keepalive() as u16;
let node_id = random();
let beacon_key = config.beacon_password.as_ref().map(|s| s.as_bytes()).unwrap_or(&[]);
let beacon_key = config.get_config().beacon_password.as_ref().map(|s| s.as_bytes()).unwrap_or(&[]);
Self {
_dummy_p: PhantomData,
_dummy_ts: PhantomData,
@ -116,7 +114,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
claims,
device,
coms,
learning: config.is_learning(),
learning: config.get_config().is_learning(),
next_housekeep: now,
next_beacon: now,
next_peers: now,
@ -125,23 +123,22 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
pending_inits: HashMap::default(),
reconnect_peers: SmallVec::new(),
own_addresses: SmallVec::new(),
peers: HashMap::default(),
peer_timeout_publish: config.peer_timeout as u16,
peer_timeout_publish: config.get_config().peer_timeout as u16,
beacon_serializer: BeaconSerializer::new(beacon_key),
port_forwarding,
stats_file,
update_freq,
statsd_server: config.statsd_server.clone(),
crypto: Crypto::new(node_id, &config.crypto).unwrap(),
statsd_server: config.get_config().statsd_server.clone(),
crypto: Crypto::new(node_id, &config.get_config().crypto).unwrap(),
peer_crypto: Default::default(),
config,
buffer: MsgBuffer::new(SPACE_BEFORE),
running,
}
}
fn connect_sock(&mut self, addr: SocketAddr) -> Result<(), Error> {
let addr = mapped_addr(addr);
if self.peers.contains_key(&addr)
if self.coms.has_peer(&addr)
|| self.own_addresses.contains(&addr)
|| self.pending_inits.contains_key(&addr)
{
@ -159,7 +156,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
let addrs = resolve(&addr)?.into_iter().map(mapped_addr).collect::<SmallVec<[SocketAddr; 3]>>();
for addr in &addrs {
if self.own_addresses.contains(addr)
|| self.peers.contains_key(addr)
|| self.coms.has_peer(addr)
|| self.pending_inits.contains_key(addr)
{
return Ok(());
@ -175,7 +172,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
fn create_node_info(&self) -> NodeInfo {
let mut peers = smallvec![];
for peer in self.peers.values() {
for peer in self.coms.get_peers().values() {
peers.push(PeerInfo { node_id: Some(peer.node_id), addrs: peer.addrs.clone() })
}
if peers.len() > 20 {
@ -193,9 +190,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
fn update_peer_info(&mut self, addr: SocketAddr, info: Option<NodeInfo>) -> Result<(), Error> {
if let Some(peer) = self.peers.get_mut(&addr) {
if let Some(peer) = self.coms.get_peers().get_mut(&addr) {
peer.last_seen = TS::now();
peer.timeout = TS::now() + self.config.peer_timeout as Time;
peer.timeout = TS::now() + self.config.get_config().peer_timeout as Time;
if let Some(info) = &info {
// Update peer addresses, always add seen address
peer.addrs.clear();
@ -226,14 +223,13 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
let crypto = init.finish(&mut self.buffer);
let peer_data = PeerData {
addrs: info.addrs.clone(),
crypto,
node_id: info.node_id,
peer_timeout: info.peer_timeout.unwrap_or(DEFAULT_PEER_TIMEOUT),
last_seen: TS::now(),
timeout: TS::now() + self.config.peer_timeout as Time,
timeout: TS::now() + self.config.get_config().peer_timeout as Time,
};
self.coms.add_peer(addr, &peer_data);
self.peers.insert(addr, peer_data);
self.coms.add_peer(addr, peer_data,&crypto);
self.peer_crypto.insert(addr, crypto);
self.update_peer_info(addr, Some(info))?;
if !self.buffer.is_empty() {
self.coms.send_to(addr, &mut self.buffer)?;
@ -247,7 +243,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
fn connect_to_peers(&mut self, peers: &[PeerInfo]) -> Result<(), Error> {
'outer: for peer in peers {
for addr in &peer.addrs {
if self.peers.contains_key(addr) {
if self.own_addresses.contains(addr) {
// Check addresses and add addresses that we don't know to own addresses
for addr in &peer.addrs {
if !self.own_addresses.contains(addr) {
@ -261,7 +257,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
if self.node_id == node_id {
continue 'outer;
}
for p in self.peers.values() {
for p in self.coms.get_peers().values() {
if p.node_id == node_id {
continue 'outer;
}
@ -273,10 +269,8 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
fn remove_peer(&mut self, addr: SocketAddr) {
if let Some(_peer) = self.peers.remove(&addr) {
info!("Closing connection to {}", addr_nice(addr));
self.coms.remove_peer(&addr);
}
self.coms.remove_peer(&addr);
self.peer_crypto.remove(&addr);
}
fn handle_payload_from(&mut self, peer: SocketAddr) -> Result<(), Error> {
@ -339,7 +333,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
debug!("Received {} bytes from {}", self.buffer.len(), src);
let buffer = &mut self.buffer;
self.coms.traffic.count_in_traffic(src, buffer.len());
if let Some(result) = self.peers.get_mut(&src).map(|peer| peer.crypto.handle_message(buffer)) {
if let Some(result) = self.peer_crypto.get_mut(&src).map(|crypto| crypto.handle_message(buffer)) {
return self.process_message(src, result?);
}
let is_init = is_init_message(buffer.message());
@ -383,14 +377,13 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
pub fn housekeep(&mut self) -> Result<(), Error> {
let now = TS::now();
let mut del: SmallVec<[SocketAddr; 3]> = SmallVec::new();
for (&addr, data) in &self.peers {
for (&addr, data) in self.coms.get_peers().deref() {
if data.timeout < now {
del.push(addr);
}
}
for addr in del {
info!("Forgot peer {} due to timeout", addr_nice(addr));
self.peers.remove(&addr);
self.coms.remove_peer(&addr);
self.connect_sock(addr)?; // Try to reconnect
}
@ -409,7 +402,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
info.encode(&mut self.buffer);
self.coms.broadcast_msg(MESSAGE_TYPE_NODE_INFO, &mut self.buffer)?;
// Reschedule for next update
let min_peer_timeout = self.peers.iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT);
let min_peer_timeout = self.coms.get_peers().iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT);
let interval = min(self.update_freq as u16, max(min_peer_timeout / 2 - 60, 1));
self.next_peers = now + Time::from(interval);
}
@ -432,7 +425,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
if self.next_beacon < now {
self.store_beacon()?;
self.load_beacon()?;
self.next_beacon = now + Time::from(self.config.beacon_interval);
self.next_beacon = now + Time::from(self.config.get_config().beacon_interval);
}
self.coms.sync()?;
// Periodically reset own peers
@ -454,16 +447,17 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
self.coms.send_to(addr, &mut self.buffer)?
}
}
for addr in self.peers.keys().copied().collect::<SmallVec<[SocketAddr; 16]>>() {
for (addr, crypto) in self.peer_crypto.iter_mut() {
self.buffer.clear();
self.peers.get_mut(&addr).unwrap().crypto.every_second(&mut self.buffer);
crypto.every_second(&mut self.buffer);
if !self.buffer.is_empty() {
self.coms.send_to(addr, &mut self.buffer)?
self.coms.send_to(*addr, &mut self.buffer)?
}
}
for addr in del {
self.pending_inits.remove(&addr);
if self.peers.remove(&addr).is_some() {
self.peer_crypto