mirror of https://github.com/dswd/vpncloud.git
Compare commits
2 Commits
427c426fde
...
aef78c1742
Author | SHA1 | Date |
---|---|---|
Dennis Schwerdel | aef78c1742 | |
Dennis Schwerdel | 3e7f3d94f5 |
|
@ -219,6 +219,15 @@ dependencies = [
|
||||||
"xmltree",
|
"xmltree",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "instant"
|
||||||
|
version = "0.1.9"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if 1.0.0",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "itoa"
|
name = "itoa"
|
||||||
version = "0.4.6"
|
version = "0.4.6"
|
||||||
|
@ -252,6 +261,15 @@ version = "0.5.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a"
|
checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "lock_api"
|
||||||
|
version = "0.4.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "dd96ffd135b2fd7b973ac026d28085defbe8983df057ced3eb4f2130b0831312"
|
||||||
|
dependencies = [
|
||||||
|
"scopeguard",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "log"
|
name = "log"
|
||||||
version = "0.4.11"
|
version = "0.4.11"
|
||||||
|
@ -298,6 +316,31 @@ version = "1.5.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0"
|
checksum = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "parking_lot"
|
||||||
|
version = "0.11.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb"
|
||||||
|
dependencies = [
|
||||||
|
"instant",
|
||||||
|
"lock_api",
|
||||||
|
"parking_lot_core",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "parking_lot_core"
|
||||||
|
version = "0.8.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d7c6d9b8427445284a09c55be860a15855ab580a417ccad9da88f5a06787ced0"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if 1.0.0",
|
||||||
|
"instant",
|
||||||
|
"libc",
|
||||||
|
"redox_syscall",
|
||||||
|
"smallvec",
|
||||||
|
"winapi",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "percent-encoding"
|
name = "percent-encoding"
|
||||||
version = "2.1.0"
|
version = "2.1.0"
|
||||||
|
@ -494,6 +537,12 @@ version = "1.0.5"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
|
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "scopeguard"
|
||||||
|
version = "1.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "semver"
|
name = "semver"
|
||||||
version = "0.9.0"
|
version = "0.9.0"
|
||||||
|
@ -854,6 +903,7 @@ dependencies = [
|
||||||
"igd",
|
"igd",
|
||||||
"libc",
|
"libc",
|
||||||
"log",
|
"log",
|
||||||
|
"parking_lot",
|
||||||
"privdrop",
|
"privdrop",
|
||||||
"rand 0.8.0",
|
"rand 0.8.0",
|
||||||
"ring",
|
"ring",
|
||||||
|
|
|
@ -30,6 +30,7 @@ privdrop = "0.5"
|
||||||
byteorder = "1.3"
|
byteorder = "1.3"
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
smallvec = "1.5"
|
smallvec = "1.5"
|
||||||
|
parking_lot = "*"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tempfile = "3"
|
tempfile = "3"
|
||||||
|
|
|
@ -42,10 +42,12 @@ use ring::{
|
||||||
aead::{self, LessSafeKey, UnboundKey},
|
aead::{self, LessSafeKey, UnboundKey},
|
||||||
rand::{SecureRandom, SystemRandom}
|
rand::{SecureRandom, SystemRandom}
|
||||||
};
|
};
|
||||||
|
use std::cell::UnsafeCell;
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
io::{Cursor, Read, Write},
|
io::{Cursor, Read, Write},
|
||||||
mem,
|
mem,
|
||||||
|
sync::atomic::{AtomicUsize, Ordering},
|
||||||
time::{Duration, Instant}
|
time::{Duration, Instant}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -126,35 +128,42 @@ impl CryptoKey {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Why this is safe:
|
||||||
|
// Only 2 of the 4 keys are accessed.
|
||||||
|
// Only the other two keys will ever be replaced and then become current.
|
||||||
|
// Between two replacements is enough time so that all calls using those keys are long done.
|
||||||
|
|
||||||
pub struct CryptoCore {
|
pub struct CryptoCore {
|
||||||
rand: SystemRandom,
|
rand: SystemRandom,
|
||||||
keys: [CryptoKey; 4],
|
keys: [UnsafeCell<CryptoKey>; 4],
|
||||||
current_key: usize,
|
current_key: AtomicUsize,
|
||||||
nonce_half: bool
|
nonce_half: bool,
|
||||||
|
algorithm: &'static aead::Algorithm
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CryptoCore {
|
impl CryptoCore {
|
||||||
pub fn new(key: LessSafeKey, nonce_half: bool) -> Self {
|
pub fn new(key: LessSafeKey, nonce_half: bool) -> Self {
|
||||||
let rand = SystemRandom::new();
|
let rand = SystemRandom::new();
|
||||||
|
let algorithm = key.algorithm();
|
||||||
let dummy_key_data = random_data(key.algorithm().key_len());
|
let dummy_key_data = random_data(key.algorithm().key_len());
|
||||||
let dummy_key1 = LessSafeKey::new(UnboundKey::new(key.algorithm(), &dummy_key_data).unwrap());
|
let dummy_key1 = LessSafeKey::new(UnboundKey::new(key.algorithm(), &dummy_key_data).unwrap());
|
||||||
let dummy_key2 = LessSafeKey::new(UnboundKey::new(key.algorithm(), &dummy_key_data).unwrap());
|
let dummy_key2 = LessSafeKey::new(UnboundKey::new(key.algorithm(), &dummy_key_data).unwrap());
|
||||||
let dummy_key3 = LessSafeKey::new(UnboundKey::new(key.algorithm(), &dummy_key_data).unwrap());
|
let dummy_key3 = LessSafeKey::new(UnboundKey::new(key.algorithm(), &dummy_key_data).unwrap());
|
||||||
Self {
|
Self {
|
||||||
keys: [
|
keys: [
|
||||||
CryptoKey::new(&rand, key, nonce_half),
|
UnsafeCell::new(CryptoKey::new(&rand, key, nonce_half)),
|
||||||
CryptoKey::new(&rand, dummy_key1, nonce_half),
|
UnsafeCell::new(CryptoKey::new(&rand, dummy_key1, nonce_half)),
|
||||||
CryptoKey::new(&rand, dummy_key2, nonce_half),
|
UnsafeCell::new(CryptoKey::new(&rand, dummy_key2, nonce_half)),
|
||||||
CryptoKey::new(&rand, dummy_key3, nonce_half)
|
UnsafeCell::new(CryptoKey::new(&rand, dummy_key3, nonce_half))
|
||||||
],
|
],
|
||||||
current_key: 0,
|
current_key: AtomicUsize::new(0),
|
||||||
nonce_half,
|
nonce_half,
|
||||||
rand
|
rand,
|
||||||
|
algorithm
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn encrypt(&mut self, buffer: &mut MsgBuffer) {
|
pub fn encrypt(&self, buffer: &mut MsgBuffer) {
|
||||||
let data_start = buffer.get_start();
|
let data_start = buffer.get_start();
|
||||||
let data_length = buffer.len();
|
let data_length = buffer.len();
|
||||||
assert!(buffer.get_start() >= EXTRA_LEN);
|
assert!(buffer.get_start() >= EXTRA_LEN);
|
||||||
|
@ -162,11 +171,12 @@ impl CryptoCore {
|
||||||
buffer.set_length(data_length + EXTRA_LEN + TAG_LEN);
|
buffer.set_length(data_length + EXTRA_LEN + TAG_LEN);
|
||||||
let (extra, data_and_tag) = buffer.message_mut().split_at_mut(EXTRA_LEN);
|
let (extra, data_and_tag) = buffer.message_mut().split_at_mut(EXTRA_LEN);
|
||||||
let (data, tag_space) = data_and_tag.split_at_mut(data_length);
|
let (data, tag_space) = data_and_tag.split_at_mut(data_length);
|
||||||
let key = &mut self.keys[self.current_key];
|
let current_key = self.current_key.load(Ordering::SeqCst);
|
||||||
|
let key = unsafe { self.keys[current_key].get().as_mut().unwrap() };
|
||||||
key.send_nonce.increment();
|
key.send_nonce.increment();
|
||||||
{
|
{
|
||||||
let mut extra = Cursor::new(extra);
|
let mut extra = Cursor::new(extra);
|
||||||
extra.write_u8(self.current_key as u8).unwrap();
|
extra.write_u8(current_key as u8).unwrap();
|
||||||
extra.write_all(&key.send_nonce.as_bytes()[5..]).unwrap();
|
extra.write_all(&key.send_nonce.as_bytes()[5..]).unwrap();
|
||||||
}
|
}
|
||||||
let nonce = aead::Nonce::assume_unique_for_key(*key.send_nonce.as_bytes());
|
let nonce = aead::Nonce::assume_unique_for_key(*key.send_nonce.as_bytes());
|
||||||
|
@ -190,7 +200,7 @@ impl CryptoCore {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn decrypt(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> {
|
pub fn decrypt(&self, buffer: &mut MsgBuffer) -> Result<(), Error> {
|
||||||
assert!(buffer.len() >= EXTRA_LEN + TAG_LEN);
|
assert!(buffer.len() >= EXTRA_LEN + TAG_LEN);
|
||||||
let (extra, data_and_tag) = buffer.message_mut().split_at_mut(EXTRA_LEN);
|
let (extra, data_and_tag) = buffer.message_mut().split_at_mut(EXTRA_LEN);
|
||||||
let key_id;
|
let key_id;
|
||||||
|
@ -202,34 +212,37 @@ impl CryptoCore {
|
||||||
extra.read_exact(&mut nonce.0[5..]).map_err(|_| Error::Crypto("Input data too short"))?;
|
extra.read_exact(&mut nonce.0[5..]).map_err(|_| Error::Crypto("Input data too short"))?;
|
||||||
nonce.set_msb(if self.nonce_half { 0x00 } else { 0x80 });
|
nonce.set_msb(if self.nonce_half { 0x00 } else { 0x80 });
|
||||||
}
|
}
|
||||||
let key = &mut self.keys[key_id as usize];
|
let key = unsafe { self.keys[key_id as usize].get().as_mut().unwrap() };
|
||||||
let result = Self::decrypt_with_key(key, nonce, data_and_tag);
|
let result = Self::decrypt_with_key(key, nonce, data_and_tag);
|
||||||
buffer.set_start(buffer.get_start() + EXTRA_LEN);
|
buffer.set_start(buffer.get_start() + EXTRA_LEN);
|
||||||
buffer.set_length(buffer.len() - TAG_LEN);
|
buffer.set_length(buffer.len() - TAG_LEN);
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn rotate_key(&mut self, key: LessSafeKey, id: u64, use_for_sending: bool) {
|
pub fn rotate_key(&self, key: LessSafeKey, id: u64, use_for_sending: bool) {
|
||||||
debug!("Rotated key {} (use for sending: {})", id, use_for_sending);
|
debug!("Rotated key {} (use for sending: {})", id, use_for_sending);
|
||||||
let id = (id % 4) as usize;
|
let id = (id % 4) as usize;
|
||||||
self.keys[id] = CryptoKey::new(&self.rand, key, self.nonce_half);
|
let mut new_key = CryptoKey::new(&self.rand, key, self.nonce_half);
|
||||||
|
let stored_key = unsafe { self.keys[id].get().as_mut().unwrap() };
|
||||||
|
mem::swap(&mut new_key, stored_key);
|
||||||
if use_for_sending {
|
if use_for_sending {
|
||||||
self.current_key = id
|
self.current_key.store(id, Ordering::SeqCst);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn algorithm(&self) -> &'static aead::Algorithm {
|
pub fn algorithm(&self) -> &'static aead::Algorithm {
|
||||||
self.keys[self.current_key].key.algorithm()
|
self.algorithm
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn every_second(&mut self) {
|
pub fn every_second(&self) {
|
||||||
// Set min nonce on all keys
|
// Set min nonce on all keys
|
||||||
for k in &mut self.keys {
|
for k in &self.keys {
|
||||||
k.update_min_nonce();
|
unsafe { k.get().as_mut().unwrap().update_min_nonce() };
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
unsafe impl Sync for CryptoCore {}
|
||||||
|
|
||||||
pub fn create_dummy_pair(algo: &'static aead::Algorithm) -> (CryptoCore, CryptoCore) {
|
pub fn create_dummy_pair(algo: &'static aead::Algorithm) -> (CryptoCore, CryptoCore) {
|
||||||
let key_data = random_data(algo.key_len());
|
let key_data = random_data(algo.key_len());
|
||||||
|
|
|
@ -2,7 +2,7 @@ mod core;
|
||||||
mod init;
|
mod init;
|
||||||
mod rotate;
|
mod rotate;
|
||||||
|
|
||||||
pub use self::core::{EXTRA_LEN, TAG_LEN};
|
pub use self::core::{EXTRA_LEN, TAG_LEN, CryptoCore};
|
||||||
use self::{
|
use self::{
|
||||||
core::{test_speed, CryptoCore},
|
core::{test_speed, CryptoCore},
|
||||||
init::{InitResult, InitState, CLOSING},
|
init::{InitResult, InitState, CLOSING},
|
||||||
|
|
|
@ -1,50 +1,115 @@
|
||||||
use std::{marker::PhantomData, sync::Arc};
|
use super::{
|
||||||
|
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
|
||||||
use super::SPACE_BEFORE;
|
SPACE_BEFORE
|
||||||
use super::shared::SharedData;
|
};
|
||||||
use crate::{
|
use crate::{
|
||||||
device::Device,
|
device::Device,
|
||||||
error::Error,
|
error::Error,
|
||||||
util::{MsgBuffer, Time, TimeSource}
|
messages::MESSAGE_TYPE_DATA,
|
||||||
|
net::Socket,
|
||||||
|
util::{MsgBuffer, Time, TimeSource},
|
||||||
|
Protocol
|
||||||
};
|
};
|
||||||
|
use std::{marker::PhantomData, net::SocketAddr};
|
||||||
|
|
||||||
|
pub struct DeviceThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
|
||||||
const SYNC_INTERVAL: Time = 1;
|
// Read-only fields
|
||||||
|
_dummy_ts: PhantomData<TS>,
|
||||||
pub struct DeviceThread<D: Device, T: TimeSource> {
|
_dummy_p: PhantomData<P>,
|
||||||
shared: Arc<SharedData>,
|
broadcast: bool,
|
||||||
|
// Device-only fields
|
||||||
|
socket: S,
|
||||||
device: D,
|
device: D,
|
||||||
next_sync: Time,
|
next_housekeep: Time,
|
||||||
_dummy: PhantomData<T>
|
// Shared fields
|
||||||
|
traffic: SharedTraffic,
|
||||||
|
peer_crypto: SharedPeerCrypto,
|
||||||
|
table: SharedTable<TS>
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<D: Device, T: TimeSource> DeviceThread<D, T> {
|
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS> {
|
||||||
fn sync(&mut self) {
|
#[inline]
|
||||||
// TODO sync
|
fn send_to(&mut self, addr: SocketAddr, msg: &mut MsgBuffer) -> Result<(), Error> {
|
||||||
|
debug!("Sending msg with {} bytes to {}", msg.len(), addr);
|
||||||
|
self.traffic.count_out_traffic(addr, msg.len());
|
||||||
|
match self.socket.send(msg.message(), addr) {
|
||||||
|
Ok(written) if written == msg.len() => Ok(()),
|
||||||
|
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
||||||
|
Err(e) => Err(Error::SocketIo("IOError when sending", e))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_device_packet(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> {
|
#[inline]
|
||||||
// TODO: read data
|
fn send_msg(&mut self, addr: SocketAddr, type_: u8, msg: &mut MsgBuffer) -> Result<(), Error> {
|
||||||
// use 5sec timeout
|
debug!("Sending msg with {} bytes to {}", msg.len(), addr);
|
||||||
unimplemented!();
|
if self.peer_crypto.send_message(addr, type_, msg)? {
|
||||||
|
self.send_to(addr, msg)
|
||||||
|
} else {
|
||||||
|
Err(Error::Message("Sending to node that is not a peer"))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn forward_packet(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> {
|
#[inline]
|
||||||
// TODO: handle data
|
fn broadcast_msg(&mut self, type_: u8, msg: &mut MsgBuffer) -> Result<(), Error> {
|
||||||
|
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, msg.len(), self.peer_crypto.count());
|
||||||
|
let mut msg_data = MsgBuffer::new(100);
|
||||||
|
self.peer_crypto.for_each(|addr, crypto| {
|
||||||
|
msg_data.set_start(msg.get_start());
|
||||||
|
msg_data.set_length(msg.len());
|
||||||
|
msg_data.message_mut().clone_from_slice(msg.message());
|
||||||
|
crypto.send_message(type_, &mut msg_data)?;
|
||||||
|
self.traffic.count_out_traffic(addr, msg_data.len());
|
||||||
|
match self.socket.send(msg_data.message(), addr) {
|
||||||
|
Ok(written) if written == msg_data.len() => Ok(()),
|
||||||
|
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
||||||
|
Err(e) => Err(Error::SocketIo("IOError when sending", e))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn forward_packet(&mut self, data: &mut MsgBuffer) -> Result<(), Error> {
|
||||||
|
let (src, dst) = P::parse(data.message())?;
|
||||||
|
debug!("Read data from interface: src: {}, dst: {}, {} bytes", src, dst, data.len());
|
||||||
|
self.traffic.count_out_payload(dst, src, data.len());
|
||||||
|
match self.table.lookup(dst) {
|
||||||
|
Some(addr) => {
|
||||||
|
// Peer found for destination
|
||||||
|
debug!("Found destination for {} => {}", dst, addr);
|
||||||
|
self.send_msg(addr, MESSAGE_TYPE_DATA, data)?;
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
if self.broadcast {
|
||||||
|
debug!("No destination for {} found, broadcasting", dst);
|
||||||
|
self.broadcast_msg(MESSAGE_TYPE_DATA, data)?;
|
||||||
|
} else {
|
||||||
|
debug!("No destination for {} found, dropping", dst);
|
||||||
|
self.traffic.count_dropped_payload(data.len());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn housekeep(&mut self) -> Result<(), Error> {
|
||||||
|
self.peer_crypto.sync();
|
||||||
|
self.table.sync();
|
||||||
|
self.traffic.sync();
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run(mut self) {
|
pub fn run(mut self) {
|
||||||
let mut buffer = MsgBuffer::new(SPACE_BEFORE);
|
let mut buffer = MsgBuffer::new(SPACE_BEFORE);
|
||||||
loop {
|
loop {
|
||||||
try_fail!(self.read_device_packet(&mut buffer), "Failed to read from device: {}");
|
try_fail!(self.device.read(&mut buffer), "Failed to read from device: {}");
|
||||||
if let Err(e) = self.forward_packet(&mut buffer) {
|
if let Err(e) = self.forward_packet(&mut buffer) {
|
||||||
error!("{}", e);
|
error!("{}", e);
|
||||||
}
|
}
|
||||||
let now = T::now();
|
let now = TS::now();
|
||||||
if self.next_sync < now {
|
if self.next_housekeep < TS::now() {
|
||||||
self.sync();
|
if let Err(e) = self.housekeep() {
|
||||||
self.next_sync = now + SYNC_INTERVAL
|
error!("{}", e)
|
||||||
|
}
|
||||||
|
self.next_housekeep = TS::now() + 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1 +1,106 @@
|
||||||
pub struct SharedData {}
|
use crate::error::Error;
|
||||||
|
use crate::{
|
||||||
|
crypto::CryptoCore,
|
||||||
|
engine::{Hash, PeerData, TimeSource},
|
||||||
|
messages::NodeInfo,
|
||||||
|
table::ClaimTable,
|
||||||
|
traffic::TrafficStats,
|
||||||
|
types::{Address, NodeId, RangeList},
|
||||||
|
util::MsgBuffer
|
||||||
|
};
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
|
||||||
|
|
||||||
|
pub struct SharedPeerCrypto {
|
||||||
|
peers: Arc<Mutex<HashMap<SocketAddr, CryptoCore, Hash>>>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SharedPeerCrypto {
|
||||||
|
pub fn sync(&mut self) {
|
||||||
|
// TODO sync if needed
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send_message(&mut self, peer: SocketAddr, type_: u8, data: &mut MsgBuffer) -> Result<bool, Error> {
|
||||||
|
let mut peers = self.peers.lock();
|
||||||
|
if let Some(peer) = peers.get_mut(&peer) {
|
||||||
|
peer.send_message(type_, data);
|
||||||
|
Ok(true)
|
||||||
|
} else {
|
||||||
|
Ok(false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn for_each(&mut self, mut callback: impl FnMut(SocketAddr, &mut PeerCrypto<NodeInfo>) -> Result<(), Error>) -> Result<(), Error> {
|
||||||
|
let mut peers = self.peers.lock();
|
||||||
|
for (k, v) in peers.iter_mut() {
|
||||||
|
callback(*k, v)?
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn count(&self) -> usize {
|
||||||
|
self.peers.lock().len()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub struct SharedTraffic {
|
||||||
|
traffic: Arc<Mutex<TrafficStats>>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SharedTraffic {
|
||||||
|
pub fn sync(&mut self) {
|
||||||
|
// TODO sync if needed
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn count_out_traffic(&self, peer: SocketAddr, bytes: usize) {
|
||||||
|
self.traffic.lock().count_out_traffic(peer, bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn count_in_traffic(&self, peer: SocketAddr, bytes: usize) {
|
||||||
|
self.traffic.lock().count_in_traffic(peer, bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn count_out_payload(&self, remote: Address, local: Address, bytes: usize) {
|
||||||
|
self.traffic.lock().count_out_payload(remote, local, bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn count_in_payload(&self, remote: Address, local: Address, bytes: usize) {
|
||||||
|
self.traffic.lock().count_in_payload(remote, local, bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn count_dropped_payload(&self, bytes: usize) {
|
||||||
|
self.traffic.lock().count_dropped_payload(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn count_invalid_protocol(&self, bytes: usize) {
|
||||||
|
self.traffic.lock().count_invalid_protocol(bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub struct SharedTable<TS: TimeSource> {
|
||||||
|
table: Arc<Mutex<ClaimTable<TS>>>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TS: TimeSource> SharedTable<TS> {
|
||||||
|
pub fn sync(&mut self) {
|
||||||
|
// TODO sync if needed
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn lookup(&self, addr: Address) -> Option<SocketAddr> {
|
||||||
|
self.table.lock().lookup(addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_claims(&self, peer: SocketAddr, claims: RangeList) {
|
||||||
|
self.table.lock().set_claims(peer, claims)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_claims(&self, peer: SocketAddr) {
|
||||||
|
self.table.lock().remove_claims(peer)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn cache(&self, addr: Address, peer: SocketAddr) {
|
||||||
|
self.table.lock().cache(addr, peer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,50 +1,308 @@
|
||||||
use crate::error::Error;
|
use super::{
|
||||||
use std::{marker::PhantomData, net::SocketAddr, sync::Arc};
|
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
|
||||||
|
SPACE_BEFORE
|
||||||
use super::{shared::SharedData, SPACE_BEFORE};
|
|
||||||
use crate::{
|
|
||||||
net::Socket,
|
|
||||||
util::{MsgBuffer, Time, TimeSource}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
config::DEFAULT_PEER_TIMEOUT,
|
||||||
|
crypto::{is_init_message, MessageResult, PeerCrypto},
|
||||||
|
engine::{addr_nice, resolve, Hash, PeerData},
|
||||||
|
error::Error,
|
||||||
|
messages::{AddrList, NodeInfo, PeerInfo},
|
||||||
|
net::{mapped_addr, Socket},
|
||||||
|
types::{NodeId, RangeList},
|
||||||
|
util::{MsgBuffer, Time, TimeSource},
|
||||||
|
Config, Crypto, Device, Protocol
|
||||||
|
};
|
||||||
|
use rand::{seq::SliceRandom};
|
||||||
|
use smallvec::{smallvec, SmallVec};
|
||||||
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
fmt,
|
||||||
|
io::Cursor,
|
||||||
|
marker::PhantomData,
|
||||||
|
net::{SocketAddr, ToSocketAddrs},
|
||||||
|
};
|
||||||
|
|
||||||
const SYNC_INTERVAL: Time = 1;
|
pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
|
||||||
|
// Read-only fields
|
||||||
pub struct SocketThread<S: Socket, T: TimeSource> {
|
node_id: NodeId,
|
||||||
shared: Arc<SharedData>,
|
claims: RangeList,
|
||||||
|
config: Config,
|
||||||
|
peer_timeout_publish: u16,
|
||||||
|
learning: bool,
|
||||||
|
_dummy_ts: PhantomData<TS>,
|
||||||
|
_dummy_p: PhantomData<P>,
|
||||||
|
// Socket-only fields
|
||||||
socket: S,
|
socket: S,
|
||||||
next_sync: Time,
|
device: D,
|
||||||
_dummy: PhantomData<T>
|
next_housekeep: Time,
|
||||||
|
own_addresses: AddrList,
|
||||||
|
pending_inits: HashMap<SocketAddr, PeerCrypto<NodeInfo>, Hash>,
|
||||||
|
crypto: Crypto,
|
||||||
|
peers: HashMap<SocketAddr, PeerData, Hash>,
|
||||||
|
// Shared fields
|
||||||
|
traffic: SharedTraffic,
|
||||||
|
table: SharedTable<TS>
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: Socket, T: TimeSource> SocketThread<S, T> {
|
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS> {
|
||||||
fn sync(&mut self) {
|
#[inline]
|
||||||
|
fn send_to(&mut self, addr: SocketAddr, msg: &mut MsgBuffer) -> Result<(), Error> {
|
||||||
|
debug!("Sending msg with {} bytes to {}", msg.len(), addr);
|
||||||
|
self.traffic.count_out_traffic(addr, msg.len());
|
||||||
|
match self.socket.send(msg.message(), addr) {
|
||||||
|
Ok(written) if written == msg.len() => Ok(()),
|
||||||
|
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
||||||
|
Err(e) => Err(Error::SocketIo("IOError when sending", e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn connect_sock(&mut self, addr: SocketAddr) -> Result<(), Error> {
|
||||||
|
let addr = mapped_addr(addr);
|
||||||
|
if self.peers.contains_key(&addr)
|
||||||
|
|| self.own_addresses.contains(&addr)
|
||||||
|
|| self.pending_inits.contains_key(&addr)
|
||||||
|
{
|
||||||
|
return Ok(())
|
||||||
|
}
|
||||||
|
debug!("Connecting to {:?}", addr);
|
||||||
|
let payload = self.create_node_info();
|
||||||
|
let mut peer_crypto = self.crypto.peer_instance(payload);
|
||||||
|
let mut msg = MsgBuffer::new(SPACE_BEFORE);
|
||||||
|
peer_crypto.initialize(&mut msg)?;
|
||||||
|
self.pending_inits.insert(addr, peer_crypto);
|
||||||
|
self.send_to(addr, &mut msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn connect<Addr: ToSocketAddrs + fmt::Debug + Clone>(&mut self, addr: Addr) -> Result<(), Error> {
|
||||||
|
let addrs = resolve(&addr)?.into_iter().map(mapped_addr).collect::<SmallVec<[SocketAddr; 3]>>();
|
||||||
|
for addr in &addrs {
|
||||||
|
if self.own_addresses.contains(addr)
|
||||||
|
|| self.peers.contains_key(addr)
|
||||||
|
|| self.pending_inits.contains_key(addr)
|
||||||
|
{
|
||||||
|
return Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Send a message to each resolved address
|
||||||
|
for a in addrs {
|
||||||
|
// Ignore error this time
|
||||||
|
self.connect_sock(a).ok();
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_node_info(&self) -> NodeInfo {
|
||||||
|
let mut peers = smallvec![];
|
||||||
|
for peer in self.peers.values() {
|
||||||
|
peers.push(PeerInfo { node_id: Some(peer.node_id), addrs: peer.addrs.clone() })
|
||||||
|
}
|
||||||
|
if peers.len() > 20 {
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
peers.partial_shuffle(&mut rng, 20);
|
||||||
|
peers.truncate(20);
|
||||||
|
}
|
||||||
|
NodeInfo {
|
||||||
|
node_id: self.node_id,
|
||||||
|
peers,
|
||||||
|
claims: self.claims.clone(),
|
||||||
|
peer_timeout: Some(self.peer_timeout_publish),
|
||||||
|
addrs: self.own_addresses.clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_peer_info(&mut self, addr: SocketAddr, info: Option<NodeInfo>) -> Result<(), Error> {
|
||||||
|
if let Some(peer) = self.peers.get_mut(&addr) {
|
||||||
|
peer.last_seen = TS::now();
|
||||||
|
peer.timeout = TS::now() + self.config.peer_timeout as Time
|
||||||
|
} else {
|
||||||
|
error!("Received peer update from non peer {}", addr_nice(addr));
|
||||||
|
return Ok(())
|
||||||
|
}
|
||||||
|
if let Some(info) = info {
|
||||||
|
debug!("Adding claims of peer {}: {:?}", addr_nice(addr), info.claims);
|
||||||
|
self.table.set_claims(addr, info.claims);
|
||||||
|
debug!("Received {} peers from {}: {:?}", info.peers.len(), addr_nice(addr), info.peers);
|
||||||
|
self.connect_to_peers(&info.peers)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_new_peer(&mut self, addr: SocketAddr, info: NodeInfo) -> Result<(), Error> {
|
||||||
|
info!("Added peer {}", addr_nice(addr));
|
||||||
|
if let Some(init) = self.pending_inits.remove(&addr) {
|
||||||
|
self.peers.insert(addr, PeerData {
|
||||||
|
addrs: info.addrs.clone(),
|
||||||
|
crypto: init,
|
||||||
|
node_id: info.node_id,
|
||||||
|
peer_timeout: info.peer_timeout.unwrap_or(DEFAULT_PEER_TIMEOUT),
|
||||||
|
last_seen: TS::now(),
|
||||||
|
timeout: TS::now() + self.config.peer_timeout as Time
|
||||||
|
});
|
||||||
|
self.update_peer_info(addr, Some(info))?;
|
||||||
|
} else {
|
||||||
|
error!("No init for new peer {}", addr_nice(addr));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn connect_to_peers(&mut self, peers: &[PeerInfo]) -> Result<(), Error> {
|
||||||
|
'outer: for peer in peers {
|
||||||
|
for addr in &peer.addrs {
|
||||||
|
if self.peers.contains_key(addr) {
|
||||||
|
continue 'outer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(node_id) = peer.node_id {
|
||||||
|
if self.node_id == node_id {
|
||||||
|
continue 'outer
|
||||||
|
}
|
||||||
|
for p in self.peers.values() {
|
||||||
|
if p.node_id == node_id {
|
||||||
|
continue 'outer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.connect(&peer.addrs as &[SocketAddr])?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_peer(&mut self, addr: SocketAddr) {
|
||||||
|
if let Some(_peer) = self.peers.remove(&addr) {
|
||||||
|
info!("Closing connection to {}", addr_nice(addr));
|
||||||
|
self.table.remove_claims(addr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_payload_from(&mut self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
|
||||||
|
let (src, dst) = P::parse(data.message())?;
|
||||||
|
let len = data.len();
|
||||||
|
debug!("Writing data to device: {} bytes", len);
|
||||||
|
self.traffic.count_in_payload(src, dst, len);
|
||||||
|
if let Err(e) = self.device.write(data) {
|
||||||
|
error!("Failed to send via device: {}", e);
|
||||||
|
return Err(e)
|
||||||
|
}
|
||||||
|
if self.learning {
|
||||||
|
// Learn single address
|
||||||
|
self.table.cache(src, peer);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn process_message(
|
||||||
|
&mut self, src: SocketAddr, msg_result: MessageResult<NodeInfo>, data: &mut MsgBuffer
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
match msg_result {
|
||||||
|
MessageResult::Message(type_) => {
|
||||||
|
match type_ {
|
||||||
|
MESSAGE_TYPE_DATA => self.handle_payload_from(src, data)?,
|
||||||
|
MESSAGE_TYPE_NODE_INFO => {
|
||||||
|
let info = match NodeInfo::decode(Cursor::new(data.message())) {
|
||||||
|
Ok(val) => val,
|
||||||
|
Err(err) => {
|
||||||
|
self.traffic.count_invalid_protocol(data.len());
|
||||||
|
return Err(err)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
self.update_peer_info(src, Some(info))?
|
||||||
|
}
|
||||||
|
MESSAGE_TYPE_KEEPALIVE => self.update_peer_info(src, None)?,
|
||||||
|
MESSAGE_TYPE_CLOSE => self.remove_peer(src),
|
||||||
|
_ => {
|
||||||
|
self.traffic.count_invalid_protocol(data.len());
|
||||||
|
return Err(Error::Message("Unknown message type"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
MessageResult::Initialized(info) => self.add_new_peer(src, info)?,
|
||||||
|
MessageResult::InitializedWithReply(info) => {
|
||||||
|
self.add_new_peer(src, info)?;
|
||||||
|
self.send_to(src, data)?
|
||||||
|
}
|
||||||
|
MessageResult::Reply => self.send_to(src, data)?,
|
||||||
|
MessageResult::None => ()
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_message(&mut self, src: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
|
||||||
|
let src = mapped_addr(src);
|
||||||
|
debug!("Received {} bytes from {}", data.len(), src);
|
||||||
|
let msg_result = if let Some(init) = self.pending_inits.get_mut(&src) {
|
||||||
|
init.handle_message(data)
|
||||||
|
} else if is_init_message(data.message()) {
|
||||||
|
let mut result = None;
|
||||||
|
if let Some(peer) = self.peers.get_mut(&src) {
|
||||||
|
if peer.crypto.has_init() {
|
||||||
|
result = Some(peer.crypto.handle_message(data))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(result) = result {
|
||||||
|
result
|
||||||
|
} else {
|
||||||
|
let mut init = self.crypto.peer_instance(self.create_node_info());
|
||||||
|
let msg_result = init.handle_message(data);
|
||||||
|
match msg_result {
|
||||||
|
Ok(res) => {
|
||||||
|
self.pending_inits.insert(src, init);
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
self.traffic.count_invalid_protocol(data.len());
|
||||||
|
return Err(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if let Some(peer) = self.peers.get_mut(&src) {
|
||||||
|
peer.crypto.handle_message(data)
|
||||||
|
} else {
|
||||||
|
info!("Ignoring non-init message from unknown peer {}", addr_nice(src));
|
||||||
|
self.traffic.count_invalid_protocol(data.len());
|
||||||
|
return Ok(())
|
||||||
|
};
|
||||||
|
match msg_result {
|
||||||
|
Ok(val) => self.process_message(src, val, data),
|
||||||
|
Err(err) => {
|
||||||
|
self.traffic.count_invalid_protocol(data.len());
|
||||||
|
Err(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn housekeep(&mut self) -> Result<(), Error> {
|
||||||
|
// self.shared.sync();
|
||||||
// TODO: sync
|
// TODO: sync
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_socket_data(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, Error> {
|
|
||||||
// TODO: read data
|
|
||||||
// use 5sec timeout
|
|
||||||
unimplemented!();
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_message(&mut self, src: SocketAddr, buffer: &mut MsgBuffer) -> Result<(), Error> {
|
|
||||||
// TODO: handle data
|
|
||||||
unimplemented!();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn run(mut self) {
|
pub fn run(mut self) {
|
||||||
let mut buffer = MsgBuffer::new(SPACE_BEFORE);
|
let mut buffer = MsgBuffer::new(SPACE_BEFORE);
|
||||||
loop {
|
loop {
|
||||||
let addr = try_fail!(self.read_socket_data(&mut buffer), "Failed to read from socket: {}");
|
let src = try_fail!(self.socket.receive(&mut buffer), "Failed to read from network socket: {}");
|
||||||
if let Err(e) = self.handle_message(addr, &mut buffer) {
|
match self.handle_message(src, &mut buffer) {
|
||||||
|
Err(e @ Error::CryptoInitFatal(_)) => {
|
||||||
|
debug!("Fatal crypto init error from {}: {}", src, e);
|
||||||
|
info!("Closing pending connection to {} due to error in crypto init", addr_nice(src));
|
||||||
|
self.pending_inits.remove(&src);
|
||||||
|
}
|
||||||
|
Err(e @ Error::CryptoInit(_)) => {
|
||||||
|
debug!("Recoverable init error from {}: {}", src, e);
|
||||||
|
info!("Ignoring invalid init message from peer {}", addr_nice(src));
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
error!("{}", e);
|
error!("{}", e);
|
||||||
}
|
}
|
||||||
let now = T::now();
|
Ok(_) => {}
|
||||||
if self.next_sync < now {
|
}
|
||||||
self.sync();
|
let now = TS::now();
|
||||||
self.next_sync = now + SYNC_INTERVAL
|
if self.next_housekeep < TS::now() {
|
||||||
|
if let Err(e) = self.housekeep() {
|
||||||
|
error!("{}", e)
|
||||||
|
}
|
||||||
|
self.next_housekeep = TS::now() + 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue