Remove async

This commit is contained in:
Dennis Schwerdel 2021-06-24 11:29:06 +02:00
parent 76bb4aa4b6
commit 744a151110
29 changed files with 647 additions and 705 deletions

150
Cargo.lock generated
View File

@ -9,17 +9,6 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "async-trait"
version = "0.1.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d3a45e77e34375a7923b1e8febb049bb011f064714a8e17a1a616fef01da13d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "attohttpc" name = "attohttpc"
version = "0.16.3" version = "0.16.3"
@ -121,6 +110,12 @@ version = "1.0.67"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd" checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd"
[[package]]
name = "cfg-if"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
version = "1.0.0" version = "1.0.0"
@ -217,7 +212,7 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
"crossbeam-utils", "crossbeam-utils",
] ]
@ -227,7 +222,7 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9" checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
"crossbeam-epoch", "crossbeam-epoch",
"crossbeam-utils", "crossbeam-utils",
] ]
@ -238,7 +233,7 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2584f639eb95fea8c798496315b297cf81b9b58b6d30ab066a75455333cf4b12" checksum = "2584f639eb95fea8c798496315b297cf81b9b58b6d30ab066a75455333cf4b12"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
"crossbeam-utils", "crossbeam-utils",
"lazy_static", "lazy_static",
"memoffset", "memoffset",
@ -252,7 +247,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7e9d99fa91428effe99c5c6d4634cdeba32b8cf784fc428a2a687f61a952c49" checksum = "e7e9d99fa91428effe99c5c6d4634cdeba32b8cf784fc428a2a687f61a952c49"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"cfg-if", "cfg-if 1.0.0",
"lazy_static", "lazy_static",
] ]
@ -359,7 +354,7 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9495705279e7140bf035dde1f6e750c162df8b625267cd52cc44e0b156732c8" checksum = "c9495705279e7140bf035dde1f6e750c162df8b625267cd52cc44e0b156732c8"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
"libc", "libc",
"wasi", "wasi",
] ]
@ -450,7 +445,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec" checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
] ]
[[package]] [[package]]
@ -519,7 +514,7 @@ version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
] ]
[[package]] [[package]]
@ -544,26 +539,16 @@ dependencies = [
] ]
[[package]] [[package]]
name = "mio" name = "nix"
version = "0.7.9" version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5dede4e2065b3842b8b0af444119f3aa331cc7cc2dd20388bfb0f5d5a38823a" checksum = "6c722bee1037d430d0f8e687bbdbf222f27cc6e4e68d5caf630857bb2b6dbdce"
dependencies = [ dependencies = [
"bitflags",
"cc",
"cfg-if 0.1.10",
"libc", "libc",
"log", "void",
"miow",
"ntapi",
"winapi",
]
[[package]]
name = "miow"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897"
dependencies = [
"socket2",
"winapi",
] ]
[[package]] [[package]]
@ -574,19 +559,10 @@ checksum = "b2ccba0cfe4fdf15982d1674c69b1fd80bad427d293849982668dfe454bd61f2"
dependencies = [ dependencies = [
"bitflags", "bitflags",
"cc", "cc",
"cfg-if", "cfg-if 1.0.0",
"libc", "libc",
] ]
[[package]]
name = "ntapi"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44"
dependencies = [
"winapi",
]
[[package]] [[package]]
name = "num-integer" name = "num-integer"
version = "0.1.44" version = "0.1.44"
@ -651,7 +627,7 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018" checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
"instant", "instant",
"libc", "libc",
"redox_syscall", "redox_syscall",
@ -665,12 +641,6 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pin-project-lite"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439697af366c49a6d0a010c56a0d97685bc140ce0d377b13a2ea2aa42d64a827"
[[package]] [[package]]
name = "plotters" name = "plotters"
version = "0.3.0" version = "0.3.0"
@ -712,7 +682,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebd4c2739642e70439d1c0d9545beec45c1e54128739b3cda29bf2c366028c87" checksum = "ebd4c2739642e70439d1c0d9545beec45c1e54128739b3cda29bf2c366028c87"
dependencies = [ dependencies = [
"libc", "libc",
"nix", "nix 0.19.1",
] ]
[[package]] [[package]]
@ -984,19 +954,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfebf75d25bd900fd1e7d11501efab59bc846dbc76196839663e6637bba9f25f" checksum = "dfebf75d25bd900fd1e7d11501efab59bc846dbc76196839663e6637bba9f25f"
dependencies = [ dependencies = [
"block-buffer", "block-buffer",
"cfg-if", "cfg-if 1.0.0",
"cpuid-bool", "cpuid-bool",
"digest", "digest",
"opaque-debug", "opaque-debug",
] ]
[[package]] [[package]]
name = "signal-hook-registry" name = "signal"
version = "1.3.0" version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16f1d0fef1604ba8f7a073c7e701f213e056707210e9020af4528e0101ce11a6" checksum = "2f6ce83b159ab6984d2419f495134972b48754d13ff2e3f8c998339942b56ed9"
dependencies = [ dependencies = [
"libc", "libc",
"nix 0.14.1",
] ]
[[package]] [[package]]
@ -1005,17 +976,6 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]]
name = "socket2"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e"
dependencies = [
"cfg-if",
"libc",
"winapi",
]
[[package]] [[package]]
name = "spin" name = "spin"
version = "0.5.2" version = "0.5.2"
@ -1069,7 +1029,7 @@ version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
"libc", "libc",
"rand", "rand",
"redox_syscall", "redox_syscall",
@ -1116,6 +1076,15 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "timeout_io"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d89ed29f92507ee770ef31f747ec1ad5ba7d8989a9f3489783da7d19a155c60"
dependencies = [
"cc",
]
[[package]] [[package]]
name = "tinytemplate" name = "tinytemplate"
version = "1.2.1" version = "1.2.1"
@ -1141,37 +1110,6 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5"
dependencies = [
"autocfg",
"bytes",
"libc",
"memchr",
"mio",
"num_cpus",
"once_cell",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"tokio-macros",
"winapi",
]
[[package]]
name = "tokio-macros"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "tungstenite" name = "tungstenite"
version = "0.13.0" version = "0.13.0"
@ -1270,11 +1208,16 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe"
[[package]]
name = "void"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
[[package]] [[package]]
name = "vpncloud" name = "vpncloud"
version = "2.2.0" version = "2.2.0"
dependencies = [ dependencies = [
"async-trait",
"byteorder", "byteorder",
"chrono", "chrono",
"criterion", "criterion",
@ -1291,11 +1234,12 @@ dependencies = [
"ring", "ring",
"serde", "serde",
"serde_yaml", "serde_yaml",
"signal",
"smallvec", "smallvec",
"structopt", "structopt",
"tempfile", "tempfile",
"thiserror", "thiserror",
"tokio", "timeout_io",
"tungstenite", "tungstenite",
"url", "url",
"yaml-rust", "yaml-rust",
@ -1324,7 +1268,7 @@ version = "0.2.73"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83240549659d187488f91f33c0f8547cbfef0b2088bc470c116d1d260ef623d9" checksum = "83240549659d187488f91f33c0f8547cbfef0b2088bc470c116d1d260ef623d9"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
"wasm-bindgen-macro", "wasm-bindgen-macro",
] ]

View File

@ -36,9 +36,8 @@ dialoguer = { version = "0.8", optional = true }
tungstenite = { version = "0.13", optional = true, default-features = false } tungstenite = { version = "0.13", optional = true, default-features = false }
url = { version = "2.2", optional = true } url = { version = "2.2", optional = true }
igd = { version = "0.12", optional = true } igd = { version = "0.12", optional = true }
tokio = { version = "^1.5", features = ["full"] } timeout_io = "0.6"
async-trait = "0.1" signal = "0.7"
[dev-dependencies] [dev-dependencies]
tempfile = "3" tempfile = "3"

13
ROADMAP.md Normal file
View File

@ -0,0 +1,13 @@
# Roadmap
## Multithreading functionality
- [x] Timeout for device read
- [x] Timeout for net read
- [x] Check how async affects performance
- [x] Sync traffic stats
- [x] Sync forwarding table
- [ ] Fix WS Proxy code
## REST API

View File

@ -136,7 +136,6 @@ fn crypto_aes256(c: &mut Criterion) {
} }
fn full_communication_tun_router(c: &mut Criterion) { fn full_communication_tun_router(c: &mut Criterion) {
let runtime = tokio::runtime::Runtime::new().unwrap();
log::set_max_level(log::LevelFilter::Error); log::set_max_level(log::LevelFilter::Error);
let config1 = Config { let config1 = Config {
device_type: Type::Tun, device_type: Type::Tun,
@ -152,60 +151,53 @@ fn full_communication_tun_router(c: &mut Criterion) {
}; };
let mut sim = TunSimulator::new(); let mut sim = TunSimulator::new();
let (node1, node2) = runtime.block_on(async { let node1 = sim.add_node(false, &config1);
log::set_max_level(log::LevelFilter::Error); let node2 = sim.add_node(false, &config2);
let node1 = sim.add_node(false, &config1).await;
let node2 = sim.add_node(false, &config2).await;
sim.connect(node1, node2).await; sim.connect(node1, node2);
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
(node1, node2) sim.trigger_housekeep();
});
let mut payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2]; let mut payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2];
payload.append(&mut vec![0; 1400]); payload.append(&mut vec![0; 1400]);
let mut g = c.benchmark_group("full_communication"); let mut g = c.benchmark_group("full_communication");
g.throughput(Throughput::Bytes(2 * 1400)); g.throughput(Throughput::Bytes(2 * 1400));
g.bench_function("tun_router", |b| { g.bench_function("tun_router", |b| {
b.iter(|| runtime.block_on(async { b.iter(|| {
sim.put_payload(node1, payload.clone()).await; sim.put_payload(node1, payload.clone());
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert_eq!(Some(&payload), sim.pop_payload(node2).as_ref()); assert_eq!(Some(&payload), sim.pop_payload(node2).as_ref());
})); });
}); });
g.finish() g.finish()
} }
fn full_communication_tap_switch(c: &mut Criterion) { fn full_communication_tap_switch(c: &mut Criterion) {
let runtime = tokio::runtime::Runtime::new().unwrap();
log::set_max_level(log::LevelFilter::Error); log::set_max_level(log::LevelFilter::Error);
let config = Config { device_type: Type::Tap, ..Config::default() }; let config = Config { device_type: Type::Tap, ..Config::default() };
let mut sim = TapSimulator::new(); let mut sim = TapSimulator::new();
let (node1, node2) = runtime.block_on(async { let node1 = sim.add_node(false, &config);
log::set_max_level(log::LevelFilter::Error); let node2 = sim.add_node(false, &config);
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
sim.connect(node1, node2).await; sim.connect(node1, node2);
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
(node1, node2) sim.trigger_housekeep();
});
let mut payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5]; let mut payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5];
payload.append(&mut vec![0; 1400]); payload.append(&mut vec![0; 1400]);
let mut g = c.benchmark_group("full_communication"); let mut g = c.benchmark_group("full_communication");
g.throughput(Throughput::Bytes(2 * 1400)); g.throughput(Throughput::Bytes(2 * 1400));
g.bench_function("tap_switch", |b| { g.bench_function("tap_switch", |b| {
b.iter(|| runtime.block_on(async { b.iter(|| {
sim.put_payload(node1, payload.clone()).await; sim.put_payload(node1, payload.clone());
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert_eq!(Some(&payload), sim.pop_payload(node2).as_ref()); assert_eq!(Some(&payload), sim.pop_payload(node2).as_ref());
})); });
}); });
g.finish() g.finish()
} }

View File

@ -98,7 +98,6 @@ fn crypto_aes256() {
} }
fn full_communication_tun_router() { fn full_communication_tun_router() {
let runtime = tokio::runtime::Runtime::new().unwrap();
log::set_max_level(log::LevelFilter::Error); log::set_max_level(log::LevelFilter::Error);
let config1 = Config { let config1 = Config {
device_type: Type::Tun, device_type: Type::Tun,
@ -113,55 +112,44 @@ fn full_communication_tun_router() {
..Config::default() ..Config::default()
}; };
let mut sim = TunSimulator::new(); let mut sim = TunSimulator::new();
let (node1, node2) = runtime.block_on(async { let node1 = sim.add_node(false, &config1);
log::set_max_level(log::LevelFilter::Error); let node2 = sim.add_node(false, &config2);
let node1 = sim.add_node(false, &config1).await;
let node2 = sim.add_node(false, &config2).await;
sim.connect(node1, node2).await; sim.connect(node1, node2);
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
(node1, node2) sim.trigger_housekeep();
});
let mut payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2]; let mut payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2];
payload.append(&mut vec![0; 1400]); payload.append(&mut vec![0; 1400]);
for _ in 0..1000 { for _ in 0..1000 {
runtime.block_on(async { sim.put_payload(node1, payload.clone());
sim.put_payload(node1, payload.clone()).await; sim.simulate_all_messages();
sim.simulate_all_messages().await; assert_eq!(Some(&payload), black_box(sim.pop_payload(node2).as_ref()));
assert_eq!(Some(&payload), black_box(sim.pop_payload(node2).as_ref()));
});
} }
} }
fn full_communication_tap_switch() { fn full_communication_tap_switch() {
let runtime = tokio::runtime::Runtime::new().unwrap();
log::set_max_level(log::LevelFilter::Error); log::set_max_level(log::LevelFilter::Error);
let config = Config { device_type: Type::Tap, ..Config::default() }; let config = Config { device_type: Type::Tap, ..Config::default() };
let mut sim = TapSimulator::new(); let mut sim = TapSimulator::new();
let (node1, node2) = runtime.block_on(async { let node1 = sim.add_node(false, &config);
log::set_max_level(log::LevelFilter::Error); let node2 = sim.add_node(false, &config);
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
sim.connect(node1, node2).await; sim.connect(node1, node2);
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
(node1, node2) sim.trigger_housekeep();
});
let mut payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5]; let mut payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5];
payload.append(&mut vec![0; 1400]); payload.append(&mut vec![0; 1400]);
for _ in 0..1000 { for _ in 0..1000 {
runtime.block_on(async { sim.put_payload(node1, payload.clone());
sim.put_payload(node1, payload.clone()).await; sim.simulate_all_messages();
sim.simulate_all_messages().await; assert_eq!(Some(&payload), black_box(sim.pop_payload(node2).as_ref()));
assert_eq!(Some(&payload), black_box(sim.pop_payload(node2).as_ref()));
});
} }
} }

View File

@ -4,9 +4,6 @@ from common import EC2Environment, CREATE, eprint
import time, json, os, atexit import time, json, os, atexit
from datetime import date from datetime import date
# Note: this script will run for ~8 minutes and incur costs of about $ 0.02
FILE = "../../target/release/vpncloud" FILE = "../../target/release/vpncloud"
VERSION = "2.2.0" VERSION = "2.2.0"
REGION = "eu-central-1" REGION = "eu-central-1"
@ -25,9 +22,6 @@ env = EC2Environment(
) )
CRYPTO = ["plain", "aes256", "aes128", "chacha20"]
class PerfTest: class PerfTest:
def __init__(self, sender, receiver): def __init__(self, sender, receiver):
self.sender = sender self.sender = sender

View File

