Compare commits

..

No commits in common. "68f4cb778723d3684f93731be4cd8f292ba899a8" and "da9befe23504fcab3839f374258873b60a274475" have entirely different histories.

13 changed files with 390 additions and 447 deletions

View File

@ -14,19 +14,8 @@ mod types {
mod table { mod table {
include!("../src/table.rs"); include!("../src/table.rs");
} }
mod engine { mod cloud {
pub mod common { include!("../src/cloud.rs");
include!("../src/engine/common.rs");
}
mod shared {
include!("../src/engine/shared.rs");
}
mod device_thread {
include!("../src/engine/device_thread.rs");
}
mod socket_thread {
include!("../src/engine/socket_thread.rs");
}
} }
mod config { mod config {
include!("../src/config.rs"); include!("../src/config.rs");

View File

@ -1,30 +1,26 @@
#![allow(dead_code, unused_macros, unused_imports)] #![allow(dead_code, unused_macros, unused_imports)]
#[macro_use] #[macro_use] extern crate serde;
extern crate serde; #[macro_use] extern crate log;
#[macro_use]
extern crate log;
#[macro_use]
extern crate tokio;
use criterion::{criterion_group, criterion_main, Criterion, Throughput}; use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use ring::aead;
use smallvec::smallvec; use smallvec::smallvec;
use ring::aead;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, UdpSocket};
use std::str::FromStr; use std::str::FromStr;
use std::net::{SocketAddr, Ipv4Addr, SocketAddrV4, UdpSocket};
include!(".code.rs"); include!(".code.rs");
use config::Config;
use crypto::core::{create_dummy_pair, EXTRA_LEN};
use device::Type;
pub use error::Error; pub use error::Error;
use payload::{Frame, Packet, Protocol};
use table::ClaimTable;
use tests::common::{TapSimulator, TunSimulator};
use types::{Address, Range};
use util::{MockTimeSource, MsgBuffer}; use util::{MockTimeSource, MsgBuffer};
use types::{Address, Range};
use table::{ClaimTable};
use device::Type;
use config::Config;
use payload::{Packet, Frame, Protocol};
use crypto::core::{create_dummy_pair, EXTRA_LEN};
use tests::common::{TunSimulator, TapSimulator};
fn udp_send(c: &mut Criterion) { fn udp_send(c: &mut Criterion) {
let sock = UdpSocket::bind("127.0.0.1:0").unwrap(); let sock = UdpSocket::bind("127.0.0.1:0").unwrap();
@ -51,7 +47,7 @@ fn decode_ipv4(c: &mut Criterion) {
fn decode_ipv6(c: &mut Criterion) { fn decode_ipv6(c: &mut Criterion) {
let data = [ let data = [
0x60, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 6, 5, 0x60, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 6, 5,
4, 3, 2, 1, 4, 3, 2, 1
]; ];
let mut g = c.benchmark_group("payload"); let mut g = c.benchmark_group("payload");
g.throughput(Throughput::Bytes(1400)); g.throughput(Throughput::Bytes(1400));
@ -111,9 +107,9 @@ fn lookup_cold(c: &mut Criterion) {
fn crypto_bench(c: &mut Criterion, algo: &'static aead::Algorithm) { fn crypto_bench(c: &mut Criterion, algo: &'static aead::Algorithm) {
let mut buffer = MsgBuffer::new(EXTRA_LEN); let mut buffer = MsgBuffer::new(EXTRA_LEN);
buffer.set_length(1400); buffer.set_length(1400);
let (sender, receiver) = create_dummy_pair(algo); let (mut sender, mut receiver) = create_dummy_pair(algo);
let mut g = c.benchmark_group("crypto"); let mut g = c.benchmark_group("crypto");
g.throughput(Throughput::Bytes(2 * 1400)); g.throughput(Throughput::Bytes(2*1400));
g.bench_function(format!("{:?}", algo), |b| { g.bench_function(format!("{:?}", algo), |b| {
b.iter(|| { b.iter(|| {
sender.encrypt(&mut buffer); sender.encrypt(&mut buffer);
@ -136,7 +132,6 @@ fn crypto_aes256(c: &mut Criterion) {
} }
fn full_communication_tun_router(c: &mut Criterion) { fn full_communication_tun_router(c: &mut Criterion) {
let runtime = tokio::runtime::Runtime::new().unwrap();
log::set_max_level(log::LevelFilter::Error); log::set_max_level(log::LevelFilter::Error);
let config1 = Config { let config1 = Config {
device_type: Type::Tun, device_type: Type::Tun,
@ -151,78 +146,59 @@ fn full_communication_tun_router(c: &mut Criterion) {
..Config::default() ..Config::default()
}; };
let mut sim = TunSimulator::new(); let mut sim = TunSimulator::new();
let node1 = sim.add_node(false, &config1);
let node2 = sim.add_node(false, &config2);
let (node1, node2) = runtime.block_on(async { sim.connect(node1, node2);
log::set_max_level(log::LevelFilter::Error); sim.simulate_all_messages();
let node1 = sim.add_node(false, &config1).await; assert!(sim.is_connected(node1, node2));
let node2 = sim.add_node(false, &config2).await; assert!(sim.is_connected(node2, node1));
sim.connect(node1, node2).await;
sim.simulate_all_messages().await;
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
(node1, node2)
});
let mut payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2]; let mut payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2];
payload.append(&mut vec![0; 1400]); payload.append(&mut vec![0; 1400]);
let mut g = c.benchmark_group("full_communication"); let mut g = c.benchmark_group("full_communication");
g.throughput(Throughput::Bytes(2 * 1400)); g.throughput(Throughput::Bytes(2*1400));
g.bench_function("tun_router", |b| { g.bench_function("tun_router", |b| {
b.iter(|| runtime.block_on(async { b.iter(|| {
sim.put_payload(node1, payload.clone()).await; sim.put_payload(node1, payload.clone());
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert_eq!(Some(&payload), sim.pop_payload(node2).as_ref()); assert_eq!(Some(&payload), sim.pop_payload(node2).as_ref());
})); });
}); });
g.finish() g.finish()
} }
fn full_communication_tap_switch(c: &mut Criterion) { fn full_communication_tap_switch(c: &mut Criterion) {
let runtime = tokio::runtime::Runtime::new().unwrap();
log::set_max_level(log::LevelFilter::Error); log::set_max_level(log::LevelFilter::Error);
let config = Config { device_type: Type::Tap, ..Config::default() }; let config = Config { device_type: Type::Tap, ..Config::default() };
let mut sim = TapSimulator::new(); let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config);
let (node1, node2) = runtime.block_on(async { sim.connect(node1, node2);
log::set_max_level(log::LevelFilter::Error); sim.simulate_all_messages();
let node1 = sim.add_node(false, &config).await; assert!(sim.is_connected(node1, node2));
let node2 = sim.add_node(false, &config).await; assert!(sim.is_connected(node2, node1));
sim.connect(node1, node2).await;
sim.simulate_all_messages().await;
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
(node1, node2)
});
let mut payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5]; let mut payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5];
payload.append(&mut vec![0; 1400]); payload.append(&mut vec![0; 1400]);
let mut g = c.benchmark_group("full_communication"); let mut g = c.benchmark_group("full_communication");
g.throughput(Throughput::Bytes(2 * 1400)); g.throughput(Throughput::Bytes(2*1400));
g.bench_function("tap_switch", |b| { g.bench_function("tap_switch", |b| {
b.iter(|| runtime.block_on(async { b.iter(|| {
sim.put_payload(node1, payload.clone()).await; sim.put_payload(node1, payload.clone());
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert_eq!(Some(&payload), sim.pop_payload(node2).as_ref()); assert_eq!(Some(&payload), sim.pop_payload(node2).as_ref());
})); });
}); });
g.finish() g.finish()
} }
criterion_group!( criterion_group!(benches,
benches,
udp_send, udp_send,
decode_ipv4, decode_ipv4, decode_ipv6, decode_ethernet, decode_ethernet_with_vlan,
decode_ipv6, lookup_cold, lookup_warm,
decode_ethernet, crypto_chacha20, crypto_aes128, crypto_aes256,
decode_ethernet_with_vlan, full_communication_tun_router, full_communication_tap_switch
lookup_cold,
lookup_warm,
crypto_chacha20,
crypto_aes128,
crypto_aes256,
full_communication_tun_router,
full_communication_tap_switch
); );
criterion_main!(benches); criterion_main!(benches);

View File

@ -1,30 +1,26 @@
#![allow(dead_code, unused_macros, unused_imports)] #![allow(dead_code, unused_macros, unused_imports)]
#[macro_use] #[macro_use] extern crate serde;
extern crate serde; #[macro_use] extern crate log;
#[macro_use]
extern crate log;
#[macro_use]
extern crate tokio;
use iai::{black_box, main}; use iai::{black_box, main};
use ring::aead;
use smallvec::smallvec; use smallvec::smallvec;
use ring::aead;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, UdpSocket};
use std::str::FromStr; use std::str::FromStr;
use std::net::{SocketAddr, Ipv4Addr, SocketAddrV4, UdpSocket};
include!(".code.rs"); include!(".code.rs");
use config::Config;
use crypto::core::{create_dummy_pair, EXTRA_LEN};
use device::Type;
pub use error::Error; pub use error::Error;
use payload::{Frame, Packet, Protocol};
use table::ClaimTable;
use tests::common::{TapSimulator, TunSimulator};
use types::{Address, Range};
use util::{MockTimeSource, MsgBuffer}; use util::{MockTimeSource, MsgBuffer};
use config::Config;
use types::{Address, Range};
use device::Type;
use table::{ClaimTable};
use payload::{Packet, Frame, Protocol};
use crypto::core::{create_dummy_pair, EXTRA_LEN};
use tests::common::{TunSimulator, TapSimulator};
fn udp_send() { fn udp_send() {
let sock = UdpSocket::bind("127.0.0.1:0").unwrap(); let sock = UdpSocket::bind("127.0.0.1:0").unwrap();
@ -41,7 +37,7 @@ fn decode_ipv4() {
fn decode_ipv6() { fn decode_ipv6() {
let data = [ let data = [
0x60, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 6, 5, 0x60, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 6, 5,
4, 3, 2, 1, 4, 3, 2, 1
]; ];
Packet::parse(&black_box(data)).unwrap(); Packet::parse(&black_box(data)).unwrap();
} }
@ -78,7 +74,7 @@ fn lookup_cold() {
fn crypto_bench(algo: &'static aead::Algorithm) { fn crypto_bench(algo: &'static aead::Algorithm) {
let mut buffer = MsgBuffer::new(EXTRA_LEN); let mut buffer = MsgBuffer::new(EXTRA_LEN);
buffer.set_length(1400); buffer.set_length(1400);
let (sender, receiver) = create_dummy_pair(algo); let (mut sender, mut receiver) = create_dummy_pair(algo);
for _ in 0..1000 { for _ in 0..1000 {
sender.encrypt(black_box(&mut buffer)); sender.encrypt(black_box(&mut buffer));
receiver.decrypt(&mut buffer).unwrap(); receiver.decrypt(&mut buffer).unwrap();
@ -98,7 +94,6 @@ fn crypto_aes256() {
} }
fn full_communication_tun_router() { fn full_communication_tun_router() {
let runtime = tokio::runtime::Runtime::new().unwrap();
log::set_max_level(log::LevelFilter::Error); log::set_max_level(log::LevelFilter::Error);
let config1 = Config { let config1 = Config {
device_type: Type::Tun, device_type: Type::Tun,
@ -113,69 +108,48 @@ fn full_communication_tun_router() {
..Config::default() ..Config::default()
}; };
let mut sim = TunSimulator::new(); let mut sim = TunSimulator::new();
let (node1, node2) = runtime.block_on(async { let node1 = sim.add_node(false, &config1);
log::set_max_level(log::LevelFilter::Error); let node2 = sim.add_node(false, &config2);
let node1 = sim.add_node(false, &config1).await;
let node2 = sim.add_node(false, &config2).await;
sim.connect(node1, node2).await; sim.connect(node1, node2);
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
(node1, node2)
});
let mut payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2]; let mut payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2];
payload.append(&mut vec![0; 1400]); payload.append(&mut vec![0; 1400]);
for _ in 0..1000 { for _ in 0..1000 {
runtime.block_on(async { sim.put_payload(node1, payload.clone());
sim.put_payload(node1, payload.clone()).await; sim.simulate_all_messages();
sim.simulate_all_messages().await; assert_eq!(Some(&payload), black_box(sim.pop_payload(node2).as_ref()));
assert_eq!(Some(&payload), black_box(sim.pop_payload(node2).as_ref()));
});
} }
} }
fn full_communication_tap_switch() { fn full_communication_tap_switch() {
let runtime = tokio::runtime::Runtime::new().unwrap();
log::set_max_level(log::LevelFilter::Error); log::set_max_level(log::LevelFilter::Error);
let config = Config { device_type: Type::Tap, ..Config::default() }; let config = Config { device_type: Type::Tap, ..Config::default() };
let mut sim = TapSimulator::new(); let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config);
let (node1, node2) = runtime.block_on(async { sim.connect(node1, node2);
log::set_max_level(log::LevelFilter::Error); sim.simulate_all_messages();
let node1 = sim.add_node(false, &config).await; assert!(sim.is_connected(node1, node2));
let node2 = sim.add_node(false, &config).await; assert!(sim.is_connected(node2, node1));
sim.connect(node1, node2).await;
sim.simulate_all_messages().await;
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
(node1, node2)
});
let mut payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5]; let mut payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5];
payload.append(&mut vec![0; 1400]); payload.append(&mut vec![0; 1400]);
for _ in 0..1000 { for _ in 0..1000 {
runtime.block_on(async { sim.put_payload(node1, payload.clone());
sim.put_payload(node1, payload.clone()).await; sim.simulate_all_messages();
sim.simulate_all_messages().await; assert_eq!(Some(&payload), black_box(sim.pop_payload(node2).as_ref()));
assert_eq!(Some(&payload), black_box(sim.pop_payload(node2).as_ref()));
});
} }
} }
iai::main!( iai::main!(
udp_send, udp_send,
decode_ipv4, decode_ipv4, decode_ipv6, decode_ethernet, decode_ethernet_with_vlan,
decode_ipv6, lookup_cold, lookup_warm,
decode_ethernet, crypto_chacha20, crypto_aes128, crypto_aes256,
decode_ethernet_with_vlan, full_communication_tun_router, full_communication_tap_switch
lookup_cold,
lookup_warm,
crypto_chacha20,
crypto_aes128,
crypto_aes256,
full_communication_tun_router,
full_communication_tap_switch
); );

View File

@ -1,24 +1,24 @@
use super::{core::test_speed, rotate::RotationState}; use super::{core::test_speed, rotate::RotationState};
pub use super::{ pub use super::{
core::{CryptoCore, EXTRA_LEN, TAG_LEN}, core::{CryptoCore, EXTRA_LEN, TAG_LEN},
init::{is_init_message, InitMsg, InitResult, InitState, INIT_MESSAGE_FIRST_BYTE, STAGE_PONG}, init::{is_init_message, InitResult, InitState, INIT_MESSAGE_FIRST_BYTE}
}; };
use crate::{ use crate::{
error::Error, error::Error,
types::NodeId, types::NodeId,
util::{from_base62, to_base62, MsgBuffer}, util::{from_base62, to_base62, MsgBuffer}
}; };
use libc::BPF_FS_MAGIC;
use ring::{ use ring::{
aead::{self, Algorithm, LessSafeKey, UnboundKey}, aead::{self, Algorithm, LessSafeKey, UnboundKey},
agreement::{EphemeralPrivateKey, UnparsedPublicKey}, agreement::{EphemeralPrivateKey, UnparsedPublicKey},
pbkdf2, pbkdf2,
rand::{SecureRandom, SystemRandom}, rand::{SecureRandom, SystemRandom},
signature::{Ed25519KeyPair, KeyPair, ED25519_PUBLIC_KEY_LEN}, signature::{Ed25519KeyPair, KeyPair, ED25519_PUBLIC_KEY_LEN}
}; };
use smallvec::{smallvec, SmallVec}; use smallvec::{smallvec, SmallVec};
use std::{fmt::Debug, io::Read, num::NonZeroU32, sync::Arc, time::Duration}; use std::{fmt::Debug, io::Read, num::NonZeroU32, sync::Arc, time::Duration};
const SALT: &[u8; 32] = b"vpncloudVPNCLOUDvpncl0udVpnCloud"; const SALT: &[u8; 32] = b"vpncloudVPNCLOUDvpncl0udVpnCloud";
pub const MESSAGE_TYPE_ROTATION: u8 = 0x10; pub const MESSAGE_TYPE_ROTATION: u8 = 0x10;
@ -28,6 +28,7 @@ pub type EcdhPublicKey = UnparsedPublicKey<SmallVec<[u8; 96]>>;
pub type EcdhPrivateKey = EphemeralPrivateKey; pub type EcdhPrivateKey = EphemeralPrivateKey;
pub type Key = SmallVec<[u8; 32]>; pub type Key = SmallVec<[u8; 32]>;
const DEFAULT_ALGORITHMS: [&str; 3] = ["AES128", "AES256", "CHACHA20"]; const DEFAULT_ALGORITHMS: [&str; 3] = ["AES128", "AES256", "CHACHA20"];
#[cfg(test)] #[cfg(test)]
@ -37,15 +38,17 @@ const SPEED_TEST_TIME: f32 = 0.1;
const ROTATE_INTERVAL: usize = 120; const ROTATE_INTERVAL: usize = 120;
pub trait Payload: Debug + PartialEq + Sized { pub trait Payload: Debug + PartialEq + Sized {
fn write_to(&self, buffer: &mut MsgBuffer); fn write_to(&self, buffer: &mut MsgBuffer);
fn read_from<R: Read>(r: R) -> Result<Self, Error>; fn read_from<R: Read>(r: R) -> Result<Self, Error>;
} }
#[derive(Clone)] #[derive(Clone)]
pub struct Algorithms { pub struct Algorithms {
pub algorithm_speeds: SmallVec<[(&'static Algorithm, f32); 3]>, pub algorithm_speeds: SmallVec<[(&'static Algorithm, f32); 3]>,
pub allow_unencrypted: bool, pub allow_unencrypted: bool
} }
#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq)] #[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq)]
@ -55,14 +58,14 @@ pub struct Config {
pub private_key: Option<String>, pub private_key: Option<String>,
pub public_key: Option<String>, pub public_key: Option<String>,
pub trusted_keys: Vec<String>, pub trusted_keys: Vec<String>,
pub algorithms: Vec<String>, pub algorithms: Vec<String>
} }
pub struct Crypto { pub struct Crypto {
node_id: NodeId, node_id: NodeId,
key_pair: Arc<Ed25519KeyPair>, key_pair: Arc<Ed25519KeyPair>,
trusted_keys: Arc<[Ed25519PublicKey]>, trusted_keys: Arc<[Ed25519PublicKey]>,
algorithms: Algorithms, algorithms: Algorithms
} }
impl Crypto { impl Crypto {
@ -75,12 +78,12 @@ impl Crypto {
let algo = match &name.to_uppercase() as &str { let algo = match &name.to_uppercase() as &str {
"UNENCRYPTED" | "NONE" | "PLAIN" => { "UNENCRYPTED" | "NONE" | "PLAIN" => {
unencrypted = true; unencrypted = true;
continue; continue
} }
"AES128" | "AES128_GCM" | "AES_128" | "AES_128_GCM" => &aead::AES_128_GCM, "AES128" | "AES128_GCM" | "AES_128" | "AES_128_GCM" => &aead::AES_128_GCM,
"AES256" | "AES256_GCM" | "AES_256" | "AES_256_GCM" => &aead::AES_256_GCM, "AES256" | "AES256_GCM" | "AES_256" | "AES_256_GCM" => &aead::AES_256_GCM,
"CHACHA" | "CHACHA20" | "CHACHA20_POLY1305" => &aead::CHACHA20_POLY1305, "CHACHA" | "CHACHA20" | "CHACHA20_POLY1305" => &aead::CHACHA20_POLY1305,
_ => return Err(Error::InvalidConfig("Unknown crypto method")), _ => return Err(Error::InvalidConfig("Unknown crypto method"))
}; };
algos.push(algo) algos.push(algo)
} }
@ -97,7 +100,7 @@ impl Crypto {
} else if let Some(password) = &config.password { } else if let Some(password) = &config.password {
Self::keypair_from_password(password) Self::keypair_from_password(password)
} else { } else {
return Err(Error::InvalidConfig("Either private_key or password must be set")); return Err(Error::InvalidConfig("Either private_key or password must be set"))
}; };
let mut trusted_keys = vec![]; let mut trusted_keys = vec![];
for tn in &config.trusted_keys { for tn in &config.trusted_keys {
@ -131,7 +134,7 @@ impl Crypto {
node_id, node_id,
key_pair: Arc::new(key_pair), key_pair: Arc::new(key_pair),
trusted_keys: trusted_keys.into_boxed_slice().into(), trusted_keys: trusted_keys.into_boxed_slice().into(),
algorithms: algos, algorithms: algos
}) })
} }
@ -148,7 +151,7 @@ impl Crypto {
NonZeroU32::new(4096).unwrap(), NonZeroU32::new(4096).unwrap(),
SALT, SALT,
password.as_bytes(), password.as_bytes(),
&mut bytes, &mut bytes
); );
} }
} }
@ -190,7 +193,7 @@ impl Crypto {
fn parse_public_key(pubkey: &str) -> Result<Ed25519PublicKey, Error> { fn parse_public_key(pubkey: &str) -> Result<Ed25519PublicKey, Error> {
let pubkey = from_base62(pubkey).map_err(|_| Error::InvalidConfig("Failed to parse public key"))?; let pubkey = from_base62(pubkey).map_err(|_| Error::InvalidConfig("Failed to parse public key"))?;
if pubkey.len() != ED25519_PUBLIC_KEY_LEN { if pubkey.len() != ED25519_PUBLIC_KEY_LEN {
return Err(Error::InvalidConfig("Failed to parse public key")); return Err(Error::InvalidConfig("Failed to parse public key"))
} }
let mut result = [0; ED25519_PUBLIC_KEY_LEN]; let mut result = [0; ED25519_PUBLIC_KEY_LEN];
result.clone_from_slice(&pubkey); result.clone_from_slice(&pubkey);
@ -202,61 +205,53 @@ impl Crypto {
} }
} }
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum MessageResult { pub enum MessageResult {
Message(u8), Message(u8),
Reply, Reply,
None, None
} }
pub enum PeerCrypto { pub enum PeerCrypto {
Encrypted { Encrypted {
last_init_message: Vec<u8>, last_init_message: Vec<u8>,
trusted_keys: Arc<[Ed25519PublicKey]>,
algorithm: &'static Algorithm, algorithm: &'static Algorithm,
rotation: RotationState, rotation: RotationState,
core: Arc<CryptoCore>, core: Arc<CryptoCore>,
rotate_counter: usize, rotate_counter: usize
}, },
Unencrypted { Unencrypted {
last_init_message: Vec<u8>, last_init_message: Vec<u8>
trusted_keys: Arc<[Ed25519PublicKey]>, }
},
} }
impl PeerCrypto { impl PeerCrypto {
pub fn algorithm_name(&self) -> &'static str { pub fn algorithm_name(&self) -> &'static str {
match self { match self {
PeerCrypto::Encrypted { algorithm, .. } => match *algorithm { PeerCrypto::Encrypted { algorithm, .. } => {
x if x == &aead::CHACHA20_POLY1305 => "CHACHA20", match *algorithm {
x if x == &aead::AES_128_GCM => "AES128", x if x == &aead::CHACHA20_POLY1305 => "CHACHA20",
x if x == &aead::AES_256_GCM => "AES256", x if x == &aead::AES_128_GCM => "AES128",
_ => unreachable!(), x if x == &aead::AES_256_GCM => "AES256",
}, _ => unreachable!()
PeerCrypto::Unencrypted { .. } => "PLAIN", }
}
PeerCrypto::Unencrypted { .. } => "PLAIN"
} }
} }
pub fn get_core(&self) -> Option<Arc<CryptoCore>> { pub fn get_core(&self) -> Option<Arc<CryptoCore>> {
match self { match self {
PeerCrypto::Encrypted { core, .. } => Some(core.clone()), PeerCrypto::Encrypted { core, .. } => Some(core.clone()),
PeerCrypto::Unencrypted { .. } => None, PeerCrypto::Unencrypted { .. } => None
} }
} }
fn handle_init_message(&mut self, buffer: &mut MsgBuffer) -> Result<MessageResult, Error> { fn handle_init_message(&mut self, buffer: &mut MsgBuffer) -> Result<MessageResult, Error> {
match self { // TODO: parse message stage
PeerCrypto::Encrypted { trusted_keys, last_init_message, .. } // TODO: depending on stage resend last message
| PeerCrypto::Unencrypted { trusted_keys, last_init_message, .. } => {
let (msg, _) = InitMsg::read_from(buffer.buffer(), &trusted_keys)?;
buffer.clear();
if msg.stage() == STAGE_PONG {
buffer.set_length(last_init_message.len());
buffer.message_mut().copy_from_slice(last_init_message);
}
return Ok(MessageResult::Reply)
}
}
Ok(MessageResult::None) Ok(MessageResult::None)
} }
@ -269,7 +264,7 @@ impl PeerCrypto {
} }
Ok(()) Ok(())
} }
PeerCrypto::Unencrypted { .. } => Err(Error::Crypto("Rotation when unencrypted")), PeerCrypto::Unencrypted { .. } => Err(Error::Crypto("Rotation when unencrypted"))
} }
} }
@ -292,7 +287,7 @@ impl PeerCrypto {
pub fn handle_message(&mut self, buffer: &mut MsgBuffer) -> Result<MessageResult, Error> { pub fn handle_message(&mut self, buffer: &mut MsgBuffer) -> Result<MessageResult, Error> {
// HOT PATH // HOT PATH
if buffer.is_empty() { if buffer.is_empty() {
return Err(Error::InvalidCryptoState("No message in buffer")); return Err(Error::InvalidCryptoState("No message in buffer"))
} }
if is_init_message(buffer.buffer()) { if is_init_message(buffer.buffer()) {
// COLD PATH // COLD PATH
@ -335,6 +330,7 @@ impl PeerCrypto {
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -366,18 +362,18 @@ mod tests {
debug!("Node1 <- Node2"); debug!("Node1 <- Node2");
let res = node1.handle_init(&mut msg).unwrap(); let res = node1.handle_init(&mut msg).unwrap();
assert_eq!(res, InitResult::Success { peer_payload: vec![], is_initiator: true }); assert_eq!(res, InitResult::Success { peer_payload: vec![], is_initiator: false });
assert!(!msg.is_empty()); assert!(!msg.is_empty());
debug!("Node1 -> Node2"); debug!("Node1 -> Node2");
let res = node2.handle_init(&mut msg).unwrap(); let res = node2.handle_init(&mut msg).unwrap();
assert_eq!(res, InitResult::Success { peer_payload: vec![], is_initiator: false }); assert_eq!(res, InitResult::Success { peer_payload: vec![], is_initiator: true });
assert!(msg.is_empty()); assert!(msg.is_empty());
let mut node1 = node1.finish(&mut msg); let mut node1 = node1.finish(&mut msg);
assert!(msg.is_empty()); assert!(msg.is_empty());
let mut node2 = node2.finish(&mut msg); let mut node2 = node2.finish(&mut msg);
assert!(!msg.is_empty()); assert!(msg.is_empty());
debug!("Node1 <- Node2"); debug!("Node1 <- Node2");
let res = node1.handle_message(&mut msg).unwrap(); let res = node1.handle_message(&mut msg).unwrap();

View File

@ -54,11 +54,12 @@
// Once every second, both nodes check whether they have already finished the initialization. If not, they repeat their // Once every second, both nodes check whether they have already finished the initialization. If not, they repeat their
// last message. After 5 seconds, the initialization is aborted as failed. // last message. After 5 seconds, the initialization is aborted as failed.
use super::{ use super::{
common::MESSAGE_TYPE_ROTATION,
core::{CryptoCore, EXTRA_LEN}, core::{CryptoCore, EXTRA_LEN},
rotate::RotationState, rotate::RotationState,
Algorithms, EcdhPrivateKey, EcdhPublicKey, Ed25519PublicKey, Payload, PeerCrypto, Algorithms, EcdhPrivateKey, EcdhPublicKey, Ed25519PublicKey, Payload, PeerCrypto,
common::MESSAGE_TYPE_ROTATION
}; };
use crate::{error::Error, types::NodeId, util::MsgBuffer}; use crate::{error::Error, types::NodeId, util::MsgBuffer};
use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt};
@ -67,14 +68,14 @@ use ring::{
agreement::{agree_ephemeral, X25519}, agreement::{agree_ephemeral, X25519},
digest, digest,
rand::{SecureRandom, SystemRandom}, rand::{SecureRandom, SystemRandom},
signature::{self, Ed25519KeyPair, KeyPair, ED25519, ED25519_PUBLIC_KEY_LEN}, signature::{self, Ed25519KeyPair, KeyPair, ED25519, ED25519_PUBLIC_KEY_LEN}
}; };
use smallvec::{smallvec, SmallVec}; use smallvec::{smallvec, SmallVec};
use std::{ use std::{
cmp, f32, cmp, f32,
fmt::Debug, fmt::Debug,
io::{self, Cursor, Read, Write}, io::{self, Cursor, Read, Write},
sync::Arc, sync::Arc
}; };
pub const INIT_MESSAGE_FIRST_BYTE: u8 = 0xff; pub const INIT_MESSAGE_FIRST_BYTE: u8 = 0xff;
@ -90,27 +91,29 @@ pub const MAX_FAILED_RETRIES: usize = 120;
pub const SALTED_NODE_ID_HASH_LEN: usize = 20; pub const SALTED_NODE_ID_HASH_LEN: usize = 20;
pub type SaltedNodeIdHash = [u8; SALTED_NODE_ID_HASH_LEN]; pub type SaltedNodeIdHash = [u8; SALTED_NODE_ID_HASH_LEN];
pub fn is_init_message(msg: &[u8]) -> bool { pub fn is_init_message(msg: &[u8]) -> bool {
!msg.is_empty() && msg[0] == INIT_MESSAGE_FIRST_BYTE !msg.is_empty() && msg[0] == INIT_MESSAGE_FIRST_BYTE
} }
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
pub enum InitMsg { pub enum InitMsg {
Ping { Ping {
salted_node_id_hash: SaltedNodeIdHash, salted_node_id_hash: SaltedNodeIdHash,
ecdh_public_key: EcdhPublicKey, ecdh_public_key: EcdhPublicKey,
algorithms: Algorithms, algorithms: Algorithms
}, },
Pong { Pong {
salted_node_id_hash: SaltedNodeIdHash, salted_node_id_hash: SaltedNodeIdHash,
ecdh_public_key: EcdhPublicKey, ecdh_public_key: EcdhPublicKey,
algorithms: Algorithms, algorithms: Algorithms,
encrypted_payload: MsgBuffer, encrypted_payload: MsgBuffer
}, },
Peng { Peng {
salted_node_id_hash: SaltedNodeIdHash, salted_node_id_hash: SaltedNodeIdHash,
encrypted_payload: MsgBuffer, encrypted_payload: MsgBuffer
}, }
} }
impl InitMsg { impl InitMsg {
@ -121,11 +124,11 @@ impl InitMsg {
const PART_SALTED_NODE_ID_HASH: u8 = 2; const PART_SALTED_NODE_ID_HASH: u8 = 2;
const PART_STAGE: u8 = 1; const PART_STAGE: u8 = 1;
pub fn stage(&self) -> u8 { fn stage(&self) -> u8 {
match self { match self {
InitMsg::Ping { .. } => STAGE_PING, InitMsg::Ping { .. } => STAGE_PING,
InitMsg::Pong { .. } => STAGE_PONG, InitMsg::Pong { .. } => STAGE_PONG,
InitMsg::Peng { .. } => STAGE_PENG, InitMsg::Peng { .. } => STAGE_PENG
} }
} }
@ -133,7 +136,7 @@ impl InitMsg {
match self { match self {
InitMsg::Ping { salted_node_id_hash, .. } InitMsg::Ping { salted_node_id_hash, .. }
| InitMsg::Pong { salted_node_id_hash, .. } | InitMsg::Pong { salted_node_id_hash, .. }
| InitMsg::Peng { salted_node_id_hash, .. } => salted_node_id_hash, | InitMsg::Peng { salted_node_id_hash, .. } => salted_node_id_hash
} }
} }
@ -147,11 +150,11 @@ impl InitMsg {
short_hash short_hash
} }
pub fn read_from(buffer: &[u8], trusted_keys: &[Ed25519PublicKey]) -> Result<(Self, Ed25519PublicKey), Error> { fn read_from(buffer: &[u8], trusted_keys: &[Ed25519PublicKey]) -> Result<(Self, Ed25519PublicKey), Error> {
let mut r = Cursor::new(buffer); let mut r = Cursor::new(buffer);
if r.read_u8().map_err(|_| Error::Parse("Init message too short"))? != INIT_MESSAGE_FIRST_BYTE { if r.read_u8().map_err(|_| Error::Parse("Init message too short"))? != INIT_MESSAGE_FIRST_BYTE {
return Err(Error::Parse("Init message has invalid first byte")); return Err(Error::Parse("Init message has invalid first byte"))
} }
let mut public_key_salt = [0; 4]; let mut public_key_salt = [0; 4];
r.read_exact(&mut public_key_salt).map_err(|_| Error::Parse("Init message too short"))?; r.read_exact(&mut public_key_salt).map_err(|_| Error::Parse("Init message too short"))?;
@ -163,11 +166,11 @@ impl InitMsg {
if Self::calculate_hash(tk, &public_key_salt) == public_key_hash { if Self::calculate_hash(tk, &public_key_salt) == public_key_hash {
public_key_data.clone_from_slice(tk); public_key_data.clone_from_slice(tk);
found_key = true; found_key = true;
break; break
} }
} }
if !found_key { if !found_key {
return Err(Error::Crypto("untrusted peer")); return Err(Error::Crypto("untrusted peer"))
} }
let mut stage = None; let mut stage = None;
@ -179,19 +182,19 @@ impl InitMsg {
loop { loop {
let field = r.read_u8().map_err(|_| Error::Parse("Init message too short"))?; let field = r.read_u8().map_err(|_| Error::Parse("Init message too short"))?;
if field == Self::PART_END { if field == Self::PART_END {
break; break
} }
let field_len = r.read_u16::<NetworkEndian>().map_err(|_| Error::Parse("Init message too short"))? as usize; let field_len = r.read_u16::<NetworkEndian>().map_err(|_| Error::Parse("Init message too short"))? as usize;
match field { match field {
Self::PART_STAGE => { Self::PART_STAGE => {
if field_len != 1 { if field_len != 1 {
return Err(Error::CryptoInit("Invalid size for stage field")); return Err(Error::CryptoInit("Invalid size for stage field"))
} }
stage = Some(r.read_u8().map_err(|_| Error::Parse("Init message too short"))?) stage = Some(r.read_u8().map_err(|_| Error::Parse("Init message too short"))?)
} }
Self::PART_SALTED_NODE_ID_HASH => { Self::PART_SALTED_NODE_ID_HASH => {
if field_len != SALTED_NODE_ID_HASH_LEN { if field_len != SALTED_NODE_ID_HASH_LEN {
return Err(Error::CryptoInit("Invalid size for salted node id hash field")); return Err(Error::CryptoInit("Invalid size for salted node id hash field"))
} }
let mut id = [0; SALTED_NODE_ID_HASH_LEN]; let mut id = [0; SALTED_NODE_ID_HASH_LEN];
r.read_exact(&mut id).map_err(|_| Error::Parse("Init message too short"))?; r.read_exact(&mut id).map_err(|_| Error::Parse("Init message too short"))?;
@ -221,7 +224,7 @@ impl InitMsg {
1 => Some(&AES_128_GCM), 1 => Some(&AES_128_GCM),
2 => Some(&AES_256_GCM), 2 => Some(&AES_256_GCM),
3 => Some(&CHACHA20_POLY1305), 3 => Some(&CHACHA20_POLY1305),
_ => None, _ => None
}; };
let speed = let speed =
r.read_f32::<NetworkEndian>().map_err(|_| Error::Parse("Init message too short"))?; r.read_f32::<NetworkEndian>().map_err(|_| Error::Parse("Init message too short"))?;
@ -247,53 +250,53 @@ impl InitMsg {
let signed_data = &r.into_inner()[0..pos]; let signed_data = &r.into_inner()[0..pos];
let public_key = signature::UnparsedPublicKey::new(&ED25519, &public_key_data); let public_key = signature::UnparsedPublicKey::new(&ED25519, &public_key_data);
if public_key.verify(&signed_data, &signature).is_err() { if public_key.verify(&signed_data, &signature).is_err() {
return Err(Error::Crypto("invalid signature")); return Err(Error::Crypto("invalid signature"))
} }
let stage = match stage { let stage = match stage {
Some(val) => val, Some(val) => val,
None => return Err(Error::CryptoInit("Init message without stage")), None => return Err(Error::CryptoInit("Init message without stage"))
}; };
let salted_node_id_hash = match salted_node_id_hash { let salted_node_id_hash = match salted_node_id_hash {
Some(val) => val, Some(val) => val,
None => return Err(Error::CryptoInit("Init message without node id")), None => return Err(Error::CryptoInit("Init message without node id"))
}; };
let msg = match stage { let msg = match stage {
STAGE_PING => { STAGE_PING => {
let ecdh_public_key = match ecdh_public_key { let ecdh_public_key = match ecdh_public_key {
Some(val) => val, Some(val) => val,
None => return Err(Error::CryptoInit("Init message without ecdh public key")), None => return Err(Error::CryptoInit("Init message without ecdh public key"))
}; };
let algorithms = match algorithms { let algorithms = match algorithms {
Some(val) => val, Some(val) => val,
None => return Err(Error::CryptoInit("Init message without algorithms")), None => return Err(Error::CryptoInit("Init message without algorithms"))
}; };
Self::Ping { salted_node_id_hash, ecdh_public_key, algorithms } Self::Ping { salted_node_id_hash, ecdh_public_key, algorithms }
} }
STAGE_PONG => { STAGE_PONG => {
let ecdh_public_key = match ecdh_public_key { let ecdh_public_key = match ecdh_public_key {
Some(val) => val, Some(val) => val,
None => return Err(Error::CryptoInit("Init message without ecdh public key")), None => return Err(Error::CryptoInit("Init message without ecdh public key"))
}; };
let algorithms = match algorithms { let algorithms = match algorithms {
Some(val) => val, Some(val) => val,
None => return Err(Error::CryptoInit("Init message without algorithms")), None => return Err(Error::CryptoInit("Init message without algorithms"))
}; };
let encrypted_payload = match encrypted_payload { let encrypted_payload = match encrypted_payload {
Some(val) => val, Some(val) => val,
None => return Err(Error::CryptoInit("Init message without payload")), None => return Err(Error::CryptoInit("Init message without payload"))
}; };
Self::Pong { salted_node_id_hash, ecdh_public_key, algorithms, encrypted_payload } Self::Pong { salted_node_id_hash, ecdh_public_key, algorithms, encrypted_payload }
} }
STAGE_PENG => { STAGE_PENG => {
let encrypted_payload = match encrypted_payload { let encrypted_payload = match encrypted_payload {
Some(val) => val, Some(val) => val,
None => return Err(Error::CryptoInit("Init message without payload")), None => return Err(Error::CryptoInit("Init message without payload"))
}; };
Self::Peng { salted_node_id_hash, encrypted_payload } Self::Peng { salted_node_id_hash, encrypted_payload }
} }
_ => return Err(Error::CryptoInit("Invalid stage")), _ => return Err(Error::CryptoInit("Invalid stage"))
}; };
Ok((msg, public_key_data)) Ok((msg, public_key_data))
@ -333,7 +336,7 @@ impl InitMsg {
w.write_u16::<NetworkEndian>(key_bytes.len() as u16)?; w.write_u16::<NetworkEndian>(key_bytes.len() as u16)?;
w.write_all(&key_bytes)?; w.write_all(&key_bytes)?;
} }
_ => (), _ => ()
} }
match &self { match &self {
@ -361,7 +364,7 @@ impl InitMsg {
w.write_f32::<NetworkEndian>(*speed)?; w.write_f32::<NetworkEndian>(*speed)?;
} }
} }
_ => (), _ => ()
} }
match &self { match &self {
@ -370,7 +373,7 @@ impl InitMsg {
w.write_u16::<NetworkEndian>(encrypted_payload.len() as u16)?; w.write_u16::<NetworkEndian>(encrypted_payload.len() as u16)?;
w.write_all(encrypted_payload.message())?; w.write_all(encrypted_payload.message())?;
} }
_ => (), _ => ()
} }
w.write_u8(Self::PART_END)?; w.write_u8(Self::PART_END)?;
@ -384,10 +387,11 @@ impl InitMsg {
} }
} }
#[derive(PartialEq, Debug)] #[derive(PartialEq, Debug)]
pub enum InitResult<P: Payload> { pub enum InitResult<P: Payload> {
Continue, Continue,
Success { peer_payload: P, is_initiator: bool }, Success { peer_payload: P, is_initiator: bool }
} }
pub struct InitState<P: Payload> { pub struct InitState<P: Payload> {
@ -405,13 +409,13 @@ pub struct InitState<P: Payload> {
algorithms: Algorithms, algorithms: Algorithms,
#[allow(dead_code)] // Used in tests #[allow(dead_code)] // Used in tests
selected_algorithm: Option<&'static Algorithm>, selected_algorithm: Option<&'static Algorithm>,
failed_retries: usize, failed_retries: usize
} }
impl<P: Payload> InitState<P> { impl<P: Payload> InitState<P> {
pub fn new( pub fn new(
node_id: NodeId, payload: P, key_pair: Arc<Ed25519KeyPair>, trusted_keys: Arc<[Ed25519PublicKey]>, node_id: NodeId, payload: P, key_pair: Arc<Ed25519KeyPair>, trusted_keys: Arc<[Ed25519PublicKey]>,
algorithms: Algorithms, algorithms: Algorithms
) -> Self { ) -> Self {
let mut hash = [0; SALTED_NODE_ID_HASH_LEN]; let mut hash = [0; SALTED_NODE_ID_HASH_LEN];
let rng = SystemRandom::new(); let rng = SystemRandom::new();
@ -433,7 +437,7 @@ impl<P: Payload> InitState<P> {
selected_algorithm: None, selected_algorithm: None,
algorithms, algorithms,
failed_retries: 0, failed_retries: 0,
close_time: 60, close_time: 60
} }
} }
@ -519,22 +523,28 @@ impl<P: Payload> InitState<P> {
let mut public_key = [0; ED25519_PUBLIC_KEY_LEN]; let mut public_key = [0; ED25519_PUBLIC_KEY_LEN];
public_key.clone_from_slice(self.key_pair.as_ref().public_key().as_ref()); public_key.clone_from_slice(self.key_pair.as_ref().public_key().as_ref());
let msg = match stage { let msg = match stage {
STAGE_PING => InitMsg::Ping { STAGE_PING => {
salted_node_id_hash: self.salted_node_id_hash, InitMsg::Ping {
ecdh_public_key: ecdh_public_key.unwrap(), salted_node_id_hash: self.salted_node_id_hash,
algorithms: self.algorithms.clone(), ecdh_public_key: ecdh_public_key.unwrap(),
}, algorithms: self.algorithms.clone()
STAGE_PONG => InitMsg::Pong { }
salted_node_id_hash: self.salted_node_id_hash, }
ecdh_public_key: ecdh_public_key.unwrap(), STAGE_PONG => {
algorithms: self.algorithms.clone(), InitMsg::Pong {
encrypted_payload: self.encrypt_payload(), salted_node_id_hash: self.salted_node_id_hash,
}, ecdh_public_key: ecdh_public_key.unwrap(),
STAGE_PENG => InitMsg::Peng { algorithms: self.algorithms.clone(),
salted_node_id_hash: self.salted_node_id_hash, encrypted_payload: self.encrypt_payload()
encrypted_payload: self.encrypt_payload(), }
}, }
_ => unreachable!(), STAGE_PENG => {
InitMsg::Peng {
salted_node_id_hash: self.salted_node_id_hash,
encrypted_payload: self.encrypt_payload()
}
}
_ => unreachable!()
}; };
let mut bytes = out.buffer(); let mut bytes = out.buffer();
let len = msg.write_to(&mut bytes, &self.key_pair).expect("Buffer too small"); let len = msg.write_to(&mut bytes, &self.key_pair).expect("Buffer too small");
@ -553,7 +563,7 @@ impl<P: Payload> InitState<P> {
fn select_algorithm(&self, peer_algos: &Algorithms) -> Result<Option<(&'static Algorithm, f32)>, Error> { fn select_algorithm(&self, peer_algos: &Algorithms) -> Result<Option<(&'static Algorithm, f32)>, Error> {
if self.algorithms.allow_unencrypted && peer_algos.allow_unencrypted { if self.algorithms.allow_unencrypted && peer_algos.allow_unencrypted {
return Ok(None); return Ok(None)
} }
// For each supported algorithm, find the algorithm in the list of the peer (ignore algorithm if not found). // For each supported algorithm, find the algorithm in the list of the peer (ignore algorithm if not found).
// Take the minimal speed reported by either us or the peer. // Take the minimal speed reported by either us or the peer.
@ -587,7 +597,7 @@ impl<P: Payload> InitState<P> {
if self.salted_node_id_hash == salted_node_id_hash if self.salted_node_id_hash == salted_node_id_hash
|| self.check_salted_node_id_hash(&salted_node_id_hash, self.node_id) || self.check_salted_node_id_hash(&salted_node_id_hash, self.node_id)
{ {
return Err(Error::CryptoInitFatal("Connected to self")); return Err(Error::CryptoInitFatal("Connected to self"))
} }
if stage != self.next_stage { if stage != self.next_stage {
if self.next_stage == STAGE_PONG && stage == STAGE_PING { if self.next_stage == STAGE_PONG && stage == STAGE_PING {
@ -600,15 +610,15 @@ impl<P: Payload> InitState<P> {
self.last_message = None; self.last_message = None;
self.ecdh_private_key = None; self.ecdh_private_key = None;
} else { } else {
return Ok(InitResult::Continue); return Ok(InitResult::Continue)
} }
} else if self.next_stage == CLOSING { } else if self.next_stage == CLOSING {
return Ok(InitResult::Continue); return Ok(InitResult::Continue)
} else if self.last_message.is_some() { } else if self.last_message.is_some() {
self.repeat_last_message(out); self.repeat_last_message(out);
return Ok(InitResult::Continue); return Ok(InitResult::Continue)
} else { } else {
return Err(Error::CryptoInitFatal("Received invalid stage as first message")); return Err(Error::CryptoInitFatal("Received invalid stage as first message"))
} }
} }
self.failed_retries = 0; self.failed_retries = 0;
@ -673,25 +683,23 @@ impl<P: Payload> InitState<P> {
let rotation = RotationState::new(!self.is_initiator, buffer); let rotation = RotationState::new(!self.is_initiator, buffer);
if !buffer.is_empty() { if !buffer.is_empty() {
buffer.prepend_byte(MESSAGE_TYPE_ROTATION); buffer.prepend_byte(MESSAGE_TYPE_ROTATION);
crypto.encrypt(buffer);
} }
PeerCrypto::Encrypted { PeerCrypto::Encrypted {
algorithm: crypto.algorithm(), algorithm: crypto.algorithm(),
trusted_keys: self.trusted_keys.clone(),
core: crypto, core: crypto,
rotation, rotation,
rotate_counter: 0, rotate_counter: 0,
last_init_message: self.last_message.unwrap(), last_init_message: self.last_message.unwrap()
} }
} else { } else {
PeerCrypto::Unencrypted { PeerCrypto::Unencrypted {
last_init_message: self.last_message.unwrap(), last_init_message: self.last_message.unwrap()
trusted_keys: self.trusted_keys.clone(),
} }
} }
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -723,7 +731,7 @@ mod tests {
rng.fill(&mut node2).unwrap(); rng.fill(&mut node2).unwrap();
let algorithms = Algorithms { let algorithms = Algorithms {
algorithm_speeds: smallvec![(&AES_128_GCM, 600.0), (&AES_256_GCM, 500.0), (&CHACHA20_POLY1305, 400.0)], algorithm_speeds: smallvec![(&AES_128_GCM, 600.0), (&AES_256_GCM, 500.0), (&CHACHA20_POLY1305, 400.0)],
allow_unencrypted: false, allow_unencrypted: false
}; };
let sender = InitState::new(node1, vec![1], key_pair.clone(), trusted_nodes.clone(), algorithms.clone()); let sender = InitState::new(node1, vec![1], key_pair.clone(), trusted_nodes.clone(), algorithms.clone());
let receiver = InitState::new(node2, vec![2], key_pair, trusted_nodes, algorithms); let receiver = InitState::new(node2, vec![2], key_pair, trusted_nodes, algorithms);
@ -743,12 +751,12 @@ mod tests {
assert_eq!(sender.stage(), WAITING_TO_CLOSE); assert_eq!(sender.stage(), WAITING_TO_CLOSE);
let result = match result { let result = match result {
InitResult::Success { .. } => receiver.handle_init(&mut out).unwrap(), InitResult::Success { .. } => receiver.handle_init(&mut out).unwrap(),
InitResult::Continue => unreachable!(), InitResult::Continue => unreachable!()
}; };
assert_eq!(receiver.stage(), CLOSING); assert_eq!(receiver.stage(), CLOSING);
match result { match result {
InitResult::Success { .. } => assert!(out.is_empty()), InitResult::Success { .. } => assert!(out.is_empty()),
InitResult::Continue => unreachable!(), InitResult::Continue => unreachable!()
} }
} }
@ -774,14 +782,14 @@ mod tests {
// lost peng, sender recovers // lost peng, sender recovers
out.clear(); out.clear();
} }
InitResult::Continue => unreachable!(), InitResult::Continue => unreachable!()
}; };
sender.every_second(&mut out).unwrap(); sender.every_second(&mut out).unwrap();
let result = receiver.handle_init(&mut out).unwrap(); let result = receiver.handle_init(&mut out).unwrap();
assert_eq!(receiver.stage(), CLOSING); assert_eq!(receiver.stage(), CLOSING);
match result { match result {
InitResult::Success { .. } => assert!(out.is_empty()), InitResult::Success { .. } => assert!(out.is_empty()),
InitResult::Continue => unreachable!(), InitResult::Continue => unreachable!()
} }
} }
@ -805,7 +813,7 @@ mod tests {
// lost peng, sender recovers // lost peng, sender recovers
out.clear(); out.clear();
} }
InitResult::Continue => unreachable!(), InitResult::Continue => unreachable!()
}; };
receiver.every_second(&mut out).unwrap(); receiver.every_second(&mut out).unwrap();
sender.handle_init(&mut out).unwrap(); sender.handle_init(&mut out).unwrap();
@ -813,7 +821,7 @@ mod tests {
assert_eq!(receiver.stage(), CLOSING); assert_eq!(receiver.stage(), CLOSING);
match result { match result {
InitResult::Success { .. } => assert!(out.is_empty()), InitResult::Success { .. } => assert!(out.is_empty()),
InitResult::Continue => unreachable!(), InitResult::Continue => unreachable!()
} }
} }
@ -862,7 +870,7 @@ mod tests {
} }
fn test_algorithm_negotiation( fn test_algorithm_negotiation(
algos1: Algorithms, algos2: Algorithms, success: bool, selected: Option<&'static Algorithm>, algos1: Algorithms, algos2: Algorithms, success: bool, selected: Option<&'static Algorithm>
) { ) {
let (mut sender, mut receiver) = create_pair(); let (mut sender, mut receiver) = create_pair();
sender.algorithms = algos1; sender.algorithms = algos1;
@ -872,7 +880,7 @@ mod tests {
let res = receiver.handle_init(&mut out); let res = receiver.handle_init(&mut out);
assert_eq!(res.is_ok(), success); assert_eq!(res.is_ok(), success);
if !success { if !success {
return; return
} }
sender.handle_init(&mut out).unwrap(); sender.handle_init(&mut out).unwrap();
receiver.handle_init(&mut out).unwrap(); receiver.handle_init(&mut out).unwrap();
@ -886,70 +894,70 @@ mod tests {
test_algorithm_negotiation( test_algorithm_negotiation(
Algorithms { Algorithms {
algorithm_speeds: smallvec![(&AES_128_GCM, 600.0), (&AES_256_GCM, 500.0), (&CHACHA20_POLY1305, 400.0)], algorithm_speeds: smallvec![(&AES_128_GCM, 600.0), (&AES_256_GCM, 500.0), (&CHACHA20_POLY1305, 400.0)],
allow_unencrypted: false, allow_unencrypted: false
}, },
Algorithms { Algorithms {
algorithm_speeds: smallvec![(&AES_128_GCM, 600.0), (&AES_256_GCM, 500.0), (&CHACHA20_POLY1305, 400.0)], algorithm_speeds: smallvec![(&AES_128_GCM, 600.0), (&AES_256_GCM, 500.0), (&CHACHA20_POLY1305, 400.0)],
allow_unencrypted: false, allow_unencrypted: false
}, },
true, true,
Some(&AES_128_GCM), Some(&AES_128_GCM)
); );
// Overlapping but different // Overlapping but different
test_algorithm_negotiation( test_algorithm_negotiation(
Algorithms { Algorithms {
algorithm_speeds: smallvec![(&AES_256_GCM, 500.0), (&CHACHA20_POLY1305, 400.0)], algorithm_speeds: smallvec![(&AES_256_GCM, 500.0), (&CHACHA20_POLY1305, 400.0)],
allow_unencrypted: false, allow_unencrypted: false
}, },
Algorithms { Algorithms {
algorithm_speeds: smallvec![(&AES_128_GCM, 600.0), (&AES_256_GCM, 500.0)], algorithm_speeds: smallvec![(&AES_128_GCM, 600.0), (&AES_256_GCM, 500.0)],
allow_unencrypted: false, allow_unencrypted: false
}, },
true, true,
Some(&AES_256_GCM), Some(&AES_256_GCM)
); );
// Select fastest pair // Select fastest pair
test_algorithm_negotiation( test_algorithm_negotiation(
Algorithms { Algorithms {
algorithm_speeds: smallvec![(&AES_128_GCM, 600.0), (&AES_256_GCM, 500.0), (&CHACHA20_POLY1305, 400.0)], algorithm_speeds: smallvec![(&AES_128_GCM, 600.0), (&AES_256_GCM, 500.0), (&CHACHA20_POLY1305, 400.0)],
allow_unencrypted: false, allow_unencrypted: false
}, },
Algorithms { Algorithms {
algorithm_speeds: smallvec![(&AES_128_GCM, 40.0), (&AES_256_GCM, 50.0), (&CHACHA20_POLY1305, 60.0)], algorithm_speeds: smallvec![(&AES_128_GCM, 40.0), (&AES_256_GCM, 50.0), (&CHACHA20_POLY1305, 60.0)],
allow_unencrypted: false, allow_unencrypted: false
}, },
true, true,
Some(&CHACHA20_POLY1305), Some(&CHACHA20_POLY1305)
); );
// Select unencrypted if supported by both // Select unencrypted if supported by both
test_algorithm_negotiation( test_algorithm_negotiation(
Algorithms { Algorithms {
algorithm_speeds: smallvec![(&AES_128_GCM, 600.0), (&AES_256_GCM, 500.0), (&CHACHA20_POLY1305, 400.0)], algorithm_speeds: smallvec![(&AES_128_GCM, 600.0), (&AES_256_GCM, 500.0), (&CHACHA20_POLY1305, 400.0)],
allow_unencrypted: true, allow_unencrypted: true
}, },
Algorithms { Algorithms {
algorithm_speeds: smallvec![(&AES_128_GCM, 600.0), (&AES_256_GCM, 500.0), (&CHACHA20_POLY1305, 400.0)], algorithm_speeds: smallvec![(&AES_128_GCM, 600.0), (&AES_256_GCM, 500.0), (&CHACHA20_POLY1305, 400.0)],
allow_unencrypted: true, allow_unencrypted: true
}, },
true, true,
None, None
); );
// Do not select unencrypted if only supported by one // Do not select unencrypted if only supported by one
test_algorithm_negotiation( test_algorithm_negotiation(
Algorithms { Algorithms {
algorithm_speeds: smallvec![(&AES_128_GCM, 600.0), (&AES_256_GCM, 500.0), (&CHACHA20_POLY1305, 400.0)], algorithm_speeds: smallvec![(&AES_128_GCM, 600.0), (&AES_256_GCM, 500.0), (&CHACHA20_POLY1305, 400.0)],
allow_unencrypted: true, allow_unencrypted: true
}, },
Algorithms { Algorithms {
algorithm_speeds: smallvec![(&AES_128_GCM, 600.0), (&AES_256_GCM, 500.0), (&CHACHA20_POLY1305, 400.0)], algorithm_speeds: smallvec![(&AES_128_GCM, 600.0), (&AES_256_GCM, 500.0), (&CHACHA20_POLY1305, 400.0)],
allow_unencrypted: false, allow_unencrypted: false
}, },
true, true,
Some(&AES_128_GCM), Some(&AES_128_GCM)
); );
// Fail if no match // Fail if no match
@ -957,10 +965,10 @@ mod tests {
Algorithms { algorithm_speeds: smallvec![(&AES_128_GCM, 600.0)], allow_unencrypted: true }, Algorithms { algorithm_speeds: smallvec![(&AES_128_GCM, 600.0)], allow_unencrypted: true },
Algorithms { Algorithms {
algorithm_speeds: smallvec![(&AES_256_GCM, 500.0), (&CHACHA20_POLY1305, 400.0)], algorithm_speeds: smallvec![(&AES_256_GCM, 500.0), (&CHACHA20_POLY1305, 400.0)],
allow_unencrypted: false, allow_unencrypted: false
}, },
false, false,
Some(&AES_128_GCM), Some(&AES_128_GCM)
); );
} }
} }

