Compare commits

..

2 Commits

Author SHA1 Message Date
Dennis Schwerdel 744a151110 Remove async 2021-06-24 11:29:06 +02:00
Dennis Schwerdel 76bb4aa4b6 More sync 2021-05-27 08:09:09 +02:00
29 changed files with 675 additions and 725 deletions

150
Cargo.lock generated
View File

@ -9,17 +9,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "async-trait"
version = "0.1.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d3a45e77e34375a7923b1e8febb049bb011f064714a8e17a1a616fef01da13d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "attohttpc"
version = "0.16.3"
@ -121,6 +110,12 @@ version = "1.0.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd"
[[package]]
name = "cfg-if"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "cfg-if"
version = "1.0.0"
@ -217,7 +212,7 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
"crossbeam-utils",
]
@ -227,7 +222,7 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
"crossbeam-epoch",
"crossbeam-utils",
]
@ -238,7 +233,7 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2584f639eb95fea8c798496315b297cf81b9b58b6d30ab066a75455333cf4b12"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
"crossbeam-utils",
"lazy_static",
"memoffset",
@ -252,7 +247,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7e9d99fa91428effe99c5c6d4634cdeba32b8cf784fc428a2a687f61a952c49"
dependencies = [
"autocfg",
"cfg-if",
"cfg-if 1.0.0",
"lazy_static",
]
@ -359,7 +354,7 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9495705279e7140bf035dde1f6e750c162df8b625267cd52cc44e0b156732c8"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
"libc",
"wasi",
]
@ -450,7 +445,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
]
[[package]]
@ -519,7 +514,7 @@ version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
]
[[package]]
@ -544,26 +539,16 @@ dependencies = [
]
[[package]]
name = "mio"
version = "0.7.9"
name = "nix"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5dede4e2065b3842b8b0af444119f3aa331cc7cc2dd20388bfb0f5d5a38823a"
checksum = "6c722bee1037d430d0f8e687bbdbf222f27cc6e4e68d5caf630857bb2b6dbdce"
dependencies = [
"bitflags",
"cc",
"cfg-if 0.1.10",
"libc",
"log",
"miow",
"ntapi",
"winapi",
]
[[package]]
name = "miow"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897"
dependencies = [
"socket2",
"winapi",
"void",
]
[[package]]
@ -574,19 +559,10 @@ checksum = "b2ccba0cfe4fdf15982d1674c69b1fd80bad427d293849982668dfe454bd61f2"
dependencies = [
"bitflags",
"cc",
"cfg-if",
"cfg-if 1.0.0",
"libc",
]
[[package]]
name = "ntapi"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44"
dependencies = [
"winapi",
]
[[package]]
name = "num-integer"
version = "0.1.44"
@ -651,7 +627,7 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
"instant",
"libc",
"redox_syscall",
@ -665,12 +641,6 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pin-project-lite"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439697af366c49a6d0a010c56a0d97685bc140ce0d377b13a2ea2aa42d64a827"
[[package]]
name = "plotters"
version = "0.3.0"
@ -712,7 +682,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebd4c2739642e70439d1c0d9545beec45c1e54128739b3cda29bf2c366028c87"
dependencies = [
"libc",
"nix",
"nix 0.19.1",
]
[[package]]
@ -984,19 +954,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfebf75d25bd900fd1e7d11501efab59bc846dbc76196839663e6637bba9f25f"
dependencies = [
"block-buffer",
"cfg-if",
"cfg-if 1.0.0",
"cpuid-bool",
"digest",
"opaque-debug",
]
[[package]]
name = "signal-hook-registry"
version = "1.3.0"
name = "signal"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16f1d0fef1604ba8f7a073c7e701f213e056707210e9020af4528e0101ce11a6"
checksum = "2f6ce83b159ab6984d2419f495134972b48754d13ff2e3f8c998339942b56ed9"
dependencies = [
"libc",
"nix 0.14.1",
]
[[package]]
@ -1005,17 +976,6 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]]
name = "socket2"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e"
dependencies = [
"cfg-if",
"libc",
"winapi",
]
[[package]]
name = "spin"
version = "0.5.2"
@ -1069,7 +1029,7 @@ version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
"libc",
"rand",
"redox_syscall",
@ -1116,6 +1076,15 @@ dependencies = [
"syn",
]
[[package]]
name = "timeout_io"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d89ed29f92507ee770ef31f747ec1ad5ba7d8989a9f3489783da7d19a155c60"
dependencies = [
"cc",
]
[[package]]
name = "tinytemplate"
version = "1.2.1"
@ -1141,37 +1110,6 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5"
dependencies = [
"autocfg",
"bytes",
"libc",
"memchr",
"mio",
"num_cpus",
"once_cell",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"tokio-macros",
"winapi",
]
[[package]]
name = "tokio-macros"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tungstenite"
version = "0.13.0"
@ -1270,11 +1208,16 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe"
[[package]]
name = "void"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
[[package]]
name = "vpncloud"
version = "2.2.0"
dependencies = [
"async-trait",
"byteorder",
"chrono",
"criterion",
@ -1291,11 +1234,12 @@ dependencies = [
"ring",
"serde",
"serde_yaml",
"signal",
"smallvec",
"structopt",
"tempfile",
"thiserror",
"tokio",
"timeout_io",
"tungstenite",
"url",
"yaml-rust",
@ -1324,7 +1268,7 @@ version = "0.2.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83240549659d187488f91f33c0f8547cbfef0b2088bc470c116d1d260ef623d9"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
"wasm-bindgen-macro",
]

View File

@ -36,9 +36,8 @@ dialoguer = { version = "0.8", optional = true }
tungstenite = { version = "0.13", optional = true, default-features = false }
url = { version = "2.2", optional = true }
igd = { version = "0.12", optional = true }
tokio = { version = "^1.5", features = ["full"] }
async-trait = "0.1"
timeout_io = "0.6"
signal = "0.7"
[dev-dependencies]
tempfile = "3"

13
ROADMAP.md Normal file
View File

@ -0,0 +1,13 @@
# Roadmap
## Multithreading functionality
- [x] Timeout for device read
- [x] Timeout for net read
- [x] Check how async affects performance
- [x] Sync traffic stats
- [x] Sync forwarding table
- [ ] Fix WS Proxy code
## REST API

View File

@ -136,7 +136,6 @@ fn crypto_aes256(c: &mut Criterion) {
}
fn full_communication_tun_router(c: &mut Criterion) {
let runtime = tokio::runtime::Runtime::new().unwrap();
log::set_max_level(log::LevelFilter::Error);
let config1 = Config {
device_type: Type::Tun,
@ -152,60 +151,53 @@ fn full_communication_tun_router(c: &mut Criterion) {
};
let mut sim = TunSimulator::new();
let (node1, node2) = runtime.block_on(async {
log::set_max_level(log::LevelFilter::Error);
let node1 = sim.add_node(false, &config1).await;
let node2 = sim.add_node(false, &config2).await;
let node1 = sim.add_node(false, &config1);
let node2 = sim.add_node(false, &config2);
sim.connect(node1, node2).await;
sim.simulate_all_messages().await;
sim.connect(node1, node2);
sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
(node1, node2)
});
sim.trigger_housekeep();
let mut payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2];
payload.append(&mut vec![0; 1400]);
let mut g = c.benchmark_group("full_communication");
g.throughput(Throughput::Bytes(2 * 1400));
g.bench_function("tun_router", |b| {
b.iter(|| runtime.block_on(async {
sim.put_payload(node1, payload.clone()).await;
sim.simulate_all_messages().await;
b.iter(|| {
sim.put_payload(node1, payload.clone());
sim.simulate_all_messages();
assert_eq!(Some(&payload), sim.pop_payload(node2).as_ref());
}));
});
});
g.finish()
}
fn full_communication_tap_switch(c: &mut Criterion) {
let runtime = tokio::runtime::Runtime::new().unwrap();
log::set_max_level(log::LevelFilter::Error);
let config = Config { device_type: Type::Tap, ..Config::default() };
let mut sim = TapSimulator::new();
let (node1, node2) = runtime.block_on(async {
log::set_max_level(log::LevelFilter::Error);
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config);
sim.connect(node1, node2).await;
sim.simulate_all_messages().await;
sim.connect(node1, node2);
sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
(node1, node2)
});
sim.trigger_housekeep();
let mut payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5];
payload.append(&mut vec![0; 1400]);
let mut g = c.benchmark_group("full_communication");
g.throughput(Throughput::Bytes(2 * 1400));
g.bench_function("tap_switch", |b| {
b.iter(|| runtime.block_on(async {
sim.put_payload(node1, payload.clone()).await;
sim.simulate_all_messages().await;
b.iter(|| {
sim.put_payload(node1, payload.clone());
sim.simulate_all_messages();
assert_eq!(Some(&payload), sim.pop_payload(node2).as_ref());
}));
});
});
g.finish()
}

View File

@ -98,7 +98,6 @@ fn crypto_aes256() {
}
fn full_communication_tun_router() {
let runtime = tokio::runtime::Runtime::new().unwrap();
log::set_max_level(log::LevelFilter::Error);
let config1 = Config {
device_type: Type::Tun,
@ -113,55 +112,44 @@ fn full_communication_tun_router() {
..Config::default()
};
let mut sim = TunSimulator::new();
let (node1, node2) = runtime.block_on(async {
log::set_max_level(log::LevelFilter::Error);
let node1 = sim.add_node(false, &config1).await;
let node2 = sim.add_node(false, &config2).await;
let node1 = sim.add_node(false, &config1);
let node2 = sim.add_node(false, &config2);
sim.connect(node1, node2).await;
sim.simulate_all_messages().await;
sim.connect(node1, node2);
sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
(node1, node2)
});
sim.trigger_housekeep();
let mut payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2];
payload.append(&mut vec![0; 1400]);
for _ in 0..1000 {
runtime.block_on(async {
sim.put_payload(node1, payload.clone()).await;
sim.simulate_all_messages().await;
sim.put_payload(node1, payload.clone());
sim.simulate_all_messages();
assert_eq!(Some(&payload), black_box(sim.pop_payload(node2).as_ref()));
});
}
}
fn full_communication_tap_switch() {
let runtime = tokio::runtime::Runtime::new().unwrap();
log::set_max_level(log::LevelFilter::Error);
let config = Config { device_type: Type::Tap, ..Config::default() };
let mut sim = TapSimulator::new();
let (node1, node2) = runtime.block_on(async {
log::set_max_level(log::LevelFilter::Error);
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config);
sim.connect(node1, node2).await;
sim.simulate_all_messages().await;
sim.connect(node1, node2);
sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
(node1, node2)
});
sim.trigger_housekeep();
let mut payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5];
payload.append(&mut vec![0; 1400]);
for _ in 0..1000 {
runtime.block_on(async {
sim.put_payload(node1, payload.clone()).await;
sim.simulate_all_messages().await;
sim.put_payload(node1, payload.clone());
sim.simulate_all_messages();
assert_eq!(Some(&payload), black_box(sim.pop_payload(node2).as_ref()));
});
}
}

View File

@ -4,9 +4,6 @@ from common import EC2Environment, CREATE, eprint
import time, json, os, atexit
from datetime import date
# Note: this script will run for ~8 minutes and incur costs of about $ 0.02
FILE = "../../target/release/vpncloud"
VERSION = "2.2.0"
REGION = "eu-central-1"
@ -25,9 +22,6 @@ env = EC2Environment(
)
CRYPTO = ["plain", "aes256", "aes128", "chacha20"]
class PerfTest:
def __init__(self, sender, receiver):
self.sender = sender