@ -323,7 +323,7 @@ use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
#[test] #[test]
async fn encode() { fn encode() {
MockTimeSource::set_time(2000 * 3600); MockTimeSource::set_time(2000 * 3600);
let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey"); let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey");
let mut peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()]; let mut peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()];
@ -335,7 +335,7 @@ async fn encode() {
} }
#[test] #[test]
async fn decode() { fn decode() {
MockTimeSource::set_time(2000 * 3600); MockTimeSource::set_time(2000 * 3600);
let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey"); let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey");
let mut peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()]; let mut peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()];
@ -348,7 +348,7 @@ async fn decode() {
} }
#[test] #[test]
async fn decode_split() { fn decode_split() {
MockTimeSource::set_time(2000 * 3600); MockTimeSource::set_time(2000 * 3600);
let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey"); let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey");
let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()]; let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()];
@ -363,7 +363,7 @@ async fn decode_split() {
} }
#[test] #[test]
async fn decode_offset() { fn decode_offset() {
MockTimeSource::set_time(2000 * 3600); MockTimeSource::set_time(2000 * 3600);
let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey"); let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey");
let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()]; let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()];
@ -374,7 +374,7 @@ async fn decode_offset() {
} }
#[test] #[test]
async fn decode_multiple() { fn decode_multiple() {
MockTimeSource::set_time(2000 * 3600); MockTimeSource::set_time(2000 * 3600);
let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey"); let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey");
let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()]; let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()];
@ -385,7 +385,7 @@ async fn decode_multiple() {
} }
#[test] #[test]
async fn decode_ttl() { fn decode_ttl() {
MockTimeSource::set_time(2000 * 3600); MockTimeSource::set_time(2000 * 3600);
let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey"); let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey");
MockTimeSource::set_time(2000 * 3600); MockTimeSource::set_time(2000 * 3600);
@ -409,7 +409,7 @@ async fn decode_ttl() {
} }
#[test] #[test]
async fn decode_invalid() { fn decode_invalid() {
MockTimeSource::set_time(2000 * 3600); MockTimeSource::set_time(2000 * 3600);
let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey"); let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey");
assert_eq!(0, ser.decode("", None).len()); assert_eq!(0, ser.decode("", None).len());
@ -422,7 +422,7 @@ async fn decode_invalid() {
} }
#[test] #[test]
async fn encode_decode() { fn encode_decode() {
MockTimeSource::set_time(2000 * 3600); MockTimeSource::set_time(2000 * 3600);
let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey"); let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey");
let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()]; let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()];
@ -432,7 +432,7 @@ async fn encode_decode() {
} }
#[test] #[test]
async fn encode_decode_file() { fn encode_decode_file() {
MockTimeSource::set_time(2000 * 3600); MockTimeSource::set_time(2000 * 3600);
let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey"); let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey");
let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()]; let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()];
@ -444,7 +444,7 @@ async fn encode_decode_file() {
} }
#[test] #[test]
async fn encode_decode_cmd() { fn encode_decode_cmd() {
MockTimeSource::set_time(2000 * 3600); MockTimeSource::set_time(2000 * 3600);
let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey"); let ser = BeaconSerializer::<MockTimeSource>::new(b"mysecretkey");
let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()]; let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()];

View File