View File

@ -1,146 +0,0 @@
use std::{fs::File, hash::BuildHasherDefault};
use tokio;
use fnv::FnvHasher;
use crate::{
config::Config,
crypto::PeerCrypto,
device::Device,
engine::{
device_thread::DeviceThread,
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
socket_thread::SocketThread,
},
error::Error,
messages::AddrList,
net::Socket,
payload::Protocol,
port_forwarding::PortForwarding,
types::NodeId,
util::{CtrlC, Time, TimeSource},
};
pub type Hash = BuildHasherDefault<FnvHasher>;
pub const STATS_INTERVAL: Time = 60;
pub const SPACE_BEFORE: usize = 100;
pub struct PeerData {
pub addrs: AddrList,
#[allow(dead_code)] // TODO: export in status
pub last_seen: Time,
pub timeout: Time,
pub peer_timeout: u16,
pub node_id: NodeId,
pub crypto: PeerCrypto,
}
#[derive(Clone)]
pub struct ReconnectEntry {
address: Option<(String, Time)>,
resolved: AddrList,
tries: u16,
timeout: u16,
next: Time,
final_timeout: Option<Time>,
}
pub struct GenericCloud<D: Device, P: Protocol, S: Socket, TS: TimeSource> {
socket_thread: SocketThread<S, D, P, TS>,
device_thread: DeviceThread<S, D, P, TS>,
}
impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS> {
#[allow(clippy::too_many_arguments)]
pub async fn new(
config: &Config, socket: S, device: D, port_forwarding: Option<PortForwarding>, stats_file: Option<File>,
) -> Result<Self, Error> {
let table = SharedTable::<TS>::new(&config);
let traffic = SharedTraffic::new();
let peer_crypto = SharedPeerCrypto::new();
let device_thread = DeviceThread::<S, D, P, TS>::new(
config.clone(),
device.duplicate().await?,
socket.clone(),
traffic.clone(),
peer_crypto.clone(),
table.clone(),
);
let mut socket_thread = SocketThread::<S, D, P, TS>::new(
config.clone(),
device,
socket,
traffic,
peer_crypto,
table,
port_forwarding,
stats_file,
);
socket_thread.housekeep().await?;
Ok(Self { socket_thread, device_thread })
}
pub fn add_peer(&mut self, addr: String) -> Result<(), Error> {
unimplemented!()
}
pub async fn run(self) {
let ctrlc = CtrlC::new();
let device_thread_handle = tokio::spawn(self.device_thread.run());
let socket_thread_handle = tokio::spawn(self.socket_thread.run());
// TODO: wait for ctrl-c
let (dev_ret, sock_ret) = join!(device_thread_handle, socket_thread_handle);
dev_ret.unwrap();
sock_ret.unwrap();
}
}
#[cfg(test)]
use crate::device::MockDevice;
#[cfg(test)]
use crate::net::MockSocket;
#[cfg(test)]
use crate::util::MockTimeSource;
#[cfg(test)]
use std::net::SocketAddr;
#[cfg(test)]
impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
pub fn socket(&mut self) -> &mut MockSocket {
&mut self.socket_thread.socket
}
pub fn device(&mut self) -> &mut MockDevice {
&mut self.device_thread.device
}
pub async fn connect(&mut self, addr: SocketAddr) -> Result<(), Error> {
self.socket_thread.connect(addr).await
}
pub async fn trigger_socket_event(&mut self) {
self.socket_thread.iteration().await
}
pub async fn trigger_device_event(&mut self) {
self.device_thread.iteration().await
}
pub async fn trigger_housekeep(&mut self) {
try_fail!(self.socket_thread.housekeep().await, "Housekeep failed: {}");
try_fail!(self.device_thread.housekeep().await, "Housekeep failed: {}");
}
pub fn is_connected(&self, addr: &SocketAddr) -> bool {
self.socket_thread.peers.contains_key(addr)
}
pub fn own_addresses(&self) -> &[SocketAddr] {
&self.socket_thread.own_addresses
}
pub async fn get_num(&self) -> usize {
self.socket_thread.socket.address().await.unwrap().port() as usize
}
}