View File

@ -323,7 +323,7 @@ use std::str::FromStr;
use std::time::Duration;
#[test]
async fn encode() {
fn encode() {
MockTimeSource::set_time(2000 * 3600);
let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey");
let mut peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()];
@ -335,7 +335,7 @@ async fn encode() {
}
#[test]
async fn decode() {
fn decode() {
MockTimeSource::set_time(2000 * 3600);
let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey");
let mut peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()];
@ -348,7 +348,7 @@ async fn decode() {
}
#[test]
async fn decode_split() {
fn decode_split() {
MockTimeSource::set_time(2000 * 3600);
let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey");
let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()];
@ -363,7 +363,7 @@ async fn decode_split() {
}
#[test]
async fn decode_offset() {
fn decode_offset() {
MockTimeSource::set_time(2000 * 3600);
let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey");
let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()];
@ -374,7 +374,7 @@ async fn decode_offset() {
}
#[test]
async fn decode_multiple() {
fn decode_multiple() {
MockTimeSource::set_time(2000 * 3600);
let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey");
let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()];
@ -385,7 +385,7 @@ async fn decode_multiple() {
}
#[test]
async fn decode_ttl() {
fn decode_ttl() {
MockTimeSource::set_time(2000 * 3600);
let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey");
MockTimeSource::set_time(2000 * 3600);
@ -409,7 +409,7 @@ async fn decode_ttl() {
}
#[test]
async fn decode_invalid() {
fn decode_invalid() {
MockTimeSource::set_time(2000 * 3600);
let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey");
assert_eq!(0, ser.decode("", None).len());
@ -422,7 +422,7 @@ async fn decode_invalid() {
}
#[test]
async fn encode_decode() {
fn encode_decode() {
MockTimeSource::set_time(2000 * 3600);
let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey");
let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()];
@ -432,7 +432,7 @@ async fn encode_decode() {
}
#[test]
async fn encode_decode_file() {
fn encode_decode_file() {
MockTimeSource::set_time(2000 * 3600);
let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey");
let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()];
@ -444,7 +444,7 @@ async fn encode_decode_file() {
}
#[test]
async fn encode_decode_cmd() {
fn encode_decode_cmd() {
MockTimeSource::set_time(2000 * 3600);
let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey");
let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()];

View File

@ -358,29 +358,25 @@ impl Config {
pub fn is_learning(&self) -> bool {
match self.mode {
Mode::Normal => {
match self.device_type {
Mode::Normal => match self.device_type {
Type::Tap => true,
Type::Tun => false
}
}
Type::Tun => false,
},
Mode::Router => false,
Mode::Switch => true,
Mode::Hub => false
Mode::Hub => false,
}
}
pub fn is_broadcasting(&self) -> bool {
match self.mode {
Mode::Normal => {
match self.device_type {
Mode::Normal => match self.device_type {
Type::Tap => true,
Type::Tun => false
}
}
Type::Tun => false,
},
Mode::Router => false,
Mode::Switch => true,
Mode::Hub => true
Mode::Hub => true,
}
}
@ -685,7 +681,7 @@ pub struct ConfigFile {
}
#[test]
async fn config_file() {
fn config_file() {
let config_file = "
device:
type: tun
@ -766,12 +762,12 @@ statsd:
}
#[test]
async fn parse_example_config() {
fn parse_example_config() {
serde_yaml::from_str::<ConfigFile>(include_str!("../assets/example.net.disabled")).unwrap();
}
#[test]
async fn config_merge() {
fn config_merge() {
let mut config = Config::default();
config.merge_file(ConfigFile {
device: Some(ConfigFileDevice {
@ -871,7 +867,9 @@ async fn config_merge() {
group: Some("root".to_string()),
..Default::default()
});
assert_eq!(config, Config {
assert_eq!(
config,
Config {
device_type: Type::Tap,
device_name: "vpncloud0".to_string(),
device_path: Some("/dev/null".to_string()),
@ -908,5 +906,6 @@ async fn config_merge() {
daemonize: true,
hook: None,
hooks: HashMap::new()
});
}
);
}

View File

@ -349,7 +349,7 @@ mod tests {
}
#[test]
async fn normal() {
fn normal() {
let config = Config { password: Some("test".to_string()), ..Default::default() };
let mut node1 = create_node(&config);
let mut node2 = create_node(&config);

View File

@ -277,7 +277,7 @@ mod tests {
use ring::aead::{self, LessSafeKey, UnboundKey};
#[test]
async fn test_nonce() {
fn test_nonce() {
let mut nonce = Nonce::zero();
assert_eq!(nonce.as_bytes(), &[0; 12]);
nonce.increment();
@ -299,17 +299,17 @@ mod tests {
}
#[test]
async fn test_encrypt_decrypt_aes128() {
fn test_encrypt_decrypt_aes128() {
test_encrypt_decrypt(&aead::AES_128_GCM)
}
#[test]
async fn test_encrypt_decrypt_aes256() {
fn test_encrypt_decrypt_aes256() {
test_encrypt_decrypt(&aead::AES_256_GCM)
}
#[test]
async fn test_encrypt_decrypt_chacha() {
fn test_encrypt_decrypt_chacha() {
test_encrypt_decrypt(&aead::CHACHA20_POLY1305)
}
@ -339,17 +339,17 @@ mod tests {
}
#[test]
async fn test_tampering_aes128() {
fn test_tampering_aes128() {
test_tampering(&aead::AES_128_GCM)
}
#[test]
async fn test_tampering_aes256() {
fn test_tampering_aes256() {
test_tampering(&aead::AES_256_GCM)
}
#[test]
async fn test_tampering_chacha() {
fn test_tampering_chacha() {
test_tampering(&aead::CHACHA20_POLY1305)
}
@ -380,17 +380,17 @@ mod tests {
}
#[test]
async fn test_nonce_pinning_aes128() {
fn test_nonce_pinning_aes128() {
test_nonce_pinning(&aead::AES_128_GCM)
}
#[test]
async fn test_nonce_pinning_aes256() {
fn test_nonce_pinning_aes256() {
test_nonce_pinning(&aead::AES_256_GCM)
}
#[test]
async fn test_nonce_pinning_chacha() {
fn test_nonce_pinning_chacha() {
test_nonce_pinning(&aead::CHACHA20_POLY1305)
}
@ -429,39 +429,39 @@ mod tests {
}
#[test]
async fn test_key_rotation_aes128() {
fn test_key_rotation_aes128() {
test_key_rotation(&aead::AES_128_GCM);
}
#[test]
async fn test_key_rotation_aes256() {
fn test_key_rotation_aes256() {
test_key_rotation(&aead::AES_256_GCM);
}
#[test]
async fn test_key_rotation_chacha() {
fn test_key_rotation_chacha() {
test_key_rotation(&aead::CHACHA20_POLY1305);
}
#[test]
async fn test_core_size() {
fn test_core_size() {
assert_eq!(2400, mem::size_of::<CryptoCore>());
}
#[test]
async fn test_speed_aes128() {
fn test_speed_aes128() {
let speed = test_speed(&aead::AES_128_GCM, &Duration::from_secs_f32(0.2));
assert!(speed > 10.0);
}
#[test]
async fn test_speed_aes256() {
fn test_speed_aes256() {
let speed = test_speed(&aead::AES_256_GCM, &Duration::from_secs_f32(0.2));
assert!(speed > 10.0);
}
#[test]
async fn test_speed_chacha() {
fn test_speed_chacha() {
let speed = test_speed(&aead::CHACHA20_POLY1305, &Duration::from_secs_f32(0.2));
assert!(speed > 10.0);
}

View File

@ -731,7 +731,7 @@ mod tests {
}
#[test]
async fn normal_init() {
fn normal_init() {
let (mut sender, mut receiver) = create_pair();
let mut out = MsgBuffer::new(8);
sender.send_ping(&mut out);
@ -753,7 +753,7 @@ mod tests {
}
#[test]
async fn lost_init_sender_recovers() {
fn lost_init_sender_recovers() {
let (mut sender, mut receiver) = create_pair();
let mut out = MsgBuffer::new(8);
sender.send_ping(&mut out);
@ -786,7 +786,7 @@ mod tests {
}
#[test]
async fn lost_init_receiver_recovers() {
fn lost_init_receiver_recovers() {
let (mut sender, mut receiver) = create_pair();
let mut out = MsgBuffer::new(8);
sender.send_ping(&mut out);
@ -818,7 +818,7 @@ mod tests {
}
#[test]
async fn timeout() {
fn timeout() {
let (mut sender, _receiver) = create_pair();
let mut out = MsgBuffer::new(8);
sender.send_ping(&mut out);
@ -833,7 +833,7 @@ mod tests {
}
#[test]
async fn untrusted_peer() {
fn untrusted_peer() {
let (mut sender, _) = create_pair();
let (_, mut receiver) = create_pair();
let mut out = MsgBuffer::new(8);
@ -843,7 +843,7 @@ mod tests {
}
#[test]
async fn manipulated_message() {
fn manipulated_message() {
let (mut sender, mut receiver) = create_pair();
let mut out = MsgBuffer::new(8);
sender.send_ping(&mut out);
@ -853,7 +853,7 @@ mod tests {
}
#[test]
async fn connect_to_self() {
fn connect_to_self() {
let (mut sender, _) = create_pair();
let mut out = MsgBuffer::new(8);
sender.send_ping(&mut out);
@ -881,7 +881,7 @@ mod tests {
}
#[test]
async fn algorithm_negotiation() {
fn algorithm_negotiation() {
// Equal algorithms
test_algorithm_negotiation(
Algorithms {

View File

@ -228,7 +228,7 @@ mod tests {
}
#[test]
async fn test_encode_decode_message() {
fn test_encode_decode_message() {
let mut data = Vec::with_capacity(100);
let (_, key) = RotationState::create_key();
let msg = RotationMessage { message_id: 1, propose: key, confirm: None };
@ -249,7 +249,7 @@ mod tests {
}
#[test]
async fn test_normal_rotation() {
fn test_normal_rotation() {
let mut out1 = MsgBuffer::new(8);
let mut out2 = MsgBuffer::new(8);
@ -323,7 +323,7 @@ mod tests {
}
#[test]
async fn test_duplication() {
fn test_duplication() {
let mut out1 = MsgBuffer::new(8);
let mut out2 = MsgBuffer::new(8);
@ -359,7 +359,7 @@ mod tests {
}
#[test]
async fn test_lost_message() {
fn test_lost_message() {
let mut out1 = MsgBuffer::new(8);
let mut out2 = MsgBuffer::new(8);
@ -385,7 +385,7 @@ mod tests {
}
#[test]
async fn test_reflect_back() {
fn test_reflect_back() {
let mut out1 = MsgBuffer::new(8);
let mut out2 = MsgBuffer::new(8);

View File

@ -2,23 +2,9 @@
// Copyright (C) 2015-2021 Dennis Schwerdel
// This software is licensed under GPL-3 or newer (see LICENSE.md)
use async_trait::async_trait;
use parking_lot::Mutex;
use std::{
cmp,
collections::VecDeque,
convert::TryInto,
fmt,
io::{self, Cursor, Read, Write, Error as IoError, BufReader, BufRead},
net::{Ipv4Addr, UdpSocket},
fs::{self, File},
os::unix::io::AsRawFd,
str,
str::FromStr,
sync::Arc,
};
use tokio::fs::{File as AsyncFile};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::{cmp, collections::VecDeque, convert::TryInto, fmt, fs::{self, File}, io::{self, Cursor, Read, Write, Error as IoError, BufReader, BufRead}, net::{Ipv4Addr, UdpSocket}, os::unix::io::AsRawFd, str, str::FromStr, sync::Arc, time::Duration};
use timeout_io::Reader;
use crate::{crypto, error::Error, util::MsgBuffer};
@ -79,7 +65,6 @@ impl FromStr for Type {
}
}
#[async_trait]
pub trait Device: Send + 'static + Sized {
/// Returns the type of this device
fn get_type(&self) -> Type;
@ -95,7 +80,7 @@ pub trait Device: Send + 'static + Sized {
///
/// # Errors
/// This method will return an error if the underlying read call fails.
async fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error>;
fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error>;
/// Writes a packet/frame to the device
///
@ -106,9 +91,9 @@ pub trait Device: Send + 'static + Sized {
///
/// # Errors
/// This method will return an error if the underlying read call fails.
async fn write(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error>;
fn write(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error>;
async fn duplicate(&self) -> Result<Self, Error>;
fn duplicate(&self) -> Result<Self, Error>;
fn get_ip(&self) -> Result<Ipv4Addr, Error>;
}
@ -218,23 +203,6 @@ impl TunTapDevice {
}
Ok(())
}
}
/// Represents a tun/tap device
pub struct AsyncTunTapDevice {
fd: AsyncFile,
ifname: String,
type_: Type,
}
impl AsyncTunTapDevice {
pub fn from_sync(dev: TunTapDevice) -> Self {
Self {
fd: AsyncFile::from_std(dev.fd),
ifname: dev.ifname,
type_: dev.type_
}
}
#[cfg(any(target_os = "linux", target_os = "android"))]
#[inline]
@ -286,32 +254,32 @@ impl AsyncTunTapDevice {
}
}
#[async_trait]
impl Device for AsyncTunTapDevice {
impl Device for TunTapDevice {
fn get_type(&self) -> Type {
self.type_
}
async fn duplicate(&self) -> Result<Self, Error> {
fn duplicate(&self) -> Result<Self, Error> {
Ok(Self {
fd: self.fd.try_clone().await.map_err(|e| Error::DeviceIo("Failed to clone device", e))?,
fd: self.fd.try_clone().map_err(|e| Error::DeviceIo("Failed to clone device", e))?,
ifname: self.ifname.clone(),
type_: self.type_,
})
}
async fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> {
fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> {
buffer.clear();
let read = self.fd.read(buffer.buffer()).await.map_err(|e| Error::DeviceIo("Read error", e))?;
let mut read = 0;
self.fd.try_read(buffer.buffer(), &mut read, Duration::from_secs(1)).map_err(|e| Error::DeviceRead(e))?;
buffer.set_length(read);
self.correct_data_after_read(buffer);
Ok(())
}
async fn write(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> {
fn write(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> {
self.correct_data_before_write(buffer);
match self.fd.write_all(buffer.message()).await {
Ok(_) => self.fd.flush().await.map_err(|e| Error::DeviceIo("Flush error", e)),
match self.fd.write_all(buffer.message()) {
Ok(_) => self.fd.flush().map_err(|e| Error::DeviceIo("Flush error", e)),
Err(e) => Err(Error::DeviceIo("Write error", e)),
}
}
@ -345,9 +313,8 @@ impl MockDevice {
}
}
#[async_trait]
impl Device for MockDevice {
async fn duplicate(&self) -> Result<Self, Error> {
fn duplicate(&self) -> Result<Self, Error> {
Ok(self.clone())
}
@ -355,7 +322,7 @@ impl Device for MockDevice {
Type::Tun
}
async fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> {
fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> {
if let Some(data) = self.inbound.lock().pop_front() {
buffer.clear();
buffer.set_length(data.len());
@ -366,7 +333,7 @@ impl Device for MockDevice {
}
}
async fn write(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> {
fn write(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> {
self.outbound.lock().push_back(buffer.message().into());
Ok(())
}

View File

@ -1,10 +1,11 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::{fs::File, hash::BuildHasherDefault};
use tokio;
use fnv::FnvHasher;
use crate::util::CtrlC;
use crate::{
config::Config,
crypto::PeerCrypto,
@ -46,7 +47,7 @@ pub struct GenericCloud<D: Device, P: Protocol, S: Socket, TS: TimeSource> {
impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS> {
#[allow(clippy::too_many_arguments)]
pub async fn new(
pub fn new(
config: &Config, socket: S, device: D, port_forwarding: Option<PortForwarding>, stats_file: Option<File>,
) -> Result<Self, Error> {
let table = SharedTable::<TS>::new(&config);
@ -55,7 +56,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
let running = Arc::new(AtomicBool::new(true));
let device_thread = DeviceThread::<S, D, P, TS>::new(
config.clone(),
device.duplicate().await?,
device.duplicate()?,
socket.clone(),
traffic.clone(),
peer_crypto.clone(),
@ -73,7 +74,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
stats_file,
running.clone(),
);
socket_thread.housekeep().await?;
socket_thread.housekeep()?;
Ok(Self { socket_thread, device_thread, running })
}
@ -90,17 +91,19 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
Ok(())
}
pub async fn run(self) {
pub fn run(self) {
debug!("Starting threads");
let running = self.running.clone();
let device_thread_handle = tokio::spawn(self.device_thread.run());
let socket_thread_handle = tokio::spawn(self.socket_thread.run());
try_fail!(tokio::signal::ctrl_c().await, "Failed to set ctrl-c handler: {}");
let device = self.device_thread;
let device_thread_handle = thread::spawn(move || device.run());
let socket = self.socket_thread;
let socket_thread_handle = thread::spawn(move || socket.run());
let ctrlc = CtrlC::new();
ctrlc.wait();
running.store(false, Ordering::SeqCst);
debug!("Waiting for threads to end");
let (dev_ret, sock_ret) = join!(device_thread_handle, socket_thread_handle);
dev_ret.unwrap();
sock_ret.unwrap();
device_thread_handle.join().unwrap();
socket_thread_handle.join().unwrap();
debug!("Threads stopped");
}
}
@ -124,21 +127,21 @@ impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
&mut self.device_thread.device
}
pub async fn connect(&mut self, addr: SocketAddr) -> Result<(), Error> {
self.socket_thread.connect(addr).await
pub fn connect(&mut self, addr: SocketAddr) -> Result<(), Error> {
self.socket_thread.connect(addr)
}
pub async fn trigger_socket_event(&mut self) {
self.socket_thread.iteration().await
pub fn trigger_socket_event(&mut self) {
self.socket_thread.iteration()
}
pub async fn trigger_device_event(&mut self) {
self.device_thread.iteration().await
pub fn trigger_device_event(&mut self) {
self.device_thread.iteration()
}
pub async fn trigger_housekeep(&mut self) {
try_fail!(self.socket_thread.housekeep().await, "Housekeep failed: {}");
try_fail!(self.device_thread.housekeep().await, "Housekeep failed: {}");
pub fn trigger_housekeep(&mut self) {
try_fail!(self.socket_thread.housekeep(), "Housekeep failed: {}");
try_fail!(self.device_thread.housekeep(), "Housekeep failed: {}");
}
pub fn is_connected(&self, addr: &SocketAddr) -> bool {
@ -149,7 +152,7 @@ impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
&self.socket_thread.own_addresses
}
pub async fn get_num(&self) -> usize {
self.socket_thread.socket.address().await.unwrap().port() as usize
pub fn get_num(&self) -> usize {
self.socket_thread.socket.address().unwrap().port() as usize
}
}

View File

@ -14,7 +14,6 @@ use crate::{
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{marker::PhantomData, net::SocketAddr};
use tokio::time::timeout;
pub struct DeviceThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
// Read-only fields
@ -56,11 +55,11 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
}
#[inline]
async fn send_to(&mut self, addr: SocketAddr) -> Result<(), Error> {
fn send_to(&mut self, addr: SocketAddr) -> Result<(), Error> {
let size = self.buffer.len();
debug!("Sending msg with {} bytes to {}", size, addr);
self.traffic.count_out_traffic(addr, size);
match self.socket.send(self.buffer.message(), addr).await {
match self.socket.send(self.buffer.message(), addr) {
Ok(written) if written == size => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
@ -68,15 +67,15 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
}
#[inline]
async fn send_msg(&mut self, addr: SocketAddr, type_: u8) -> Result<(), Error> {
fn send_msg(&mut self, addr: SocketAddr, type_: u8) -> Result<(), Error> {
debug!("Sending msg with {} bytes to {}", self.buffer.len(), addr);
self.buffer.prepend_byte(type_);
self.peer_crypto.encrypt_for(addr, &mut self.buffer)?;
self.send_to(addr).await
self.send_to(addr)
}
#[inline]
async fn broadcast_msg(&mut self, type_: u8) -> Result<(), Error> {
fn broadcast_msg(&mut self, type_: u8) -> Result<(), Error> {
let size = self.buffer.len();
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, size, self.peer_crypto.count());
let traffic = &mut self.traffic;
@ -90,8 +89,8 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
if let Some(crypto) = crypto {
crypto.encrypt(&mut self.broadcast_buffer);
}
traffic.count_out_traffic(addr, self.broadcast_buffer.len());
match socket.send(self.broadcast_buffer.message(), addr).await {
traffic.count_out_traffic(*addr, self.broadcast_buffer.len());
match socket.send(self.broadcast_buffer.message(), *addr) {
Ok(written) if written == self.broadcast_buffer.len() => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
@ -100,7 +99,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
Ok(())
}
async fn forward_packet(&mut self) -> Result<(), Error> {
fn forward_packet(&mut self) -> Result<(), Error> {
let (src, dst) = P::parse(self.buffer.message())?;
debug!("Read data from interface: src: {}, dst: {}, {} bytes", src, dst, self.buffer.len());
self.traffic.count_out_payload(dst, src, self.buffer.len());
@ -108,12 +107,12 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
Some(addr) => {
// Peer found for destination
debug!("Found destination for {} => {}", dst, addr);
self.send_msg(addr, MESSAGE_TYPE_DATA).await?;
self.send_msg(addr, MESSAGE_TYPE_DATA)?;
}
None => {
if self.broadcast {
debug!("No destination for {} found, broadcasting", dst);
self.broadcast_msg(MESSAGE_TYPE_DATA).await?;
self.broadcast_msg(MESSAGE_TYPE_DATA)?;
} else {
debug!("No destination for {} found, dropping", dst);
self.traffic.count_dropped_payload(self.buffer.len());
@ -123,35 +122,35 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
Ok(())
}
pub async fn housekeep(&mut self) -> Result<(), Error> {
pub fn housekeep(&mut self) -> Result<(), Error> {
self.peer_crypto.load();
self.table.sync();
self.traffic.sync();
Ok(())
}
pub async fn iteration(&mut self) {
if let Ok(result) = timeout(std::time::Duration::from_millis(1000), self.device.read(&mut self.buffer)).await {
try_fail!(result, "Failed to read from device: {}");
if let Err(e) = self.forward_packet().await {
pub fn iteration(&mut self) {
if self.device.read(&mut self.buffer).is_ok() {
//try_fail!(result, "Failed to read from device: {}");
if let Err(e) = self.forward_packet() {
error!("{}", e);
}
}
let now = TS::now();
if self.next_housekeep < now {
if let Err(e) = self.housekeep().await {
if let Err(e) = self.housekeep() {
error!("{}", e)
}
self.next_housekeep = now + 1
}
}
pub async fn run(mut self) {
pub fn run(mut self) {
loop {
self.iteration().await;
self.iteration();
if !self.running.load(Ordering::SeqCst) {
debug!("Device: end");
return
return;
}
}
}

View File

@ -21,84 +21,99 @@ use super::common::PeerData;
#[derive(Clone)]
pub struct SharedPeerCrypto {
peers: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>,
//TODO: local hashmap as cache
cache: HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>, //TODO: local hashmap as cache
}
impl SharedPeerCrypto {
pub fn new() -> Self {
SharedPeerCrypto { peers: Arc::new(Mutex::new(HashMap::default())) }
SharedPeerCrypto { peers: Arc::new(Mutex::new(HashMap::default())), cache: HashMap::default() }
}
pub fn encrypt_for(&self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
//TODO: use cache first
let mut peers = self.peers.lock();
match peers.get_mut(&peer) {
None => Err(Error::InvalidCryptoState("No crypto found for peer")),
Some(None) => Ok(()),
Some(Some(crypto)) => {
pub fn encrypt_for(&mut self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
let crypto = match self.cache.get(&peer) {
Some(crypto) => crypto,
None => {
let peers = self.peers.lock();
if let Some(crypto) = peers.get(&peer) {
self.cache.insert(peer, crypto.clone());
self.cache.get(&peer).unwrap()
} else {
return Err(Error::InvalidCryptoState("No crypto found for peer"));
}
}
};
if let Some(crypto) = crypto {
crypto.encrypt(data);
}
Ok(())
}
}
}
pub fn store(&self, data: &HashMap<SocketAddr, PeerData, Hash>) {
//TODO: store in shared and in cache
pub fn store(&mut self, data: &HashMap<SocketAddr, PeerData, Hash>) {
self.cache.clear();
self.cache.extend(data.iter().map(|(k, v)| (*k, v.crypto.get_core())));
let mut peers = self.peers.lock();
peers.clear();
peers.extend(data.iter().map(|(k, v)| (*k, v.crypto.get_core())));
peers.extend(self.cache.iter().map(|(k, v)| (*k, v.clone())));
}
pub fn load(&mut self) {
// TODO sync if needed
let peers = self.peers.lock();
self.cache.clear();
self.cache.extend(peers.iter().map(|(k, v)| (*k, v.clone())));
}
pub fn get_snapshot(&self) -> HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash> {
//TODO: return local cache
self.peers.lock().clone()
pub fn get_snapshot(&mut self) -> &HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash> {
&self.cache
}
pub fn count(&self) -> usize {
self.peers.lock().len()
self.cache.len()
}
}
#[derive(Clone)]
pub struct SharedTraffic {
cache: TrafficStats,
traffic: Arc<Mutex<TrafficStats>>,
}
impl Clone for SharedTraffic {
fn clone(&self) -> Self {
Self { cache: TrafficStats::default(), traffic: self.traffic.clone() }
}
}
impl SharedTraffic {
pub fn new() -> Self {
Self { traffic: Arc::new(Mutex::new(Default::default())) }
Self { cache: TrafficStats::default(), traffic: Arc::new(Mutex::new(Default::default())) }
}
pub fn sync(&mut self) {
// TODO sync if needed
self.traffic.lock().add(&self.cache);
self.cache.clear();
}
pub fn count_out_traffic(&self, peer: SocketAddr, bytes: usize) {
self.traffic.lock().count_out_traffic(peer, bytes);
pub fn count_out_traffic(&mut self, peer: SocketAddr, bytes: usize) {
self.cache.count_out_traffic(peer, bytes);
}
pub fn count_in_traffic(&self, peer: SocketAddr, bytes: usize) {
self.traffic.lock().count_in_traffic(peer, bytes);
pub fn count_in_traffic(&mut self, peer: SocketAddr, bytes: usize) {
self.cache.count_in_traffic(peer, bytes);
}
pub fn count_out_payload(&self, remote: Address, local: Address, bytes: usize) {
self.traffic.lock().count_out_payload(remote, local, bytes);
pub fn count_out_payload(&mut self, remote: Address, local: Address, bytes: usize) {
self.cache.count_out_payload(remote, local, bytes);
}
pub fn count_in_payload(&self, remote: Address, local: Address, bytes: usize) {
self.traffic.lock().count_in_payload(remote, local, bytes);
pub fn count_in_payload(&mut self, remote: Address, local: Address, bytes: usize) {
self.cache.count_in_payload(remote, local, bytes);
}
pub fn count_dropped_payload(&self, bytes: usize) {
self.traffic.lock().count_dropped_payload(bytes);
pub fn count_dropped_payload(&mut self, bytes: usize) {
self.cache.count_dropped_payload(bytes);
}
pub fn count_invalid_protocol(&self, bytes: usize) {
self.traffic.lock().count_invalid_protocol(bytes);
pub fn count_invalid_protocol(&mut self, bytes: usize) {
self.cache.count_invalid_protocol(bytes);
}
pub fn period(&mut self, cleanup_idle: Option<usize>) {
@ -125,60 +140,60 @@ impl SharedTraffic {
#[derive(Clone)]
pub struct SharedTable<TS: TimeSource> {
table: Arc<Mutex<ClaimTable<TS>>>,
//TODO: local reader lookup table Addr => Option<SocketAddr>
//TODO: local writer cache Addr => SocketAddr
cache: HashMap<Address, Option<SocketAddr>, Hash>,
}
impl<TS: TimeSource> SharedTable<TS> {
pub fn new(config: &Config) -> Self {
let table = ClaimTable::new(config.switch_timeout as Duration, config.peer_timeout as Duration);
SharedTable { table: Arc::new(Mutex::new(table)) }
SharedTable { table: Arc::new(Mutex::new(table)), cache: Default::default() }
}
pub fn sync(&mut self) {
// TODO sync if needed
// once every x seconds
// fetch reader cache
// clear writer cache
self.cache.clear();
}
pub fn lookup(&mut self, addr: Address) -> Option<SocketAddr> {
// TODO: use local reader cache
if let Some(val) = self.cache.get(&addr) {
return *val;
}
// if not found, use shared table and put into cache
self.table.lock().lookup(addr)
let val = self.table.lock().lookup(addr);
self.cache.insert(addr, val);
val
}
pub fn set_claims(&mut self, peer: SocketAddr, claims: RangeList) {
// clear writer cache
self.table.lock().set_claims(peer, claims)
self.table.lock().set_claims(peer, claims);
self.cache.clear();
}
pub fn remove_claims(&mut self, peer: SocketAddr) {
// clear writer cache
self.table.lock().remove_claims(peer)
self.table.lock().remove_claims(peer);
self.cache.clear();
}
pub fn cache(&mut self, addr: Address, peer: SocketAddr) {
// check writer cache and only write real updates to shared table
self.table.lock().cache(addr, peer)
if self.cache.get(&addr) != Some(&Some(peer)) {
self.table.lock().cache(addr, peer);
self.cache.insert(addr, Some(peer));
}
}
pub fn housekeep(&mut self) {
self.table.lock().housekeep()
self.table.lock().housekeep();
self.cache.clear();
}
pub fn write_out<W: Write>(&self, out: &mut W) -> Result<(), io::Error> {
//TODO: stats call
self.table.lock().write_out(out)
}
pub fn cache_len(&self) -> usize {
//TODO: stats call
self.table.lock().cache_len()
}
pub fn claim_len(&self) -> usize {
//TODO: stats call
self.table.lock().claim_len()
}
}

View File

@ -35,7 +35,6 @@ use std::{
net::{SocketAddr, ToSocketAddrs},
str::FromStr,
};
use tokio::time::timeout;
const MAX_RECONNECT_INTERVAL: u16 = 3600;
const RESOLVE_INTERVAL: Time = 300;
@ -151,11 +150,11 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
#[inline]
async fn send_to(&mut self, addr: SocketAddr) -> Result<(), Error> {
fn send_to(&mut self, addr: SocketAddr) -> Result<(), Error> {
let size = self.buffer.len();
debug!("Sending msg with {} bytes to {}", size, addr);
self.traffic.count_out_traffic(addr, size);
match self.socket.send(self.buffer.message(), addr).await {
match self.socket.send(self.buffer.message(), addr) {
Ok(written) if written == size => {
self.buffer.clear();
Ok(())
@ -166,7 +165,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
#[inline]
async fn broadcast_msg(&mut self, type_: u8) -> Result<(), Error> {
fn broadcast_msg(&mut self, type_: u8) -> Result<(), Error> {
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, self.buffer.len(), self.peers.len());
for (addr, peer) in &mut self.peers {
self.broadcast_buffer.set_start(self.buffer.get_start());
@ -175,7 +174,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
self.broadcast_buffer.prepend_byte(type_);
peer.crypto.encrypt_message(&mut self.broadcast_buffer);
self.traffic.count_out_traffic(*addr, self.broadcast_buffer.len());
match self.socket.send(self.broadcast_buffer.message(), *addr).await {
match self.socket.send(self.broadcast_buffer.message(), *addr) {
Ok(written) if written == self.broadcast_buffer.len() => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
@ -185,7 +184,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
Ok(())
}
async fn connect_sock(&mut self, addr: SocketAddr) -> Result<(), Error> {
fn connect_sock(&mut self, addr: SocketAddr) -> Result<(), Error> {
let addr = mapped_addr(addr);
if self.peers.contains_key(&addr)
|| self.own_addresses.contains(&addr)
@ -198,10 +197,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
let mut init = self.crypto.peer_instance(payload);
init.send_ping(&mut self.buffer);
self.pending_inits.insert(addr, init);
self.send_to(addr).await
self.send_to(addr)
}
pub async fn connect<Addr: ToSocketAddrs + fmt::Debug + Clone>(&mut self, addr: Addr) -> Result<(), Error> {
pub fn connect<Addr: ToSocketAddrs + fmt::Debug + Clone>(&mut self, addr: Addr) -> Result<(), Error> {
let addrs = resolve(&addr)?.into_iter().map(mapped_addr).collect::<SmallVec<[SocketAddr; 3]>>();
for addr in &addrs {
if self.own_addresses.contains(addr)
@ -214,7 +213,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
// Send a message to each resolved address
for a in addrs {
// Ignore error this time
self.connect_sock(a).await.ok();
self.connect_sock(a).ok();
}
Ok(())
}
@ -238,7 +237,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
}
async fn update_peer_info(&mut self, addr: SocketAddr, info: Option<NodeInfo>) -> Result<(), Error> {
fn update_peer_info(&mut self, addr: SocketAddr, info: Option<NodeInfo>) -> Result<(), Error> {
if let Some(peer) = self.peers.get_mut(&addr) {
peer.last_seen = TS::now();
peer.timeout = TS::now() + self.config.peer_timeout as Time;
@ -260,12 +259,12 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
debug!("Adding claims of peer {}: {:?}", addr_nice(addr), info.claims);
self.table.set_claims(addr, info.claims);
debug!("Received {} peers from {}: {:?}", info.peers.len(), addr_nice(addr), info.peers);
self.connect_to_peers(&info.peers).await?;
self.connect_to_peers(&info.peers)?;
}
Ok(())
}
async fn add_new_peer(&mut self, addr: SocketAddr, info: NodeInfo) -> Result<(), Error> {
fn add_new_peer(&mut self, addr: SocketAddr, info: NodeInfo) -> Result<(), Error> {
info!("Added peer {}", addr_nice(addr));
if let Some(init) = self.pending_inits.remove(&addr) {
self.buffer.clear();
@ -281,9 +280,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
timeout: TS::now() + self.config.peer_timeout as Time,
},
);
self.update_peer_info(addr, Some(info)).await?;
self.update_peer_info(addr, Some(info))?;
if !self.buffer.is_empty() {
self.send_to(addr).await?;
self.send_to(addr)?;
}
self.peer_crypto.store(&self.peers);
} else {
@ -292,7 +291,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
Ok(())
}
async fn connect_to_peers(&mut self, peers: &[PeerInfo]) -> Result<(), Error> {
fn connect_to_peers(&mut self, peers: &[PeerInfo]) -> Result<(), Error> {
'outer: for peer in peers {
for addr in &peer.addrs {
if self.peers.contains_key(addr) {
@ -315,7 +314,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
}
}
self.connect(&peer.addrs as &[SocketAddr]).await?;
self.connect(&peer.addrs as &[SocketAddr])?;
}
Ok(())
}
@ -328,12 +327,12 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
}
async fn handle_payload_from(&mut self, peer: SocketAddr) -> Result<(), Error> {
fn handle_payload_from(&mut self, peer: SocketAddr) -> Result<(), Error> {
let (src, dst) = P::parse(self.buffer.message())?;
let len = self.buffer.len();
debug!("Writing data to device: {} bytes", len);
self.traffic.count_in_payload(src, dst, len);
if let Err(e) = self.device.write(&mut self.buffer).await {
if let Err(e) = self.device.write(&mut self.buffer) {
error!("Failed to send via device: {}", e);
return Err(e);
}
@ -345,10 +344,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
Ok(())
}
async fn process_message(&mut self, src: SocketAddr, msg_result: MessageResult) -> Result<(), Error> {
fn process_message(&mut self, src: SocketAddr, msg_result: MessageResult) -> Result<(), Error> {
match msg_result {
MessageResult::Message(type_) => match type_ {
MESSAGE_TYPE_DATA => self.handle_payload_from(src).await?,
MESSAGE_TYPE_DATA => self.handle_payload_from(src)?,
MESSAGE_TYPE_NODE_INFO => {
let info = match NodeInfo::decode(Cursor::new(self.buffer.message())) {
Ok(val) => val,
@ -358,11 +357,11 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
};
self.buffer.clear();
self.update_peer_info(src, Some(info)).await?;
self.update_peer_info(src, Some(info))?;
}
MESSAGE_TYPE_KEEPALIVE => {
self.buffer.clear();
self.update_peer_info(src, None).await?;
self.update_peer_info(src, None)?;
}
MESSAGE_TYPE_CLOSE => {
self.buffer.clear();
@ -374,7 +373,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
return Err(Error::Message("Unknown message type"));
}
},
MessageResult::Reply => self.send_to(src).await?,
MessageResult::Reply => self.send_to(src)?,
MessageResult::None => {
self.buffer.clear();
}
@ -382,13 +381,13 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
Ok(())
}
async fn handle_message(&mut self, src: SocketAddr) -> Result<(), Error> {
fn handle_message(&mut self, src: SocketAddr) -> Result<(), Error> {
let src = mapped_addr(src);
debug!("Received {} bytes from {}", self.buffer.len(), src);
let buffer = &mut self.buffer;
self.traffic.count_in_traffic(src, buffer.len());
if let Some(result) = self.peers.get_mut(&src).map(|peer| peer.crypto.handle_message(buffer)) {
return self.process_message(src, result?).await;
return self.process_message(src, result?);
}
let is_init = is_init_message(buffer.message());
if let Some(result) = self.pending_inits.get_mut(&src).map(|init| {
@ -401,10 +400,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
}) {
if !buffer.is_empty() {
self.send_to(src).await?
self.send_to(src)?
}
if let InitResult::Success { peer_payload, .. } = result? {
self.add_new_peer(src, peer_payload).await?
self.add_new_peer(src, peer_payload)?
}
return Ok(());
}
@ -419,7 +418,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
match msg_result {
Ok(_) => {
self.pending_inits.insert(src, init);
self.send_to(src).await
self.send_to(src)
}
Err(err) => {
self.traffic.count_invalid_protocol(self.buffer.len());
@ -428,7 +427,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
}
pub async fn housekeep(&mut self) -> Result<(), Error> {
pub fn housekeep(&mut self) -> Result<(), Error> {
let now = TS::now();
let mut del: SmallVec<[SocketAddr; 3]> = SmallVec::new();
for (&addr, ref data) in &self.peers {
@ -440,10 +439,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
info!("Forgot peer {} due to timeout", addr_nice(addr));
self.peers.remove(&addr);
self.table.remove_claims(addr);
self.connect_sock(addr).await?; // Try to reconnect
self.connect_sock(addr)?; // Try to reconnect
}
self.table.housekeep();
self.crypto_housekeep().await?;
self.crypto_housekeep()?;
// Periodically extend the port-forwarding
if let Some(ref mut pfw) = self.port_forwarding {
pfw.check_extend();
@ -454,29 +453,29 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
debug!("Send peer list to all peers");
let info = self.create_node_info();
info.encode(&mut self.buffer);
self.broadcast_msg(MESSAGE_TYPE_NODE_INFO).await?;
self.broadcast_msg(MESSAGE_TYPE_NODE_INFO)?;
// Reschedule for next update
let min_peer_timeout = self.peers.iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT);
let interval = min(self.update_freq as u16, max(min_peer_timeout / 2 - 60, 1));
self.next_peers = now + Time::from(interval);
}
self.reconnect_to_peers().await?;
self.reconnect_to_peers()?;
if self.next_stats_out < now {
// Write out the statistics
self.write_out_stats().map_err(|err| Error::FileIo("Failed to write stats file", err))?;
self.send_stats_to_statsd().await?;
self.send_stats_to_statsd()?;
self.next_stats_out = now + STATS_INTERVAL;
self.traffic.period(Some(5));
}
if let Some(peers) = self.beacon_serializer.get_cmd_results() {
debug!("Loaded beacon with peers: {:?}", peers);
for peer in peers {
self.connect_sock(peer).await?;
self.connect_sock(peer)?;
}
}
if self.next_beacon < now {
self.store_beacon()?;
self.load_beacon().await?;
self.load_beacon()?;
self.next_beacon = now + Time::from(self.config.beacon_interval);
}
self.table.sync();
@ -484,42 +483,42 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
self.peer_crypto.store(&self.peers);
// Periodically reset own peers
if self.next_own_address_reset <= now {
self.reset_own_addresses().await.map_err(|err| Error::SocketIo("Failed to get own addresses", err))?;
self.reset_own_addresses().map_err(|err| Error::SocketIo("Failed to get own addresses", err))?;
self.next_own_address_reset = now + OWN_ADDRESS_RESET_INTERVAL;
}
assert!(self.buffer.is_empty());
Ok(())
}
async fn crypto_housekeep(&mut self) -> Result<(), Error> {
fn crypto_housekeep(&mut self) -> Result<(), Error> {
let mut del: SmallVec<[SocketAddr; 4]> = smallvec![];
for addr in self.pending_inits.keys().copied().collect::<SmallVec<[SocketAddr; 4]>>() {
self.buffer.clear();
if self.pending_inits.get_mut(&addr).unwrap().every_second(&mut self.buffer).is_err() {
del.push(addr)
} else if !self.buffer.is_empty() {
self.send_to(addr).await?
self.send_to(addr)?
}
}
for addr in self.peers.keys().copied().collect::<SmallVec<[SocketAddr; 16]>>() {
self.buffer.clear();
self.peers.get_mut(&addr).unwrap().crypto.every_second(&mut self.buffer);
if !self.buffer.is_empty() {
self.send_to(addr).await?
self.send_to(addr)?
}
}
for addr in del {
self.pending_inits.remove(&addr);
if self.peers.remove(&addr).is_some() {
self.connect_sock(addr).await?;
self.connect_sock(addr)?;
}
}
Ok(())
}
async fn reset_own_addresses(&mut self) -> io::Result<()> {
fn reset_own_addresses(&mut self) -> io::Result<()> {
self.own_addresses.clear();
let socket_addr = self.socket.address().await.map(mapped_addr)?;
let socket_addr = self.socket.address().map(mapped_addr)?;
// 1) Specified advertise addresses
for addr in &self.config.advertise_addresses {
self.own_addresses.push(parse_listen(addr, socket_addr.port()));
@ -555,7 +554,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
/// Loads the beacon
async fn load_beacon(&mut self) -> Result<(), Error> {
fn load_beacon(&mut self) -> Result<(), Error> {
let peers;
if let Some(ref path) = self.config.beacon_load {
if let Some(path) = path.strip_prefix('|') {
@ -574,7 +573,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
debug!("Loaded beacon with peers: {:?}", peers);
for peer in peers {
self.connect_sock(peer).await?;
self.connect_sock(peer)?;
}
Ok(())
}
@ -606,7 +605,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
/// Sends the statistics to a statsd endpoint
async fn send_stats_to_statsd(&mut self) -> Result<(), Error> {
fn send_stats_to_statsd(&mut self) -> Result<(), Error> {
if let Some(ref endpoint) = self.statsd_server {
let peer_traffic = self.traffic.total_peer_traffic();
let payload_traffic = self.traffic.total_payload_traffic();
@ -652,7 +651,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
let msg_data = msg.as_bytes();
let addrs = resolve(endpoint)?;
if let Some(addr) = addrs.first() {
match self.socket.send(msg_data, *addr).await {
match self.socket.send(msg_data, *addr) {
Ok(written) if written == msg_data.len() => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
@ -664,14 +663,14 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
Ok(())
}
async fn reconnect_to_peers(&mut self) -> Result<(), Error> {
fn reconnect_to_peers(&mut self) -> Result<(), Error> {
let now = TS::now();
// Connect to those reconnect_peers that are due
for entry in self.reconnect_peers.clone() {
if entry.next > now {
continue;
}
self.connect(&entry.resolved as &[SocketAddr]).await?;
self.connect(&entry.resolved as &[SocketAddr])?;
}
for entry in &mut self.reconnect_peers {
// Schedule for next second if node is connected
@ -717,11 +716,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
Ok(())
}
pub async fn iteration(&mut self) {
if let Ok(result) = timeout(std::time::Duration::from_millis(1000), self.socket.receive(&mut self.buffer)).await
pub fn iteration(&mut self) {
if let Ok(src) = self.socket.receive(&mut self.buffer)
{
let src = try_fail!(result, "Failed to read from network socket: {}");
match self.handle_message(src).await {
match self.handle_message(src) {
Err(e @ Error::CryptoInitFatal(_)) => {
debug!("Fatal crypto init error from {}: {}", src, e);
info!("Closing pending connection to {} due to error in crypto init", addr_nice(src));
@ -743,7 +741,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
let now = TS::now();
if self.next_housekeep < now {
if let Err(e) = self.housekeep().await {
if let Err(e) = self.housekeep() {
error!("{}", e)
}
self.next_housekeep = now + 1
@ -751,9 +749,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
debug_assert!(self.buffer.is_empty());
}
pub async fn run(mut self) {
pub fn run(mut self) {
loop {
self.iteration().await;
self.iteration();
if !self.running.load(Ordering::SeqCst) {
debug!("Socket: end");
return;

View File

@ -38,6 +38,9 @@ pub enum Error {
#[error("Device error: {0} ({1})")]
DeviceIo(&'static str, #[source] io::Error),
#[error("Device read error: {0}")]
DeviceRead(#[source] timeout_io::TimeoutIoError),
#[error("File error: {0}")]
FileIo(&'static str, #[source] io::Error),

View File

@ -6,8 +6,6 @@
extern crate log;
#[macro_use]
extern crate serde;
#[macro_use]
extern crate tokio;
#[cfg(test)]
extern crate tempfile;
@ -39,9 +37,8 @@ pub mod wizard;
#[cfg(feature = "websocket")]
pub mod wsproxy;
use net::SocketBuilder;
use net::Socket;
use structopt::StructOpt;
use tokio::runtime::Runtime;
use std::{
fs::{self, File, Permissions},
@ -58,7 +55,7 @@ use std::{
use crate::{
config::{Args, Command, Config, DEFAULT_PORT},
crypto::Crypto,
device::{AsyncTunTapDevice, TunTapDevice, Type},
device::{TunTapDevice, Type},
engine::common::GenericCloud,
net::NetSocket,
oldconfig::OldConfigFile,
@ -177,7 +174,7 @@ fn setup_device(config: &Config) -> TunTapDevice {
}
#[allow(clippy::cognitive_complexity)]
fn run<P: Protocol, S: SocketBuilder>(config: Config, socket: S) {
fn run<P: Protocol, S: Socket>(config: Config, socket: S) {
let device = setup_device(&config);
let port_forwarding = if config.port_forwarding { socket.create_port_forwarding() } else { None };
let stats_file = match config.stats_file {
@ -222,21 +219,15 @@ fn run<P: Protocol, S: SocketBuilder>(config: Config, socket: S) {
}
try_fail!(pd.apply(), "Failed to drop privileges: {}");
}
let rt = Runtime::new().unwrap();
let ifdown = config.ifdown.clone();
rt.block_on(async move {
// Warning: no async code outside this block, or it will break on daemonize
let device = AsyncTunTapDevice::from_sync(device);
let socket = try_fail!(socket.build(), "Failed to create async socket: {}");
let mut cloud = try_fail!(
GenericCloud::<AsyncTunTapDevice, P, S::SocketType, SystemTimeSource>::new(
GenericCloud::<TunTapDevice, P, S, SystemTimeSource>::new(
&config,
socket,
device,
port_forwarding,
stats_file
)
.await,
),
"Failed to create engine: {}"
);
for mut addr in config.peers {
@ -246,8 +237,7 @@ fn run<P: Protocol, S: SocketBuilder>(config: Config, socket: S) {
}
try_fail!(cloud.add_peer(addr.clone()), "Failed to send message to {}: {}", &addr);
}
cloud.run().await
});
cloud.run();
if let Some(script) = ifdown {
run_script(&script, &ifname);
}

View File

@ -5,9 +5,8 @@
use crate::config::DEFAULT_PORT;
use crate::port_forwarding::PortForwarding;
use crate::util::{MockTimeSource, MsgBuffer, Time, TimeSource};
use async_trait::async_trait;
use parking_lot::Mutex;
use tokio::net::UdpSocket as AsyncUdpSocket;
use std::time::Duration;
use std::{
collections::{HashMap, VecDeque},
io::{self, ErrorKind},
@ -32,17 +31,11 @@ pub fn get_ip() -> IpAddr {
s.local_addr().unwrap().ip()
}
pub trait SocketBuilder {
type SocketType: Socket;
fn build(self) -> Result<Self::SocketType, io::Error>;
fn create_port_forwarding(&self) -> Option<PortForwarding>;
}
#[async_trait]
pub trait Socket: Sized + Clone + Send + Sync + 'static {
async fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error>;
async fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error>;
async fn address(&self) -> Result<SocketAddr, io::Error>;
fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error>;
fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error>;
fn address(&self) -> Result<SocketAddr, io::Error>;
fn create_port_forwarding(&self) -> Option<PortForwarding>;
}
pub fn parse_listen(addr: &str, default_port: u16) -> SocketAddr {
@ -59,49 +52,40 @@ pub fn parse_listen(addr: &str, default_port: u16) -> SocketAddr {
}
}
pub struct NetSocket(UdpSocket);
pub struct NetSocket(Arc<UdpSocket>);
impl NetSocket {
pub fn listen(addr: &str) -> Result<Self, io::Error> {
let addr = parse_listen(addr, DEFAULT_PORT);
Ok(Self(UdpSocket::bind(addr)?))
let sock = UdpSocket::bind(addr)?;
sock.set_read_timeout(Some(Duration::from_secs(1)))?;
Ok(Self(Arc::new(sock)))
}
}
impl SocketBuilder for NetSocket {
type SocketType = AsyncNetSocket;
fn create_port_forwarding(&self) -> Option<PortForwarding> {
PortForwarding::new(self.0.local_addr().unwrap().port())
}
fn build(self) -> Result<Self::SocketType, io::Error> {
Ok(AsyncNetSocket(Arc::new(AsyncUdpSocket::from_std(self.0)?)))
}
}
pub struct AsyncNetSocket(Arc<AsyncUdpSocket>);
impl Clone for AsyncNetSocket {
impl Clone for NetSocket {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
#[async_trait]
impl Socket for AsyncNetSocket {
async fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> {
impl Socket for NetSocket {
fn create_port_forwarding(&self) -> Option<PortForwarding> {
PortForwarding::new(self.0.local_addr().unwrap().port())
}
fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> {
buffer.clear();
let (size, addr) = self.0.recv_from(buffer.buffer()).await?;
let (size, addr) = self.0.recv_from(buffer.buffer())?;
buffer.set_length(size);
Ok(addr)
}
async fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error> {
self.0.send_to(data, addr).await
fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error> {
self.0.send_to(data, addr)
}
async fn address(&self) -> Result<SocketAddr, io::Error> {
fn address(&self) -> Result<SocketAddr, io::Error> {
let mut addr = self.0.local_addr()?;
addr.set_ip(get_ip());
Ok(addr)
@ -160,9 +144,8 @@ impl MockSocket {
}
}
#[async_trait]
impl Socket for MockSocket {
async fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> {
fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> {
if let Some((addr, data)) = self.inbound.lock().pop_front() {
buffer.clear();
buffer.set_length(data.len());
@ -173,7 +156,7 @@ impl Socket for MockSocket {
}
}
async fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error> {
fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error> {
self.outbound.lock().push_back((addr, data.into()));
if self.nat {
self.nat_peers.lock().insert(addr, MockTimeSource::now() + 300);
@ -181,9 +164,13 @@ impl Socket for MockSocket {
Ok(data.len())
}
async fn address(&self) -> Result<SocketAddr, io::Error> {
fn address(&self) -> Result<SocketAddr, io::Error> {
Ok(self.address)
}
fn create_port_forwarding(&self) -> Option<PortForwarding> {
None
}
}
#[cfg(feature = "bench")]

View File

@ -53,7 +53,7 @@ impl Protocol for Frame {
}
#[test]
async fn decode_frame_without_vlan() {
fn decode_frame_without_vlan() {
let data = [6, 5, 4, 3, 2, 1, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 8];
let (src, dst) = Frame::parse(&data).unwrap();
assert_eq!(src, Address { data: [1, 2, 3, 4, 5, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], len: 6 });
@ -61,7 +61,7 @@ async fn decode_frame_without_vlan() {
}
#[test]
async fn decode_frame_with_vlan() {
fn decode_frame_with_vlan() {
let data = [6, 5, 4, 3, 2, 1, 1, 2, 3, 4, 5, 6, 0x81, 0, 4, 210, 1, 2, 3, 4, 5, 6, 7, 8];
let (src, dst) = Frame::parse(&data).unwrap();
assert_eq!(src, Address { data: [4, 210, 1, 2, 3, 4, 5, 6, 0, 0, 0, 0, 0, 0, 0, 0], len: 8 });
@ -69,7 +69,7 @@ async fn decode_frame_with_vlan() {
}
#[test]
async fn decode_invalid_frame() {
fn decode_invalid_frame() {
assert!(Frame::parse(&[6, 5, 4, 3, 2, 1, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 8]).is_ok());
// truncated frame
assert!(Frame::parse(&[]).is_err());
@ -118,7 +118,7 @@ impl Protocol for Packet {
}
#[test]
async fn decode_ipv4_packet() {
fn decode_ipv4_packet() {
let data = [0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 192, 168, 1, 1, 192, 168, 1, 2];
let (src, dst) = Packet::parse(&data).unwrap();
assert_eq!(src, Address { data: [192, 168, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], len: 4 });
@ -126,7 +126,7 @@ async fn decode_ipv4_packet() {
}
#[test]
async fn decode_ipv6_packet() {
fn decode_ipv6_packet() {
let data = [
0x60, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 6, 5,
4, 3, 2, 1,
@ -137,7 +137,7 @@ async fn decode_ipv6_packet() {
}
#[test]
async fn decode_invalid_packet() {
fn decode_invalid_packet() {
assert!(Packet::parse(&[0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 192, 168, 1, 1, 192, 168, 1, 2]).is_ok());
assert!(Packet::parse(&[
0x60, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 6, 5,

View File

@ -79,7 +79,7 @@ impl<P: Protocol> Simulator<P> {
Self { next_port: 1, nodes: HashMap::default(), messages: VecDeque::default() }
}
pub async fn add_node(&mut self, nat: bool, config: &Config) -> SocketAddr {
pub fn add_node(&mut self, nat: bool, config: &Config) -> SocketAddr {
let mut config = config.clone();
MockSocket::set_nat(nat);
config.listen = format!("[::]:{}", self.next_port);
@ -89,7 +89,7 @@ impl<P: Protocol> Simulator<P> {
}
DebugLogger::set_node(self.next_port as usize);
self.next_port += 1;
let node = TestNode::new(&config, MockSocket::new(addr), MockDevice::new(), None, None).await.unwrap();
let node = TestNode::new(&config, MockSocket::new(addr), MockDevice::new(), None, None).unwrap();
DebugLogger::set_node(0);
self.nodes.insert(addr, node);
@ -97,18 +97,18 @@ impl<P: Protocol> Simulator<P> {
}
#[allow(dead_code)]
pub async fn get_node(&mut self, addr: SocketAddr) -> &mut TestNode<P> {
pub fn get_node(&mut self, addr: SocketAddr) -> &mut TestNode<P> {
let node = self.nodes.get_mut(&addr).unwrap();
DebugLogger::set_node(node.get_num().await);
DebugLogger::set_node(node.get_num());
node
}
pub async fn simulate_next_message(&mut self) {
pub fn simulate_next_message(&mut self) {
if let Some((src, dst, data)) = self.messages.pop_front() {
if let Some(node) = self.nodes.get_mut(&dst) {
if node.socket().put_inbound(src, data) {
DebugLogger::set_node(node.get_num().await);
node.trigger_socket_event().await;
DebugLogger::set_node(node.get_num());
node.trigger_socket_event();
DebugLogger::set_node(0);
let sock = node.socket();
let src = dst;
@ -122,16 +122,16 @@ impl<P: Protocol> Simulator<P> {
}
}
pub async fn simulate_all_messages(&mut self) {
pub fn simulate_all_messages(&mut self) {
while !self.messages.is_empty() {
self.simulate_next_message().await
self.simulate_next_message()
}
}
pub async fn trigger_node_housekeep(&mut self, addr: SocketAddr) {
pub fn trigger_node_housekeep(&mut self, addr: SocketAddr) {
let node = self.nodes.get_mut(&addr).unwrap();
DebugLogger::set_node(node.get_num().await);
node.trigger_housekeep().await;
DebugLogger::set_node(node.get_num());
node.trigger_housekeep();
DebugLogger::set_node(0);
let sock = node.socket();
while let Some((dst, data)) = sock.pop_outbound() {
@ -139,10 +139,10 @@ impl<P: Protocol> Simulator<P> {
}
}
pub async fn trigger_housekeep(&mut self) {
pub fn trigger_housekeep(&mut self) {
for (src, node) in &mut self.nodes {
DebugLogger::set_node(node.get_num().await);
node.trigger_housekeep().await;
DebugLogger::set_node(node.get_num());
node.trigger_housekeep();
DebugLogger::set_node(0);
let sock = node.socket();
while let Some((dst, data)) = sock.pop_outbound() {
@ -155,20 +155,20 @@ impl<P: Protocol> Simulator<P> {
MockTimeSource::set_time(time);
}
pub async fn simulate_time(&mut self, time: Time) {
pub fn simulate_time(&mut self, time: Time) {
let mut t = MockTimeSource::now();
while t < time {
t += 1;
self.set_time(t);
self.trigger_housekeep().await;
self.simulate_all_messages().await;
self.trigger_housekeep();
self.simulate_all_messages();
}
}
pub async fn connect(&mut self, src: SocketAddr, dst: SocketAddr) {
pub fn connect(&mut self, src: SocketAddr, dst: SocketAddr) {
let node = self.nodes.get_mut(&src).unwrap();
DebugLogger::set_node(node.get_num().await);
node.connect(dst).await.unwrap();
DebugLogger::set_node(node.get_num());
node.connect(dst).unwrap();
DebugLogger::set_node(0);
let sock = node.socket();
while let Some((dst, data)) = sock.pop_outbound() {
@ -190,11 +190,11 @@ impl<P: Protocol> Simulator<P> {
self.messages.len()
}
pub async fn put_payload(&mut self, addr: SocketAddr, data: Vec<u8>) {
pub fn put_payload(&mut self, addr: SocketAddr, data: Vec<u8>) {
let node = self.nodes.get_mut(&addr).unwrap();
node.device().put_inbound(data);
DebugLogger::set_node(node.get_num().await);
node.trigger_device_event().await;
DebugLogger::set_node(node.get_num());
node.trigger_device_event();
DebugLogger::set_node(0);
let sock = node.socket();
while let Some((dst, data)) = sock.pop_outbound() {

View File

@ -5,35 +5,35 @@
use super::common::*;
#[test]
async fn connect_nat_2_peers() {
fn connect_nat_2_peers() {
let config = Config { port_forwarding: false, ..Default::default() };
let mut sim = TapSimulator::new();
let node1 = sim.add_node(true, &config).await;
let node2 = sim.add_node(true, &config).await;
let node1 = sim.add_node(true, &config);
let node2 = sim.add_node(true, &config);
sim.connect(node1, node2).await;
sim.connect(node2, node1).await;
sim.connect(node1, node2);
sim.connect(node2, node1);
sim.simulate_time(60).await;
sim.simulate_time(60);
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
}
#[test]
async fn connect_nat_3_peers() {
fn connect_nat_3_peers() {
let config = Config::default();
let mut sim = TapSimulator::new();
let node1 = sim.add_node(true, &config).await;
let node2 = sim.add_node(true, &config).await;
let node3 = sim.add_node(true, &config).await;
let node1 = sim.add_node(true, &config);
let node2 = sim.add_node(true, &config);
let node3 = sim.add_node(true, &config);
sim.connect(node1, node2).await;
sim.connect(node2, node1).await;
sim.connect(node1, node3).await;
sim.connect(node3, node1).await;
sim.connect(node1, node2);
sim.connect(node2, node1);
sim.connect(node1, node3);
sim.connect(node3, node1);
sim.simulate_time(300).await;
sim.simulate_time(300);
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
assert!(sim.is_connected(node1, node3));
@ -43,19 +43,19 @@ async fn connect_nat_3_peers() {
}
#[test]
async fn nat_keepalive() {
fn nat_keepalive() {
let config = Config::default();
let mut sim = TapSimulator::new();
let node1 = sim.add_node(true, &config).await;
let node2 = sim.add_node(true, &config).await;
let node3 = sim.add_node(true, &config).await;
let node1 = sim.add_node(true, &config);
let node2 = sim.add_node(true, &config);
let node3 = sim.add_node(true, &config);
sim.connect(node1, node2).await;
sim.connect(node2, node1).await;
sim.connect(node1, node3).await;
sim.connect(node3, node1).await;
sim.connect(node1, node2);
sim.connect(node2, node1);
sim.connect(node1, node3);
sim.connect(node3, node1);
sim.simulate_time(1000).await;
sim.simulate_time(1000);
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
assert!(sim.is_connected(node1, node3));
@ -63,7 +63,7 @@ async fn nat_keepalive() {
assert!(sim.is_connected(node2, node3));
assert!(sim.is_connected(node3, node2));
sim.simulate_time(10000).await;
sim.simulate_time(10000);
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
assert!(sim.is_connected(node1, node3));

View File

@ -5,50 +5,52 @@
use super::common::*;
#[test]
async fn switch_delivers() {
fn switch_delivers() {
let config = Config { device_type: Type::Tap, ..Config::default() };
let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config);
sim.connect(node1, node2).await;
sim.simulate_all_messages().await;
sim.connect(node1, node2);
sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
sim.trigger_housekeep();
let payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5];
sim.put_payload(node1, payload.clone()).await;
sim.simulate_all_messages().await;
sim.put_payload(node1, payload.clone());
sim.simulate_all_messages();
assert_eq!(Some(payload), sim.pop_payload(node2));
}
#[test]
async fn switch_learns() {
fn switch_learns() {
let config = Config { device_type: Type::Tap, ..Config::default() };
let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
let node3 = sim.add_node(false, &config).await;
let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config);
let node3 = sim.add_node(false, &config);
sim.connect(node1, node2).await;
sim.connect(node1, node3).await;
sim.connect(node2, node3).await;
sim.simulate_all_messages().await;
sim.connect(node1, node2);
sim.connect(node1, node3);
sim.connect(node2, node3);
sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
assert!(sim.is_connected(node1, node3));
assert!(sim.is_connected(node3, node1));
assert!(sim.is_connected(node2, node3));
assert!(sim.is_connected(node3, node2));
sim.trigger_housekeep();
let payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5];
// Nothing learnt so far, node1 broadcasts
sim.put_payload(node1, payload.clone()).await;
sim.simulate_all_messages().await;
sim.put_payload(node1, payload.clone());
sim.simulate_all_messages();
assert_eq!(Some(payload.clone()), sim.pop_payload(node2));
assert_eq!(Some(payload), sim.pop_payload(node3));
@ -57,38 +59,39 @@ async fn switch_learns() {
// Node 2 learned the address by receiving it, does not broadcast
sim.put_payload(node2, payload.clone()).await;
sim.simulate_all_messages().await;
sim.put_payload(node2, payload.clone());
sim.simulate_all_messages();
assert_eq!(Some(payload), sim.pop_payload(node1));
assert_eq!(None, sim.pop_payload(node3));
}
#[test]
async fn switch_honours_vlans() {
fn switch_honours_vlans() {
let config = Config { device_type: Type::Tap, ..Config::default() };
let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
let node3 = sim.add_node(false, &config).await;
let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config);
let node3 = sim.add_node(false, &config);
sim.connect(node1, node2).await;
sim.connect(node1, node3).await;
sim.connect(node2, node3).await;
sim.simulate_all_messages().await;
sim.connect(node1, node2);
sim.connect(node1, node3);
sim.connect(node2, node3);
sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
assert!(sim.is_connected(node1, node3));
assert!(sim.is_connected(node3, node1));
assert!(sim.is_connected(node2, node3));
assert!(sim.is_connected(node3, node2));
sim.trigger_housekeep();
let payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 0x81, 0, 0, 0x67, 1, 2, 3, 4, 5];
// Nothing learnt so far, node1 broadcasts
sim.put_payload(node1, payload.clone()).await;
sim.simulate_all_messages().await;
sim.put_payload(node1, payload.clone());
sim.simulate_all_messages();
assert_eq!(Some(payload.clone()), sim.pop_payload(node2));
assert_eq!(Some(payload), sim.pop_payload(node3));
@ -97,8 +100,8 @@ async fn switch_honours_vlans() {
// Node 2 learned the address by receiving it, does not broadcast
sim.put_payload(node2, payload.clone()).await;
sim.simulate_all_messages().await;
sim.put_payload(node2, payload.clone());
sim.simulate_all_messages();
assert_eq!(Some(payload), sim.pop_payload(node1));
assert_eq!(None, sim.pop_payload(node3));
@ -107,8 +110,8 @@ async fn switch_honours_vlans() {
// Different VLANs, node 2 does not learn, still broadcasts
sim.put_payload(node2, payload.clone()).await;
sim.simulate_all_messages().await;
sim.put_payload(node2, payload.clone());
sim.simulate_all_messages();
assert_eq!(Some(payload.clone()), sim.pop_payload(node1));
assert_eq!(Some(payload), sim.pop_payload(node3));
@ -116,13 +119,13 @@ async fn switch_honours_vlans() {
#[test]
#[ignore]
async fn switch_forgets() {
fn switch_forgets() {
// TODO Test
unimplemented!()
}
#[test]
async fn router_delivers() {
fn router_delivers() {
let config1 = Config {
device_type: Type::Tun,
auto_claim: false,
@ -136,24 +139,24 @@ async fn router_delivers() {
..Config::default()
};
let mut sim = TunSimulator::new();
let node1 = sim.add_node(false, &config1).await;
let node2 = sim.add_node(false, &config2).await;
let node1 = sim.add_node(false, &config1);
let node2 = sim.add_node(false, &config2);
sim.connect(node1, node2).await;
sim.simulate_all_messages().await;
sim.connect(node1, node2);
sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
let payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2];
sim.put_payload(node1, payload.clone()).await;
sim.simulate_all_messages().await;
sim.put_payload(node1, payload.clone());
sim.simulate_all_messages();
assert_eq!(Some(payload), sim.pop_payload(node2));
}
#[test]
async fn router_drops_unknown_dest() {
fn router_drops_unknown_dest() {
let config1 = Config {
device_type: Type::Tun,
auto_claim: false,
@ -167,18 +170,18 @@ async fn router_drops_unknown_dest() {
..Config::default()
};
let mut sim = TunSimulator::new();
let node1 = sim.add_node(false, &config1).await;
let node2 = sim.add_node(false, &config2).await;
let node1 = sim.add_node(false, &config1);
let node2 = sim.add_node(false, &config2);
sim.connect(node1, node2).await;
sim.simulate_all_messages().await;
sim.connect(node1, node2);
sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
let payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 3, 3, 3, 3];
sim.put_payload(node1, payload).await;
sim.simulate_all_messages().await;
sim.put_payload(node1, payload);
sim.simulate_all_messages();
assert_eq!(None, sim.pop_payload(node2));
}

View File

@ -5,47 +5,47 @@
use super::common::*;
#[test]
async fn direct_connect() {
fn direct_connect() {
let config = Config::default();
let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config);
sim.connect(node1, node2).await;
sim.simulate_all_messages().await;
sim.connect(node1, node2);
sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
}
#[test]
async fn direct_connect_unencrypted() {
fn direct_connect_unencrypted() {
let config = Config {
crypto: CryptoConfig { algorithms: vec!["plain".to_string()], ..CryptoConfig::default() },
..Config::default()
};
let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config);
sim.connect(node1, node2).await;
sim.simulate_all_messages().await;
sim.connect(node1, node2);
sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
}
#[test]
async fn cross_connect() {
fn cross_connect() {
let config = Config::default();
let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
let node3 = sim.add_node(false, &config).await;
let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config);
let node3 = sim.add_node(false, &config);
sim.connect(node1, node2).await;
sim.connect(node1, node3).await;
sim.simulate_all_messages().await;
sim.connect(node1, node2);
sim.connect(node1, node3);
sim.simulate_all_messages();
sim.simulate_time(120).await;
sim.simulate_time(120);
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
@ -56,124 +56,124 @@ async fn cross_connect() {
}
#[test]
async fn connect_via_beacons() {
fn connect_via_beacons() {
let mut sim = TapSimulator::new();
let beacon_path = "target/.vpncloud_test";
let config1 = Config { beacon_store: Some(beacon_path.to_string()), ..Default::default() };
let node1 = sim.add_node(false, &config1).await;
let node1 = sim.add_node(false, &config1);
let config2 = Config { beacon_load: Some(beacon_path.to_string()), ..Default::default() };
let node2 = sim.add_node(false, &config2).await;
let node2 = sim.add_node(false, &config2);
sim.set_time(100);
sim.trigger_node_housekeep(node1).await;
sim.trigger_node_housekeep(node2).await;
sim.simulate_all_messages().await;
sim.trigger_node_housekeep(node1);
sim.trigger_node_housekeep(node2);
sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
}
#[test]
async fn reconnect_after_timeout() {
fn reconnect_after_timeout() {
let config = Config::default();
let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config);
sim.connect(node1, node2).await;
sim.simulate_all_messages().await;
sim.connect(node1, node2);
sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
sim.set_time(5000);
sim.trigger_housekeep().await;
sim.trigger_housekeep();
assert!(!sim.is_connected(node1, node2));
assert!(!sim.is_connected(node2, node1));
sim.simulate_all_messages().await;
sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
}
#[test]
async fn lost_init_ping() {
fn lost_init_ping() {
let config = Config::default();
let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config);
sim.connect(node1, node2).await;
sim.connect(node1, node2);
sim.drop_message(); // drop init ping
sim.simulate_time(120).await;
sim.simulate_time(120);
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
}
#[test]
async fn lost_init_pong() {
fn lost_init_pong() {
let config = Config::default();
let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config);
sim.connect(node1, node2).await;
sim.simulate_next_message().await; // init ping
sim.connect(node1, node2);
sim.simulate_next_message(); // init ping
sim.drop_message(); // drop init pong
sim.simulate_time(120).await;
sim.simulate_time(120);
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
}
#[test]
async fn lost_init_peng() {
fn lost_init_peng() {
let config = Config::default();
let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config);
sim.connect(node1, node2).await;
sim.simulate_next_message().await; // init ping
sim.simulate_next_message().await; // init pong
sim.connect(node1, node2);
sim.simulate_next_message(); // init ping
sim.simulate_next_message(); // init pong
sim.drop_message(); // drop init peng
sim.simulate_time(120).await;
sim.simulate_time(120);
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
}
#[test]
#[ignore]
async fn peer_exchange() {
fn peer_exchange() {
// TODO Test
unimplemented!()
}
#[test]
#[ignore]
async fn lost_peer_exchange() {
fn lost_peer_exchange() {
// TODO Test
unimplemented!()
}
#[test]
#[ignore]
async fn remove_dead_peers() {
fn remove_dead_peers() {
// TODO Test
unimplemented!()
}
#[test]
#[ignore]
async fn update_primary_address() {
fn update_primary_address() {
// TODO Test
unimplemented!()
}
#[test]
#[ignore]
async fn automatic_peer_timeout() {
fn automatic_peer_timeout() {
// TODO Test
unimplemented!()
}

View File

@ -70,6 +70,18 @@ impl TrafficEntry {
self.out_bytes = 0;
self.in_bytes = 0;
}
fn clear(&mut self) {
self.in_bytes = 0;
self.out_bytes = 0;
self.in_packets = 0;
self.out_packets = 0;
self.idle_periods = 0;
self.in_bytes_total = 0;
self.in_packets_total = 0;
self.out_bytes_total = 0;
self.out_packets_total = 0;
}
}
#[derive(Default)]
@ -203,4 +215,28 @@ impl TrafficStats {
)?;
Ok(())
}
pub fn add(&mut self, other: &Self) {
for (addr, data) in &other.peers {
if let Some(entry) = self.peers.get_mut(addr) {
*entry += data
} else {
self.peers.insert(*addr, data.clone());
}
}
for (key, data) in &other.payload {
if let Some(entry) = self.payload.get_mut(key) {
*entry += data
} else {
self.payload.insert(*key, data.clone());
}
}
self.dropped += &other.dropped
}
pub fn clear(&mut self) {
self.peers.clear();
self.payload.clear();
self.dropped.clear();
}
}

View File

@ -233,7 +233,7 @@ mod tests {
use std::io::Cursor;
#[test]
async fn address_parse_fmt() {
fn address_parse_fmt() {
assert_eq!(format!("{}", Address::from_str("120.45.22.5").unwrap()), "120.45.22.5");
assert_eq!(format!("{}", Address::from_str("78:2d:16:05:01:02").unwrap()), "78:2d:16:05:01:02");
assert_eq!(
@ -249,7 +249,7 @@ mod tests {
}
#[test]
async fn address_decode_encode() {
fn address_decode_encode() {
let mut buf = vec![];
let addr = Address::from_str("120.45.22.5").unwrap();
addr.write_to(Cursor::new(&mut buf));
@ -270,7 +270,7 @@ mod tests {
}
#[test]
async fn address_eq() {
fn address_eq() {
assert_eq!(
Address::read_from_fixed(Cursor::new(&[1, 2, 3, 4]), 4).unwrap(),
Address::read_from_fixed(Cursor::new(&[1, 2, 3, 4]), 4).unwrap()
@ -290,7 +290,7 @@ mod tests {
}
#[test]
async fn address_range_decode_encode() {
fn address_range_decode_encode() {
let mut buf = vec![];
let range =
Range { base: Address { data: [0, 1, 2, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], len: 4 }, prefix_len: 24 };

View File

@ -3,6 +3,7 @@
// This software is licensed under GPL-3 or newer (see LICENSE.md)
use std::process::Command;
use std::time::Instant;
use std::{
fmt,
net::{Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket},
@ -11,6 +12,8 @@ use std::{
use crate::error::Error;
use signal::trap::Trap;
use signal::Signal;
#[cfg(not(target_os = "linux"))]
use time;
@ -260,6 +263,38 @@ impl fmt::Display for Bytes {
}
}
pub struct CtrlC {
dummy_time: Instant,
trap: Trap,
}
impl CtrlC {
pub fn new() -> Self {
Default::default()
}
pub fn was_pressed(&self) -> bool {
self.trap.wait(self.dummy_time).is_some()
}
pub fn wait(&self) {
loop {
let deadline = Instant::now() + std::time::Duration::from_secs(10);
if self.trap.wait(deadline).is_some() {
return;
}
}
}
}
impl Default for CtrlC {
fn default() -> Self {
let dummy_time = Instant::now();
let trap = Trap::trap(&[Signal::SIGINT, Signal::SIGTERM, Signal::SIGQUIT]);
Self { dummy_time, trap }
}
}
pub trait TimeSource: Sync + Copy + Send + 'static {
fn now() -> Time;
}
@ -402,7 +437,7 @@ pub fn run_cmd(mut cmd: Command) {
}
#[test]
async fn base62() {
fn base62() {
assert_eq!("", to_base62(&[0]));
assert_eq!("z", to_base62(&[61]));
assert_eq!("10", to_base62(&[62]));

View File

@ -3,19 +3,13 @@
// This software is licensed under GPL-3 or newer (see LICENSE.md)
use super::{
net::{get_ip, mapped_addr, parse_listen, Socket, SocketBuilder},
net::{get_ip, mapped_addr, parse_listen, Socket},
poll::{WaitImpl, WaitResult},
port_forwarding::PortForwarding,
util::MsgBuffer,
};
use async_trait::async_trait;
use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt};
use std::{
io::{self, Cursor, Read, Write},
net::{Ipv6Addr, SocketAddr, SocketAddrV6, TcpListener, TcpStream, UdpSocket},
os::unix::io::AsRawFd,
sync::Arc,
};
use std::{io::{self, Cursor, Read, Write}, net::{Ipv6Addr, SocketAddr, SocketAddrV6, TcpListener, TcpStream, UdpSocket}, os::unix::io::AsRawFd, sync::Arc, thread};
use tungstenite::{client::AutoStream, connect, protocol::WebSocket, server::accept, Message};
use url::Url;
@ -99,7 +93,7 @@ pub fn run_proxy(listen: &str) -> Result<(), io::Error> {
for stream in server.incoming() {
let stream = stream?;
let peer = stream.peer_addr()?;
tokio::spawn(async move {
thread::spawn(move || {
if let Err(err) = serve_proxy_connection(stream) {
error!("Error on connection {}: {}", peer, err);
}
@ -138,21 +132,12 @@ impl ProxyConnection {
}
}
impl SocketBuilder for ProxyConnection {
type SocketType = ProxyConnection;
fn build(self) -> Result<Self::SocketType, io::Error> {
Ok(self)
}
impl Socket for ProxyConnection {
fn create_port_forwarding(&self) -> Option<PortForwarding> {
None
}
}
#[async_trait]
impl Socket for ProxyConnection {
async fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> {
fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> {
buffer.clear();
let data = self.read_message()?;
let addr = read_addr(Cursor::new(&data))?;
@ -160,7 +145,7 @@ impl Socket for ProxyConnection {
Ok(addr)
}
async fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error> {
fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error> {
let mut msg = Vec::with_capacity(data.len() + 18);
write_addr(addr, &mut msg)?;
msg.write_all(data)?;
@ -171,7 +156,7 @@ impl Socket for ProxyConnection {
*/
}
async fn address(&self) -> Result<SocketAddr, io::Error> {
fn address(&self) -> Result<SocketAddr, io::Error> {
Ok(self.addr)
}
}