Fixes for bench

This commit is contained in:
Dennis Schwerdel 2021-03-26 23:42:38 +00:00
parent fb0676ba6b
commit 1101fe65e0
11 changed files with 180 additions and 169 deletions

View File

@ -14,8 +14,19 @@ mod types {
mod table {
include!("../src/table.rs");
}
mod cloud {
include!("../src/cloud.rs");
mod engine {
pub mod common {
include!("../src/engine/common.rs");
}
mod shared {
include!("../src/engine/shared.rs");
}
mod device_thread {
include!("../src/engine/device_thread.rs");
}
mod socket_thread {
include!("../src/engine/socket_thread.rs");
}
}
mod config {
include!("../src/config.rs");

View File

@ -1,6 +1,7 @@
#![allow(dead_code, unused_macros, unused_imports)]
#[macro_use] extern crate serde;
#[macro_use] extern crate log;
#[macro_use] extern crate tokio;
use criterion::{criterion_group, criterion_main, Criterion, Throughput};

View File

@ -1,6 +1,7 @@
#![allow(dead_code, unused_macros, unused_imports)]
#[macro_use] extern crate serde;
#[macro_use] extern crate log;
#[macro_use] extern crate tokio;
use iai::{black_box, main};

146
src/engine/common.rs Normal file
View File

@ -0,0 +1,146 @@
use std::{fs::File, hash::BuildHasherDefault};
use tokio;
use fnv::FnvHasher;
use crate::{
config::Config,
crypto::PeerCrypto,
device::Device,
engine::{
device_thread::DeviceThread,
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
socket_thread::SocketThread,
},
error::Error,
messages::AddrList,
net::Socket,
payload::Protocol,
port_forwarding::PortForwarding,
types::NodeId,
util::{CtrlC, Time, TimeSource},
};
pub type Hash = BuildHasherDefault<FnvHasher>;
pub const STATS_INTERVAL: Time = 60;
pub const SPACE_BEFORE: usize = 100;
pub struct PeerData {
pub addrs: AddrList,
#[allow(dead_code)] // TODO: export in status
pub last_seen: Time,
pub timeout: Time,
pub peer_timeout: u16,
pub node_id: NodeId,
pub crypto: PeerCrypto,
}
#[derive(Clone)]
pub struct ReconnectEntry {
address: Option<(String, Time)>,
resolved: AddrList,
tries: u16,
timeout: u16,
next: Time,
final_timeout: Option<Time>,
}
pub struct GenericCloud<D: Device, P: Protocol, S: Socket, TS: TimeSource> {
socket_thread: SocketThread<S, D, P, TS>,
device_thread: DeviceThread<S, D, P, TS>,
}
impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS> {
#[allow(clippy::too_many_arguments)]
pub async fn new(
config: &Config, socket: S, device: D, port_forwarding: Option<PortForwarding>, stats_file: Option<File>,
) -> Result<Self, Error> {
let table = SharedTable::<TS>::new(&config);
let traffic = SharedTraffic::new();
let peer_crypto = SharedPeerCrypto::new();
let device_thread = DeviceThread::<S, D, P, TS>::new(
config.clone(),
device.duplicate().await?,
socket.clone(),
traffic.clone(),
peer_crypto.clone(),
table.clone(),
);
let mut socket_thread = SocketThread::<S, D, P, TS>::new(
config.clone(),
device,
socket,
traffic,
peer_crypto,
table,
port_forwarding,
stats_file,
);
socket_thread.housekeep().await?;
Ok(Self { socket_thread, device_thread })
}
pub fn add_peer(&mut self, addr: String) -> Result<(), Error> {
unimplemented!()
}
pub async fn run(self) {
let ctrlc = CtrlC::new();
let device_thread_handle = tokio::spawn(self.device_thread.run());
let socket_thread_handle = tokio::spawn(self.socket_thread.run());
// TODO: wait for ctrl-c
let (dev_ret, sock_ret) = join!(device_thread_handle, socket_thread_handle);
dev_ret.unwrap();
sock_ret.unwrap();
}
}
#[cfg(test)]
use crate::device::MockDevice;
#[cfg(test)]
use crate::net::MockSocket;
#[cfg(test)]
use crate::util::MockTimeSource;
#[cfg(test)]
use std::net::SocketAddr;
#[cfg(test)]
impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
pub fn socket(&mut self) -> &mut MockSocket {
&mut self.socket_thread.socket
}
pub fn device(&mut self) -> &mut MockDevice {
&mut self.device_thread.device
}
pub async fn connect(&mut self, addr: SocketAddr) -> Result<(), Error> {
self.socket_thread.connect(addr).await
}
pub async fn trigger_socket_event(&mut self) {
self.socket_thread.iteration().await
}
pub async fn trigger_device_event(&mut self) {
self.device_thread.iteration().await
}
pub async fn trigger_housekeep(&mut self) {
try_fail!(self.socket_thread.housekeep().await, "Housekeep failed: {}");
try_fail!(self.device_thread.housekeep().await, "Housekeep failed: {}");
}
pub fn is_connected(&self, addr: &SocketAddr) -> bool {
self.socket_thread.peers.contains_key(addr)
}
pub fn own_addresses(&self) -> &[SocketAddr] {
&self.socket_thread.own_addresses
}
pub async fn get_num(&self) -> usize {
self.socket_thread.socket.address().await.unwrap().port() as usize
}
}

View File

@ -1,6 +1,6 @@
use super::{
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
SPACE_BEFORE,
common::SPACE_BEFORE,
};
use crate::{
config::Config,

View File

@ -5,150 +5,4 @@
mod device_thread;
mod shared;
mod socket_thread;
use std::{fs::File, hash::BuildHasherDefault};
use tokio;
use fnv::FnvHasher;
use crate::{
config::Config,
crypto::PeerCrypto,
device::Device,
engine::{
device_thread::DeviceThread,
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
socket_thread::SocketThread,
},
error::Error,
messages::AddrList,
net::Socket,
payload::Protocol,
port_forwarding::PortForwarding,
types::NodeId,
util::{addr_nice, resolve, CtrlC, Time, TimeSource},
};
pub type Hash = BuildHasherDefault<FnvHasher>;
pub const STATS_INTERVAL: Time = 60;
const SPACE_BEFORE: usize = 100;
pub struct PeerData {
addrs: AddrList,
#[allow(dead_code)] // TODO: export in status
last_seen: Time,
timeout: Time,
peer_timeout: u16,
node_id: NodeId,
crypto: PeerCrypto,
}
#[derive(Clone)]
pub struct ReconnectEntry {
address: Option<(String, Time)>,
resolved: AddrList,
tries: u16,
timeout: u16,
next: Time,
final_timeout: Option<Time>,
}
pub struct GenericCloud<D: Device, P: Protocol, S: Socket, TS: TimeSource> {
socket_thread: SocketThread<S, D, P, TS>,
device_thread: DeviceThread<S, D, P, TS>,
}
impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS> {
#[allow(clippy::too_many_arguments)]
pub async fn new(
config: &Config, socket: S, device: D, port_forwarding: Option<PortForwarding>, stats_file: Option<File>,
) -> Result<Self, Error> {
let table = SharedTable::<TS>::new(&config);
let traffic = SharedTraffic::new();
let peer_crypto = SharedPeerCrypto::new();
let device_thread = DeviceThread::<S, D, P, TS>::new(
config.clone(),
device.duplicate().await?,
socket.clone(),
traffic.clone(),
peer_crypto.clone(),
table.clone(),
);
let mut socket_thread = SocketThread::<S, D, P, TS>::new(
config.clone(),
device,
socket,
traffic,
peer_crypto,
table,
port_forwarding,
stats_file,
);
socket_thread.housekeep().await?;
Ok(Self { socket_thread, device_thread })
}
pub fn add_peer(&mut self, addr: String) -> Result<(), Error> {
unimplemented!()
}
pub async fn run(self) {
let ctrlc = CtrlC::new();
let device_thread_handle = tokio::spawn(self.device_thread.run());
let socket_thread_handle = tokio::spawn(self.socket_thread.run());
// TODO: wait for ctrl-c
let (dev_ret, sock_ret) = join!(device_thread_handle, socket_thread_handle);
dev_ret.unwrap();
sock_ret.unwrap();
}
}
#[cfg(test)]
use super::device::MockDevice;
#[cfg(test)]
use super::net::MockSocket;
#[cfg(test)]
use super::util::MockTimeSource;
#[cfg(test)]
use std::net::SocketAddr;
#[cfg(test)]
impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
pub fn socket(&mut self) -> &mut MockSocket {
&mut self.socket_thread.socket
}
pub fn device(&mut self) -> &mut MockDevice {
&mut self.device_thread.device
}
pub async fn connect(&mut self, addr: SocketAddr) -> Result<(), Error> {
self.socket_thread.connect(addr).await
}
pub async fn trigger_socket_event(&mut self) {
self.socket_thread.iteration().await
}
pub async fn trigger_device_event(&mut self) {
self.device_thread.iteration().await
}
pub async fn trigger_housekeep(&mut self) {
try_fail!(self.socket_thread.housekeep().await, "Housekeep failed: {}");
try_fail!(self.device_thread.housekeep().await, "Housekeep failed: {}");
}
pub fn is_connected(&self, addr: &SocketAddr) -> bool {
self.socket_thread.peers.contains_key(addr)
}
pub fn own_addresses(&self) -> &[SocketAddr] {
&self.socket_thread.own_addresses
}
pub async fn get_num(&self) -> usize {
self.socket_thread.socket.address().await.unwrap().port() as usize
}
}
pub mod common;

View File

@ -1,26 +1,26 @@
use crate::{
config::Config,
crypto::CryptoCore,
engine::{Hash, TimeSource},
engine::common::Hash,
error::Error,
table::ClaimTable,
traffic::{TrafficEntry, TrafficStats},
types::{Address, RangeList},
util::{Duration, MsgBuffer}
util::{Duration, MsgBuffer, TimeSource},
};
use parking_lot::Mutex;
use std::{
collections::HashMap,
io::{self, Write},
net::SocketAddr,
sync::Arc
sync::Arc,
};
use super::PeerData;
use super::common::PeerData;
#[derive(Clone)]
pub struct SharedPeerCrypto {
peers: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>
peers: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>,
}
impl SharedPeerCrypto {
@ -49,7 +49,7 @@ impl SharedPeerCrypto {
pub fn load(&mut self) {
// TODO sync if needed
}
pub fn get_snapshot(&self) -> HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash> {
self.peers.lock().clone()
}
@ -59,10 +59,9 @@ impl SharedPeerCrypto {
}
}
#[derive(Clone)]
pub struct SharedTraffic {
traffic: Arc<Mutex<TrafficStats>>
traffic: Arc<Mutex<TrafficStats>>,
}
impl SharedTraffic {
@ -119,10 +118,9 @@ impl SharedTraffic {
}
}
#[derive(Clone)]
pub struct SharedTable<TS: TimeSource> {
table: Arc<Mutex<ClaimTable<TS>>>
table: Arc<Mutex<ClaimTable<TS>>>,
}
impl<TS: TimeSource> SharedTable<TS> {

View File

@ -1,14 +1,14 @@
use super::{
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
SPACE_BEFORE,
common::SPACE_BEFORE,
};
use crate::{
beacon::BeaconSerializer,
config::{DEFAULT_PEER_TIMEOUT, DEFAULT_PORT},
crypto::{is_init_message, InitResult, InitState, MessageResult},
device::Type,
engine::{addr_nice, resolve, Hash, PeerData},
crypto::{is_init_message, InitResult, InitState, MessageResult, Crypto},
device::{Type, Device},
engine::common::{Hash, PeerData},
error::Error,
messages::{
AddrList, NodeInfo, PeerInfo, MESSAGE_TYPE_CLOSE, MESSAGE_TYPE_DATA, MESSAGE_TYPE_KEEPALIVE,
@ -17,8 +17,8 @@ use crate::{
net::{mapped_addr, Socket},
port_forwarding::PortForwarding,
types::{Address, NodeId, Range, RangeList},
util::{MsgBuffer, StatsdMsg, Time, TimeSource},
Config, Crypto, Device, Protocol,
util::{addr_nice, resolve, MsgBuffer, StatsdMsg, Time, TimeSource},
Config, Protocol,
};
use rand::{random, seq::SliceRandom, thread_rng};
use smallvec::{smallvec, SmallVec};

View File

@ -47,7 +47,7 @@ use std::{
};
use crate::{
engine::GenericCloud,
engine::common::GenericCloud,
config::{Args, Command, Config, DEFAULT_PORT},
crypto::Crypto,
device::{Device, TunTapDevice, Type},

View File

@ -15,7 +15,7 @@ use std::{
pub use crate::{
config::{Config, CryptoConfig},
device::{MockDevice, Type},
engine::GenericCloud,
engine::common::GenericCloud,
net::MockSocket,
payload::{Frame, Packet, Protocol},
types::Range,

View File

@ -10,7 +10,7 @@ use std::{
};
use super::{
engine::{Hash, STATS_INTERVAL},
engine::common::{Hash, STATS_INTERVAL},
types::Address,
util::{addr_nice, Bytes}
};