View File

@ -1,6 +1,6 @@
use super::{ use super::{
shared::{SharedPeerCrypto, SharedTable, SharedTraffic}, shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
common::SPACE_BEFORE, SPACE_BEFORE,
}; };
use crate::{ use crate::{
config::Config, config::Config,

View File

@ -5,4 +5,149 @@
mod device_thread; mod device_thread;
mod shared; mod shared;
mod socket_thread; mod socket_thread;
pub mod common;
use std::{fs::File, hash::BuildHasherDefault};
use tokio;
use fnv::FnvHasher;
use crate::{
config::Config,
crypto::PeerCrypto,
device::Device,
engine::{
device_thread::DeviceThread,
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
socket_thread::SocketThread,
},
error::Error,
messages::AddrList,
net::Socket,
payload::Protocol,
port_forwarding::PortForwarding,
types::NodeId,
util::{addr_nice, resolve, CtrlC, Time, TimeSource},
};
pub type Hash = BuildHasherDefault<FnvHasher>;
pub const STATS_INTERVAL: Time = 60;
const SPACE_BEFORE: usize = 100;
pub struct PeerData {
addrs: AddrList,
#[allow(dead_code)] // TODO: export in status
last_seen: Time,
timeout: Time,
peer_timeout: u16,
node_id: NodeId,
crypto: PeerCrypto,
}
#[derive(Clone)]
pub struct ReconnectEntry {
address: Option<(String, Time)>,
resolved: AddrList,
tries: u16,
timeout: u16,
next: Time,
final_timeout: Option<Time>,
}
pub struct GenericCloud<D: Device, P: Protocol, S: Socket, TS: TimeSource> {
socket_thread: SocketThread<S, D, P, TS>,
device_thread: DeviceThread<S, D, P, TS>,
}
impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS> {
#[allow(clippy::too_many_arguments)]
pub async fn new(
config: &Config, socket: S, device: D, port_forwarding: Option<PortForwarding>, stats_file: Option<File>,
) -> Result<Self, Error> {
let table = SharedTable::<TS>::new(&config);
let traffic = SharedTraffic::new();
let peer_crypto = SharedPeerCrypto::new();
let device_thread = DeviceThread::<S, D, P, TS>::new(
config.clone(),
device.duplicate().await?,
socket.clone(),
traffic.clone(),
peer_crypto.clone(),
table.clone(),
);
let socket_thread = SocketThread::<S, D, P, TS>::new(
config.clone(),
device,
socket,
traffic,
peer_crypto,
table,
port_forwarding,
stats_file,
);
Ok(Self { socket_thread, device_thread })
}
pub fn add_peer(&mut self, addr: String) -> Result<(), Error> {
unimplemented!()
}
pub async fn run(self) {
let ctrlc = CtrlC::new();
let device_thread_handle = tokio::spawn(self.device_thread.run());
let socket_thread_handle = tokio::spawn(self.socket_thread.run());
// TODO: wait for ctrl-c
let (dev_ret, sock_ret) = join!(device_thread_handle, socket_thread_handle);
dev_ret.unwrap();
sock_ret.unwrap();
}
}
#[cfg(test)]
use super::device::MockDevice;
#[cfg(test)]
use super::net::MockSocket;
#[cfg(test)]
use super::util::MockTimeSource;
#[cfg(test)]
use std::net::SocketAddr;
#[cfg(test)]
impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
pub fn socket(&mut self) -> &mut MockSocket {
&mut self.socket_thread.socket
}
pub fn device(&mut self) -> &mut MockDevice {
&mut self.device_thread.device
}
pub async fn connect(&mut self, addr: SocketAddr) -> Result<(), Error> {
self.socket_thread.connect(addr).await
}
pub async fn trigger_socket_event(&mut self) {
self.socket_thread.iteration().await
}
pub async fn trigger_device_event(&mut self) {
self.device_thread.iteration().await
}
pub async fn trigger_housekeep(&mut self) {
try_fail!(self.socket_thread.housekeep().await, "Housekeep failed: {}");
try_fail!(self.device_thread.housekeep().await, "Housekeep failed: {}");
}
pub fn is_connected(&self, addr: &SocketAddr) -> bool {
self.socket_thread.peers.contains_key(addr)
}
pub fn own_addresses(&self) -> &[SocketAddr] {
&self.socket_thread.own_addresses
}
pub async fn get_num(&self) -> usize {
self.socket_thread.socket.address().await.unwrap().port() as usize
}
}

View File

@ -1,26 +1,26 @@
use crate::{ use crate::{
config::Config, config::Config,
crypto::CryptoCore, crypto::CryptoCore,
engine::common::Hash, engine::{Hash, TimeSource},
error::Error, error::Error,
table::ClaimTable, table::ClaimTable,
traffic::{TrafficEntry, TrafficStats}, traffic::{TrafficEntry, TrafficStats},
types::{Address, RangeList}, types::{Address, RangeList},
util::{Duration, MsgBuffer, TimeSource}, util::{Duration, MsgBuffer}
}; };
use parking_lot::Mutex; use parking_lot::Mutex;
use std::{ use std::{
collections::HashMap, collections::HashMap,
io::{self, Write}, io::{self, Write},
net::SocketAddr, net::SocketAddr,
sync::Arc, sync::Arc
}; };
use super::common::PeerData; use super::PeerData;
#[derive(Clone)] #[derive(Clone)]
pub struct SharedPeerCrypto { pub struct SharedPeerCrypto {
peers: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>, peers: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>
} }
impl SharedPeerCrypto { impl SharedPeerCrypto {
@ -59,9 +59,10 @@ impl SharedPeerCrypto {
} }
} }
#[derive(Clone)] #[derive(Clone)]
pub struct SharedTraffic { pub struct SharedTraffic {
traffic: Arc<Mutex<TrafficStats>>, traffic: Arc<Mutex<TrafficStats>>
} }
impl SharedTraffic { impl SharedTraffic {
@ -118,9 +119,10 @@ impl SharedTraffic {
} }
} }
#[derive(Clone)] #[derive(Clone)]
pub struct SharedTable<TS: TimeSource> { pub struct SharedTable<TS: TimeSource> {
table: Arc<Mutex<ClaimTable<TS>>>, table: Arc<Mutex<ClaimTable<TS>>>
} }
impl<TS: TimeSource> SharedTable<TS> { impl<TS: TimeSource> SharedTable<TS> {

View File

@ -1,14 +1,14 @@
use super::{ use super::{
shared::{SharedPeerCrypto, SharedTable, SharedTraffic}, shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
common::SPACE_BEFORE, SPACE_BEFORE,
}; };
use crate::{ use crate::{
beacon::BeaconSerializer, beacon::BeaconSerializer,
config::{DEFAULT_PEER_TIMEOUT, DEFAULT_PORT}, config::{DEFAULT_PEER_TIMEOUT, DEFAULT_PORT},
crypto::{is_init_message, InitResult, InitState, MessageResult, Crypto}, crypto::{is_init_message, InitResult, InitState, MessageResult},
device::{Type, Device}, device::Type,
engine::common::{Hash, PeerData}, engine::{addr_nice, resolve, Hash, PeerData},
error::Error, error::Error,
messages::{ messages::{
AddrList, NodeInfo, PeerInfo, MESSAGE_TYPE_CLOSE, MESSAGE_TYPE_DATA, MESSAGE_TYPE_KEEPALIVE, AddrList, NodeInfo, PeerInfo, MESSAGE_TYPE_CLOSE, MESSAGE_TYPE_DATA, MESSAGE_TYPE_KEEPALIVE,
@ -17,8 +17,8 @@ use crate::{
net::{mapped_addr, Socket}, net::{mapped_addr, Socket},
port_forwarding::PortForwarding, port_forwarding::PortForwarding,
types::{Address, NodeId, Range, RangeList}, types::{Address, NodeId, Range, RangeList},
util::{addr_nice, resolve, MsgBuffer, StatsdMsg, Time, TimeSource}, util::{MsgBuffer, StatsdMsg, Time, TimeSource},
Config, Protocol, Config, Crypto, Device, Protocol,
}; };
use rand::{random, seq::SliceRandom, thread_rng}; use rand::{random, seq::SliceRandom, thread_rng};
use smallvec::{smallvec, SmallVec}; use smallvec::{smallvec, SmallVec};
@ -127,7 +127,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
next_beacon: now, next_beacon: now,
next_peers: now, next_peers: now,
next_stats_out: now + STATS_INTERVAL, next_stats_out: now + STATS_INTERVAL,
next_own_address_reset: now, next_own_address_reset: now + OWN_ADDRESS_RESET_INTERVAL,
pending_inits: HashMap::default(), pending_inits: HashMap::default(),
reconnect_peers: SmallVec::new(), reconnect_peers: SmallVec::new(),
own_addresses: SmallVec::new(), own_addresses: SmallVec::new(),
@ -336,20 +336,20 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
return Err(err); return Err(err);
} }
}; };
self.buffer.clear();
self.update_peer_info(src, Some(info)).await?; self.update_peer_info(src, Some(info)).await?;
self.buffer.clear();
} }
MESSAGE_TYPE_KEEPALIVE => { MESSAGE_TYPE_KEEPALIVE => {
self.buffer.clear();
self.update_peer_info(src, None).await?; self.update_peer_info(src, None).await?;
self.buffer.clear();
} }
MESSAGE_TYPE_CLOSE => { MESSAGE_TYPE_CLOSE => {
self.buffer.clear();
self.remove_peer(src); self.remove_peer(src);
self.buffer.clear();
} }
_ => { _ => {
self.buffer.clear();
self.traffic.count_invalid_protocol(self.buffer.len()); self.traffic.count_invalid_protocol(self.buffer.len());
self.buffer.clear();
return Err(Error::Message("Unknown message type")); return Err(Error::Message("Unknown message type"));
} }
}, },