@ -358,29 +358,25 @@ impl Config {
pub fn is_learning(&self) -> bool { pub fn is_learning(&self) -> bool {
match self.mode { match self.mode {
Mode::Normal => { Mode::Normal => match self.device_type {
match self.device_type { Type::Tap => true,
Type::Tap => true, Type::Tun => false,
Type::Tun => false },
}
}
Mode::Router => false, Mode::Router => false,
Mode::Switch => true, Mode::Switch => true,
Mode::Hub => false Mode::Hub => false,
} }
} }
pub fn is_broadcasting(&self) -> bool { pub fn is_broadcasting(&self) -> bool {
match self.mode { match self.mode {
Mode::Normal => { Mode::Normal => match self.device_type {
match self.device_type { Type::Tap => true,
Type::Tap => true, Type::Tun => false,
Type::Tun => false },
}
}
Mode::Router => false, Mode::Router => false,
Mode::Switch => true, Mode::Switch => true,
Mode::Hub => true Mode::Hub => true,
} }
} }
@ -685,7 +681,7 @@ pub struct ConfigFile {
} }
#[test] #[test]
async fn config_file() { fn config_file() {
let config_file = " let config_file = "
device: device:
type: tun type: tun
@ -766,12 +762,12 @@ statsd:
} }
#[test] #[test]
async fn parse_example_config() { fn parse_example_config() {
serde_yaml::from_str::<ConfigFile>(include_str!("../assets/example.net.disabled")).unwrap(); serde_yaml::from_str::<ConfigFile>(include_str!("../assets/example.net.disabled")).unwrap();
} }
#[test] #[test]
async fn config_merge() { fn config_merge() {
let mut config = Config::default(); let mut config = Config::default();
config.merge_file(ConfigFile { config.merge_file(ConfigFile {
device: Some(ConfigFileDevice { device: Some(ConfigFileDevice {
@ -871,42 +867,45 @@ async fn config_merge() {
group: Some("root".to_string()), group: Some("root".to_string()),
..Default::default() ..Default::default()
}); });
assert_eq!(config, Config { assert_eq!(
device_type: Type::Tap, config,
device_name: "vpncloud0".to_string(), Config {
device_path: Some("/dev/null".to_string()), device_type: Type::Tap,
device_mtu: None, device_name: "vpncloud0".to_string(),
fix_rp_filter: false, device_path: Some("/dev/null".to_string()),
advertise_addresses: vec![], device_mtu: None,
ip: None, fix_rp_filter: false,
ifup: Some("ifconfig $IFNAME 10.0.1.2/16 mtu 1400 up".to_string()), advertise_addresses: vec![],
ifdown: Some("ifconfig $IFNAME down".to_string()), ip: None,
crypto: CryptoConfig { password: Some("anothersecret".to_string()), ..CryptoConfig::default() }, ifup: Some("ifconfig $IFNAME 10.0.1.2/16 mtu 1400 up".to_string()),
listen: "[::]:3211".to_string(), ifdown: Some("ifconfig $IFNAME down".to_string()),
peers: vec![ crypto: CryptoConfig { password: Some("anothersecret".to_string()), ..CryptoConfig::default() },
"remote.machine.foo:3210".to_string(), listen: "[::]:3211".to_string(),
"remote.machine.bar:3210".to_string(), peers: vec![
"another:3210".to_string() "remote.machine.foo:3210".to_string(),
], "remote.machine.bar:3210".to_string(),
peer_timeout: 1801, "another:3210".to_string()
keepalive: Some(850), ],
switch_timeout: 301, peer_timeout: 1801,
beacon_store: Some("/run/vpncloud.beacon.out2".to_string()), keepalive: Some(850),
beacon_load: Some("/run/vpncloud.beacon.in2".to_string()), switch_timeout: 301,
beacon_interval: 3600, beacon_store: Some("/run/vpncloud.beacon.out2".to_string()),
beacon_password: Some("test1234".to_string()), beacon_load: Some("/run/vpncloud.beacon.in2".to_string()),
mode: Mode::Switch, beacon_interval: 3600,
port_forwarding: false, beacon_password: Some("test1234".to_string()),
claims: vec!["10.0.1.0/24".to_string()], mode: Mode::Switch,
auto_claim: true, port_forwarding: false,
user: Some("root".to_string()), claims: vec!["10.0.1.0/24".to_string()],
group: Some("root".to_string()), auto_claim: true,
pid_file: Some("/run/vpncloud-mynet.run".to_string()), user: Some("root".to_string()),
stats_file: Some("/var/log/vpncloud-mynet.stats".to_string()), group: Some("root".to_string()),
statsd_server: Some("example.com:2345".to_string()), pid_file: Some("/run/vpncloud-mynet.run".to_string()),
statsd_prefix: Some("prefix2".to_string()), stats_file: Some("/var/log/vpncloud-mynet.stats".to_string()),
daemonize: true, statsd_server: Some("example.com:2345".to_string()),
hook: None, statsd_prefix: Some("prefix2".to_string()),
hooks: HashMap::new() daemonize: true,
}); hook: None,
hooks: HashMap::new()
}
);
} }

View File

@ -349,7 +349,7 @@ mod tests {
} }
#[test] #[test]
async fn normal() { fn normal() {
let config = Config { password: Some("test".to_string()), ..Default::default() }; let config = Config { password: Some("test".to_string()), ..Default::default() };
let mut node1 = create_node(&config); let mut node1 = create_node(&config);
let mut node2 = create_node(&config); let mut node2 = create_node(&config);

View File

@ -277,7 +277,7 @@ mod tests {
use ring::aead::{self, LessSafeKey, UnboundKey}; use ring::aead::{self, LessSafeKey, UnboundKey};
#[test] #[test]
async fn test_nonce() { fn test_nonce() {
let mut nonce = Nonce::zero(); let mut nonce = Nonce::zero();
assert_eq!(nonce.as_bytes(), &[0; 12]); assert_eq!(nonce.as_bytes(), &[0; 12]);
nonce.increment(); nonce.increment();
@ -299,17 +299,17 @@ mod tests {
} }
#[test] #[test]
async fn test_encrypt_decrypt_aes128() { fn test_encrypt_decrypt_aes128() {
test_encrypt_decrypt(&aead::AES_128_GCM) test_encrypt_decrypt(&aead::AES_128_GCM)
} }
#[test] #[test]
async fn test_encrypt_decrypt_aes256() { fn test_encrypt_decrypt_aes256() {
test_encrypt_decrypt(&aead::AES_256_GCM) test_encrypt_decrypt(&aead::AES_256_GCM)
} }
#[test] #[test]
async fn test_encrypt_decrypt_chacha() { fn test_encrypt_decrypt_chacha() {
test_encrypt_decrypt(&aead::CHACHA20_POLY1305) test_encrypt_decrypt(&aead::CHACHA20_POLY1305)
} }
@ -339,17 +339,17 @@ mod tests {
} }
#[test] #[test]
async fn test_tampering_aes128() { fn test_tampering_aes128() {
test_tampering(&aead::AES_128_GCM) test_tampering(&aead::AES_128_GCM)
} }
#[test] #[test]
async fn test_tampering_aes256() { fn test_tampering_aes256() {
test_tampering(&aead::AES_256_GCM) test_tampering(&aead::AES_256_GCM)
} }
#[test] #[test]
async fn test_tampering_chacha() { fn test_tampering_chacha() {
test_tampering(&aead::CHACHA20_POLY1305) test_tampering(&aead::CHACHA20_POLY1305)
} }
@ -380,17 +380,17 @@ mod tests {
} }
#[test] #[test]
async fn test_nonce_pinning_aes128() { fn test_nonce_pinning_aes128() {
test_nonce_pinning(&aead::AES_128_GCM) test_nonce_pinning(&aead::AES_128_GCM)
} }
#[test] #[test]
async fn test_nonce_pinning_aes256() { fn test_nonce_pinning_aes256() {
test_nonce_pinning(&aead::AES_256_GCM) test_nonce_pinning(&aead::AES_256_GCM)
} }
#[test] #[test]
async fn test_nonce_pinning_chacha() { fn test_nonce_pinning_chacha() {
test_nonce_pinning(&aead::CHACHA20_POLY1305) test_nonce_pinning(&aead::CHACHA20_POLY1305)
} }
@ -429,39 +429,39 @@ mod tests {
} }
#[test] #[test]
async fn test_key_rotation_aes128() { fn test_key_rotation_aes128() {
test_key_rotation(&aead::AES_128_GCM); test_key_rotation(&aead::AES_128_GCM);
} }
#[test] #[test]
async fn test_key_rotation_aes256() { fn test_key_rotation_aes256() {
test_key_rotation(&aead::AES_256_GCM); test_key_rotation(&aead::AES_256_GCM);
} }
#[test] #[test]
async fn test_key_rotation_chacha() { fn test_key_rotation_chacha() {
test_key_rotation(&aead::CHACHA20_POLY1305); test_key_rotation(&aead::CHACHA20_POLY1305);
} }
#[test] #[test]
async fn test_core_size() { fn test_core_size() {
assert_eq!(2400, mem::size_of::<CryptoCore>()); assert_eq!(2400, mem::size_of::<CryptoCore>());
} }
#[test] #[test]
async fn test_speed_aes128() { fn test_speed_aes128() {
let speed = test_speed(&aead::AES_128_GCM, &Duration::from_secs_f32(0.2)); let speed = test_speed(&aead::AES_128_GCM, &Duration::from_secs_f32(0.2));
assert!(speed > 10.0); assert!(speed > 10.0);
} }
#[test] #[test]
async fn test_speed_aes256() { fn test_speed_aes256() {
let speed = test_speed(&aead::AES_256_GCM, &Duration::from_secs_f32(0.2)); let speed = test_speed(&aead::AES_256_GCM, &Duration::from_secs_f32(0.2));
assert!(speed > 10.0); assert!(speed > 10.0);
} }
#[test] #[test]
async fn test_speed_chacha() { fn test_speed_chacha() {
let speed = test_speed(&aead::CHACHA20_POLY1305, &Duration::from_secs_f32(0.2)); let speed = test_speed(&aead::CHACHA20_POLY1305, &Duration::from_secs_f32(0.2));
assert!(speed > 10.0); assert!(speed > 10.0);
} }

View File

@ -731,7 +731,7 @@ mod tests {
} }
#[test] #[test]
async fn normal_init() { fn normal_init() {
let (mut sender, mut receiver) = create_pair(); let (mut sender, mut receiver) = create_pair();
let mut out = MsgBuffer::new(8); let mut out = MsgBuffer::new(8);
sender.send_ping(&mut out); sender.send_ping(&mut out);
@ -753,7 +753,7 @@ mod tests {
} }
#[test] #[test]
async fn lost_init_sender_recovers() { fn lost_init_sender_recovers() {
let (mut sender, mut receiver) = create_pair(); let (mut sender, mut receiver) = create_pair();
let mut out = MsgBuffer::new(8); let mut out = MsgBuffer::new(8);
sender.send_ping(&mut out); sender.send_ping(&mut out);
@ -786,7 +786,7 @@ mod tests {
} }
#[test] #[test]
async fn lost_init_receiver_recovers() { fn lost_init_receiver_recovers() {
let (mut sender, mut receiver) = create_pair(); let (mut sender, mut receiver) = create_pair();
let mut out = MsgBuffer::new(8); let mut out = MsgBuffer::new(8);
sender.send_ping(&mut out); sender.send_ping(&mut out);
@ -818,7 +818,7 @@ mod tests {
} }
#[test] #[test]
async fn timeout() { fn timeout() {
let (mut sender, _receiver) = create_pair(); let (mut sender, _receiver) = create_pair();
let mut out = MsgBuffer::new(8); let mut out = MsgBuffer::new(8);
sender.send_ping(&mut out); sender.send_ping(&mut out);
@ -833,7 +833,7 @@ mod tests {
} }
#[test] #[test]
async fn untrusted_peer() { fn untrusted_peer() {
let (mut sender, _) = create_pair(); let (mut sender, _) = create_pair();
let (_, mut receiver) = create_pair(); let (_, mut receiver) = create_pair();
let mut out = MsgBuffer::new(8); let mut out = MsgBuffer::new(8);
@ -843,7 +843,7 @@ mod tests {
} }
#[test] #[test]
async fn manipulated_message() { fn manipulated_message() {
let (mut sender, mut receiver) = create_pair(); let (mut sender, mut receiver) = create_pair();
let mut out = MsgBuffer::new(8); let mut out = MsgBuffer::new(8);
sender.send_ping(&mut out); sender.send_ping(&mut out);
@ -853,7 +853,7 @@ mod tests {
} }
#[test] #[test]
async fn connect_to_self() { fn connect_to_self() {
let (mut sender, _) = create_pair(); let (mut sender, _) = create_pair();
let mut out = MsgBuffer::new(8); let mut out = MsgBuffer::new(8);
sender.send_ping(&mut out); sender.send_ping(&mut out);
@ -881,7 +881,7 @@ mod tests {
} }
#[test] #[test]
async fn algorithm_negotiation() { fn algorithm_negotiation() {
// Equal algorithms // Equal algorithms
test_algorithm_negotiation( test_algorithm_negotiation(
Algorithms { Algorithms {

View File

@ -228,7 +228,7 @@ mod tests {
} }
#[test] #[test]
async fn test_encode_decode_message() { fn test_encode_decode_message() {
let mut data = Vec::with_capacity(100); let mut data = Vec::with_capacity(100);
let (_, key) = RotationState::create_key(); let (_, key) = RotationState::create_key();
let msg = RotationMessage { message_id: 1, propose: key, confirm: None }; let msg = RotationMessage { message_id: 1, propose: key, confirm: None };
@ -249,7 +249,7 @@ mod tests {
} }
#[test] #[test]
async fn test_normal_rotation() { fn test_normal_rotation() {
let mut out1 = MsgBuffer::new(8); let mut out1 = MsgBuffer::new(8);
let mut out2 = MsgBuffer::new(8); let mut out2 = MsgBuffer::new(8);
@ -323,7 +323,7 @@ mod tests {
} }
#[test] #[test]
async fn test_duplication() { fn test_duplication() {
let mut out1 = MsgBuffer::new(8); let mut out1 = MsgBuffer::new(8);
let mut out2 = MsgBuffer::new(8); let mut out2 = MsgBuffer::new(8);
@ -359,7 +359,7 @@ mod tests {
} }
#[test] #[test]
async fn test_lost_message() { fn test_lost_message() {
let mut out1 = MsgBuffer::new(8); let mut out1 = MsgBuffer::new(8);
let mut out2 = MsgBuffer::new(8); let mut out2 = MsgBuffer::new(8);
@ -385,7 +385,7 @@ mod tests {
} }
#[test] #[test]
async fn test_reflect_back() { fn test_reflect_back() {
let mut out1 = MsgBuffer::new(8); let mut out1 = MsgBuffer::new(8);
let mut out2 = MsgBuffer::new(8); let mut out2 = MsgBuffer::new(8);

View File

@ -2,23 +2,9 @@
// Copyright (C) 2015-2021 Dennis Schwerdel // Copyright (C) 2015-2021 Dennis Schwerdel
// This software is licensed under GPL-3 or newer (see LICENSE.md) // This software is licensed under GPL-3 or newer (see LICENSE.md)
use async_trait::async_trait;
use parking_lot::Mutex; use parking_lot::Mutex;
use std::{ use std::{cmp, collections::VecDeque, convert::TryInto, fmt, fs::{self, File}, io::{self, Cursor, Read, Write, Error as IoError, BufReader, BufRead}, net::{Ipv4Addr, UdpSocket}, os::unix::io::AsRawFd, str, str::FromStr, sync::Arc, time::Duration};
cmp, use timeout_io::Reader;
collections::VecDeque,
convert::TryInto,
fmt,
io::{self, Cursor, Read, Write, Error as IoError, BufReader, BufRead},
net::{Ipv4Addr, UdpSocket},
fs::{self, File},
os::unix::io::AsRawFd,
str,
str::FromStr,
sync::Arc,
};
use tokio::fs::{File as AsyncFile};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use crate::{crypto, error::Error, util::MsgBuffer}; use crate::{crypto, error::Error, util::MsgBuffer};
@ -79,7 +65,6 @@ impl FromStr for Type {
} }
} }
#[async_trait]
pub trait Device: Send + 'static + Sized { pub trait Device: Send + 'static + Sized {
/// Returns the type of this device /// Returns the type of this device
fn get_type(&self) -> Type; fn get_type(&self) -> Type;
@ -95,7 +80,7 @@ pub trait Device: Send + 'static + Sized {
/// ///
/// # Errors /// # Errors
/// This method will return an error if the underlying read call fails. /// This method will return an error if the underlying read call fails.
async fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error>; fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error>;
/// Writes a packet/frame to the device /// Writes a packet/frame to the device
/// ///
@ -106,9 +91,9 @@ pub trait Device: Send + 'static + Sized {
/// ///
/// # Errors /// # Errors
/// This method will return an error if the underlying read call fails. /// This method will return an error if the underlying read call fails.
async fn write(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error>; fn write(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error>;
async fn duplicate(&self) -> Result<Self, Error>; fn duplicate(&self) -> Result<Self, Error>;
fn get_ip(&self) -> Result<Ipv4Addr, Error>; fn get_ip(&self) -> Result<Ipv4Addr, Error>;
} }
@ -218,23 +203,6 @@ impl TunTapDevice {
} }
Ok(()) Ok(())
} }
}
/// Represents a tun/tap device
pub struct AsyncTunTapDevice {
fd: AsyncFile,
ifname: String,
type_: Type,
}
impl AsyncTunTapDevice {
pub fn from_sync(dev: TunTapDevice) -> Self {
Self {
fd: AsyncFile::from_std(dev.fd),
ifname: dev.ifname,
type_: dev.type_
}
}
#[cfg(any(target_os = "linux", target_os = "android"))] #[cfg(any(target_os = "linux", target_os = "android"))]
#[inline] #[inline]
@ -286,32 +254,32 @@ impl AsyncTunTapDevice {
} }
} }
#[async_trait] impl Device for TunTapDevice {
impl Device for AsyncTunTapDevice {
fn get_type(&self) -> Type { fn get_type(&self) -> Type {
self.type_ self.type_
} }
async fn duplicate(&self) -> Result<Self, Error> { fn duplicate(&self) -> Result<Self, Error> {
Ok(Self { Ok(Self {
fd: self.fd.try_clone().await.map_err(|e| Error::DeviceIo("Failed to clone device", e))?, fd: self.fd.try_clone().map_err(|e| Error::DeviceIo("Failed to clone device", e))?,
ifname: self.ifname.clone(), ifname: self.ifname.clone(),
type_: self.type_, type_: self.type_,
}) })
} }
async fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> { fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> {
buffer.clear(); buffer.clear();
let read = self.fd.read(buffer.buffer()).await.map_err(|e| Error::DeviceIo("Read error", e))?; let mut read = 0;
self.fd.try_read(buffer.buffer(), &mut read, Duration::from_secs(1)).map_err(|e| Error::DeviceRead(e))?;
buffer.set_length(read); buffer.set_length(read);
self.correct_data_after_read(buffer); self.correct_data_after_read(buffer);
Ok(()) Ok(())
} }
async fn write(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> { fn write(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> {
self.correct_data_before_write(buffer); self.correct_data_before_write(buffer);
match self.fd.write_all(buffer.message()).await { match self.fd.write_all(buffer.message()) {
Ok(_) => self.fd.flush().await.map_err(|e| Error::DeviceIo("Flush error", e)), Ok(_) => self.fd.flush().map_err(|e| Error::DeviceIo("Flush error", e)),
Err(e) => Err(Error::DeviceIo("Write error", e)), Err(e) => Err(Error::DeviceIo("Write error", e)),
} }
} }
@ -345,9 +313,8 @@ impl MockDevice {
} }
} }
#[async_trait]
impl Device for MockDevice { impl Device for MockDevice {
async fn duplicate(&self) -> Result<Self, Error> { fn duplicate(&self) -> Result<Self, Error> {
Ok(self.clone()) Ok(self.clone())
} }
@ -355,7 +322,7 @@ impl Device for MockDevice {
Type::Tun Type::Tun
} }
async fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> { fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> {
if let Some(data) = self.inbound.lock().pop_front() { if let Some(data) = self.inbound.lock().pop_front() {
buffer.clear(); buffer.clear();
buffer.set_length(data.len()); buffer.set_length(data.len());
@ -366,7 +333,7 @@ impl Device for MockDevice {
} }
} }
async fn write(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> { fn write(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> {
self.outbound.lock().push_back(buffer.message().into()); self.outbound.lock().push_back(buffer.message().into());
Ok(()) Ok(())
} }

View File

@ -1,10 +1,11 @@
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::thread;
use std::{fs::File, hash::BuildHasherDefault}; use std::{fs::File, hash::BuildHasherDefault};
use tokio;
use fnv::FnvHasher; use fnv::FnvHasher;
use crate::util::CtrlC;
use crate::{ use crate::{
config::Config, config::Config,
crypto::PeerCrypto, crypto::PeerCrypto,
@ -46,7 +47,7 @@ pub struct GenericCloud<D: Device, P: Protocol, S: Socket, TS: TimeSource> {
impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS> { impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS> {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn new( pub fn new(
config: &Config, socket: S, device: D, port_forwarding: Option<PortForwarding>, stats_file: Option<File>, config: &Config, socket: S, device: D, port_forwarding: Option<PortForwarding>, stats_file: Option<File>,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let table = SharedTable::<TS>::new(&config); let table = SharedTable::<TS>::new(&config);
@ -55,7 +56,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
let running = Arc::new(AtomicBool::new(true)); let running = Arc::new(AtomicBool::new(true));
let device_thread = DeviceThread::<S, D, P, TS>::new( let device_thread = DeviceThread::<S, D, P, TS>::new(
config.clone(), config.clone(),
device.duplicate().await?, device.duplicate()?,
socket.clone(), socket.clone(),
traffic.clone(), traffic.clone(),
peer_crypto.clone(), peer_crypto.clone(),
@ -73,7 +74,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
stats_file, stats_file,
running.clone(), running.clone(),
); );
socket_thread.housekeep().await?; socket_thread.housekeep()?;
Ok(Self { socket_thread, device_thread, running }) Ok(Self { socket_thread, device_thread, running })
} }
@ -90,17 +91,19 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
Ok(()) Ok(())
} }
pub async fn run(self) { pub fn run(self) {
debug!("Starting threads"); debug!("Starting threads");
let running = self.running.clone(); let running = self.running.clone();
let device_thread_handle = tokio::spawn(self.device_thread.run()); let device = self.device_thread;
let socket_thread_handle = tokio::spawn(self.socket_thread.run()); let device_thread_handle = thread::spawn(move || device.run());
try_fail!(tokio::signal::ctrl_c().await, "Failed to set ctrl-c handler: {}"); let socket = self.socket_thread;
let socket_thread_handle = thread::spawn(move || socket.run());
let ctrlc = CtrlC::new();
ctrlc.wait();
running.store(false, Ordering::SeqCst); running.store(false, Ordering::SeqCst);
debug!("Waiting for threads to end"); debug!("Waiting for threads to end");
let (dev_ret, sock_ret) = join!(device_thread_handle, socket_thread_handle); device_thread_handle.join().unwrap();
dev_ret.unwrap(); socket_thread_handle.join().unwrap();
sock_ret.unwrap();
debug!("Threads stopped"); debug!("Threads stopped");
} }
} }
@ -124,21 +127,21 @@ impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
&mut self.device_thread.device &mut self.device_thread.device
} }
pub async fn connect(&mut self, addr: SocketAddr) -> Result<(), Error> { pub fn connect(&mut self, addr: SocketAddr) -> Result<(), Error> {
self.socket_thread.connect(addr).await self.socket_thread.connect(addr)
} }
pub async fn trigger_socket_event(&mut self) { pub fn trigger_socket_event(&mut self) {
self.socket_thread.iteration().await self.socket_thread.iteration()
} }
pub async fn trigger_device_event(&mut self) { pub fn trigger_device_event(&mut self) {
self.device_thread.iteration().await self.device_thread.iteration()
} }
pub async fn trigger_housekeep(&mut self) { pub fn trigger_housekeep(&mut self) {
try_fail!(self.socket_thread.housekeep().await, "Housekeep failed: {}"); try_fail!(self.socket_thread.housekeep(), "Housekeep failed: {}");
try_fail!(self.device_thread.housekeep().await, "Housekeep failed: {}"); try_fail!(self.device_thread.housekeep(), "Housekeep failed: {}");
} }
pub fn is_connected(&self, addr: &SocketAddr) -> bool { pub fn is_connected(&self, addr: &SocketAddr) -> bool {
@ -149,7 +152,7 @@ impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
&self.socket_thread.own_addresses &self.socket_thread.own_addresses
} }
pub async fn get_num(&self) -> usize { pub fn get_num(&self) -> usize {
self.socket_thread.socket.address().await.unwrap().port() as usize self.socket_thread.socket.address().unwrap().port() as usize
} }
} }

View File

@ -14,7 +14,6 @@ use crate::{
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::{marker::PhantomData, net::SocketAddr}; use std::{marker::PhantomData, net::SocketAddr};
use tokio::time::timeout;
pub struct DeviceThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> { pub struct DeviceThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
// Read-only fields // Read-only fields
@ -56,11 +55,11 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
} }
#[inline] #[inline]
async fn send_to(&mut self, addr: SocketAddr) -> Result<(), Error> { fn send_to(&mut self, addr: SocketAddr) -> Result<(), Error> {
let size = self.buffer.len(); let size = self.buffer.len();
debug!("Sending msg with {} bytes to {}", size, addr); debug!("Sending msg with {} bytes to {}", size, addr);
self.traffic.count_out_traffic(addr, size); self.traffic.count_out_traffic(addr, size);
match self.socket.send(self.buffer.message(), addr).await { match self.socket.send(self.buffer.message(), addr) {
Ok(written) if written == size => Ok(()), Ok(written) if written == size => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")), Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)), Err(e) => Err(Error::SocketIo("IOError when sending", e)),
@ -68,15 +67,15 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
} }
#[inline] #[inline]
async fn send_msg(&mut self, addr: SocketAddr, type_: u8) -> Result<(), Error> { fn send_msg(&mut self, addr: SocketAddr, type_: u8) -> Result<(), Error> {
debug!("Sending msg with {} bytes to {}", self.buffer.len(), addr); debug!("Sending msg with {} bytes to {}", self.buffer.len(), addr);
self.buffer.prepend_byte(type_); self.buffer.prepend_byte(type_);
self.peer_crypto.encrypt_for(addr, &mut self.buffer)?; self.peer_crypto.encrypt_for(addr, &mut self.buffer)?;
self.send_to(addr).await self.send_to(addr)
} }
#[inline] #[inline]
async fn broadcast_msg(&mut self, type_: u8) -> Result<(), Error> { fn broadcast_msg(&mut self, type_: u8) -> Result<(), Error> {
let size = self.buffer.len(); let size = self.buffer.len();
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, size, self.peer_crypto.count()); debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, size, self.peer_crypto.count());
let traffic = &mut self.traffic; let traffic = &mut self.traffic;
@ -91,7 +90,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
crypto.encrypt(&mut self.broadcast_buffer); crypto.encrypt(&mut self.broadcast_buffer);
} }
traffic.count_out_traffic(*addr, self.broadcast_buffer.len()); traffic.count_out_traffic(*addr, self.broadcast_buffer.len());
match socket.send(self.broadcast_buffer.message(), *addr).await { match socket.send(self.broadcast_buffer.message(), *addr) {
Ok(written) if written == self.broadcast_buffer.len() => Ok(()), Ok(written) if written == self.broadcast_buffer.len() => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")), Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)), Err(e) => Err(Error::SocketIo("IOError when sending", e)),
@ -100,7 +99,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
Ok(()) Ok(())
} }
async fn forward_packet(&mut self) -> Result<(), Error> { fn forward_packet(&mut self) -> Result<(), Error> {
let (src, dst) = P::parse(self.buffer.message())?; let (src, dst) = P::parse(self.buffer.message())?;
debug!("Read data from interface: src: {}, dst: {}, {} bytes", src, dst, self.buffer.len()); debug!("Read data from interface: src: {}, dst: {}, {} bytes", src, dst, self.buffer.len());
self.traffic.count_out_payload(dst, src, self.buffer.len()); self.traffic.count_out_payload(dst, src, self.buffer.len());
@ -108,12 +107,12 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
Some(addr) => { Some(addr) => {
// Peer found for destination // Peer found for destination
debug!("Found destination for {} => {}", dst, addr); debug!("Found destination for {} => {}", dst, addr);
self.send_msg(addr, MESSAGE_TYPE_DATA).await?; self.send_msg(addr, MESSAGE_TYPE_DATA)?;
} }
None => { None => {
if self.broadcast { if self.broadcast {
debug!("No destination for {} found, broadcasting", dst); debug!("No destination for {} found, broadcasting", dst);
self.broadcast_msg(MESSAGE_TYPE_DATA).await?; self.broadcast_msg(MESSAGE_TYPE_DATA)?;
} else { } else {
debug!("No destination for {} found, dropping", dst); debug!("No destination for {} found, dropping", dst);
self.traffic.count_dropped_payload(self.buffer.len()); self.traffic.count_dropped_payload(self.buffer.len());
@ -123,35 +122,35 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
Ok(()) Ok(())
} }
pub async fn housekeep(&mut self) -> Result<(), Error> { pub fn housekeep(&mut self) -> Result<(), Error> {
self.peer_crypto.load(); self.peer_crypto.load();
self.table.sync(); self.table.sync();
self.traffic.sync(); self.traffic.sync();
Ok(()) Ok(())
} }
pub async fn iteration(&mut self) { pub fn iteration(&mut self) {
if let Ok(result) = timeout(std::time::Duration::from_millis(1000), self.device.read(&mut self.buffer)).await { if self.device.read(&mut self.buffer).is_ok() {
try_fail!(result, "Failed to read from device: {}"); //try_fail!(result, "Failed to read from device: {}");
if let Err(e) = self.forward_packet().await { if let Err(e) = self.forward_packet() {
error!("{}", e); error!("{}", e);
} }
} }
let now = TS::now(); let now = TS::now();
if self.next_housekeep < now { if self.next_housekeep < now {
if let Err(e) = self.housekeep().await { if let Err(e) = self.housekeep() {
error!("{}", e) error!("{}", e)
} }
self.next_housekeep = now + 1 self.next_housekeep = now + 1
} }
} }
pub async fn run(mut self) { pub fn run(mut self) {
loop { loop {
self.iteration().await; self.iteration();
if !self.running.load(Ordering::SeqCst) { if !self.running.load(Ordering::SeqCst) {
debug!("Device: end"); debug!("Device: end");
return return;
} }
} }
} }

View File

@ -71,42 +71,49 @@ impl SharedPeerCrypto {
} }
} }
#[derive(Clone)]
pub struct SharedTraffic { pub struct SharedTraffic {
cache: TrafficStats,
traffic: Arc<Mutex<TrafficStats>>, traffic: Arc<Mutex<TrafficStats>>,
} }
impl Clone for SharedTraffic {
fn clone(&self) -> Self {
Self { cache: TrafficStats::default(), traffic: self.traffic.clone() }
}
}
impl SharedTraffic { impl SharedTraffic {
pub fn new() -> Self { pub fn new() -> Self {
Self { traffic: Arc::new(Mutex::new(Default::default())) } Self { cache: TrafficStats::default(), traffic: Arc::new(Mutex::new(Default::default())) }
} }
pub fn sync(&mut self) { pub fn sync(&mut self) {
// TODO sync if needed self.traffic.lock().add(&self.cache);
self.cache.clear();
} }
pub fn count_out_traffic(&self, peer: SocketAddr, bytes: usize) { pub fn count_out_traffic(&mut self, peer: SocketAddr, bytes: usize) {
self.traffic.lock().count_out_traffic(peer, bytes); self.cache.count_out_traffic(peer, bytes);
} }
pub fn count_in_traffic(&self, peer: SocketAddr, bytes: usize) { pub fn count_in_traffic(&mut self, peer: SocketAddr, bytes: usize) {
self.traffic.lock().count_in_traffic(peer, bytes); self.cache.count_in_traffic(peer, bytes);
} }
pub fn count_out_payload(&self, remote: Address, local: Address, bytes: usize) { pub fn count_out_payload(&mut self, remote: Address, local: Address, bytes: usize) {
self.traffic.lock().count_out_payload(remote, local, bytes); self.cache.count_out_payload(remote, local, bytes);
} }
pub fn count_in_payload(&self, remote: Address, local: Address, bytes: usize) { pub fn count_in_payload(&mut self, remote: Address, local: Address, bytes: usize) {
self.traffic.lock().count_in_payload(remote, local, bytes); self.cache.count_in_payload(remote, local, bytes);
} }
pub fn count_dropped_payload(&self, bytes: usize) { pub fn count_dropped_payload(&mut self, bytes: usize) {
self.traffic.lock().count_dropped_payload(bytes); self.cache.count_dropped_payload(bytes);
} }
pub fn count_invalid_protocol(&self, bytes: usize) { pub fn count_invalid_protocol(&mut self, bytes: usize) {
self.traffic.lock().count_invalid_protocol(bytes); self.cache.count_invalid_protocol(bytes);
} }
pub fn period(&mut self, cleanup_idle: Option<usize>) { pub fn period(&mut self, cleanup_idle: Option<usize>) {
@ -133,60 +140,60 @@ impl SharedTraffic {
#[derive(Clone)] #[derive(Clone)]
pub struct SharedTable<TS: TimeSource> { pub struct SharedTable<TS: TimeSource> {
table: Arc<Mutex<ClaimTable<TS>>>, table: Arc<Mutex<ClaimTable<TS>>>,
//TODO: local reader lookup table Addr => Option<SocketAddr> cache: HashMap<Address, Option<SocketAddr>, Hash>,
//TODO: local writer cache Addr => SocketAddr
} }
impl<TS: TimeSource> SharedTable<TS> { impl<TS: TimeSource> SharedTable<TS> {
pub fn new(config: &Config) -> Self { pub fn new(config: &Config) -> Self {
let table = ClaimTable::new(config.switch_timeout as Duration, config.peer_timeout as Duration); let table = ClaimTable::new(config.switch_timeout as Duration, config.peer_timeout as Duration);
SharedTable { table: Arc::new(Mutex::new(table)) } SharedTable { table: Arc::new(Mutex::new(table)), cache: Default::default() }
} }
pub fn sync(&mut self) { pub fn sync(&mut self) {
// TODO sync if needed self.cache.clear();
// once every x seconds
// fetch reader cache
// clear writer cache
} }
pub fn lookup(&mut self, addr: Address) -> Option<SocketAddr> { pub fn lookup(&mut self, addr: Address) -> Option<SocketAddr> {
// TODO: use local reader cache if let Some(val) = self.cache.get(&addr) {
return *val;
}
// if not found, use shared table and put into cache // if not found, use shared table and put into cache
self.table.lock().lookup(addr) let val = self.table.lock().lookup(addr);
self.cache.insert(addr, val);
val
} }
pub fn set_claims(&mut self, peer: SocketAddr, claims: RangeList) { pub fn set_claims(&mut self, peer: SocketAddr, claims: RangeList) {
// clear writer cache self.table.lock().set_claims(peer, claims);
self.table.lock().set_claims(peer, claims) self.cache.clear();
} }
pub fn remove_claims(&mut self, peer: SocketAddr) { pub fn remove_claims(&mut self, peer: SocketAddr) {
// clear writer cache self.table.lock().remove_claims(peer);
self.table.lock().remove_claims(peer) self.cache.clear();
} }
pub fn cache(&mut self, addr: Address, peer: SocketAddr) { pub fn cache(&mut self, addr: Address, peer: SocketAddr) {
// check writer cache and only write real updates to shared table if self.cache.get(&addr) != Some(&Some(peer)) {
self.table.lock().cache(addr, peer) self.table.lock().cache(addr, peer);
self.cache.insert(addr, Some(peer));
}
} }
pub fn housekeep(&mut self) { pub fn housekeep(&mut self) {
self.table.lock().housekeep() self.table.lock().housekeep();
self.cache.clear();
} }
pub fn write_out<W: Write>(&self, out: &mut W) -> Result<(), io::Error> { pub fn write_out<W: Write>(&self, out: &mut W) -> Result<(), io::Error> {
//TODO: stats call
self.table.lock().write_out(out) self.table.lock().write_out(out)
} }
pub fn cache_len(&self) -> usize { pub fn cache_len(&self) -> usize {
//TODO: stats call
self.table.lock().cache_len() self.table.lock().cache_len()
} }
pub fn claim_len(&self) -> usize { pub fn claim_len(&self) -> usize {
//TODO: stats call
self.table.lock().claim_len() self.table.lock().claim_len()
} }
} }

View File

@ -35,7 +35,6 @@ use std::{
net::{SocketAddr, ToSocketAddrs}, net::{SocketAddr, ToSocketAddrs},
str::FromStr, str::FromStr,
}; };
use tokio::time::timeout;
const MAX_RECONNECT_INTERVAL: u16 = 3600; const MAX_RECONNECT_INTERVAL: u16 = 3600;
const RESOLVE_INTERVAL: Time = 300; const RESOLVE_INTERVAL: Time = 300;
@ -151,11 +150,11 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
} }
#[inline] #[inline]
async fn send_to(&mut self, addr: SocketAddr) -> Result<(), Error> { fn send_to(&mut self, addr: SocketAddr) -> Result<(), Error> {
let size = self.buffer.len(); let size = self.buffer.len();
debug!("Sending msg with {} bytes to {}", size, addr); debug!("Sending msg with {} bytes to {}", size, addr);
self.traffic.count_out_traffic(addr, size); self.traffic.count_out_traffic(addr, size);
match self.socket.send(self.buffer.message(), addr).await { match self.socket.send(self.buffer.message(), addr) {
Ok(written) if written == size => { Ok(written) if written == size => {
self.buffer.clear(); self.buffer.clear();
Ok(()) Ok(())
@ -166,7 +165,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
} }
#[inline] #[inline]
async fn broadcast_msg(&mut self, type_: u8) -> Result<(), Error> { fn broadcast_msg(&mut self, type_: u8) -> Result<(), Error> {
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, self.buffer.len(), self.peers.len()); debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, self.buffer.len(), self.peers.len());
for (addr, peer) in &mut self.peers { for (addr, peer) in &mut self.peers {
self.broadcast_buffer.set_start(self.buffer.get_start()); self.broadcast_buffer.set_start(self.buffer.get_start());
@ -175,7 +174,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
self.broadcast_buffer.prepend_byte(type_); self.broadcast_buffer.prepend_byte(type_);
peer.crypto.encrypt_message(&mut self.broadcast_buffer); peer.crypto.encrypt_message(&mut self.broadcast_buffer);
self.traffic.count_out_traffic(*addr, self.broadcast_buffer.len()); self.traffic.count_out_traffic(*addr, self.broadcast_buffer.len());
match self.socket.send(self.broadcast_buffer.message(), *addr).await { match self.socket.send(self.broadcast_buffer.message(), *addr) {
Ok(written) if written == self.broadcast_buffer.len() => Ok(()), Ok(written) if written == self.broadcast_buffer.len() => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")), Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)), Err(e) => Err(Error::SocketIo("IOError when sending", e)),
@ -185,7 +184,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
Ok(()) Ok(())
} }
async fn connect_sock(&mut self, addr: SocketAddr) -> Result<(), Error> { fn connect_sock(&mut self, addr: SocketAddr) -> Result<(), Error> {
let addr = mapped_addr(addr); let addr = mapped_addr(addr);
if self.peers.contains_key(&addr) if self.peers.contains_key(&addr)
|| self.own_addresses.contains(&addr) || self.own_addresses.contains(&addr)
@ -198,10 +197,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
let mut init = self.crypto.peer_instance(payload); let mut init = self.crypto.peer_instance(payload);
init.send_ping(&mut self.buffer); init.send_ping(&mut self.buffer);
self.pending_inits.insert(addr, init); self.pending_inits.insert(addr, init);
self.send_to(addr).await self.send_to(addr)
} }
pub async fn connect<Addr: ToSocketAddrs + fmt::Debug + Clone>(&mut self, addr: Addr) -> Result<(), Error> { pub fn connect<Addr: ToSocketAddrs + fmt::Debug + Clone>(&mut self, addr: Addr) -> Result<(), Error> {
let addrs = resolve(&addr)?.into_iter().map(mapped_addr).collect::<SmallVec<[SocketAddr; 3]>>(); let addrs = resolve(&addr)?.into_iter().map(mapped_addr).collect::<SmallVec<[SocketAddr; 3]>>();
for addr in &addrs { for addr in &addrs {
if self.own_addresses.contains(addr) if self.own_addresses.contains(addr)
@ -214,7 +213,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
// Send a message to each resolved address // Send a message to each resolved address
for a in addrs { for a in addrs {
// Ignore error this time // Ignore error this time
self.connect_sock(a).await.ok(); self.connect_sock(a).ok();
} }
Ok(()) Ok(())
} }
@ -238,7 +237,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
} }
} }
async fn update_peer_info(&mut self, addr: SocketAddr, info: Option<NodeInfo>) -> Result<(), Error> { fn update_peer_info(&mut self, addr: SocketAddr, info: Option<NodeInfo>) -> Result<(), Error> {
if let Some(peer) = self.peers.get_mut(&addr) { if let Some(peer) = self.peers.get_mut(&addr) {
peer.last_seen = TS::now(); peer.last_seen = TS::now();
peer.timeout = TS::now() + self.config.peer_timeout as Time; peer.timeout = TS::now() + self.config.peer_timeout as Time;
@ -260,12 +259,12 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
debug!("Adding claims of peer {}: {:?}", addr_nice(addr), info.claims); debug!("Adding claims of peer {}: {:?}", addr_nice(addr), info.claims);
self.table.set_claims(addr, info.claims); self.table.set_claims(addr, info.claims);
debug!("Received {} peers from {}: {:?}", info.peers.len(), addr_nice(addr), info.peers); debug!("Received {} peers from {}: {:?}", info.peers.len(), addr_nice(addr), info.peers);
self.connect_to_peers(&info.peers).await?; self.connect_to_peers(&info.peers)?;
} }
Ok(()) Ok(())
} }
async fn add_new_peer(&mut self, addr: SocketAddr, info: NodeInfo) -> Result<(), Error> { fn add_new_peer(&mut self, addr: SocketAddr, info: NodeInfo) -> Result<(), Error> {
info!("Added peer {}", addr_nice(addr)); info!("Added peer {}", addr_nice(addr));
if let Some(init) = self.pending_inits.remove(&addr) { if let Some(init) = self.pending_inits.remove(&addr) {
self.buffer.clear(); self.buffer.clear();
@ -281,9 +280,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
timeout: TS::now() + self.config.peer_timeout as Time, timeout: TS::now() + self.config.peer_timeout as Time,
}, },
); );
self.update_peer_info(addr, Some(info)).await?; self.update_peer_info(addr, Some(info))?;
if !self.buffer.is_empty() { if !self.buffer.is_empty() {
self.send_to(addr).await?; self.send_to(addr)?;
} }
self.peer_crypto.store(&self.peers); self.peer_crypto.store(&self.peers);
} else { } else {
@ -292,7 +291,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
Ok(()) Ok(())
} }
async fn connect_to_peers(&mut self, peers: &[PeerInfo]) -> Result<(), Error> { fn connect_to_peers(&mut self, peers: &[PeerInfo]) -> Result<(), Error> {
'outer: for peer in peers { 'outer: for peer in peers {
for addr in &peer.addrs { for addr in &peer.addrs {
if self.peers.contains_key(addr) { if self.peers.contains_key(addr) {
@ -315,7 +314,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
} }
} }
} }
self.connect(&peer.addrs as &[SocketAddr]).await?; self.connect(&peer.addrs as &[SocketAddr])?;
} }
Ok(()) Ok(())
} }
@ -328,12 +327,12 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
} }
} }
async fn handle_payload_from(&mut self, peer: SocketAddr) -> Result<(), Error> { fn handle_payload_from(&mut self, peer: SocketAddr) -> Result<(), Error> {
let (src, dst) = P::parse(self.buffer.message())?; let (src, dst) = P::parse(self.buffer.message())?;
let len = self.buffer.len(); let len = self.buffer.len();
debug!("Writing data to device: {} bytes", len); debug!("Writing data to device: {} bytes", len);
self.traffic.count_in_payload(src, dst, len); self.traffic.count_in_payload(src, dst, len);
if let Err(e) = self.device.write(&mut self.buffer).await { if let Err(e) = self.device.write(&mut self.buffer) {
error!("Failed to send via device: {}", e); error!("Failed to send via device: {}", e);
return Err(e); return Err(e);
} }
@ -345,10 +344,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
Ok(()) Ok(())
} }
async fn process_message(&mut self, src: SocketAddr, msg_result: MessageResult) -> Result<(), Error> { fn process_message(&mut self, src: SocketAddr, msg_result: MessageResult) -> Result<(), Error> {
match msg_result { match msg_result {
MessageResult::Message(type_) => match type_ { MessageResult::Message(type_) => match type_ {
MESSAGE_TYPE_DATA => self.handle_payload_from(src).await?, MESSAGE_TYPE_DATA => self.handle_payload_from(src)?,
MESSAGE_TYPE_NODE_INFO => { MESSAGE_TYPE_NODE_INFO => {
let info = match NodeInfo::decode(Cursor::new(self.buffer.message())) { let info = match NodeInfo::decode(Cursor::new(self.buffer.message())) {
Ok(val) => val, Ok(val) => val,
@ -358,11 +357,11 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
} }
}; };
self.buffer.clear(); self.buffer.clear();
self.update_peer_info(src, Some(info)).await?; self.update_peer_info(src, Some(info))?;
} }
MESSAGE_TYPE_KEEPALIVE => { MESSAGE_TYPE_KEEPALIVE => {
self.buffer.clear(); self.buffer.clear();
self.update_peer_info(src, None).await?; self.update_peer_info(src, None)?;
} }
MESSAGE_TYPE_CLOSE => { MESSAGE_TYPE_CLOSE => {
self.buffer.clear(); self.buffer.clear();
@ -374,7 +373,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
return Err(Error::Message("Unknown message type")); return Err(Error::Message("Unknown message type"));
} }
}, },
MessageResult::Reply => self.send_to(src).await?, MessageResult::Reply => self.send_to(src)?,
MessageResult::None => { MessageResult::None => {
self.buffer.clear(); self.buffer.clear();
} }
@ -382,13 +381,13 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
Ok(()) Ok(())
} }
async fn handle_message(&mut self, src: SocketAddr) -> Result<(), Error> { fn handle_message(&mut self, src: SocketAddr) -> Result<(), Error> {
let src = mapped_addr(src); let src = mapped_addr(src);
debug!("Received {} bytes from {}", self.buffer.len(), src); debug!("Received {} bytes from {}", self.buffer.len(), src);
let buffer = &mut self.buffer; let buffer = &mut self.buffer;
self.traffic.count_in_traffic(src, buffer.len()); self.traffic.count_in_traffic(src, buffer.len());
if let Some(result) = self.peers.get_mut(&src).map(|peer| peer.crypto.handle_message(buffer)) { if let Some(result) = self.peers.get_mut(&src).map(|peer| peer.crypto.handle_message(buffer)) {
return self.process_message(src, result?).await; return self.process_message(src, result?);
} }
let is_init = is_init_message(buffer.message()); let is_init = is_init_message(buffer.message());
if let Some(result) = self.pending_inits.get_mut(&src).map(|init| { if let Some(result) = self.pending_inits.get_mut(&src).map(|init| {
@ -401,10 +400,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
} }
}) { }) {
if !buffer.is_empty() { if !buffer.is_empty() {
self.send_to(src).await? self.send_to(src)?
} }
if let InitResult::Success { peer_payload, .. } = result? { if let InitResult::Success { peer_payload, .. } = result? {
self.add_new_peer(src, peer_payload).await? self.add_new_peer(src, peer_payload)?
} }
return Ok(()); return Ok(());
} }
@ -419,7 +418,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
match msg_result { match msg_result {
Ok(_) => { Ok(_) => {
self.pending_inits.insert(src, init); self.pending_inits.insert(src, init);
self.send_to(src).await self.send_to(src)
} }
Err(err) => { Err(err) => {
self.traffic.count_invalid_protocol(self.buffer.len()); self.traffic.count_invalid_protocol(self.buffer.len());
@ -428,7 +427,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
} }
} }
pub async fn housekeep(&mut self) -> Result<(), Error> { pub fn housekeep(&mut self) -> Result<(), Error> {
let now = TS::now(); let now = TS::now();
let mut del: SmallVec<[SocketAddr; 3]> = SmallVec::new(); let mut del: SmallVec<[SocketAddr; 3]> = SmallVec::new();
for (&addr, ref data) in &self.peers { for (&addr, ref data) in &self.peers {
@ -440,10 +439,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
info!("Forgot peer {} due to timeout", addr_nice(addr)); info!("Forgot peer {} due to timeout", addr_nice(addr));
self.peers.remove(&addr); self.peers.remove(&addr);
self.table.remove_claims(addr); self.table.remove_claims(addr);
self.connect_sock(addr).await?; // Try to reconnect self.connect_sock(addr)?; // Try to reconnect
} }
self.table.housekeep(); self.table.housekeep();
self.crypto_housekeep().await?; self.crypto_housekeep()?;
// Periodically extend the port-forwarding // Periodically extend the port-forwarding
if let Some(ref mut pfw) = self.port_forwarding { if let Some(ref mut pfw) = self.port_forwarding {
pfw.check_extend(); pfw.check_extend();
@ -454,29 +453,29 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
debug!("Send peer list to all peers"); debug!("Send peer list to all peers");
let info = self.create_node_info(); let info = self.create_node_info();
info.encode(&mut self.buffer); info.encode(&mut self.buffer);
self.broadcast_msg(MESSAGE_TYPE_NODE_INFO).await?; self.broadcast_msg(MESSAGE_TYPE_NODE_INFO)?;
// Reschedule for next update // Reschedule for next update
let min_peer_timeout = self.peers.iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT); let min_peer_timeout = self.peers.iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT);
let interval = min(self.update_freq as u16, max(min_peer_timeout / 2 - 60, 1)); let interval = min(self.update_freq as u16, max(min_peer_timeout / 2 - 60, 1));
self.next_peers = now + Time::from(interval); self.next_peers = now + Time::from(interval);
} }
self.reconnect_to_peers().await?; self.reconnect_to_peers()?;
if self.next_stats_out < now { if self.next_stats_out < now {
// Write out the statistics // Write out the statistics
self.write_out_stats().map_err(|err| Error::FileIo("Failed to write stats file", err))?; self.write_out_stats().map_err(|err| Error::FileIo("Failed to write stats file", err))?;
self.send_stats_to_statsd().await?; self.send_stats_to_statsd()?;
self.next_stats_out = now + STATS_INTERVAL; self.next_stats_out = now + STATS_INTERVAL;
self.traffic.period(Some(5)); self.traffic.period(Some(5));
} }
if let Some(peers) = self.beacon_serializer.get_cmd_results() { if let Some(peers) = self.beacon_serializer.get_cmd_results() {
debug!("Loaded beacon with peers: {:?}", peers); debug!("Loaded beacon with peers: {:?}", peers);
for peer in peers { for peer in peers {
self.connect_sock(peer).await?; self.connect_sock(peer)?;
} }
} }
if self.next_beacon < now { if self.next_beacon < now {
self.store_beacon()?; self.store_beacon()?;
self.load_beacon().await?; self.load_beacon()?;
self.next_beacon = now + Time::from(self.config.beacon_interval); self.next_beacon = now + Time::from(self.config.beacon_interval);
} }
self.table.sync(); self.table.sync();
@ -484,42 +483,42 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
self.peer_crypto.store(&self.peers); self.peer_crypto.store(&self.peers);
// Periodically reset own peers // Periodically reset own peers
if self.next_own_address_reset <= now { if self.next_own_address_reset <= now {
self.reset_own_addresses().await.map_err(|err| Error::SocketIo("Failed to get own addresses", err))?; self.reset_own_addresses().map_err(|err| Error::SocketIo("Failed to get own addresses", err))?;
self.next_own_address_reset = now + OWN_ADDRESS_RESET_INTERVAL; self.next_own_address_reset = now + OWN_ADDRESS_RESET_INTERVAL;
} }
assert!(self.buffer.is_empty()); assert!(self.buffer.is_empty());
Ok(()) Ok(())
} }
async fn crypto_housekeep(&mut self) -> Result<(), Error> { fn crypto_housekeep(&mut self) -> Result<(), Error> {
let mut del: SmallVec<[SocketAddr; 4]> = smallvec![]; let mut del: SmallVec<[SocketAddr; 4]> = smallvec![];
for addr in self.pending_inits.keys().copied().collect::<SmallVec<[SocketAddr; 4]>>() { for addr in self.pending_inits.keys().copied().collect::<SmallVec<[SocketAddr; 4]>>() {
self.buffer.clear(); self.buffer.clear();
if self.pending_inits.get_mut(&addr).unwrap().every_second(&mut self.buffer).is_err() { if self.pending_inits.get_mut(&addr).unwrap().every_second(&mut self.buffer).is_err() {
del.push(addr) del.push(addr)
} else if !self.buffer.is_empty() { } else if !self.buffer.is_empty() {
self.send_to(addr).await? self.send_to(addr)?
} }
} }
for addr in self.peers.keys().copied().collect::<SmallVec<[SocketAddr; 16]>>() { for addr in self.peers.keys().copied().collect::<SmallVec<[SocketAddr; 16]>>() {
self.buffer.clear(); self.buffer.clear();
self.peers.get_mut(&addr).unwrap().crypto.every_second(&mut self.buffer); self.peers.get_mut(&addr).unwrap().crypto.every_second(&mut self.buffer);
if !self.buffer.is_empty() { if !self.buffer.is_empty() {
self.send_to(addr).await? self.send_to(addr)?
} }
} }
for addr in del { for addr in del {
self.pending_inits.remove(&addr); self.pending_inits.remove(&addr);
if self.peers.remove(&addr).is_some() { if self.peers.remove(&addr).is_some() {
self.connect_sock(addr).await?; self.connect_sock(addr)?;
} }
} }
Ok(()) Ok(())
} }
async fn reset_own_addresses(&mut self) -> io::Result<()> { fn reset_own_addresses(&mut self) -> io::Result<()> {
self.own_addresses.clear(); self.own_addresses.clear();
let socket_addr = self.socket.address().await.map(mapped_addr)?; let socket_addr = self.socket.address().map(mapped_addr)?;
// 1) Specified advertise addresses // 1) Specified advertise addresses
for addr in &self.config.advertise_addresses { for addr in &self.config.advertise_addresses {
self.own_addresses.push(parse_listen(addr, socket_addr.port())); self.own_addresses.push(parse_listen(addr, socket_addr.port()));
@ -555,7 +554,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
} }
/// Loads the beacon /// Loads the beacon
async fn load_beacon(&mut self) -> Result<(), Error> { fn load_beacon(&mut self) -> Result<(), Error> {
let peers; let peers;
if let Some(ref path) = self.config.beacon_load { if let Some(ref path) = self.config.beacon_load {
if let Some(path) = path.strip_prefix('|') { if let Some(path) = path.strip_prefix('|') {
@ -574,7 +573,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
} }
debug!("Loaded beacon with peers: {:?}", peers); debug!("Loaded beacon with peers: {:?}", peers);
for peer in peers { for peer in peers {
self.connect_sock(peer).await?; self.connect_sock(peer)?;
} }
Ok(()) Ok(())
} }
@ -606,7 +605,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
} }
/// Sends the statistics to a statsd endpoint /// Sends the statistics to a statsd endpoint
async fn send_stats_to_statsd(&mut self) -> Result<(), Error> { fn send_stats_to_statsd(&mut self) -> Result<(), Error> {
if let Some(ref endpoint) = self.statsd_server { if let Some(ref endpoint) = self.statsd_server {
let peer_traffic = self.traffic.total_peer_traffic(); let peer_traffic = self.traffic.total_peer_traffic();
let payload_traffic = self.traffic.total_payload_traffic(); let payload_traffic = self.traffic.total_payload_traffic();
@ -652,7 +651,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
let msg_data = msg.as_bytes(); let msg_data = msg.as_bytes();
let addrs = resolve(endpoint)?; let addrs = resolve(endpoint)?;
if let Some(addr) = addrs.first() { if let Some(addr) = addrs.first() {
match self.socket.send(msg_data, *addr).await { match self.socket.send(msg_data, *addr) {
Ok(written) if written == msg_data.len() => Ok(()), Ok(written) if written == msg_data.len() => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")), Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)), Err(e) => Err(Error::SocketIo("IOError when sending", e)),
@ -664,14 +663,14 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
Ok(()) Ok(())
} }
async fn reconnect_to_peers(&mut self) -> Result<(), Error> { fn reconnect_to_peers(&mut self) -> Result<(), Error> {
let now = TS::now(); let now = TS::now();
// Connect to those reconnect_peers that are due // Connect to those reconnect_peers that are due
for entry in self.reconnect_peers.clone() { for entry in self.reconnect_peers.clone() {
if entry.next > now { if entry.next > now {
continue; continue;
} }
self.connect(&entry.resolved as &[SocketAddr]).await?; self.connect(&entry.resolved as &[SocketAddr])?;
} }
for entry in &mut self.reconnect_peers { for entry in &mut self.reconnect_peers {
// Schedule for next second if node is connected // Schedule for next second if node is connected
@ -717,11 +716,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
Ok(()) Ok(())
} }
pub async fn iteration(&mut self) { pub fn iteration(&mut self) {
if let Ok(result) = timeout(std::time::Duration::from_millis(1000), self.socket.receive(&mut self.buffer)).await if let Ok(src) = self.socket.receive(&mut self.buffer)
{ {
let src = try_fail!(result, "Failed to read from network socket: {}"); match self.handle_message(src) {
match self.handle_message(src).await {
Err(e @ Error::CryptoInitFatal(_)) => { Err(e @ Error::CryptoInitFatal(_)) => {
debug!("Fatal crypto init error from {}: {}", src, e); debug!("Fatal crypto init error from {}: {}", src, e);
info!("Closing pending connection to {} due to error in crypto init", addr_nice(src)); info!("Closing pending connection to {} due to error in crypto init", addr_nice(src));
@ -743,7 +741,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
} }
let now = TS::now(); let now = TS::now();
if self.next_housekeep < now { if self.next_housekeep < now {
if let Err(e) = self.housekeep().await { if let Err(e) = self.housekeep() {
error!("{}", e) error!("{}", e)
} }
self.next_housekeep = now + 1 self.next_housekeep = now + 1
@ -751,9 +749,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
debug_assert!(self.buffer.is_empty()); debug_assert!(self.buffer.is_empty());
} }
pub async fn run(mut self) { pub fn run(mut self) {
loop { loop {
self.iteration().await; self.iteration();
if !self.running.load(Ordering::SeqCst) { if !self.running.load(Ordering::SeqCst) {
debug!("Socket: end"); debug!("Socket: end");
return; return;

View File

@ -38,6 +38,9 @@ pub enum Error {
#[error("Device error: {0} ({1})")] #[error("Device error: {0} ({1})")]
DeviceIo(&'static str, #[source] io::Error), DeviceIo(&'static str, #[source] io::Error),
#[error("Device read error: {0}")]
DeviceRead(#[source] timeout_io::TimeoutIoError),
#[error("File error: {0}")] #[error("File error: {0}")]
FileIo(&'static str, #[source] io::Error), FileIo(&'static str, #[source] io::Error),

View File

@ -6,8 +6,6 @@
extern crate log; extern crate log;
#[macro_use] #[macro_use]
extern crate serde; extern crate serde;
#[macro_use]
extern crate tokio;
#[cfg(test)] #[cfg(test)]
extern crate tempfile; extern crate tempfile;
@ -39,9 +37,8 @@ pub mod wizard;
#[cfg(feature = "websocket")] #[cfg(feature = "websocket")]
pub mod wsproxy; pub mod wsproxy;
use net::SocketBuilder; use net::Socket;
use structopt::StructOpt; use structopt::StructOpt;
use tokio::runtime::Runtime;
use std::{ use std::{
fs::{self, File, Permissions}, fs::{self, File, Permissions},
@ -58,7 +55,7 @@ use std::{
use crate::{ use crate::{
config::{Args, Command, Config, DEFAULT_PORT}, config::{Args, Command, Config, DEFAULT_PORT},
crypto::Crypto, crypto::Crypto,
device::{AsyncTunTapDevice, TunTapDevice, Type}, device::{TunTapDevice, Type},
engine::common::GenericCloud, engine::common::GenericCloud,
net::NetSocket, net::NetSocket,
oldconfig::OldConfigFile, oldconfig::OldConfigFile,
@ -177,7 +174,7 @@ fn setup_device(config: &Config) -> TunTapDevice {
} }
#[allow(clippy::cognitive_complexity)] #[allow(clippy::cognitive_complexity)]
fn run<P: Protocol, S: SocketBuilder>(config: Config, socket: S) { fn run<P: Protocol, S: Socket>(config: Config, socket: S) {
let device = setup_device(&config); let device = setup_device(&config);
let port_forwarding = if config.port_forwarding { socket.create_port_forwarding() } else { None }; let port_forwarding = if config.port_forwarding { socket.create_port_forwarding() } else { None };
let stats_file = match config.stats_file { let stats_file = match config.stats_file {
@ -222,32 +219,25 @@ fn run<P: Protocol, S: SocketBuilder>(config: Config, socket: S) {
} }
try_fail!(pd.apply(), "Failed to drop privileges: {}"); try_fail!(pd.apply(), "Failed to drop privileges: {}");
} }
let rt = Runtime::new().unwrap();
let ifdown = config.ifdown.clone(); let ifdown = config.ifdown.clone();
rt.block_on(async move { let mut cloud = try_fail!(
// Warning: no async code outside this block, or it will break on daemonize GenericCloud::<TunTapDevice, P, S, SystemTimeSource>::new(
let device = AsyncTunTapDevice::from_sync(device); &config,
let socket = try_fail!(socket.build(), "Failed to create async socket: {}"); socket,
let mut cloud = try_fail!( device,
GenericCloud::<AsyncTunTapDevice, P, S::SocketType, SystemTimeSource>::new( port_forwarding,
&config, stats_file
socket, ),
device, "Failed to create engine: {}"
port_forwarding, );
stats_file for mut addr in config.peers {
) if addr.find(':').unwrap_or(0) <= addr.find(']').unwrap_or(0) {
.await, // : not present or only in IPv6 address
"Failed to create engine: {}" addr = format!("{}:{}", addr, DEFAULT_PORT)
);
for mut addr in config.peers {
if addr.find(':').unwrap_or(0) <= addr.find(']').unwrap_or(0) {
// : not present or only in IPv6 address
addr = format!("{}:{}", addr, DEFAULT_PORT)
}
try_fail!(cloud.add_peer(addr.clone()), "Failed to send message to {}: {}", &addr);
} }
cloud.run().await try_fail!(cloud.add_peer(addr.clone()), "Failed to send message to {}: {}", &addr);
}); }
cloud.run();
if let Some(script) = ifdown { if let Some(script) = ifdown {
run_script(&script, &ifname); run_script(&script, &ifname);
} }

View File

@ -5,9 +5,8 @@
use crate::config::DEFAULT_PORT; use crate::config::DEFAULT_PORT;
use crate::port_forwarding::PortForwarding; use crate::port_forwarding::PortForwarding;
use crate::util::{MockTimeSource, MsgBuffer, Time, TimeSource}; use crate::util::{MockTimeSource, MsgBuffer, Time, TimeSource};
use async_trait::async_trait;
use parking_lot::Mutex; use parking_lot::Mutex;
use tokio::net::UdpSocket as AsyncUdpSocket; use std::time::Duration;
use std::{ use std::{
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
io::{self, ErrorKind}, io::{self, ErrorKind},
@ -32,17 +31,11 @@ pub fn get_ip() -> IpAddr {
s.local_addr().unwrap().ip() s.local_addr().unwrap().ip()
} }
pub trait SocketBuilder {
type SocketType: Socket;
fn build(self) -> Result<Self::SocketType, io::Error>;
fn create_port_forwarding(&self) -> Option<PortForwarding>;
}
#[async_trait]
pub trait Socket: Sized + Clone + Send + Sync + 'static { pub trait Socket: Sized + Clone + Send + Sync + 'static {
async fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error>; fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error>;
async fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error>; fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error>;
async fn address(&self) -> Result<SocketAddr, io::Error>; fn address(&self) -> Result<SocketAddr, io::Error>;
fn create_port_forwarding(&self) -> Option<PortForwarding>;
} }
pub fn parse_listen(addr: &str, default_port: u16) -> SocketAddr { pub fn parse_listen(addr: &str, default_port: u16) -> SocketAddr {
@ -59,49 +52,40 @@ pub fn parse_listen(addr: &str, default_port: u16) -> SocketAddr {
} }
} }
pub struct NetSocket(UdpSocket); pub struct NetSocket(Arc<UdpSocket>);
impl NetSocket { impl NetSocket {
pub fn listen(addr: &str) -> Result<Self, io::Error> { pub fn listen(addr: &str) -> Result<Self, io::Error> {
let addr = parse_listen(addr, DEFAULT_PORT); let addr = parse_listen(addr, DEFAULT_PORT);
Ok(Self(UdpSocket::bind(addr)?)) let sock = UdpSocket::bind(addr)?;
sock.set_read_timeout(Some(Duration::from_secs(1)))?;
Ok(Self(Arc::new(sock)))
} }
} }
impl SocketBuilder for NetSocket { impl Clone for NetSocket {
type SocketType = AsyncNetSocket;
fn create_port_forwarding(&self) -> Option<PortForwarding> {
PortForwarding::new(self.0.local_addr().unwrap().port())
}
fn build(self) -> Result<Self::SocketType, io::Error> {
Ok(AsyncNetSocket(Arc::new(AsyncUdpSocket::from_std(self.0)?)))
}
}
pub struct AsyncNetSocket(Arc<AsyncUdpSocket>);
impl Clone for AsyncNetSocket {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self(self.0.clone()) Self(self.0.clone())
} }
} }
#[async_trait] impl Socket for NetSocket {
impl Socket for AsyncNetSocket { fn create_port_forwarding(&self) -> Option<PortForwarding> {
async fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> { PortForwarding::new(self.0.local_addr().unwrap().port())
}
fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> {
buffer.clear(); buffer.clear();
let (size, addr) = self.0.recv_from(buffer.buffer()).await?; let (size, addr) = self.0.recv_from(buffer.buffer())?;
buffer.set_length(size); buffer.set_length(size);
Ok(addr) Ok(addr)
} }
async fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error> { fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error> {
self.0.send_to(data, addr).await self.0.send_to(data, addr)
} }
async fn address(&self) -> Result<SocketAddr, io::Error> { fn address(&self) -> Result<SocketAddr, io::Error> {
let mut addr = self.0.local_addr()?; let mut addr = self.0.local_addr()?;
addr.set_ip(get_ip()); addr.set_ip(get_ip());
Ok(addr) Ok(addr)
@ -160,9 +144,8 @@ impl MockSocket {
} }
} }
#[async_trait]
impl Socket for MockSocket { impl Socket for MockSocket {
async fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> { fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> {
if let Some((addr, data)) = self.inbound.lock().pop_front() { if let Some((addr, data)) = self.inbound.lock().pop_front() {
buffer.clear(); buffer.clear();
buffer.set_length(data.len()); buffer.set_length(data.len());
@ -173,7 +156,7 @@ impl Socket for MockSocket {
} }
} }
async fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error> { fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error> {
self.outbound.lock().push_back((addr, data.into())); self.outbound.lock().push_back((addr, data.into()));
if self.nat { if self.nat {
self.nat_peers.lock().insert(addr, MockTimeSource::now() + 300); self.nat_peers.lock().insert(addr, MockTimeSource::now() + 300);
@ -181,9 +164,13 @@ impl Socket for MockSocket {
Ok(data.len()) Ok(data.len())
} }
async fn address(&self) -> Result<SocketAddr, io::Error> { fn address(&self) -> Result<SocketAddr, io::Error> {
Ok(self.address) Ok(self.address)
} }
fn create_port_forwarding(&self) -> Option<PortForwarding> {
None
}
} }
#[cfg(feature = "bench")] #[cfg(feature = "bench")]

View File

@ -53,7 +53,7 @@ impl Protocol for Frame {
} }
#[test] #[test]
async fn decode_frame_without_vlan() { fn decode_frame_without_vlan() {
let data = [6, 5, 4, 3, 2, 1, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 8]; let data = [6, 5, 4, 3, 2, 1, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 8];
let (src, dst) = Frame::parse(&data).unwrap(); let (src, dst) = Frame::parse(&data).unwrap();
assert_eq!(src, Address { data: [1, 2, 3, 4, 5, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], len: 6 }); assert_eq!(src, Address { data: [1, 2, 3, 4, 5, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], len: 6 });
@ -61,7 +61,7 @@ async fn decode_frame_without_vlan() {
} }
#[test] #[test]
async fn decode_frame_with_vlan() { fn decode_frame_with_vlan() {
let data = [6, 5, 4, 3, 2, 1, 1, 2, 3, 4, 5, 6, 0x81, 0, 4, 210, 1, 2, 3, 4, 5, 6, 7, 8]; let data = [6, 5, 4, 3, 2, 1, 1, 2, 3, 4, 5, 6, 0x81, 0, 4, 210, 1, 2, 3, 4, 5, 6, 7, 8];
let (src, dst) = Frame::parse(&data).unwrap(); let (src, dst) = Frame::parse(&data).unwrap();
assert_eq!(src, Address { data: [4, 210, 1, 2, 3, 4, 5, 6, 0, 0, 0, 0, 0, 0, 0, 0], len: 8 }); assert_eq!(src, Address { data: [4, 210, 1, 2, 3, 4, 5, 6, 0, 0, 0, 0, 0, 0, 0, 0], len: 8 });
@ -69,7 +69,7 @@ async fn decode_frame_with_vlan() {
} }
#[test] #[test]
async fn decode_invalid_frame() { fn decode_invalid_frame() {
assert!(Frame::parse(&[6, 5, 4, 3, 2, 1, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 8]).is_ok()); assert!(Frame::parse(&[6, 5, 4, 3, 2, 1, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 8]).is_ok());
// truncated frame // truncated frame
assert!(Frame::parse(&[]).is_err()); assert!(Frame::parse(&[]).is_err());
@ -118,7 +118,7 @@ impl Protocol for Packet {
} }
#[test] #[test]
async fn decode_ipv4_packet() { fn decode_ipv4_packet() {
let data = [0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 192, 168, 1, 1, 192, 168, 1, 2]; let data = [0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 192, 168, 1, 1, 192, 168, 1, 2];
let (src, dst) = Packet::parse(&data).unwrap(); let (src, dst) = Packet::parse(&data).unwrap();
assert_eq!(src, Address { data: [192, 168, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], len: 4 }); assert_eq!(src, Address { data: [192, 168, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], len: 4 });
@ -126,7 +126,7 @@ async fn decode_ipv4_packet() {
} }
#[test] #[test]
async fn decode_ipv6_packet() { fn decode_ipv6_packet() {
let data = [ let data = [
0x60, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 6, 5, 0x60, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 6, 5,
4, 3, 2, 1, 4, 3, 2, 1,
@ -137,7 +137,7 @@ async fn decode_ipv6_packet() {
} }
#[test] #[test]
async fn decode_invalid_packet() { fn decode_invalid_packet() {
assert!(Packet::parse(&[0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 192, 168, 1, 1, 192, 168, 1, 2]).is_ok()); assert!(Packet::parse(&[0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 192, 168, 1, 1, 192, 168, 1, 2]).is_ok());
assert!(Packet::parse(&[ assert!(Packet::parse(&[
0x60, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 6, 5, 0x60, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 6, 5,

View File

@ -79,7 +79,7 @@ impl<P: Protocol> Simulator<P> {
Self { next_port: 1, nodes: HashMap::default(), messages: VecDeque::default() } Self { next_port: 1, nodes: HashMap::default(), messages: VecDeque::default() }
} }
pub async fn add_node(&mut self, nat: bool, config: &Config) -> SocketAddr { pub fn add_node(&mut self, nat: bool, config: &Config) -> SocketAddr {
let mut config = config.clone(); let mut config = config.clone();
MockSocket::set_nat(nat); MockSocket::set_nat(nat);
config.listen = format!("[::]:{}", self.next_port); config.listen = format!("[::]:{}", self.next_port);
@ -89,7 +89,7 @@ impl<P: Protocol> Simulator<P> {
} }
DebugLogger::set_node(self.next_port as usize); DebugLogger::set_node(self.next_port as usize);
self.next_port += 1; self.next_port += 1;
let node = TestNode::new(&config, MockSocket::new(addr), MockDevice::new(), None, None).await.unwrap(); let node = TestNode::new(&config, MockSocket::new(addr), MockDevice::new(), None, None).unwrap();
DebugLogger::set_node(0); DebugLogger::set_node(0);
self.nodes.insert(addr, node); self.nodes.insert(addr, node);
@ -97,18 +97,18 @@ impl<P: Protocol> Simulator<P> {
} }
#[allow(dead_code)] #[allow(dead_code)]
pub async fn get_node(&mut self, addr: SocketAddr) -> &mut TestNode<P> { pub fn get_node(&mut self, addr: SocketAddr) -> &mut TestNode<P> {
let node = self.nodes.get_mut(&addr).unwrap(); let node = self.nodes.get_mut(&addr).unwrap();
DebugLogger::set_node(node.get_num().await); DebugLogger::set_node(node.get_num());
node node
} }
pub async fn simulate_next_message(&mut self) { pub fn simulate_next_message(&mut self) {
if let Some((src, dst, data)) = self.messages.pop_front() { if let Some((src, dst, data)) = self.messages.pop_front() {
if let Some(node) = self.nodes.get_mut(&dst) { if let Some(node) = self.nodes.get_mut(&dst) {
if node.socket().put_inbound(src, data) { if node.socket().put_inbound(src, data) {
DebugLogger::set_node(node.get_num().await); DebugLogger::set_node(node.get_num());
node.trigger_socket_event().await; node.trigger_socket_event();
DebugLogger::set_node(0); DebugLogger::set_node(0);
let sock = node.socket(); let sock = node.socket();
let src = dst; let src = dst;
@ -122,16 +122,16 @@ impl<P: Protocol> Simulator<P> {
} }
} }
pub async fn simulate_all_messages(&mut self) { pub fn simulate_all_messages(&mut self) {
while !self.messages.is_empty() { while !self.messages.is_empty() {
self.simulate_next_message().await self.simulate_next_message()
} }
} }
pub async fn trigger_node_housekeep(&mut self, addr: SocketAddr) { pub fn trigger_node_housekeep(&mut self, addr: SocketAddr) {
let node = self.nodes.get_mut(&addr).unwrap(); let node = self.nodes.get_mut(&addr).unwrap();
DebugLogger::set_node(node.get_num().await); DebugLogger::set_node(node.get_num());
node.trigger_housekeep().await; node.trigger_housekeep();
DebugLogger::set_node(0); DebugLogger::set_node(0);
let sock = node.socket(); let sock = node.socket();
while let Some((dst, data)) = sock.pop_outbound() { while let Some((dst, data)) = sock.pop_outbound() {
@ -139,10 +139,10 @@ impl<P: Protocol> Simulator<P> {
} }
} }
pub async fn trigger_housekeep(&mut self) { pub fn trigger_housekeep(&mut self) {
for (src, node) in &mut self.nodes { for (src, node) in &mut self.nodes {
DebugLogger::set_node(node.get_num().await); DebugLogger::set_node(node.get_num());
node.trigger_housekeep().await; node.trigger_housekeep();
DebugLogger::set_node(0); DebugLogger::set_node(0);
let sock = node.socket(); let sock = node.socket();
while let Some((dst, data)) = sock.pop_outbound() { while let Some((dst, data)) = sock.pop_outbound() {
@ -155,20 +155,20 @@ impl<P: Protocol> Simulator<P> {
MockTimeSource::set_time(time); MockTimeSource::set_time(time);
} }
pub async fn simulate_time(&mut self, time: Time) { pub fn simulate_time(&mut self, time: Time) {
let mut t = MockTimeSource::now(); let mut t = MockTimeSource::now();
while t < time { while t < time {
t += 1; t += 1;
self.set_time(t); self.set_time(t);
self.trigger_housekeep().await; self.trigger_housekeep();
self.simulate_all_messages().await; self.simulate_all_messages();
} }
} }
pub async fn connect(&mut self, src: SocketAddr, dst: SocketAddr) { pub fn connect(&mut self, src: SocketAddr, dst: SocketAddr) {
let node = self.nodes.get_mut(&src).unwrap(); let node = self.nodes.get_mut(&src).unwrap();
DebugLogger::set_node(node.get_num().await); DebugLogger::set_node(node.get_num());
node.connect(dst).await.unwrap(); node.connect(dst).unwrap();
DebugLogger::set_node(0); DebugLogger::set_node(0);
let sock = node.socket(); let sock = node.socket();
while let Some((dst, data)) = sock.pop_outbound() { while let Some((dst, data)) = sock.pop_outbound() {
@ -190,11 +190,11 @@ impl<P: Protocol> Simulator<P> {
self.messages.len() self.messages.len()
} }
pub async fn put_payload(&mut self, addr: SocketAddr, data: Vec<u8>) { pub fn put_payload(&mut self, addr: SocketAddr, data: Vec<u8>) {
let node = self.nodes.get_mut(&addr).unwrap(); let node = self.nodes.get_mut(&addr).unwrap();
node.device().put_inbound(data); node.device().put_inbound(data);
DebugLogger::set_node(node.get_num().await); DebugLogger::set_node(node.get_num());
node.trigger_device_event().await; node.trigger_device_event();
DebugLogger::set_node(0); DebugLogger::set_node(0);
let sock = node.socket(); let sock = node.socket();
while let Some((dst, data)) = sock.pop_outbound() { while let Some((dst, data)) = sock.pop_outbound() {

View File

@ -5,35 +5,35 @@
use super::common::*; use super::common::*;
#[test] #[test]
async fn connect_nat_2_peers() { fn connect_nat_2_peers() {
let config = Config { port_forwarding: false, ..Default::default() }; let config = Config { port_forwarding: false, ..Default::default() };
let mut sim = TapSimulator::new(); let mut sim = TapSimulator::new();
let node1 = sim.add_node(true, &config).await; let node1 = sim.add_node(true, &config);
let node2 = sim.add_node(true, &config).await; let node2 = sim.add_node(true, &config);
sim.connect(node1, node2).await; sim.connect(node1, node2);
sim.connect(node2, node1).await; sim.connect(node2, node1);
sim.simulate_time(60).await; sim.simulate_time(60);
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
} }
#[test] #[test]
async fn connect_nat_3_peers() { fn connect_nat_3_peers() {
let config = Config::default(); let config = Config::default();
let mut sim = TapSimulator::new(); let mut sim = TapSimulator::new();
let node1 = sim.add_node(true, &config).await; let node1 = sim.add_node(true, &config);
let node2 = sim.add_node(true, &config).await; let node2 = sim.add_node(true, &config);
let node3 = sim.add_node(true, &config).await; let node3 = sim.add_node(true, &config);
sim.connect(node1, node2).await; sim.connect(node1, node2);
sim.connect(node2, node1).await; sim.connect(node2, node1);
sim.connect(node1, node3).await; sim.connect(node1, node3);
sim.connect(node3, node1).await; sim.connect(node3, node1);
sim.simulate_time(300).await; sim.simulate_time(300);
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
assert!(sim.is_connected(node1, node3)); assert!(sim.is_connected(node1, node3));
@ -43,19 +43,19 @@ async fn connect_nat_3_peers() {
} }
#[test] #[test]
async fn nat_keepalive() { fn nat_keepalive() {
let config = Config::default(); let config = Config::default();
let mut sim = TapSimulator::new(); let mut sim = TapSimulator::new();
let node1 = sim.add_node(true, &config).await; let node1 = sim.add_node(true, &config);
let node2 = sim.add_node(true, &config).await; let node2 = sim.add_node(true, &config);
let node3 = sim.add_node(true, &config).await; let node3 = sim.add_node(true, &config);
sim.connect(node1, node2).await; sim.connect(node1, node2);
sim.connect(node2, node1).await; sim.connect(node2, node1);
sim.connect(node1, node3).await; sim.connect(node1, node3);
sim.connect(node3, node1).await; sim.connect(node3, node1);
sim.simulate_time(1000).await; sim.simulate_time(1000);
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
assert!(sim.is_connected(node1, node3)); assert!(sim.is_connected(node1, node3));
@ -63,7 +63,7 @@ async fn nat_keepalive() {
assert!(sim.is_connected(node2, node3)); assert!(sim.is_connected(node2, node3));
assert!(sim.is_connected(node3, node2)); assert!(sim.is_connected(node3, node2));
sim.simulate_time(10000).await; sim.simulate_time(10000);
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
assert!(sim.is_connected(node1, node3)); assert!(sim.is_connected(node1, node3));

View File

@ -5,50 +5,52 @@
use super::common::*; use super::common::*;
#[test] #[test]
async fn switch_delivers() { fn switch_delivers() {
let config = Config { device_type: Type::Tap, ..Config::default() }; let config = Config { device_type: Type::Tap, ..Config::default() };
let mut sim = TapSimulator::new(); let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config).await; let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config).await; let node2 = sim.add_node(false, &config);
sim.connect(node1, node2).await; sim.connect(node1, node2);
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
sim.trigger_housekeep();
let payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5]; let payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5];
sim.put_payload(node1, payload.clone()).await; sim.put_payload(node1, payload.clone());
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert_eq!(Some(payload), sim.pop_payload(node2)); assert_eq!(Some(payload), sim.pop_payload(node2));
} }
#[test] #[test]
async fn switch_learns() { fn switch_learns() {
let config = Config { device_type: Type::Tap, ..Config::default() }; let config = Config { device_type: Type::Tap, ..Config::default() };
let mut sim = TapSimulator::new(); let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config).await; let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config).await; let node2 = sim.add_node(false, &config);
let node3 = sim.add_node(false, &config).await; let node3 = sim.add_node(false, &config);
sim.connect(node1, node2).await; sim.connect(node1, node2);
sim.connect(node1, node3).await; sim.connect(node1, node3);
sim.connect(node2, node3).await; sim.connect(node2, node3);
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
assert!(sim.is_connected(node1, node3)); assert!(sim.is_connected(node1, node3));
assert!(sim.is_connected(node3, node1)); assert!(sim.is_connected(node3, node1));
assert!(sim.is_connected(node2, node3)); assert!(sim.is_connected(node2, node3));
assert!(sim.is_connected(node3, node2)); assert!(sim.is_connected(node3, node2));
sim.trigger_housekeep();
let payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5]; let payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5];
// Nothing learnt so far, node1 broadcasts // Nothing learnt so far, node1 broadcasts
sim.put_payload(node1, payload.clone()).await; sim.put_payload(node1, payload.clone());
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert_eq!(Some(payload.clone()), sim.pop_payload(node2)); assert_eq!(Some(payload.clone()), sim.pop_payload(node2));
assert_eq!(Some(payload), sim.pop_payload(node3)); assert_eq!(Some(payload), sim.pop_payload(node3));
@ -57,38 +59,39 @@ async fn switch_learns() {
// Node 2 learned the address by receiving it, does not broadcast // Node 2 learned the address by receiving it, does not broadcast
sim.put_payload(node2, payload.clone()).await; sim.put_payload(node2, payload.clone());
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert_eq!(Some(payload), sim.pop_payload(node1)); assert_eq!(Some(payload), sim.pop_payload(node1));
assert_eq!(None, sim.pop_payload(node3)); assert_eq!(None, sim.pop_payload(node3));
} }
#[test] #[test]
async fn switch_honours_vlans() { fn switch_honours_vlans() {
let config = Config { device_type: Type::Tap, ..Config::default() }; let config = Config { device_type: Type::Tap, ..Config::default() };
let mut sim = TapSimulator::new(); let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config).await; let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config).await; let node2 = sim.add_node(false, &config);
let node3 = sim.add_node(false, &config).await; let node3 = sim.add_node(false, &config);
sim.connect(node1, node2).await; sim.connect(node1, node2);
sim.connect(node1, node3).await; sim.connect(node1, node3);
sim.connect(node2, node3).await; sim.connect(node2, node3);
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
assert!(sim.is_connected(node1, node3)); assert!(sim.is_connected(node1, node3));
assert!(sim.is_connected(node3, node1)); assert!(sim.is_connected(node3, node1));
assert!(sim.is_connected(node2, node3)); assert!(sim.is_connected(node2, node3));
assert!(sim.is_connected(node3, node2)); assert!(sim.is_connected(node3, node2));
sim.trigger_housekeep();
let payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 0x81, 0, 0, 0x67, 1, 2, 3, 4, 5]; let payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 0x81, 0, 0, 0x67, 1, 2, 3, 4, 5];
// Nothing learnt so far, node1 broadcasts // Nothing learnt so far, node1 broadcasts
sim.put_payload(node1, payload.clone()).await; sim.put_payload(node1, payload.clone());
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert_eq!(Some(payload.clone()), sim.pop_payload(node2)); assert_eq!(Some(payload.clone()), sim.pop_payload(node2));
assert_eq!(Some(payload), sim.pop_payload(node3)); assert_eq!(Some(payload), sim.pop_payload(node3));
@ -97,8 +100,8 @@ async fn switch_honours_vlans() {
// Node 2 learned the address by receiving it, does not broadcast // Node 2 learned the address by receiving it, does not broadcast
sim.put_payload(node2, payload.clone()).await; sim.put_payload(node2, payload.clone());
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert_eq!(Some(payload), sim.pop_payload(node1)); assert_eq!(Some(payload), sim.pop_payload(node1));
assert_eq!(None, sim.pop_payload(node3)); assert_eq!(None, sim.pop_payload(node3));
@ -107,8 +110,8 @@ async fn switch_honours_vlans() {
// Different VLANs, node 2 does not learn, still broadcasts // Different VLANs, node 2 does not learn, still broadcasts
sim.put_payload(node2, payload.clone()).await; sim.put_payload(node2, payload.clone());
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert_eq!(Some(payload.clone()), sim.pop_payload(node1)); assert_eq!(Some(payload.clone()), sim.pop_payload(node1));
assert_eq!(Some(payload), sim.pop_payload(node3)); assert_eq!(Some(payload), sim.pop_payload(node3));
@ -116,13 +119,13 @@ async fn switch_honours_vlans() {
#[test] #[test]
#[ignore] #[ignore]
async fn switch_forgets() { fn switch_forgets() {
// TODO Test // TODO Test
unimplemented!() unimplemented!()
} }
#[test] #[test]
async fn router_delivers() { fn router_delivers() {
let config1 = Config { let config1 = Config {
device_type: Type::Tun, device_type: Type::Tun,
auto_claim: false, auto_claim: false,
@ -136,24 +139,24 @@ async fn router_delivers() {
..Config::default() ..Config::default()
}; };
let mut sim = TunSimulator::new(); let mut sim = TunSimulator::new();
let node1 = sim.add_node(false, &config1).await; let node1 = sim.add_node(false, &config1);
let node2 = sim.add_node(false, &config2).await; let node2 = sim.add_node(false, &config2);
sim.connect(node1, node2).await; sim.connect(node1, node2);
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
let payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2]; let payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2];
sim.put_payload(node1, payload.clone()).await; sim.put_payload(node1, payload.clone());
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert_eq!(Some(payload), sim.pop_payload(node2)); assert_eq!(Some(payload), sim.pop_payload(node2));
} }
#[test] #[test]
async fn router_drops_unknown_dest() { fn router_drops_unknown_dest() {
let config1 = Config { let config1 = Config {
device_type: Type::Tun, device_type: Type::Tun,
auto_claim: false, auto_claim: false,
@ -167,18 +170,18 @@ async fn router_drops_unknown_dest() {
..Config::default() ..Config::default()
}; };
let mut sim = TunSimulator::new(); let mut sim = TunSimulator::new();
let node1 = sim.add_node(false, &config1).await; let node1 = sim.add_node(false, &config1);
let node2 = sim.add_node(false, &config2).await; let node2 = sim.add_node(false, &config2);
sim.connect(node1, node2).await; sim.connect(node1, node2);
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
let payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 3, 3, 3, 3]; let payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 3, 3, 3, 3];
sim.put_payload(node1, payload).await; sim.put_payload(node1, payload);
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert_eq!(None, sim.pop_payload(node2)); assert_eq!(None, sim.pop_payload(node2));
} }

View File

@ -5,47 +5,47 @@
use super::common::*; use super::common::*;
#[test] #[test]
async fn direct_connect() { fn direct_connect() {
let config = Config::default(); let config = Config::default();
let mut sim = TapSimulator::new(); let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config).await; let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config).await; let node2 = sim.add_node(false, &config);
sim.connect(node1, node2).await; sim.connect(node1, node2);
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
} }
#[test] #[test]
async fn direct_connect_unencrypted() { fn direct_connect_unencrypted() {
let config = Config { let config = Config {
crypto: CryptoConfig { algorithms: vec!["plain".to_string()], ..CryptoConfig::default() }, crypto: CryptoConfig { algorithms: vec!["plain".to_string()], ..CryptoConfig::default() },
..Config::default() ..Config::default()
}; };
let mut sim = TapSimulator::new(); let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config).await; let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config).await; let node2 = sim.add_node(false, &config);
sim.connect(node1, node2).await; sim.connect(node1, node2);
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
} }
#[test] #[test]
async fn cross_connect() { fn cross_connect() {
let config = Config::default(); let config = Config::default();
let mut sim = TapSimulator::new(); let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config).await; let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config).await; let node2 = sim.add_node(false, &config);
let node3 = sim.add_node(false, &config).await; let node3 = sim.add_node(false, &config);
sim.connect(node1, node2).await; sim.connect(node1, node2);
sim.connect(node1, node3).await; sim.connect(node1, node3);
sim.simulate_all_messages().await; sim.simulate_all_messages();
sim.simulate_time(120).await; sim.simulate_time(120);
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
@ -56,124 +56,124 @@ async fn cross_connect() {
} }
#[test] #[test]
async fn connect_via_beacons() { fn connect_via_beacons() {
let mut sim = TapSimulator::new(); let mut sim = TapSimulator::new();
let beacon_path = "target/.vpncloud_test"; let beacon_path = "target/.vpncloud_test";
let config1 = Config { beacon_store: Some(beacon_path.to_string()), ..Default::default() }; let config1 = Config { beacon_store: Some(beacon_path.to_string()), ..Default::default() };
let node1 = sim.add_node(false, &config1).await; let node1 = sim.add_node(false, &config1);
let config2 = Config { beacon_load: Some(beacon_path.to_string()), ..Default::default() }; let config2 = Config { beacon_load: Some(beacon_path.to_string()), ..Default::default() };
let node2 = sim.add_node(false, &config2).await; let node2 = sim.add_node(false, &config2);
sim.set_time(100); sim.set_time(100);
sim.trigger_node_housekeep(node1).await; sim.trigger_node_housekeep(node1);
sim.trigger_node_housekeep(node2).await; sim.trigger_node_housekeep(node2);
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
} }
#[test] #[test]
async fn reconnect_after_timeout() { fn reconnect_after_timeout() {
let config = Config::default(); let config = Config::default();
let mut sim = TapSimulator::new(); let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config).await; let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config).await; let node2 = sim.add_node(false, &config);
sim.connect(node1, node2).await; sim.connect(node1, node2);
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
sim.set_time(5000); sim.set_time(5000);
sim.trigger_housekeep().await; sim.trigger_housekeep();
assert!(!sim.is_connected(node1, node2)); assert!(!sim.is_connected(node1, node2));
assert!(!sim.is_connected(node2, node1)); assert!(!sim.is_connected(node2, node1));
sim.simulate_all_messages().await; sim.simulate_all_messages();
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
} }
#[test] #[test]
async fn lost_init_ping() { fn lost_init_ping() {
let config = Config::default(); let config = Config::default();
let mut sim = TapSimulator::new(); let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config).await; let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config).await; let node2 = sim.add_node(false, &config);
sim.connect(node1, node2).await; sim.connect(node1, node2);
sim.drop_message(); // drop init ping sim.drop_message(); // drop init ping
sim.simulate_time(120).await; sim.simulate_time(120);
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
} }
#[test] #[test]
async fn lost_init_pong() { fn lost_init_pong() {
let config = Config::default(); let config = Config::default();
let mut sim = TapSimulator::new(); let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config).await; let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config).await; let node2 = sim.add_node(false, &config);
sim.connect(node1, node2).await; sim.connect(node1, node2);
sim.simulate_next_message().await; // init ping sim.simulate_next_message(); // init ping
sim.drop_message(); // drop init pong sim.drop_message(); // drop init pong
sim.simulate_time(120).await; sim.simulate_time(120);
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
} }
#[test] #[test]
async fn lost_init_peng() { fn lost_init_peng() {
let config = Config::default(); let config = Config::default();
let mut sim = TapSimulator::new(); let mut sim = TapSimulator::new();
let node1 = sim.add_node(false, &config).await; let node1 = sim.add_node(false, &config);
let node2 = sim.add_node(false, &config).await; let node2 = sim.add_node(false, &config);
sim.connect(node1, node2).await; sim.connect(node1, node2);
sim.simulate_next_message().await; // init ping sim.simulate_next_message(); // init ping
sim.simulate_next_message().await; // init pong sim.simulate_next_message(); // init pong
sim.drop_message(); // drop init peng sim.drop_message(); // drop init peng
sim.simulate_time(120).await; sim.simulate_time(120);
assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node2, node1));
} }
#[test] #[test]
#[ignore] #[ignore]
async fn peer_exchange() { fn peer_exchange() {
// TODO Test // TODO Test
unimplemented!() unimplemented!()
} }
#[test] #[test]
#[ignore] #[ignore]
async fn lost_peer_exchange() { fn lost_peer_exchange() {
// TODO Test // TODO Test
unimplemented!() unimplemented!()
} }
#[test] #[test]
#[ignore] #[ignore]
async fn remove_dead_peers() { fn remove_dead_peers() {
// TODO Test // TODO Test
unimplemented!() unimplemented!()
} }
#[test] #[test]
#[ignore] #[ignore]
async fn update_primary_address() { fn update_primary_address() {
// TODO Test // TODO Test
unimplemented!() unimplemented!()
} }
#[test] #[test]
#[ignore] #[ignore]
async fn automatic_peer_timeout() { fn automatic_peer_timeout() {
// TODO Test // TODO Test
unimplemented!() unimplemented!()
} }

View File

@ -70,6 +70,18 @@ impl TrafficEntry {
self.out_bytes = 0; self.out_bytes = 0;
self.in_bytes = 0; self.in_bytes = 0;
} }
fn clear(&mut self) {
self.in_bytes = 0;
self.out_bytes = 0;
self.in_packets = 0;
self.out_packets = 0;
self.idle_periods = 0;
self.in_bytes_total = 0;
self.in_packets_total = 0;
self.out_bytes_total = 0;
self.out_packets_total = 0;
}
} }
#[derive(Default)] #[derive(Default)]
@ -203,4 +215,28 @@ impl TrafficStats {
)?; )?;
Ok(()) Ok(())
} }
pub fn add(&mut self, other: &Self) {
for (addr, data) in &other.peers {
if let Some(entry) = self.peers.get_mut(addr) {
*entry += data
} else {
self.peers.insert(*addr, data.clone());
}
}
for (key, data) in &other.payload {
if let Some(entry) = self.payload.get_mut(key) {
*entry += data
} else {
self.payload.insert(*key, data.clone());
}
}
self.dropped += &other.dropped
}
pub fn clear(&mut self) {
self.peers.clear();
self.payload.clear();
self.dropped.clear();
}
} }

View File

@ -233,7 +233,7 @@ mod tests {
use std::io::Cursor; use std::io::Cursor;
#[test] #[test]
async fn address_parse_fmt() { fn address_parse_fmt() {
assert_eq!(format!("{}", Address::from_str("120.45.22.5").unwrap()), "120.45.22.5"); assert_eq!(format!("{}", Address::from_str("120.45.22.5").unwrap()), "120.45.22.5");
assert_eq!(format!("{}", Address::from_str("78:2d:16:05:01:02").unwrap()), "78:2d:16:05:01:02"); assert_eq!(format!("{}", Address::from_str("78:2d:16:05:01:02").unwrap()), "78:2d:16:05:01:02");
assert_eq!( assert_eq!(
@ -249,7 +249,7 @@ mod tests {
} }
#[test] #[test]
async fn address_decode_encode() { fn address_decode_encode() {
let mut buf = vec![]; let mut buf = vec![];
let addr = Address::from_str("120.45.22.5").unwrap(); let addr = Address::from_str("120.45.22.5").unwrap();
addr.write_to(Cursor::new(&mut buf)); addr.write_to(Cursor::new(&mut buf));
@ -270,7 +270,7 @@ mod tests {
} }
#[test] #[test]
async fn address_eq() { fn address_eq() {
assert_eq!( assert_eq!(
Address::read_from_fixed(Cursor::new(&[1, 2, 3, 4]), 4).unwrap(), Address::read_from_fixed(Cursor::new(&[1, 2, 3, 4]), 4).unwrap(),
Address::read_from_fixed(Cursor::new(&[1, 2, 3, 4]), 4).unwrap() Address::read_from_fixed(Cursor::new(&[1, 2, 3, 4]), 4).unwrap()
@ -290,7 +290,7 @@ mod tests {
} }
#[test] #[test]
async fn address_range_decode_encode() { fn address_range_decode_encode() {
let mut buf = vec![]; let mut buf = vec![];
let range = let range =
Range { base: Address { data: [0, 1, 2, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], len: 4 }, prefix_len: 24 }; Range { base: Address { data: [0, 1, 2, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], len: 4 }, prefix_len: 24 };

View File

@ -3,6 +3,7 @@
// This software is licensed under GPL-3 or newer (see LICENSE.md) // This software is licensed under GPL-3 or newer (see LICENSE.md)
use std::process::Command; use std::process::Command;
use std::time::Instant;
use std::{ use std::{
fmt, fmt,
net::{Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket}, net::{Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket},
@ -11,6 +12,8 @@ use std::{
use crate::error::Error; use crate::error::Error;
use signal::trap::Trap;
use signal::Signal;
#[cfg(not(target_os = "linux"))] #[cfg(not(target_os = "linux"))]
use time; use time;
@ -260,6 +263,38 @@ impl fmt::Display for Bytes {
} }
} }
pub struct CtrlC {
dummy_time: Instant,
trap: Trap,
}
impl CtrlC {
pub fn new() -> Self {
Default::default()
}
pub fn was_pressed(&self) -> bool {
self.trap.wait(self.dummy_time).is_some()
}
pub fn wait(&self) {
loop {
let deadline = Instant::now() + std::time::Duration::from_secs(10);
if self.trap.wait(deadline).is_some() {
return;
}
}
}
}
impl Default for CtrlC {
fn default() -> Self {
let dummy_time = Instant::now();
let trap = Trap::trap(&[Signal::SIGINT, Signal::SIGTERM, Signal::SIGQUIT]);
Self { dummy_time, trap }
}
}
pub trait TimeSource: Sync + Copy + Send + 'static { pub trait TimeSource: Sync + Copy + Send + 'static {
fn now() -> Time; fn now() -> Time;
} }
@ -402,7 +437,7 @@ pub fn run_cmd(mut cmd: Command) {
} }
#[test] #[test]
async fn base62() { fn base62() {
assert_eq!("", to_base62(&[0])); assert_eq!("", to_base62(&[0]));
assert_eq!("z", to_base62(&[61])); assert_eq!("z", to_base62(&[61]));
assert_eq!("10", to_base62(&[62])); assert_eq!("10", to_base62(&[62]));

View File

@ -3,19 +3,13 @@
// This software is licensed under GPL-3 or newer (see LICENSE.md) // This software is licensed under GPL-3 or newer (see LICENSE.md)
use super::{ use super::{
net::{get_ip, mapped_addr, parse_listen, Socket, SocketBuilder}, net::{get_ip, mapped_addr, parse_listen, Socket},
poll::{WaitImpl, WaitResult}, poll::{WaitImpl, WaitResult},
port_forwarding::PortForwarding, port_forwarding::PortForwarding,
util::MsgBuffer, util::MsgBuffer,
}; };
use async_trait::async_trait;
use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt};
use std::{ use std::{io::{self, Cursor, Read, Write}, net::{Ipv6Addr, SocketAddr, SocketAddrV6, TcpListener, TcpStream, UdpSocket}, os::unix::io::AsRawFd, sync::Arc, thread};
io::{self, Cursor, Read, Write},
net::{Ipv6Addr, SocketAddr, SocketAddrV6, TcpListener, TcpStream, UdpSocket},
os::unix::io::AsRawFd,
sync::Arc,
};
use tungstenite::{client::AutoStream, connect, protocol::WebSocket, server::accept, Message}; use tungstenite::{client::AutoStream, connect, protocol::WebSocket, server::accept, Message};
use url::Url; use url::Url;
@ -99,7 +93,7 @@ pub fn run_proxy(listen: &str) -> Result<(), io::Error> {
for stream in server.incoming() { for stream in server.incoming() {
let stream = stream?; let stream = stream?;
let peer = stream.peer_addr()?; let peer = stream.peer_addr()?;
tokio::spawn(async move { thread::spawn(move || {
if let Err(err) = serve_proxy_connection(stream) { if let Err(err) = serve_proxy_connection(stream) {
error!("Error on connection {}: {}", peer, err); error!("Error on connection {}: {}", peer, err);
} }
@ -138,21 +132,12 @@ impl ProxyConnection {
} }
} }
impl SocketBuilder for ProxyConnection { impl Socket for ProxyConnection {
type SocketType = ProxyConnection;
fn build(self) -> Result<Self::SocketType, io::Error> {
Ok(self)
}
fn create_port_forwarding(&self) -> Option<PortForwarding> { fn create_port_forwarding(&self) -> Option<PortForwarding> {
None None
} }
}
#[async_trait] fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> {
impl Socket for ProxyConnection {
async fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> {
buffer.clear(); buffer.clear();
let data = self.read_message()?; let data = self.read_message()?;
let addr = read_addr(Cursor::new(&data))?; let addr = read_addr(Cursor::new(&data))?;
@ -160,7 +145,7 @@ impl Socket for ProxyConnection {
Ok(addr) Ok(addr)
} }
async fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error> { fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error> {
let mut msg = Vec::with_capacity(data.len() + 18); let mut msg = Vec::with_capacity(data.len() + 18);
write_addr(addr, &mut msg)?; write_addr(addr, &mut msg)?;
msg.write_all(data)?; msg.write_all(data)?;
@ -171,7 +156,7 @@ impl Socket for ProxyConnection {
*/ */
} }
async fn address(&self) -> Result<SocketAddr, io::Error> { fn address(&self) -> Result<SocketAddr, io::Error> {
Ok(self.addr) Ok(self.addr)
} }
} }