Compare commits

...

2 Commits

Author SHA1 Message Date
Dennis Schwerdel aa949cdcec Update code 2023-10-07 00:02:13 +02:00
Dennis Schwerdel 2fcb7646c7 Coms 2022-12-01 18:42:57 +01:00
12 changed files with 705 additions and 361 deletions

View File

@ -11,7 +11,7 @@ RUN chown vscode: -R /usr/local/rustup /usr/local/cargo
USER vscode
RUN rustup default 1.64.0 \
RUN rustup default 1.70.0 \
&& rustup component add clippy rust-src rustfmt
RUN cargo install cargo-outdated cargo-cache cargo-criterion \

209
Cargo.lock generated
View File

@ -26,6 +26,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "anstyle"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41ed9a86bf92ae6580e0a31281f65a1b1d867c0cc68d5346e2ae128dddfa6a7d"
[[package]]
name = "attohttpc"
version = "0.16.3"
@ -44,7 +50,7 @@ version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"hermit-abi 0.1.19",
"libc",
"winapi",
]
@ -173,31 +179,36 @@ dependencies = [
"atty",
"bitflags",
"strsim",
"textwrap 0.11.0",
"textwrap",
"unicode-width",
"vec_map",
]
[[package]]
name = "clap"
version = "3.2.22"
version = "4.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86447ad904c7fb335a790c9d7fe3d0d971dc523b8ccd1561a520de9a85302750"
checksum = "80672091db20273a15cf9fdd4e47ed43b5091ec9841bf4c6145c9dfbbcae09ed"
dependencies = [
"clap_builder",
]
[[package]]
name = "clap_builder"
version = "4.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1458a1df40e1e2afebb7ab60ce55c1fa8f431146205aa5f4887e0b111c27636"
dependencies = [
"anstyle",
"bitflags",
"clap_lex",
"indexmap",
"textwrap 0.15.1",
]
[[package]]
name = "clap_lex"
version = "0.2.4"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5"
dependencies = [
"os_str_bytes",
]
checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b"
[[package]]
name = "console"
@ -230,19 +241,19 @@ dependencies = [
[[package]]
name = "criterion"
version = "0.4.0"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7c76e09c1aae2bc52b3d2f29e13c6572553b30c4aa1b8a49fd70de6412654cb"
checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f"
dependencies = [
"anes",
"atty",
"cast",
"ciborium",
"clap 3.2.22",
"clap 4.3.4",
"criterion-plot",
"is-terminal",
"itertools",
"lazy_static",
"num-traits",
"once_cell",
"oorandom",
"plotters",
"rayon",
@ -362,6 +373,27 @@ version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"
[[package]]
name = "errno"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a"
dependencies = [
"errno-dragonfly",
"libc",
"windows-sys 0.48.0",
]
[[package]]
name = "errno-dragonfly"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "fastrand"
version = "1.8.0"
@ -437,6 +469,12 @@ dependencies = [
"libc",
]
[[package]]
name = "hermit-abi"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286"
[[package]]
name = "http"
version = "0.2.8"
@ -515,6 +553,29 @@ dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "io-lifetimes"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
dependencies = [
"hermit-abi 0.3.1",
"libc",
"windows-sys 0.48.0",
]
[[package]]
name = "is-terminal"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f"
dependencies = [
"hermit-abi 0.3.1",
"io-lifetimes",
"rustix",
"windows-sys 0.48.0",
]
[[package]]
name = "itertools"
version = "0.10.5"
@ -547,9 +608,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.133"
version = "0.2.146"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0f80d65747a3e43d1596c7c5492d95d5edddaabd45a7fcdb02b95f644164966"
checksum = "f92be4933c13fd498862a9e02a3055f8a8d9c039ce33db97306fd5a6caa7f29b"
[[package]]
name = "linked-hash-map"
@ -557,6 +618,12 @@ version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"
[[package]]
name = "linux-raw-sys"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
[[package]]
name = "lock_api"
version = "0.4.9"
@ -635,7 +702,7 @@ version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1"
dependencies = [
"hermit-abi",
"hermit-abi 0.1.19",
"libc",
]
@ -651,12 +718,6 @@ version = "11.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "os_str_bytes"
version = "6.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ff7415e9ae3fff1225851df9e0d9e4e5479f947619774677a63572e55e80eff"
[[package]]
name = "parking_lot"
version = "0.12.1"
@ -677,7 +738,7 @@ dependencies = [
"libc",
"redox_syscall",
"smallvec",
"windows-sys",
"windows-sys 0.36.1",
]
[[package]]
@ -874,6 +935,20 @@ dependencies = [
"winapi",
]
[[package]]
name = "rustix"
version = "0.37.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b96e891d04aa506a6d1f318d2771bcb1c7dfda84e126660ace067c9b474bb2c0"
dependencies = [
"bitflags",
"errno",
"io-lifetimes",
"libc",
"linux-raw-sys",
"windows-sys 0.48.0",
]
[[package]]
name = "ryu"
version = "1.0.11"
@ -1046,12 +1121,6 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "textwrap"
version = "0.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "949517c0cf1bf4ee812e2e07e08ab448e3ae0d23472aee8a06c985f0c8815b16"
[[package]]
name = "thiserror"
version = "1.0.35"
@ -1366,43 +1435,109 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2"
dependencies = [
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_msvc",
"windows_aarch64_msvc 0.36.1",
"windows_i686_gnu 0.36.1",
"windows_i686_msvc 0.36.1",
"windows_x86_64_gnu 0.36.1",
"windows_x86_64_msvc 0.36.1",
]
[[package]]
name = "windows-sys"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc 0.48.0",
"windows_i686_gnu 0.48.0",
"windows_i686_msvc 0.48.0",
"windows_x86_64_gnu 0.48.0",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc 0.48.0",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc"
[[package]]
name = "windows_aarch64_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47"
[[package]]
name = "windows_aarch64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3"
[[package]]
name = "windows_i686_gnu"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6"
[[package]]
name = "windows_i686_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241"
[[package]]
name = "windows_i686_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024"
[[package]]
name = "windows_i686_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00"
[[package]]
name = "windows_x86_64_gnu"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1"
[[package]]
name = "windows_x86_64_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953"
[[package]]
name = "windows_x86_64_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
[[package]]
name = "windows_x86_64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
[[package]]
name = "xml-rs"
version = "0.8.4"

View File

@ -12,7 +12,7 @@ readme = "README.md"
edition = "2018"
[package.metadata]
toolchain = "1.64.0"
toolchain = "1.70.0"
upx_version = "3.96"
[dependencies]
@ -41,7 +41,7 @@ signal = "0.7"
[dev-dependencies]
tempfile = "3"
criterion = { version = "0.4", features = ["html_reports"] }
criterion = { version = "0.5", features = ["html_reports"] }
iai = "0.1"
[features]

View File

@ -18,6 +18,9 @@ mod engine {
pub mod common {
include!("../src/engine/common.rs");
}
pub mod coms {
include!("../src/engine/coms.rs");
}
mod shared {
include!("../src/engine/shared.rs");
}
@ -27,6 +30,12 @@ mod engine {
mod socket_thread {
include!("../src/engine/socket_thread.rs");
}
mod housekeep_thread {
include!("../src/engine/housekeep_thread.rs");
}
mod extras_thread {
include!("../src/engine/extras_thread.rs");
}
}
mod config {
include!("../src/config.rs");

View File

@ -1,5 +1,3 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::{fs::File, hash::BuildHasherDefault};
@ -8,11 +6,13 @@ use fnv::FnvHasher;
use crate::util::CtrlC;
use crate::{
config::Config,
crypto::PeerCrypto,
device::Device,
engine::{
coms::Coms,
device_thread::DeviceThread,
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
extras_thread::ExtrasThread,
housekeep_thread::HousekeepThread,
shared::SharedConfig,
socket_thread::{ReconnectEntry, SocketThread},
},
error::Error,
@ -36,13 +36,15 @@ pub struct PeerData {
pub timeout: Time,
pub peer_timeout: u16,
pub node_id: NodeId,
pub crypto: PeerCrypto,
//pub crypto: PeerCrypto,
}
pub struct GenericCloud<D: Device, P: Protocol, S: Socket, TS: TimeSource> {
config: SharedConfig,
socket_thread: SocketThread<S, D, P, TS>,
device_thread: DeviceThread<S, D, P, TS>,
running: Arc<AtomicBool>,
housekeep_thread: HousekeepThread<S, P, TS>,
extras_thread: ExtrasThread<S, P, TS>,
}
impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS> {
@ -50,32 +52,18 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
pub fn new(
config: &Config, socket: S, device: D, port_forwarding: Option<PortForwarding>, stats_file: Option<File>,
) -> Result<Self, Error> {
let table = SharedTable::<TS>::new(config);
let traffic = SharedTraffic::new();
let peer_crypto = SharedPeerCrypto::new();
let running = Arc::new(AtomicBool::new(true));
let device_thread = DeviceThread::<S, D, P, TS>::new(
config.clone(),
device.duplicate()?,
let config = SharedConfig::new(config.clone());
let coms = Coms::<S, TS, P>::new(
config.get_config(),
socket.try_clone().map_err(|e| Error::SocketIo("Failed to clone socket", e))?,
traffic.clone(),
peer_crypto.clone(),
table.clone(),
running.clone(),
);
let mut socket_thread = SocketThread::<S, D, P, TS>::new(
config.clone(),
device,
socket,
traffic,
peer_crypto,
table,
port_forwarding,
stats_file,
running.clone(),
);
let device_thread = DeviceThread::<S, D, P, TS>::new(config.clone(), device.duplicate()?, coms.try_clone()?);
let housekeep_thread = HousekeepThread::<S, P, TS>::new(config.clone(), coms.try_clone()?);
let extras_thread = ExtrasThread::<S, P, TS>::new(config.clone(), coms.try_clone()?);
let mut socket_thread =
SocketThread::<S, D, P, TS>::new(config.clone(), coms, device, port_forwarding, stats_file);
socket_thread.housekeep()?;
Ok(Self { socket_thread, device_thread, running })
Ok(Self { socket_thread, device_thread, config, housekeep_thread, extras_thread })
}
pub fn add_peer(&mut self, addr: String) -> Result<(), Error> {
@ -93,17 +81,23 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
pub fn run(self) {
debug!("Starting threads");
let running = self.running.clone();
let config = self.config;
let device = self.device_thread;
let device_thread_handle = thread::spawn(move || device.run());
let socket = self.socket_thread;
let socket_thread_handle = thread::spawn(move || socket.run());
let housekeep = self.housekeep_thread;
let housekeep_thread_handle = thread::spawn(move || housekeep.run());
let extras = self.extras_thread;
let extras_thread_handle = thread::spawn(move || extras.run());
let ctrlc = CtrlC::new();
ctrlc.wait();
running.store(false, Ordering::SeqCst);
config.stop();
debug!("Waiting for threads to end");
device_thread_handle.join().unwrap();
socket_thread_handle.join().unwrap();
housekeep_thread_handle.join().unwrap();
extras_thread_handle.join().unwrap();
debug!("Threads stopped");
}
}
@ -120,7 +114,7 @@ use std::net::SocketAddr;
#[cfg(test)]
impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
pub fn socket(&mut self) -> &mut MockSocket {
&mut self.socket_thread.socket
&mut self.socket_thread.coms.socket
}
pub fn device(&mut self) -> &mut MockDevice {
@ -145,7 +139,7 @@ impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
}
pub fn is_connected(&self, addr: &SocketAddr) -> bool {
self.socket_thread.peers.contains_key(addr)
self.socket_thread.coms.has_peer(addr)
}
pub fn own_addresses(&self) -> &[SocketAddr] {
@ -153,6 +147,6 @@ impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
}
pub fn get_num(&self) -> usize {
self.socket_thread.socket.address().unwrap().port() as usize
self.socket_thread.coms.socket.address().unwrap().port() as usize
}
}

172
src/engine/coms.rs Normal file
View File

@ -0,0 +1,172 @@
use std::{collections::HashMap, io, marker::PhantomData, net::SocketAddr, ops::DerefMut};
use crate::{
config::Config,
crypto::PeerCrypto,
error::Error,
messages::MESSAGE_TYPE_DATA,
net::{mapped_addr, Socket},
payload::Protocol,
util::{MsgBuffer, TimeSource, addr_nice},
};
use super::{
common::{Hash, PeerData, SPACE_BEFORE},
shared::{SharedCrypto, SharedPeers, SharedTable, SharedTraffic},
};
pub struct Coms<S: Socket, TS: TimeSource, P: Protocol> {
_dummy_p: PhantomData<P>,
broadcast: bool,
broadcast_buffer: MsgBuffer,
crypto: SharedCrypto,
peers: SharedPeers,
pub table: SharedTable<TS>,
pub traffic: SharedTraffic,
pub socket: S,
}
impl<S: Socket, TS: TimeSource, P: Protocol> Coms<S, TS, P> {
pub fn new(config: &Config, socket: S) -> Self {
Self {
_dummy_p: PhantomData,
broadcast: config.is_broadcasting(),
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE),
traffic: SharedTraffic::new(),
crypto: SharedCrypto::new(),
peers: SharedPeers::new(),
table: SharedTable::<TS>::new(config),
socket,
}
}
pub fn try_clone(&self) -> Result<Self, Error> {
Ok(Self {
_dummy_p: PhantomData,
broadcast: self.broadcast,
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE),
traffic: self.traffic.clone(),
crypto: self.crypto.clone(),
table: self.table.clone(),
peers: self.peers.clone(),
socket: self.socket.try_clone().map_err(|e| Error::SocketIo("Failed to clone socket", e))?,
})
}
pub fn sync(&mut self) -> Result<(), Error> {
self.crypto.load();
self.table.sync();
self.traffic.sync();
Ok(())
}
pub fn get_address(&self) -> Result<SocketAddr, io::Error> {
self.socket.address().map(mapped_addr)
}
pub fn send_raw(&mut self, data: &[u8], addr: SocketAddr) -> Result<(), Error> {
match self.socket.send(data, addr) {
Ok(written) if written == data.len() => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
}
}
//TODO: move back to socket thread
pub fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> {
self.socket.receive(buffer)
}
#[inline]
pub fn send_to(&mut self, addr: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
let size = data.len();
debug!("Sending msg with {} bytes to {}", size, addr);
self.traffic.count_out_traffic(addr, size);
match self.socket.send(data.message(), addr) {
Ok(written) if written == size => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
}
}
#[inline]
pub fn send_msg(&mut self, addr: SocketAddr, type_: u8, data: &mut MsgBuffer) -> Result<(), Error> {
debug!("Sending msg with {} bytes to {}", data.len(), addr);
data.prepend_byte(type_);
self.crypto.encrypt_for(addr, data)?;
self.send_to(addr, data)
}
#[inline]
pub fn broadcast_msg(&mut self, type_: u8, data: &mut MsgBuffer) -> Result<(), Error> {
let size = data.len();
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, size, self.crypto.count());
let traffic = &mut self.traffic;
let socket = &mut self.socket;
let peers = self.crypto.get_snapshot();
for (addr, crypto) in peers {
self.broadcast_buffer.set_start(data.get_start());
self.broadcast_buffer.set_length(data.len());
self.broadcast_buffer.message_mut().clone_from_slice(data.message());
self.broadcast_buffer.prepend_byte(type_);
if let Some(crypto) = crypto {
crypto.encrypt(&mut self.broadcast_buffer);
}
traffic.count_out_traffic(*addr, self.broadcast_buffer.len());
match socket.send(self.broadcast_buffer.message(), *addr) {
Ok(written) if written == self.broadcast_buffer.len() => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
}?
}
Ok(())
}
pub fn forward_packet(&mut self, data: &mut MsgBuffer) -> Result<(), Error> {
let (src, dst) = P::parse(data.message())?;
debug!("Read data from interface: src: {}, dst: {}, {} bytes", src, dst, data.len());
self.traffic.count_out_payload(dst.clone(), src, data.len());
match self.table.lookup(&dst) {
Some(addr) => {
// Peer found for destination
debug!("Found destination for {} => {}", dst, addr);
self.send_msg(addr, MESSAGE_TYPE_DATA, data)?;
}
//TODO: VIA: find relay peer and relay message
None => {
if self.broadcast {
debug!("No destination for {} found, broadcasting", dst);
self.broadcast_msg(MESSAGE_TYPE_DATA, data)?;
} else {
debug!("No destination for {} found, dropping", dst);
self.traffic.count_dropped_payload(data.len());
}
}
}
Ok(())
}
pub fn get_peers<'a>(&'a self) -> impl DerefMut<Target = HashMap<SocketAddr, PeerData, Hash>> + 'a {
self.peers.get_peers()
}
pub fn add_peer(&mut self, addr: SocketAddr, peer_data: PeerData, peer_crypto: &PeerCrypto) {
self.crypto.add(addr, peer_crypto.get_core());
self.peers.get_peers().insert(addr, peer_data);
}
pub fn remove_peer(&mut self, addr: &SocketAddr) -> bool {
if let Some(_peer) = self.peers.get_peers().remove(addr) {
info!("Closing connection to {}", addr_nice(*addr));
self.table.remove_claims(*addr);
self.crypto.remove(addr);
true
} else {
false
}
}
pub fn has_peer(&self, addr: &SocketAddr) -> bool {
self.crypto.contains(addr)
}
}

View File

@ -1,139 +1,40 @@
use super::{
common::SPACE_BEFORE,
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
};
use super::{common::SPACE_BEFORE, coms::Coms, shared::SharedConfig};
use crate::{
config::Config,
device::Device,
error::Error,
messages::MESSAGE_TYPE_DATA,
net::Socket,
util::{MsgBuffer, Time, TimeSource},
Protocol,
};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{marker::PhantomData, net::SocketAddr};
pub struct DeviceThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
// Read-only fields
_dummy_ts: PhantomData<TS>,
_dummy_p: PhantomData<P>,
broadcast: bool,
// Device-only fields
socket: S,
config: SharedConfig,
coms: Coms<S, TS, P>,
pub device: D,
next_housekeep: Time,
buffer: MsgBuffer,
broadcast_buffer: MsgBuffer,
// Shared fields
traffic: SharedTraffic,
peer_crypto: SharedPeerCrypto,
table: SharedTable<TS>,
running: Arc<AtomicBool>,
}
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS> {
pub fn new(
config: Config, device: D, socket: S, traffic: SharedTraffic, peer_crypto: SharedPeerCrypto,
table: SharedTable<TS>, running: Arc<AtomicBool>,
) -> Self {
pub fn new(config: SharedConfig, device: D, coms: Coms<S, TS, P>) -> Self {
Self {
_dummy_ts: PhantomData,
_dummy_p: PhantomData,
broadcast: config.is_broadcasting(),
socket,
config,
device,
next_housekeep: TS::now(),
traffic,
peer_crypto,
table,
buffer: MsgBuffer::new(SPACE_BEFORE),
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE),
running,
coms,
}
}
#[inline]
fn send_to(&mut self, addr: SocketAddr) -> Result<(), Error> {
let size = self.buffer.len();
debug!("Sending msg with {} bytes to {}", size, addr);
self.traffic.count_out_traffic(addr, size);
match self.socket.send(self.buffer.message(), addr) {
Ok(written) if written == size => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
}
}
#[inline]
fn send_msg(&mut self, addr: SocketAddr, type_: u8) -> Result<(), Error> {
debug!("Sending msg with {} bytes to {}", self.buffer.len(), addr);
self.buffer.prepend_byte(type_);
self.peer_crypto.encrypt_for(addr, &mut self.buffer)?;
self.send_to(addr)
}
#[inline]
fn broadcast_msg(&mut self, type_: u8) -> Result<(), Error> {
let size = self.buffer.len();
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, size, self.peer_crypto.count());
let traffic = &mut self.traffic;
let socket = &mut self.socket;
let peers = self.peer_crypto.get_snapshot();
for (addr, crypto) in peers {
self.broadcast_buffer.set_start(self.buffer.get_start());
self.broadcast_buffer.set_length(self.buffer.len());
self.broadcast_buffer.message_mut().clone_from_slice(self.buffer.message());
self.broadcast_buffer.prepend_byte(type_);
if let Some(crypto) = crypto {
crypto.encrypt(&mut self.broadcast_buffer);
}
traffic.count_out_traffic(*addr, self.broadcast_buffer.len());
match socket.send(self.broadcast_buffer.message(), *addr) {
Ok(written) if written == self.broadcast_buffer.len() => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
}?
}
Ok(())
}
fn forward_packet(&mut self) -> Result<(), Error> {
let (src, dst) = P::parse(self.buffer.message())?;
debug!("Read data from interface: src: {}, dst: {}, {} bytes", src, dst, self.buffer.len());
self.traffic.count_out_payload(dst.clone(), src, self.buffer.len());
match self.table.lookup(&dst) {
Some(addr) => {
// Peer found for destination
debug!("Found destination for {} => {}", dst, addr);
self.send_msg(addr, MESSAGE_TYPE_DATA)?;
}
//TODO: VIA: find relay peer and relay message
None => {
if self.broadcast {
debug!("No destination for {} found, broadcasting", dst);
self.broadcast_msg(MESSAGE_TYPE_DATA)?;
} else {
debug!("No destination for {} found, dropping", dst);
self.traffic.count_dropped_payload(self.buffer.len());
}
}
}
Ok(())
}
pub fn housekeep(&mut self) -> Result<(), Error> {
self.peer_crypto.load();
self.table.sync();
self.traffic.sync();
Ok(())
self.coms.sync()
}
pub fn iteration(&mut self) -> bool {
if self.device.read(&mut self.buffer).is_ok() {
//try_fail!(result, "Failed to read from device: {}");
if let Err(e) = self.forward_packet() {
if let Err(e) = self.coms.forward_packet(&mut self.buffer) {
error!("{}", e);
}
}
@ -143,7 +44,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
error!("{}", e)
}
self.next_housekeep = now + 1;
if !self.running.load(Ordering::SeqCst) {
if !self.config.is_running() {
debug!("Device: end");
return false;
}

View File

@ -0,0 +1,59 @@
use std::time::Duration;
use super::{common::SPACE_BEFORE, coms::Coms, shared::SharedConfig};
use crate::{
error::Error,
net::Socket,
util::{MsgBuffer, TimeSource, Time},
Protocol,
};
const MAX_RECONNECT_INTERVAL: u16 = 3600;
const RESOLVE_INTERVAL: Time = 300;
const OWN_ADDRESS_RESET_INTERVAL: Time = 300;
pub const STATS_INTERVAL: Time = 60;
pub struct ExtrasThread<S: Socket, P: Protocol, TS: TimeSource> {
config: SharedConfig,
coms: Coms<S, TS, P>,
next_housekeep: Time,
buffer: MsgBuffer,
}
impl<S: Socket, P: Protocol, TS: TimeSource> ExtrasThread<S, P, TS> {
pub fn new(config: SharedConfig, coms: Coms<S, TS, P>) -> Self {
Self {
config,
coms,
next_housekeep: TS::now(),
buffer: MsgBuffer::new(SPACE_BEFORE),
}
}
pub fn housekeep(&mut self) -> Result<(), Error> {
let now = TS::now();
assert!(self.buffer.is_empty());
Ok(())
}
pub fn iteration(&mut self) -> bool {
std::thread::sleep(Duration::from_millis(100));
let now = TS::now();
if self.next_housekeep < now {
if let Err(e) = self.housekeep() {
error!("{}", e)
}
self.next_housekeep = now + 1;
if !self.config.is_running() {
debug!("Extras: end");
return false;
}
}
true
}
pub fn run(mut self) {
while self.iteration() {}
}
}

View File

@ -0,0 +1,67 @@
use std::{time::Duration, net::SocketAddr, cmp::{min, max}};
use smallvec::SmallVec;
use super::{common::SPACE_BEFORE, coms::Coms, shared::SharedConfig};
use crate::{
error::Error,
net::Socket,
util::{MsgBuffer, TimeSource, Time, addr_nice},
Protocol, messages::MESSAGE_TYPE_NODE_INFO, config::DEFAULT_PEER_TIMEOUT,
};
const MAX_RECONNECT_INTERVAL: u16 = 3600;
const RESOLVE_INTERVAL: Time = 300;
const OWN_ADDRESS_RESET_INTERVAL: Time = 300;
pub const STATS_INTERVAL: Time = 60;
pub struct HousekeepThread<S: Socket, P: Protocol, TS: TimeSource> {
config: SharedConfig,
coms: Coms<S, TS, P>,
next_housekeep: Time,
buffer: MsgBuffer,
update_freq: u16,
next_peers: Time,
next_own_address_reset: Time,
}
impl<S: Socket, P: Protocol, TS: TimeSource> HousekeepThread<S, P, TS> {
pub fn new(config: SharedConfig, coms: Coms<S, TS, P>) -> Self {
let update_freq = config.get_config().get_keepalive() as u16;
Self {
config,
coms,
next_housekeep: TS::now(),
buffer: MsgBuffer::new(SPACE_BEFORE),
update_freq,
next_peers: TS::now(),
next_own_address_reset: TS::now(),
}
}
pub fn housekeep(&mut self) -> Result<(), Error> {
let now = TS::now();
assert!(self.buffer.is_empty());
Ok(())
}
pub fn iteration(&mut self) -> bool {
std::thread::sleep(Duration::from_millis(100));
let now = TS::now();
if self.next_housekeep < now {
if let Err(e) = self.housekeep() {
error!("{}", e)
}
self.next_housekeep = now + 1;
if !self.config.is_running() {
debug!("Housekeep: end");
return false;
}
}
true
}
pub fn run(mut self) {
while self.iteration() {}
}
}

View File

@ -2,7 +2,10 @@
// Copyright (C) 2015-2021 Dennis Schwerdel
// This software is licensed under GPL-3 or newer (see LICENSE.md)
mod device_thread;
mod shared;
mod device_thread;
mod socket_thread;
mod housekeep_thread;
mod extras_thread;
pub mod coms;
pub mod common;

View File

@ -1,6 +1,6 @@
use crate::{
config::Config,
crypto::CryptoCore,
crypto::{CryptoCore, PeerCrypto},
engine::common::Hash,
error::Error,
table::ClaimTable,
@ -13,34 +13,40 @@ use std::{
collections::HashMap,
io::{self, Write},
net::SocketAddr,
sync::Arc,
ops::DerefMut,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use super::common::PeerData;
#[derive(Clone)]
#[allow(clippy::type_complexity)]
pub struct SharedPeerCrypto {
peers: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>,
pub struct SharedCrypto {
peer_crypto: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>,
cache: HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>, //TODO: local hashmap as cache
}
impl SharedPeerCrypto {
impl SharedCrypto {
pub fn new() -> Self {
SharedPeerCrypto { peers: Arc::new(Mutex::new(HashMap::default())), cache: HashMap::default() }
SharedCrypto { peer_crypto: Arc::new(Mutex::new(HashMap::default())), cache: HashMap::default() }
}
pub fn encrypt_for(&mut self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
let crypto = match self.cache.get(&peer) {
Some(crypto) => crypto,
None => {
let peers = self.peers.lock();
if let Some(crypto) = peers.get(&peer) {
self.cache.insert(peer, crypto.clone());
self.cache.get(&peer).unwrap()
} else {
return Err(Error::InvalidCryptoState("No crypto found for peer"));
}
let cache = &mut self.cache;
let owned_crypto;
let crypto = if let Some(crypto) = cache.get(&peer) {
crypto
} else {
let peers = self.peer_crypto.lock();
if let Some(crypto) = peers.get(&peer) {
cache.insert(peer, crypto.clone());
owned_crypto = crypto.clone();
&owned_crypto
} else {
return Err(Error::InvalidCryptoState("No crypto found for peer"));
}
};
if let Some(crypto) = crypto {
@ -49,16 +55,28 @@ impl SharedPeerCrypto {
Ok(())
}
pub fn store(&mut self, data: &HashMap<SocketAddr, PeerData, Hash>) {
pub fn add(&mut self, addr: SocketAddr, crypto: Option<Arc<CryptoCore>>) {
self.cache.insert(addr, crypto.clone());
let mut peers = self.peer_crypto.lock();
peers.insert(addr, crypto);
}
pub fn remove(&mut self, addr: &SocketAddr) {
self.cache.remove(addr);
let mut peers = self.peer_crypto.lock();
peers.remove(addr);
}
pub fn store(&mut self, data: &HashMap<SocketAddr, PeerCrypto, Hash>) {
self.cache.clear();
self.cache.extend(data.iter().map(|(k, v)| (*k, v.crypto.get_core())));
let mut peers = self.peers.lock();
self.cache.extend(data.iter().map(|(k, v)| (*k, v.get_core())));
let mut peers = self.peer_crypto.lock();
peers.clear();
peers.extend(self.cache.iter().map(|(k, v)| (*k, v.clone())));
}
pub fn load(&mut self) {
let peers = self.peers.lock();
let peers = self.peer_crypto.lock();
self.cache.clear();
self.cache.extend(peers.iter().map(|(k, v)| (*k, v.clone())));
}
@ -67,6 +85,10 @@ impl SharedPeerCrypto {
&self.cache
}
pub fn contains(&self, addr: &SocketAddr) -> bool {
self.cache.contains_key(addr)
}
pub fn count(&self) -> usize {
self.cache.len()
}
@ -138,6 +160,12 @@ impl SharedTraffic {
}
}
impl Default for SharedTraffic {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub struct SharedTable<TS: TimeSource> {
table: Arc<Mutex<ClaimTable<TS>>>,
@ -198,3 +226,42 @@ impl<TS: TimeSource> SharedTable<TS> {
self.table.lock().claim_len()
}
}
#[derive(Clone)]
pub struct SharedConfig {
config: Config,
running: Arc<AtomicBool>,
}
impl SharedConfig {
pub fn new(config: Config) -> Self {
Self { config, running: Arc::new(AtomicBool::new(true)) }
}
pub fn get_config(&self) -> &Config {
&self.config
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::Relaxed)
}
pub fn stop(&self) {
self.running.store(false, Ordering::Relaxed)
}
}
#[derive(Clone)]
pub struct SharedPeers {
peers: Arc<Mutex<HashMap<SocketAddr, PeerData, Hash>>>,
}
impl SharedPeers {
pub fn new() -> Self {
Self { peers: Default::default() }
}
pub fn get_peers<'a>(&'a self) -> impl DerefMut<Target = HashMap<SocketAddr, PeerData, Hash>> + 'a {
self.peers.lock()
}
}

View File

@ -1,12 +1,9 @@
use super::{
common::SPACE_BEFORE,
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
};
use super::{common::SPACE_BEFORE, coms::Coms, shared::SharedConfig};
use crate::{
beacon::BeaconSerializer,
config::{DEFAULT_PEER_TIMEOUT, DEFAULT_PORT},
crypto::{is_init_message, Crypto, InitResult, InitState, MessageResult},
crypto::{is_init_message, Crypto, InitResult, InitState, MessageResult, PeerCrypto},
device::{Device, Type},
engine::common::{Hash, PeerData},
error::Error,
@ -18,12 +15,10 @@ use crate::{
port_forwarding::PortForwarding,
types::{Address, NodeId, Range, RangeList},
util::{addr_nice, resolve, MsgBuffer, StatsdMsg, Time, TimeSource},
Config, Protocol,
Protocol,
};
use rand::{random, seq::SliceRandom, thread_rng};
use smallvec::{smallvec, SmallVec};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{
cmp::{max, min},
collections::HashMap,
@ -33,7 +28,7 @@ use std::{
io::{Cursor, Seek, SeekFrom, Write},
marker::PhantomData,
net::{SocketAddr, ToSocketAddrs},
str::FromStr,
str::FromStr, ops::Deref,
};
const MAX_RECONNECT_INTERVAL: u16 = 3600;
@ -55,35 +50,32 @@ pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
// Read-only fields
node_id: NodeId,
claims: RangeList,
config: Config,
peer_timeout_publish: u16,
learning: bool,
update_freq: u16,
_dummy_ts: PhantomData<TS>,
_dummy_p: PhantomData<P>,
// Socket-only fields
pub socket: S,
config: SharedConfig,
pub coms: Coms<S, TS, P>,
device: D,
next_housekeep: Time,
pub own_addresses: AddrList,
next_own_address_reset: Time,
pending_inits: HashMap<SocketAddr, InitState<NodeInfo>, Hash>,
crypto: Crypto,
pub peers: HashMap<SocketAddr, PeerData, Hash>,
next_peers: Time,
next_stats_out: Time,
next_beacon: Time,
beacon_serializer: BeaconSerializer<TS>,
stats_file: Option<File>,
statsd_server: Option<String>,
pub reconnect_peers: SmallVec<[ReconnectEntry; 3]>,
crypto: Crypto, // TODO 2nd: move to config
//pub peers: HashMap<SocketAddr, PeerData, Hash>, // TODO 1st: move to shared peers
peer_crypto: HashMap<SocketAddr, PeerCrypto, Hash>,
next_peers: Time, // TODO: split off
next_stats_out: Time, // TODO: split off
next_beacon: Time, // TODO: split off
beacon_serializer: BeaconSerializer<TS>, // TODO: split off
stats_file: Option<File>, // TODO: split off
statsd_server: Option<String>, // TODO: split off
pub reconnect_peers: SmallVec<[ReconnectEntry; 3]>, // TODO: move to shared config
buffer: MsgBuffer,
broadcast_buffer: MsgBuffer,
// Shared fields
peer_crypto: SharedPeerCrypto,
traffic: SharedTraffic,
table: SharedTable<TS>,
running: Arc<AtomicBool>,
//table: SharedTable<TS>,
// Should not be here
port_forwarding: Option<PortForwarding>, // TODO: 3rd thread
}
@ -91,15 +83,14 @@ pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS> {
#[allow(clippy::too_many_arguments)]
pub fn new(
config: Config, device: D, socket: S, traffic: SharedTraffic, peer_crypto: SharedPeerCrypto,
table: SharedTable<TS>, port_forwarding: Option<PortForwarding>, stats_file: Option<File>,
running: Arc<AtomicBool>,
config: SharedConfig, coms: Coms<S, TS, P>, device: D, port_forwarding: Option<PortForwarding>,
stats_file: Option<File>,
) -> Self {
let mut claims = SmallVec::with_capacity(config.claims.len());
for s in &config.claims {
let mut claims = SmallVec::with_capacity(config.get_config().claims.len());
for s in &config.get_config().claims {
claims.push(try_fail!(Range::from_str(s), "Invalid subnet format: {} ({})", s));
}
if device.get_type() == Type::Tun && config.auto_claim {
if device.get_type() == Type::Tun && config.get_config().auto_claim {
match device.get_ip() {
Ok(ip) => {
let range = Range { base: Address::from_ipv4(ip), prefix_len: 32 };
@ -113,20 +104,17 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
}
let now = TS::now();
let update_freq = config.get_keepalive() as u16;
let update_freq = config.get_config().get_keepalive() as u16;
let node_id = random();
let beacon_key = config.beacon_password.as_ref().map(|s| s.as_bytes()).unwrap_or(&[]);
let beacon_key = config.get_config().beacon_password.as_ref().map(|s| s.as_bytes()).unwrap_or(&[]);
Self {
_dummy_p: PhantomData,
_dummy_ts: PhantomData,
node_id,
claims,
device,
socket,
peer_crypto,
traffic,
table,
learning: config.is_learning(),
coms,
learning: config.get_config().is_learning(),
next_housekeep: now,
next_beacon: now,
next_peers: now,
@ -135,59 +123,22 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
pending_inits: HashMap::default(),
reconnect_peers: SmallVec::new(),
own_addresses: SmallVec::new(),
peers: HashMap::default(),
peer_timeout_publish: config.peer_timeout as u16,
peer_timeout_publish: config.get_config().peer_timeout as u16,
beacon_serializer: BeaconSerializer::new(beacon_key),
port_forwarding,
stats_file,
update_freq,
statsd_server: config.statsd_server.clone(),
crypto: Crypto::new(node_id, &config.crypto).unwrap(),
statsd_server: config.get_config().statsd_server.clone(),
crypto: Crypto::new(node_id, &config.get_config().crypto).unwrap(),
peer_crypto: Default::default(),
config,
buffer: MsgBuffer::new(SPACE_BEFORE),
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE),
running,
}
}
#[inline]
fn send_to(&mut self, addr: SocketAddr) -> Result<(), Error> {
let size = self.buffer.len();
debug!("Sending msg with {} bytes to {}", size, addr);
self.traffic.count_out_traffic(addr, size);
match self.socket.send(self.buffer.message(), addr) {
Ok(written) if written == size => {
self.buffer.clear();
Ok(())
}
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
}
}
#[inline]
fn broadcast_msg(&mut self, type_: u8) -> Result<(), Error> {
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, self.buffer.len(), self.peers.len());
for (addr, peer) in &mut self.peers {
self.broadcast_buffer.set_start(self.buffer.get_start());
self.broadcast_buffer.set_length(self.buffer.len());
self.broadcast_buffer.message_mut().clone_from_slice(self.buffer.message());
self.broadcast_buffer.prepend_byte(type_);
peer.crypto.encrypt_message(&mut self.broadcast_buffer);
self.traffic.count_out_traffic(*addr, self.broadcast_buffer.len());
match self.socket.send(self.broadcast_buffer.message(), *addr) {
Ok(written) if written == self.broadcast_buffer.len() => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
}?
}
self.buffer.clear();
Ok(())
}
fn connect_sock(&mut self, addr: SocketAddr) -> Result<(), Error> {
let addr = mapped_addr(addr);
if self.peers.contains_key(&addr)
if self.coms.has_peer(&addr)
|| self.own_addresses.contains(&addr)
|| self.pending_inits.contains_key(&addr)
{
@ -198,14 +149,14 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
let mut init = self.crypto.peer_instance(payload);
init.send_ping(&mut self.buffer);
self.pending_inits.insert(addr, init);
self.send_to(addr)
self.coms.send_to(addr, &mut self.buffer)
}
pub fn connect<Addr: ToSocketAddrs + fmt::Debug + Clone>(&mut self, addr: Addr) -> Result<(), Error> {
let addrs = resolve(&addr)?.into_iter().map(mapped_addr).collect::<SmallVec<[SocketAddr; 3]>>();
for addr in &addrs {
if self.own_addresses.contains(addr)
|| self.peers.contains_key(addr)
|| self.coms.has_peer(addr)
|| self.pending_inits.contains_key(addr)
{
return Ok(());
@ -221,7 +172,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
fn create_node_info(&self) -> NodeInfo {
let mut peers = smallvec![];
for peer in self.peers.values() {
for peer in self.coms.get_peers().values() {
peers.push(PeerInfo { node_id: Some(peer.node_id), addrs: peer.addrs.clone() })
}
if peers.len() > 20 {
@ -239,9 +190,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
fn update_peer_info(&mut self, addr: SocketAddr, info: Option<NodeInfo>) -> Result<(), Error> {
if let Some(peer) = self.peers.get_mut(&addr) {
if let Some(peer) = self.coms.get_peers().get_mut(&addr) {
peer.last_seen = TS::now();
peer.timeout = TS::now() + self.config.peer_timeout as Time;
peer.timeout = TS::now() + self.config.get_config().peer_timeout as Time;
if let Some(info) = &info {
// Update peer addresses, always add seen address
peer.addrs.clear();
@ -258,7 +209,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
if let Some(info) = info {
debug!("Adding claims of peer {}: {:?}", addr_nice(addr), info.claims);
self.table.set_claims(addr, info.claims);
self.coms.table.set_claims(addr, info.claims);
debug!("Received {} peers from {}: {:?}", info.peers.len(), addr_nice(addr), info.peers);
self.connect_to_peers(&info.peers)?;
}
@ -270,22 +221,19 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
if let Some(init) = self.pending_inits.remove(&addr) {
self.buffer.clear();
let crypto = init.finish(&mut self.buffer);
self.peers.insert(
addr,
PeerData {
addrs: info.addrs.clone(),
crypto,
node_id: info.node_id,
peer_timeout: info.peer_timeout.unwrap_or(DEFAULT_PEER_TIMEOUT),
last_seen: TS::now(),
timeout: TS::now() + self.config.peer_timeout as Time,
},
);
let peer_data = PeerData {
addrs: info.addrs.clone(),
node_id: info.node_id,
peer_timeout: info.peer_timeout.unwrap_or(DEFAULT_PEER_TIMEOUT),
last_seen: TS::now(),
timeout: TS::now() + self.config.get_config().peer_timeout as Time,
};
self.coms.add_peer(addr, peer_data,&crypto);
self.peer_crypto.insert(addr, crypto);
self.update_peer_info(addr, Some(info))?;
if !self.buffer.is_empty() {
self.send_to(addr)?;
self.coms.send_to(addr, &mut self.buffer)?;
}
self.peer_crypto.store(&self.peers);
} else {
error!("No init for new peer {}", addr_nice(addr));
}
@ -295,7 +243,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
fn connect_to_peers(&mut self, peers: &[PeerInfo]) -> Result<(), Error> {
'outer: for peer in peers {
for addr in &peer.addrs {
if self.peers.contains_key(addr) {
if self.own_addresses.contains(addr) {
// Check addresses and add addresses that we don't know to own addresses
for addr in &peer.addrs {
if !self.own_addresses.contains(addr) {
@ -309,7 +257,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
if self.node_id == node_id {
continue 'outer;
}
for p in self.peers.values() {
for p in self.coms.get_peers().values() {
if p.node_id == node_id {
continue 'outer;
}
@ -321,18 +269,15 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
fn remove_peer(&mut self, addr: SocketAddr) {
if let Some(_peer) = self.peers.remove(&addr) {
info!("Closing connection to {}", addr_nice(addr));
self.table.remove_claims(addr);
self.peer_crypto.store(&self.peers);
}
self.coms.remove_peer(&addr);
self.peer_crypto.remove(&addr);
}
fn handle_payload_from(&mut self, peer: SocketAddr) -> Result<(), Error> {
let (src, dst) = P::parse(self.buffer.message())?;
let len = self.buffer.len();
debug!("Writing data to device: {} bytes", len);
self.traffic.count_in_payload(src.clone(), dst, len);
self.coms.traffic.count_in_payload(src.clone(), dst, len);
if let Err(e) = self.device.write(&mut self.buffer) {
error!("Failed to send via device: {}", e);
return Err(e);
@ -340,7 +285,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
self.buffer.clear();
if self.learning {
// Learn single address
self.table.cache(&src, peer);
self.coms.table.cache(&src, peer);
}
Ok(())
}
@ -354,7 +299,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
let info = match NodeInfo::decode(Cursor::new(self.buffer.message())) {
Ok(val) => val,
Err(err) => {
self.traffic.count_invalid_protocol(self.buffer.len());
self.coms.traffic.count_invalid_protocol(self.buffer.len());
return Err(err);
}
};
@ -371,11 +316,11 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
_ => {
self.buffer.clear();
self.traffic.count_invalid_protocol(self.buffer.len());
self.coms.traffic.count_invalid_protocol(self.buffer.len());
return Err(Error::Message("Unknown message type"));
}
},
MessageResult::Reply => self.send_to(src)?,
MessageResult::Reply => self.coms.send_to(src, &mut self.buffer)?,
MessageResult::None => {
self.buffer.clear();
}
@ -387,8 +332,8 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
let src = mapped_addr(src);
debug!("Received {} bytes from {}", self.buffer.len(), src);
let buffer = &mut self.buffer;
self.traffic.count_in_traffic(src, buffer.len());
if let Some(result) = self.peers.get_mut(&src).map(|peer| peer.crypto.handle_message(buffer)) {
self.coms.traffic.count_in_traffic(src, buffer.len());
if let Some(result) = self.peer_crypto.get_mut(&src).map(|crypto| crypto.handle_message(buffer)) {
return self.process_message(src, result?);
}
let is_init = is_init_message(buffer.message());
@ -402,7 +347,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
}) {
if !buffer.is_empty() {
self.send_to(src)?
self.coms.send_to(src, &mut self.buffer)?
}
if let InitResult::Success { peer_payload, .. } = result? {
self.add_new_peer(src, peer_payload)?
@ -411,7 +356,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
if !is_init_message(self.buffer.message()) {
info!("Ignoring non-init message from unknown peer {}", addr_nice(src));
self.traffic.count_invalid_protocol(self.buffer.len());
self.coms.traffic.count_invalid_protocol(self.buffer.len());
self.buffer.clear();
return Ok(());
}
@ -420,10 +365,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
match msg_result {
Ok(_) => {
self.pending_inits.insert(src, init);
self.send_to(src)
self.coms.send_to(src, &mut self.buffer)
}
Err(err) => {
self.traffic.count_invalid_protocol(self.buffer.len());
self.coms.traffic.count_invalid_protocol(self.buffer.len());
Err(err)
}
}
@ -432,18 +377,17 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
pub fn housekeep(&mut self) -> Result<(), Error> {
let now = TS::now();
let mut del: SmallVec<[SocketAddr; 3]> = SmallVec::new();
for (&addr, data) in &self.peers {
for (&addr, data) in self.coms.get_peers().deref() {
if data.timeout < now {
del.push(addr);
}
}
for addr in del {
info!("Forgot peer {} due to timeout", addr_nice(addr));
self.peers.remove(&addr);
self.table.remove_claims(addr);
self.coms.remove_peer(&addr);
self.connect_sock(addr)?; // Try to reconnect
}
self.table.housekeep();
self.coms.table.housekeep();
self.crypto_housekeep()?;
// Periodically extend the port-forwarding
//TODO: extra thread
@ -456,9 +400,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
debug!("Send peer list to all peers");
let info = self.create_node_info();
info.encode(&mut self.buffer);
self.broadcast_msg(MESSAGE_TYPE_NODE_INFO)?;
self.coms.broadcast_msg(MESSAGE_TYPE_NODE_INFO, &mut self.buffer)?;
// Reschedule for next update
let min_peer_timeout = self.peers.iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT);
let min_peer_timeout = self.coms.get_peers().iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT);
let interval = min(self.update_freq as u16, max(min_peer_timeout / 2 - 60, 1));
self.next_peers = now + Time::from(interval);
}
@ -470,7 +414,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
//TODO: extra thread
self.send_stats_to_statsd()?;
self.next_stats_out = now + STATS_INTERVAL;
self.traffic.period(Some(5));
self.coms.traffic.period(Some(5));
}
if let Some(peers) = self.beacon_serializer.get_cmd_results() {
debug!("Loaded beacon with peers: {:?}", peers);
@ -481,11 +425,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
if self.next_beacon < now {
self.store_beacon()?;
self.load_beacon()?;
self.next_beacon = now + Time::from(self.config.beacon_interval);
self.next_beacon = now + Time::from(self.config.get_config().beacon_interval);
}
self.table.sync();
self.traffic.sync();
self.peer_crypto.store(&self.peers);
self.coms.sync()?;
// Periodically reset own peers
if self.next_own_address_reset <= now {
self.reset_own_addresses().map_err(|err| Error::SocketIo("Failed to get own addresses", err))?;
@ -502,19 +444,20 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
if self.pending_inits.get_mut(&addr).unwrap().every_second(&mut self.buffer).is_err() {
del.push(addr)
} else if !self.buffer.is_empty() {
self.send_to(addr)?
self.coms.send_to(addr, &mut self.buffer)?
}
}
for addr in self.peers.keys().copied().collect::<SmallVec<[SocketAddr; 16]>>() {
for (addr, crypto) in self.peer_crypto.iter_mut() {
self.buffer.clear();
self.peers.get_mut(&addr).unwrap().crypto.every_second(&mut self.buffer);
crypto.every_second(&mut self.buffer);
if !self.buffer.is_empty() {
self.send_to(addr)?
self.coms.send_to(*addr, &mut self.buffer)?
}
}
for addr in del {
self.pending_inits.remove(&addr);
if self.peers.remove(&addr).is_some() {
self.peer_crypto.remove(&addr);
if self.coms.remove_peer(&addr) {
self.connect_sock(addr)?;
}
}
@ -523,9 +466,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
fn reset_own_addresses(&mut self) -> io::Result<()> {
self.own_addresses.clear();
let socket_addr = self.socket.address().map(mapped_addr)?;
let socket_addr = self.coms.get_address()?;
// 1) Specified advertise addresses
for addr in &self.config.advertise_addresses {
for addr in &self.config.get_config().advertise_addresses {
self.own_addresses.push(parse_listen(addr, socket_addr.port()));
}
// 2) Address of UDP socket
@ -542,7 +485,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
/// Stores the beacon
fn store_beacon(&mut self) -> Result<(), Error> {
if let Some(ref path) = self.config.beacon_store {
if let Some(ref path) = self.config.get_config().beacon_store {
let peers: SmallVec<[SocketAddr; 3]> =
self.own_addresses.choose_multiple(&mut thread_rng(), 3).cloned().collect();
if let Some(path) = path.strip_prefix('|') {
@ -561,7 +504,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
/// Loads the beacon
fn load_beacon(&mut self) -> Result<(), Error> {
let peers;
if let Some(ref path) = self.config.beacon_load {
if let Some(ref path) = self.config.get_config().beacon_load {
if let Some(path) = path.strip_prefix('|') {
self.beacon_serializer
.read_from_cmd(path, Some(50))
@ -591,19 +534,19 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
f.set_len(0)?;
writeln!(f, "peers:")?;
let now = TS::now();
for (addr, data) in &self.peers {
for (addr, data) in self.coms.get_peers().iter() {
writeln!(
f,
" - \"{}\": {{ ttl_secs: {}, crypto: {} }}",
addr_nice(*addr),
data.timeout - now,
data.crypto.algorithm_name()
self.peer_crypto.get(addr).unwrap().algorithm_name()
)?;
}
writeln!(f)?;
self.table.write_out(f)?;
self.coms.table.write_out(f)?;
writeln!(f)?;
self.traffic.write_out(f)?;
self.coms.traffic.write_out(f)?;
writeln!(f)?;
}
Ok(())
@ -612,15 +555,15 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
/// Sends the statistics to a statsd endpoint
fn send_stats_to_statsd(&mut self) -> Result<(), Error> {
if let Some(ref endpoint) = self.statsd_server {
let peer_traffic = self.traffic.total_peer_traffic();
let payload_traffic = self.traffic.total_payload_traffic();
let dropped = &self.traffic.dropped();
let prefix = self.config.statsd_prefix.as_ref().map(|s| s as &str).unwrap_or("vpncloud");
let peer_traffic = self.coms.traffic.total_peer_traffic();
let payload_traffic = self.coms.traffic.total_payload_traffic();
let dropped = &self.coms.traffic.dropped();
let prefix = self.config.get_config().statsd_prefix.as_ref().map(|s| s as &str).unwrap_or("vpncloud");
let msg = StatsdMsg::new()
.with_ns(prefix, |msg| {
msg.add("peer_count", self.peers.len(), "g");
msg.add("table_cache_entries", self.table.cache_len(), "g");
msg.add("table_claims", self.table.claim_len(), "g");
msg.add("peer_count", self.coms.get_peers().len(), "g");
msg.add("table_cache_entries", self.coms.table.cache_len(), "g");
msg.add("table_claims", self.coms.table.claim_len(), "g");
msg.with_ns("traffic", |msg| {
msg.with_ns("protocol", |msg| {
msg.with_ns("inbound", |msg| {
@ -653,14 +596,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
});
})
.build();
let msg_data = msg.as_bytes();
let addrs = resolve(endpoint)?;
if let Some(addr) = addrs.first() {
match self.socket.send(msg_data, *addr) {
Ok(written) if written == msg_data.len() => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
}?
self.coms.send_raw(msg.as_bytes(), *addr)?;
} else {
error!("Failed to resolve statsd server {}", endpoint);
}
@ -680,7 +618,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
for entry in &mut self.reconnect_peers {
// Schedule for next second if node is connected
for addr in &entry.resolved {
if self.peers.contains_key(addr) {
if self.coms.get_peers().contains_key(addr) {
entry.tries = 0;
entry.timeout = 1;
entry.next = now + 1;
@ -722,8 +660,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
pub fn iteration(&mut self) -> bool {
if let Ok(src) = self.socket.receive(&mut self.buffer)
{
if let Ok(src) = self.coms.receive(&mut self.buffer) {
match self.handle_message(src) {
Err(e @ Error::CryptoInitFatal(_)) => {
debug!("Fatal crypto init error from {}: {}", src, e);
@ -750,7 +687,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
error!("{}", e)
}
self.next_housekeep = now + 1;
if !self.running.load(Ordering::SeqCst) {
if !self.config.is_running() {
debug!("Socket: end");
return false;
}