View File

@ -47,7 +47,7 @@ use std::{
}; };
use crate::{ use crate::{
engine::common::GenericCloud, engine::GenericCloud,
config::{Args, Command, Config, DEFAULT_PORT}, config::{Args, Command, Config, DEFAULT_PORT},
crypto::Crypto, crypto::Crypto,
device::{Device, TunTapDevice, Type}, device::{Device, TunTapDevice, Type},

View File

@ -15,7 +15,7 @@ use std::{
pub use crate::{ pub use crate::{
config::{Config, CryptoConfig}, config::{Config, CryptoConfig},
device::{MockDevice, Type}, device::{MockDevice, Type},
engine::common::GenericCloud, engine::GenericCloud,
net::MockSocket, net::MockSocket,
payload::{Frame, Packet, Protocol}, payload::{Frame, Packet, Protocol},
types::Range, types::Range,
@ -93,7 +93,6 @@ impl<P: Protocol> Simulator<P> {
DebugLogger::set_node(self.next_port as usize); DebugLogger::set_node(self.next_port as usize);
self.next_port += 1; self.next_port += 1;
let node = TestNode::new(&config, MockSocket::new(addr), MockDevice::new(), None, None).await.unwrap(); let node = TestNode::new(&config, MockSocket::new(addr), MockDevice::new(), None, None).await.unwrap();
DebugLogger::set_node(0); DebugLogger::set_node(0);
self.nodes.insert(addr, node); self.nodes.insert(addr, node);
addr addr

View File

@ -10,7 +10,7 @@ use std::{
}; };
use super::{ use super::{
engine::common::{Hash, STATS_INTERVAL}, engine::{Hash, STATS_INTERVAL},
types::Address, types::Address,
util::{addr_nice, Bytes} util::{addr_nice, Bytes}
}; };