diff --git a/Cargo.lock b/Cargo.lock index 4e939e1..94a98eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9,17 +9,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "async-trait" -version = "0.1.42" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d3a45e77e34375a7923b1e8febb049bb011f064714a8e17a1a616fef01da13d" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "attohttpc" version = "0.16.3" @@ -121,6 +110,12 @@ version = "1.0.67" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd" +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + [[package]] name = "cfg-if" version = "1.0.0" @@ -217,7 +212,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-utils", ] @@ -227,7 +222,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-epoch", "crossbeam-utils", ] @@ -238,7 +233,7 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2584f639eb95fea8c798496315b297cf81b9b58b6d30ab066a75455333cf4b12" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-utils", "lazy_static", "memoffset", @@ -252,7 +247,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7e9d99fa91428effe99c5c6d4634cdeba32b8cf784fc428a2a687f61a952c49" dependencies = [ "autocfg", - "cfg-if", + "cfg-if 1.0.0", "lazy_static", ] @@ -359,7 +354,7 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c9495705279e7140bf035dde1f6e750c162df8b625267cd52cc44e0b156732c8" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "wasi", ] @@ -450,7 +445,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -519,7 +514,7 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -544,26 +539,16 @@ dependencies = [ ] [[package]] -name = "mio" -version = "0.7.9" +name = "nix" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5dede4e2065b3842b8b0af444119f3aa331cc7cc2dd20388bfb0f5d5a38823a" +checksum = "6c722bee1037d430d0f8e687bbdbf222f27cc6e4e68d5caf630857bb2b6dbdce" dependencies = [ + "bitflags", + "cc", + "cfg-if 0.1.10", "libc", - "log", - "miow", - "ntapi", - "winapi", -] - -[[package]] -name = "miow" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897" -dependencies = [ - "socket2", - "winapi", + "void", ] [[package]] @@ -574,19 +559,10 @@ checksum = "b2ccba0cfe4fdf15982d1674c69b1fd80bad427d293849982668dfe454bd61f2" dependencies = [ "bitflags", "cc", - "cfg-if", + "cfg-if 1.0.0", "libc", ] -[[package]] -name = "ntapi" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" -dependencies = [ - "winapi", -] - [[package]] name = "num-integer" version = "0.1.44" @@ -651,7 +627,7 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "instant", "libc", "redox_syscall", @@ -665,12 +641,6 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" -[[package]] -name = "pin-project-lite" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "439697af366c49a6d0a010c56a0d97685bc140ce0d377b13a2ea2aa42d64a827" - [[package]] name = "plotters" version = "0.3.0" @@ -712,7 +682,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebd4c2739642e70439d1c0d9545beec45c1e54128739b3cda29bf2c366028c87" dependencies = [ "libc", - "nix", + "nix 0.19.1", ] [[package]] @@ -984,19 +954,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfebf75d25bd900fd1e7d11501efab59bc846dbc76196839663e6637bba9f25f" dependencies = [ "block-buffer", - "cfg-if", + "cfg-if 1.0.0", "cpuid-bool", "digest", "opaque-debug", ] [[package]] -name = "signal-hook-registry" -version = "1.3.0" +name = "signal" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16f1d0fef1604ba8f7a073c7e701f213e056707210e9020af4528e0101ce11a6" +checksum = "2f6ce83b159ab6984d2419f495134972b48754d13ff2e3f8c998339942b56ed9" dependencies = [ "libc", + "nix 0.14.1", ] [[package]] @@ -1005,17 +976,6 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" -[[package]] -name = "socket2" -version = "0.3.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e" -dependencies = [ - "cfg-if", - "libc", - "winapi", -] - [[package]] name = "spin" version = "0.5.2" @@ -1069,7 +1029,7 @@ version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "rand", "redox_syscall", @@ -1116,6 +1076,15 @@ dependencies = [ "syn", ] +[[package]] +name = "timeout_io" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d89ed29f92507ee770ef31f747ec1ad5ba7d8989a9f3489783da7d19a155c60" +dependencies = [ + "cc", +] + [[package]] name = "tinytemplate" version = "1.2.1" @@ -1141,37 +1110,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" -[[package]] -name = "tokio" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5" -dependencies = [ - "autocfg", - "bytes", - "libc", - "memchr", - "mio", - "num_cpus", - "once_cell", - "parking_lot", - "pin-project-lite", - "signal-hook-registry", - "tokio-macros", - "winapi", -] - -[[package]] -name = "tokio-macros" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "tungstenite" version = "0.13.0" @@ -1270,11 +1208,16 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" +[[package]] +name = "void" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" + [[package]] name = "vpncloud" version = "2.2.0" dependencies = [ - "async-trait", "byteorder", "chrono", "criterion", @@ -1291,11 +1234,12 @@ dependencies = [ "ring", "serde", "serde_yaml", + "signal", "smallvec", "structopt", "tempfile", "thiserror", - "tokio", + "timeout_io", "tungstenite", "url", "yaml-rust", @@ -1324,7 +1268,7 @@ version = "0.2.73" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83240549659d187488f91f33c0f8547cbfef0b2088bc470c116d1d260ef623d9" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "wasm-bindgen-macro", ] diff --git a/Cargo.toml b/Cargo.toml index bd23ad1..58ebbb5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,9 +36,8 @@ dialoguer = { version = "0.8", optional = true } tungstenite = { version = "0.13", optional = true, default-features = false } url = { version = "2.2", optional = true } igd = { version = "0.12", optional = true } -tokio = { version = "^1.5", features = ["full"] } -async-trait = "0.1" - +timeout_io = "0.6" +signal = "0.7" [dev-dependencies] tempfile = "3" diff --git a/ROADMAP.md b/ROADMAP.md new file mode 100644 index 0000000..1e9dd42 --- /dev/null +++ b/ROADMAP.md @@ -0,0 +1,13 @@ +# Roadmap + +## Multithreading functionality + +- [x] Timeout for device read +- [x] Timeout for net read +- [x] Check how async affects performance +- [x] Sync traffic stats +- [x] Sync forwarding table +- [ ] Fix WS Proxy code + + +## REST API \ No newline at end of file diff --git a/benches/criterion.rs b/benches/criterion.rs index 7acdec9..8c929bb 100644 --- a/benches/criterion.rs +++ b/benches/criterion.rs @@ -136,7 +136,6 @@ fn crypto_aes256(c: &mut Criterion) { } fn full_communication_tun_router(c: &mut Criterion) { - let runtime = tokio::runtime::Runtime::new().unwrap(); log::set_max_level(log::LevelFilter::Error); let config1 = Config { device_type: Type::Tun, @@ -152,60 +151,53 @@ fn full_communication_tun_router(c: &mut Criterion) { }; let mut sim = TunSimulator::new(); - let (node1, node2) = runtime.block_on(async { - log::set_max_level(log::LevelFilter::Error); - let node1 = sim.add_node(false, &config1).await; - let node2 = sim.add_node(false, &config2).await; + let node1 = sim.add_node(false, &config1); + let node2 = sim.add_node(false, &config2); - sim.connect(node1, node2).await; - sim.simulate_all_messages().await; - assert!(sim.is_connected(node1, node2)); - assert!(sim.is_connected(node2, node1)); - (node1, node2) - }); + sim.connect(node1, node2); + sim.simulate_all_messages(); + assert!(sim.is_connected(node1, node2)); + assert!(sim.is_connected(node2, node1)); + sim.trigger_housekeep(); let mut payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2]; payload.append(&mut vec![0; 1400]); let mut g = c.benchmark_group("full_communication"); g.throughput(Throughput::Bytes(2 * 1400)); g.bench_function("tun_router", |b| { - b.iter(|| runtime.block_on(async { - sim.put_payload(node1, payload.clone()).await; - sim.simulate_all_messages().await; + b.iter(|| { + sim.put_payload(node1, payload.clone()); + sim.simulate_all_messages(); assert_eq!(Some(&payload), sim.pop_payload(node2).as_ref()); - })); + }); }); g.finish() } fn full_communication_tap_switch(c: &mut Criterion) { - let runtime = tokio::runtime::Runtime::new().unwrap(); log::set_max_level(log::LevelFilter::Error); let config = Config { device_type: Type::Tap, ..Config::default() }; let mut sim = TapSimulator::new(); - let (node1, node2) = runtime.block_on(async { - log::set_max_level(log::LevelFilter::Error); - let node1 = sim.add_node(false, &config).await; - let node2 = sim.add_node(false, &config).await; + let node1 = sim.add_node(false, &config); + let node2 = sim.add_node(false, &config); - sim.connect(node1, node2).await; - sim.simulate_all_messages().await; - assert!(sim.is_connected(node1, node2)); - assert!(sim.is_connected(node2, node1)); - (node1, node2) - }); + sim.connect(node1, node2); + sim.simulate_all_messages(); + assert!(sim.is_connected(node1, node2)); + assert!(sim.is_connected(node2, node1)); + sim.trigger_housekeep(); let mut payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5]; payload.append(&mut vec![0; 1400]); let mut g = c.benchmark_group("full_communication"); g.throughput(Throughput::Bytes(2 * 1400)); g.bench_function("tap_switch", |b| { - b.iter(|| runtime.block_on(async { - sim.put_payload(node1, payload.clone()).await; - sim.simulate_all_messages().await; + b.iter(|| { + sim.put_payload(node1, payload.clone()); + sim.simulate_all_messages(); assert_eq!(Some(&payload), sim.pop_payload(node2).as_ref()); - })); + }); }); g.finish() } diff --git a/benches/valgrind.rs b/benches/valgrind.rs index ece2d03..c5c81e3 100644 --- a/benches/valgrind.rs +++ b/benches/valgrind.rs @@ -98,7 +98,6 @@ fn crypto_aes256() { } fn full_communication_tun_router() { - let runtime = tokio::runtime::Runtime::new().unwrap(); log::set_max_level(log::LevelFilter::Error); let config1 = Config { device_type: Type::Tun, @@ -113,55 +112,44 @@ fn full_communication_tun_router() { ..Config::default() }; let mut sim = TunSimulator::new(); - let (node1, node2) = runtime.block_on(async { - log::set_max_level(log::LevelFilter::Error); - let node1 = sim.add_node(false, &config1).await; - let node2 = sim.add_node(false, &config2).await; + let node1 = sim.add_node(false, &config1); + let node2 = sim.add_node(false, &config2); - sim.connect(node1, node2).await; - sim.simulate_all_messages().await; - assert!(sim.is_connected(node1, node2)); - assert!(sim.is_connected(node2, node1)); - (node1, node2) - }); + sim.connect(node1, node2); + sim.simulate_all_messages(); + assert!(sim.is_connected(node1, node2)); + assert!(sim.is_connected(node2, node1)); + sim.trigger_housekeep(); let mut payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2]; payload.append(&mut vec![0; 1400]); for _ in 0..1000 { - runtime.block_on(async { - sim.put_payload(node1, payload.clone()).await; - sim.simulate_all_messages().await; - assert_eq!(Some(&payload), black_box(sim.pop_payload(node2).as_ref())); - }); + sim.put_payload(node1, payload.clone()); + sim.simulate_all_messages(); + assert_eq!(Some(&payload), black_box(sim.pop_payload(node2).as_ref())); } } fn full_communication_tap_switch() { - let runtime = tokio::runtime::Runtime::new().unwrap(); log::set_max_level(log::LevelFilter::Error); let config = Config { device_type: Type::Tap, ..Config::default() }; let mut sim = TapSimulator::new(); - let (node1, node2) = runtime.block_on(async { - log::set_max_level(log::LevelFilter::Error); - let node1 = sim.add_node(false, &config).await; - let node2 = sim.add_node(false, &config).await; + let node1 = sim.add_node(false, &config); + let node2 = sim.add_node(false, &config); - sim.connect(node1, node2).await; - sim.simulate_all_messages().await; - assert!(sim.is_connected(node1, node2)); - assert!(sim.is_connected(node2, node1)); - (node1, node2) - }); + sim.connect(node1, node2); + sim.simulate_all_messages(); + assert!(sim.is_connected(node1, node2)); + assert!(sim.is_connected(node2, node1)); + sim.trigger_housekeep(); let mut payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5]; payload.append(&mut vec![0; 1400]); for _ in 0..1000 { - runtime.block_on(async { - sim.put_payload(node1, payload.clone()).await; - sim.simulate_all_messages().await; - assert_eq!(Some(&payload), black_box(sim.pop_payload(node2).as_ref())); - }); + sim.put_payload(node1, payload.clone()); + sim.simulate_all_messages(); + assert_eq!(Some(&payload), black_box(sim.pop_payload(node2).as_ref())); } } diff --git a/contrib/aws/quick_perf.py b/contrib/aws/quick_perf.py index 87d324f..e5d6524 100755 --- a/contrib/aws/quick_perf.py +++ b/contrib/aws/quick_perf.py @@ -4,9 +4,6 @@ from common import EC2Environment, CREATE, eprint import time, json, os, atexit from datetime import date - -# Note: this script will run for ~8 minutes and incur costs of about $ 0.02 - FILE = "../../target/release/vpncloud" VERSION = "2.2.0" REGION = "eu-central-1" @@ -25,9 +22,6 @@ env = EC2Environment( ) -CRYPTO = ["plain", "aes256", "aes128", "chacha20"] - - class PerfTest: def __init__(self, sender, receiver): self.sender = sender diff --git a/src/beacon.rs b/src/beacon.rs index 3f58044..abb5cec 100644 --- a/src/beacon.rs +++ b/src/beacon.rs @@ -323,7 +323,7 @@ use std::str::FromStr; use std::time::Duration; #[test] -async fn encode() { +fn encode() { MockTimeSource::set_time(2000 * 3600); let ser = BeaconSerializer::::new(b"mysecretkey"); let mut peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()]; @@ -335,7 +335,7 @@ async fn encode() { } #[test] -async fn decode() { +fn decode() { MockTimeSource::set_time(2000 * 3600); let ser = BeaconSerializer::::new(b"mysecretkey"); let mut peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()]; @@ -348,7 +348,7 @@ async fn decode() { } #[test] -async fn decode_split() { +fn decode_split() { MockTimeSource::set_time(2000 * 3600); let ser = BeaconSerializer::::new(b"mysecretkey"); let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()]; @@ -363,7 +363,7 @@ async fn decode_split() { } #[test] -async fn decode_offset() { +fn decode_offset() { MockTimeSource::set_time(2000 * 3600); let ser = BeaconSerializer::::new(b"mysecretkey"); let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()]; @@ -374,7 +374,7 @@ async fn decode_offset() { } #[test] -async fn decode_multiple() { +fn decode_multiple() { MockTimeSource::set_time(2000 * 3600); let ser = BeaconSerializer::::new(b"mysecretkey"); let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()]; @@ -385,7 +385,7 @@ async fn decode_multiple() { } #[test] -async fn decode_ttl() { +fn decode_ttl() { MockTimeSource::set_time(2000 * 3600); let ser = BeaconSerializer::::new(b"mysecretkey"); MockTimeSource::set_time(2000 * 3600); @@ -409,7 +409,7 @@ async fn decode_ttl() { } #[test] -async fn decode_invalid() { +fn decode_invalid() { MockTimeSource::set_time(2000 * 3600); let ser = BeaconSerializer::::new(b"mysecretkey"); assert_eq!(0, ser.decode("", None).len()); @@ -422,7 +422,7 @@ async fn decode_invalid() { } #[test] -async fn encode_decode() { +fn encode_decode() { MockTimeSource::set_time(2000 * 3600); let ser = BeaconSerializer::::new(b"mysecretkey"); let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()]; @@ -432,7 +432,7 @@ async fn encode_decode() { } #[test] -async fn encode_decode_file() { +fn encode_decode_file() { MockTimeSource::set_time(2000 * 3600); let ser = BeaconSerializer::::new(b"mysecretkey"); let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()]; @@ -444,7 +444,7 @@ async fn encode_decode_file() { } #[test] -async fn encode_decode_cmd() { +fn encode_decode_cmd() { MockTimeSource::set_time(2000 * 3600); let ser = BeaconSerializer::::new(b"mysecretkey"); let peers = vec![SocketAddr::from_str("1.2.3.4:5678").unwrap(), SocketAddr::from_str("6.6.6.6:53").unwrap()]; diff --git a/src/config.rs b/src/config.rs index 2c2e18b..6252e79 100644 --- a/src/config.rs +++ b/src/config.rs @@ -358,29 +358,25 @@ impl Config { pub fn is_learning(&self) -> bool { match self.mode { - Mode::Normal => { - match self.device_type { - Type::Tap => true, - Type::Tun => false - } - } + Mode::Normal => match self.device_type { + Type::Tap => true, + Type::Tun => false, + }, Mode::Router => false, Mode::Switch => true, - Mode::Hub => false + Mode::Hub => false, } } pub fn is_broadcasting(&self) -> bool { match self.mode { - Mode::Normal => { - match self.device_type { - Type::Tap => true, - Type::Tun => false - } - } + Mode::Normal => match self.device_type { + Type::Tap => true, + Type::Tun => false, + }, Mode::Router => false, Mode::Switch => true, - Mode::Hub => true + Mode::Hub => true, } } @@ -685,7 +681,7 @@ pub struct ConfigFile { } #[test] -async fn config_file() { +fn config_file() { let config_file = " device: type: tun @@ -766,12 +762,12 @@ statsd: } #[test] -async fn parse_example_config() { +fn parse_example_config() { serde_yaml::from_str::(include_str!("../assets/example.net.disabled")).unwrap(); } #[test] -async fn config_merge() { +fn config_merge() { let mut config = Config::default(); config.merge_file(ConfigFile { device: Some(ConfigFileDevice { @@ -871,42 +867,45 @@ async fn config_merge() { group: Some("root".to_string()), ..Default::default() }); - assert_eq!(config, Config { - device_type: Type::Tap, - device_name: "vpncloud0".to_string(), - device_path: Some("/dev/null".to_string()), - device_mtu: None, - fix_rp_filter: false, - advertise_addresses: vec![], - ip: None, - ifup: Some("ifconfig $IFNAME 10.0.1.2/16 mtu 1400 up".to_string()), - ifdown: Some("ifconfig $IFNAME down".to_string()), - crypto: CryptoConfig { password: Some("anothersecret".to_string()), ..CryptoConfig::default() }, - listen: "[::]:3211".to_string(), - peers: vec![ - "remote.machine.foo:3210".to_string(), - "remote.machine.bar:3210".to_string(), - "another:3210".to_string() - ], - peer_timeout: 1801, - keepalive: Some(850), - switch_timeout: 301, - beacon_store: Some("/run/vpncloud.beacon.out2".to_string()), - beacon_load: Some("/run/vpncloud.beacon.in2".to_string()), - beacon_interval: 3600, - beacon_password: Some("test1234".to_string()), - mode: Mode::Switch, - port_forwarding: false, - claims: vec!["10.0.1.0/24".to_string()], - auto_claim: true, - user: Some("root".to_string()), - group: Some("root".to_string()), - pid_file: Some("/run/vpncloud-mynet.run".to_string()), - stats_file: Some("/var/log/vpncloud-mynet.stats".to_string()), - statsd_server: Some("example.com:2345".to_string()), - statsd_prefix: Some("prefix2".to_string()), - daemonize: true, - hook: None, - hooks: HashMap::new() - }); + assert_eq!( + config, + Config { + device_type: Type::Tap, + device_name: "vpncloud0".to_string(), + device_path: Some("/dev/null".to_string()), + device_mtu: None, + fix_rp_filter: false, + advertise_addresses: vec![], + ip: None, + ifup: Some("ifconfig $IFNAME 10.0.1.2/16 mtu 1400 up".to_string()), + ifdown: Some("ifconfig $IFNAME down".to_string()), + crypto: CryptoConfig { password: Some("anothersecret".to_string()), ..CryptoConfig::default() }, + listen: "[::]:3211".to_string(), + peers: vec![ + "remote.machine.foo:3210".to_string(), + "remote.machine.bar:3210".to_string(), + "another:3210".to_string() + ], + peer_timeout: 1801, + keepalive: Some(850), + switch_timeout: 301, + beacon_store: Some("/run/vpncloud.beacon.out2".to_string()), + beacon_load: Some("/run/vpncloud.beacon.in2".to_string()), + beacon_interval: 3600, + beacon_password: Some("test1234".to_string()), + mode: Mode::Switch, + port_forwarding: false, + claims: vec!["10.0.1.0/24".to_string()], + auto_claim: true, + user: Some("root".to_string()), + group: Some("root".to_string()), + pid_file: Some("/run/vpncloud-mynet.run".to_string()), + stats_file: Some("/var/log/vpncloud-mynet.stats".to_string()), + statsd_server: Some("example.com:2345".to_string()), + statsd_prefix: Some("prefix2".to_string()), + daemonize: true, + hook: None, + hooks: HashMap::new() + } + ); } diff --git a/src/crypto/common.rs b/src/crypto/common.rs index 403eede..a964670 100644 --- a/src/crypto/common.rs +++ b/src/crypto/common.rs @@ -349,7 +349,7 @@ mod tests { } #[test] - async fn normal() { + fn normal() { let config = Config { password: Some("test".to_string()), ..Default::default() }; let mut node1 = create_node(&config); let mut node2 = create_node(&config); diff --git a/src/crypto/core.rs b/src/crypto/core.rs index 0a21dfa..2a52784 100644 --- a/src/crypto/core.rs +++ b/src/crypto/core.rs @@ -277,7 +277,7 @@ mod tests { use ring::aead::{self, LessSafeKey, UnboundKey}; #[test] - async fn test_nonce() { + fn test_nonce() { let mut nonce = Nonce::zero(); assert_eq!(nonce.as_bytes(), &[0; 12]); nonce.increment(); @@ -299,17 +299,17 @@ mod tests { } #[test] - async fn test_encrypt_decrypt_aes128() { + fn test_encrypt_decrypt_aes128() { test_encrypt_decrypt(&aead::AES_128_GCM) } #[test] - async fn test_encrypt_decrypt_aes256() { + fn test_encrypt_decrypt_aes256() { test_encrypt_decrypt(&aead::AES_256_GCM) } #[test] - async fn test_encrypt_decrypt_chacha() { + fn test_encrypt_decrypt_chacha() { test_encrypt_decrypt(&aead::CHACHA20_POLY1305) } @@ -339,17 +339,17 @@ mod tests { } #[test] - async fn test_tampering_aes128() { + fn test_tampering_aes128() { test_tampering(&aead::AES_128_GCM) } #[test] - async fn test_tampering_aes256() { + fn test_tampering_aes256() { test_tampering(&aead::AES_256_GCM) } #[test] - async fn test_tampering_chacha() { + fn test_tampering_chacha() { test_tampering(&aead::CHACHA20_POLY1305) } @@ -380,17 +380,17 @@ mod tests { } #[test] - async fn test_nonce_pinning_aes128() { + fn test_nonce_pinning_aes128() { test_nonce_pinning(&aead::AES_128_GCM) } #[test] - async fn test_nonce_pinning_aes256() { + fn test_nonce_pinning_aes256() { test_nonce_pinning(&aead::AES_256_GCM) } #[test] - async fn test_nonce_pinning_chacha() { + fn test_nonce_pinning_chacha() { test_nonce_pinning(&aead::CHACHA20_POLY1305) } @@ -429,39 +429,39 @@ mod tests { } #[test] - async fn test_key_rotation_aes128() { + fn test_key_rotation_aes128() { test_key_rotation(&aead::AES_128_GCM); } #[test] - async fn test_key_rotation_aes256() { + fn test_key_rotation_aes256() { test_key_rotation(&aead::AES_256_GCM); } #[test] - async fn test_key_rotation_chacha() { + fn test_key_rotation_chacha() { test_key_rotation(&aead::CHACHA20_POLY1305); } #[test] - async fn test_core_size() { + fn test_core_size() { assert_eq!(2400, mem::size_of::()); } #[test] - async fn test_speed_aes128() { + fn test_speed_aes128() { let speed = test_speed(&aead::AES_128_GCM, &Duration::from_secs_f32(0.2)); assert!(speed > 10.0); } #[test] - async fn test_speed_aes256() { + fn test_speed_aes256() { let speed = test_speed(&aead::AES_256_GCM, &Duration::from_secs_f32(0.2)); assert!(speed > 10.0); } #[test] - async fn test_speed_chacha() { + fn test_speed_chacha() { let speed = test_speed(&aead::CHACHA20_POLY1305, &Duration::from_secs_f32(0.2)); assert!(speed > 10.0); } diff --git a/src/crypto/init.rs b/src/crypto/init.rs index 10a6da3..dd99a1a 100644 --- a/src/crypto/init.rs +++ b/src/crypto/init.rs @@ -731,7 +731,7 @@ mod tests { } #[test] - async fn normal_init() { + fn normal_init() { let (mut sender, mut receiver) = create_pair(); let mut out = MsgBuffer::new(8); sender.send_ping(&mut out); @@ -753,7 +753,7 @@ mod tests { } #[test] - async fn lost_init_sender_recovers() { + fn lost_init_sender_recovers() { let (mut sender, mut receiver) = create_pair(); let mut out = MsgBuffer::new(8); sender.send_ping(&mut out); @@ -786,7 +786,7 @@ mod tests { } #[test] - async fn lost_init_receiver_recovers() { + fn lost_init_receiver_recovers() { let (mut sender, mut receiver) = create_pair(); let mut out = MsgBuffer::new(8); sender.send_ping(&mut out); @@ -818,7 +818,7 @@ mod tests { } #[test] - async fn timeout() { + fn timeout() { let (mut sender, _receiver) = create_pair(); let mut out = MsgBuffer::new(8); sender.send_ping(&mut out); @@ -833,7 +833,7 @@ mod tests { } #[test] - async fn untrusted_peer() { + fn untrusted_peer() { let (mut sender, _) = create_pair(); let (_, mut receiver) = create_pair(); let mut out = MsgBuffer::new(8); @@ -843,7 +843,7 @@ mod tests { } #[test] - async fn manipulated_message() { + fn manipulated_message() { let (mut sender, mut receiver) = create_pair(); let mut out = MsgBuffer::new(8); sender.send_ping(&mut out); @@ -853,7 +853,7 @@ mod tests { } #[test] - async fn connect_to_self() { + fn connect_to_self() { let (mut sender, _) = create_pair(); let mut out = MsgBuffer::new(8); sender.send_ping(&mut out); @@ -881,7 +881,7 @@ mod tests { } #[test] - async fn algorithm_negotiation() { + fn algorithm_negotiation() { // Equal algorithms test_algorithm_negotiation( Algorithms { diff --git a/src/crypto/rotate.rs b/src/crypto/rotate.rs index 7f2b5a3..4deec81 100644 --- a/src/crypto/rotate.rs +++ b/src/crypto/rotate.rs @@ -228,7 +228,7 @@ mod tests { } #[test] - async fn test_encode_decode_message() { + fn test_encode_decode_message() { let mut data = Vec::with_capacity(100); let (_, key) = RotationState::create_key(); let msg = RotationMessage { message_id: 1, propose: key, confirm: None }; @@ -249,7 +249,7 @@ mod tests { } #[test] - async fn test_normal_rotation() { + fn test_normal_rotation() { let mut out1 = MsgBuffer::new(8); let mut out2 = MsgBuffer::new(8); @@ -323,7 +323,7 @@ mod tests { } #[test] - async fn test_duplication() { + fn test_duplication() { let mut out1 = MsgBuffer::new(8); let mut out2 = MsgBuffer::new(8); @@ -359,7 +359,7 @@ mod tests { } #[test] - async fn test_lost_message() { + fn test_lost_message() { let mut out1 = MsgBuffer::new(8); let mut out2 = MsgBuffer::new(8); @@ -385,7 +385,7 @@ mod tests { } #[test] - async fn test_reflect_back() { + fn test_reflect_back() { let mut out1 = MsgBuffer::new(8); let mut out2 = MsgBuffer::new(8); diff --git a/src/device.rs b/src/device.rs index 5c61fd4..f40bedf 100644 --- a/src/device.rs +++ b/src/device.rs @@ -2,23 +2,9 @@ // Copyright (C) 2015-2021 Dennis Schwerdel // This software is licensed under GPL-3 or newer (see LICENSE.md) -use async_trait::async_trait; use parking_lot::Mutex; -use std::{ - cmp, - collections::VecDeque, - convert::TryInto, - fmt, - io::{self, Cursor, Read, Write, Error as IoError, BufReader, BufRead}, - net::{Ipv4Addr, UdpSocket}, - fs::{self, File}, - os::unix::io::AsRawFd, - str, - str::FromStr, - sync::Arc, -}; -use tokio::fs::{File as AsyncFile}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use std::{cmp, collections::VecDeque, convert::TryInto, fmt, fs::{self, File}, io::{self, Cursor, Read, Write, Error as IoError, BufReader, BufRead}, net::{Ipv4Addr, UdpSocket}, os::unix::io::AsRawFd, str, str::FromStr, sync::Arc, time::Duration}; +use timeout_io::Reader; use crate::{crypto, error::Error, util::MsgBuffer}; @@ -79,7 +65,6 @@ impl FromStr for Type { } } -#[async_trait] pub trait Device: Send + 'static + Sized { /// Returns the type of this device fn get_type(&self) -> Type; @@ -95,7 +80,7 @@ pub trait Device: Send + 'static + Sized { /// /// # Errors /// This method will return an error if the underlying read call fails. - async fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error>; + fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error>; /// Writes a packet/frame to the device /// @@ -106,9 +91,9 @@ pub trait Device: Send + 'static + Sized { /// /// # Errors /// This method will return an error if the underlying read call fails. - async fn write(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error>; + fn write(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error>; - async fn duplicate(&self) -> Result; + fn duplicate(&self) -> Result; fn get_ip(&self) -> Result; } @@ -218,23 +203,6 @@ impl TunTapDevice { } Ok(()) } -} - -/// Represents a tun/tap device -pub struct AsyncTunTapDevice { - fd: AsyncFile, - ifname: String, - type_: Type, -} - -impl AsyncTunTapDevice { - pub fn from_sync(dev: TunTapDevice) -> Self { - Self { - fd: AsyncFile::from_std(dev.fd), - ifname: dev.ifname, - type_: dev.type_ - } - } #[cfg(any(target_os = "linux", target_os = "android"))] #[inline] @@ -286,32 +254,32 @@ impl AsyncTunTapDevice { } } -#[async_trait] -impl Device for AsyncTunTapDevice { +impl Device for TunTapDevice { fn get_type(&self) -> Type { self.type_ } - async fn duplicate(&self) -> Result { + fn duplicate(&self) -> Result { Ok(Self { - fd: self.fd.try_clone().await.map_err(|e| Error::DeviceIo("Failed to clone device", e))?, + fd: self.fd.try_clone().map_err(|e| Error::DeviceIo("Failed to clone device", e))?, ifname: self.ifname.clone(), type_: self.type_, }) } - async fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> { + fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> { buffer.clear(); - let read = self.fd.read(buffer.buffer()).await.map_err(|e| Error::DeviceIo("Read error", e))?; + let mut read = 0; + self.fd.try_read(buffer.buffer(), &mut read, Duration::from_secs(1)).map_err(|e| Error::DeviceRead(e))?; buffer.set_length(read); self.correct_data_after_read(buffer); Ok(()) } - async fn write(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> { + fn write(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> { self.correct_data_before_write(buffer); - match self.fd.write_all(buffer.message()).await { - Ok(_) => self.fd.flush().await.map_err(|e| Error::DeviceIo("Flush error", e)), + match self.fd.write_all(buffer.message()) { + Ok(_) => self.fd.flush().map_err(|e| Error::DeviceIo("Flush error", e)), Err(e) => Err(Error::DeviceIo("Write error", e)), } } @@ -345,9 +313,8 @@ impl MockDevice { } } -#[async_trait] impl Device for MockDevice { - async fn duplicate(&self) -> Result { + fn duplicate(&self) -> Result { Ok(self.clone()) } @@ -355,7 +322,7 @@ impl Device for MockDevice { Type::Tun } - async fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> { + fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> { if let Some(data) = self.inbound.lock().pop_front() { buffer.clear(); buffer.set_length(data.len()); @@ -366,7 +333,7 @@ impl Device for MockDevice { } } - async fn write(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> { + fn write(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> { self.outbound.lock().push_back(buffer.message().into()); Ok(()) } diff --git a/src/engine/common.rs b/src/engine/common.rs index 254d26c..bf691fd 100644 --- a/src/engine/common.rs +++ b/src/engine/common.rs @@ -1,10 +1,11 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::thread; use std::{fs::File, hash::BuildHasherDefault}; -use tokio; use fnv::FnvHasher; +use crate::util::CtrlC; use crate::{ config::Config, crypto::PeerCrypto, @@ -46,7 +47,7 @@ pub struct GenericCloud { impl GenericCloud { #[allow(clippy::too_many_arguments)] - pub async fn new( + pub fn new( config: &Config, socket: S, device: D, port_forwarding: Option, stats_file: Option, ) -> Result { let table = SharedTable::::new(&config); @@ -55,7 +56,7 @@ impl GenericCloud::new( config.clone(), - device.duplicate().await?, + device.duplicate()?, socket.clone(), traffic.clone(), peer_crypto.clone(), @@ -73,7 +74,7 @@ impl GenericCloud GenericCloud GenericCloud { &mut self.device_thread.device } - pub async fn connect(&mut self, addr: SocketAddr) -> Result<(), Error> { - self.socket_thread.connect(addr).await + pub fn connect(&mut self, addr: SocketAddr) -> Result<(), Error> { + self.socket_thread.connect(addr) } - pub async fn trigger_socket_event(&mut self) { - self.socket_thread.iteration().await + pub fn trigger_socket_event(&mut self) { + self.socket_thread.iteration() } - pub async fn trigger_device_event(&mut self) { - self.device_thread.iteration().await + pub fn trigger_device_event(&mut self) { + self.device_thread.iteration() } - pub async fn trigger_housekeep(&mut self) { - try_fail!(self.socket_thread.housekeep().await, "Housekeep failed: {}"); - try_fail!(self.device_thread.housekeep().await, "Housekeep failed: {}"); + pub fn trigger_housekeep(&mut self) { + try_fail!(self.socket_thread.housekeep(), "Housekeep failed: {}"); + try_fail!(self.device_thread.housekeep(), "Housekeep failed: {}"); } pub fn is_connected(&self, addr: &SocketAddr) -> bool { @@ -149,7 +152,7 @@ impl GenericCloud { &self.socket_thread.own_addresses } - pub async fn get_num(&self) -> usize { - self.socket_thread.socket.address().await.unwrap().port() as usize + pub fn get_num(&self) -> usize { + self.socket_thread.socket.address().unwrap().port() as usize } } diff --git a/src/engine/device_thread.rs b/src/engine/device_thread.rs index 7938219..f016101 100644 --- a/src/engine/device_thread.rs +++ b/src/engine/device_thread.rs @@ -14,7 +14,6 @@ use crate::{ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::{marker::PhantomData, net::SocketAddr}; -use tokio::time::timeout; pub struct DeviceThread { // Read-only fields @@ -56,11 +55,11 @@ impl DeviceThread Result<(), Error> { + fn send_to(&mut self, addr: SocketAddr) -> Result<(), Error> { let size = self.buffer.len(); debug!("Sending msg with {} bytes to {}", size, addr); self.traffic.count_out_traffic(addr, size); - match self.socket.send(self.buffer.message(), addr).await { + match self.socket.send(self.buffer.message(), addr) { Ok(written) if written == size => Ok(()), Ok(_) => Err(Error::Socket("Sent out truncated packet")), Err(e) => Err(Error::SocketIo("IOError when sending", e)), @@ -68,15 +67,15 @@ impl DeviceThread Result<(), Error> { + fn send_msg(&mut self, addr: SocketAddr, type_: u8) -> Result<(), Error> { debug!("Sending msg with {} bytes to {}", self.buffer.len(), addr); self.buffer.prepend_byte(type_); self.peer_crypto.encrypt_for(addr, &mut self.buffer)?; - self.send_to(addr).await + self.send_to(addr) } #[inline] - async fn broadcast_msg(&mut self, type_: u8) -> Result<(), Error> { + fn broadcast_msg(&mut self, type_: u8) -> Result<(), Error> { let size = self.buffer.len(); debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, size, self.peer_crypto.count()); let traffic = &mut self.traffic; @@ -91,7 +90,7 @@ impl DeviceThread Ok(()), Ok(_) => Err(Error::Socket("Sent out truncated packet")), Err(e) => Err(Error::SocketIo("IOError when sending", e)), @@ -100,7 +99,7 @@ impl DeviceThread Result<(), Error> { + fn forward_packet(&mut self) -> Result<(), Error> { let (src, dst) = P::parse(self.buffer.message())?; debug!("Read data from interface: src: {}, dst: {}, {} bytes", src, dst, self.buffer.len()); self.traffic.count_out_payload(dst, src, self.buffer.len()); @@ -108,12 +107,12 @@ impl DeviceThread { // Peer found for destination debug!("Found destination for {} => {}", dst, addr); - self.send_msg(addr, MESSAGE_TYPE_DATA).await?; + self.send_msg(addr, MESSAGE_TYPE_DATA)?; } None => { if self.broadcast { debug!("No destination for {} found, broadcasting", dst); - self.broadcast_msg(MESSAGE_TYPE_DATA).await?; + self.broadcast_msg(MESSAGE_TYPE_DATA)?; } else { debug!("No destination for {} found, dropping", dst); self.traffic.count_dropped_payload(self.buffer.len()); @@ -123,35 +122,35 @@ impl DeviceThread Result<(), Error> { + pub fn housekeep(&mut self) -> Result<(), Error> { self.peer_crypto.load(); self.table.sync(); self.traffic.sync(); Ok(()) } - pub async fn iteration(&mut self) { - if let Ok(result) = timeout(std::time::Duration::from_millis(1000), self.device.read(&mut self.buffer)).await { - try_fail!(result, "Failed to read from device: {}"); - if let Err(e) = self.forward_packet().await { + pub fn iteration(&mut self) { + if self.device.read(&mut self.buffer).is_ok() { + //try_fail!(result, "Failed to read from device: {}"); + if let Err(e) = self.forward_packet() { error!("{}", e); } } let now = TS::now(); if self.next_housekeep < now { - if let Err(e) = self.housekeep().await { + if let Err(e) = self.housekeep() { error!("{}", e) } self.next_housekeep = now + 1 } } - pub async fn run(mut self) { + pub fn run(mut self) { loop { - self.iteration().await; + self.iteration(); if !self.running.load(Ordering::SeqCst) { debug!("Device: end"); - return + return; } } } diff --git a/src/engine/shared.rs b/src/engine/shared.rs index 2e5d829..66c248a 100644 --- a/src/engine/shared.rs +++ b/src/engine/shared.rs @@ -71,42 +71,49 @@ impl SharedPeerCrypto { } } -#[derive(Clone)] pub struct SharedTraffic { + cache: TrafficStats, traffic: Arc>, } +impl Clone for SharedTraffic { + fn clone(&self) -> Self { + Self { cache: TrafficStats::default(), traffic: self.traffic.clone() } + } +} + impl SharedTraffic { pub fn new() -> Self { - Self { traffic: Arc::new(Mutex::new(Default::default())) } + Self { cache: TrafficStats::default(), traffic: Arc::new(Mutex::new(Default::default())) } } pub fn sync(&mut self) { - // TODO sync if needed + self.traffic.lock().add(&self.cache); + self.cache.clear(); } - pub fn count_out_traffic(&self, peer: SocketAddr, bytes: usize) { - self.traffic.lock().count_out_traffic(peer, bytes); + pub fn count_out_traffic(&mut self, peer: SocketAddr, bytes: usize) { + self.cache.count_out_traffic(peer, bytes); } - pub fn count_in_traffic(&self, peer: SocketAddr, bytes: usize) { - self.traffic.lock().count_in_traffic(peer, bytes); + pub fn count_in_traffic(&mut self, peer: SocketAddr, bytes: usize) { + self.cache.count_in_traffic(peer, bytes); } - pub fn count_out_payload(&self, remote: Address, local: Address, bytes: usize) { - self.traffic.lock().count_out_payload(remote, local, bytes); + pub fn count_out_payload(&mut self, remote: Address, local: Address, bytes: usize) { + self.cache.count_out_payload(remote, local, bytes); } - pub fn count_in_payload(&self, remote: Address, local: Address, bytes: usize) { - self.traffic.lock().count_in_payload(remote, local, bytes); + pub fn count_in_payload(&mut self, remote: Address, local: Address, bytes: usize) { + self.cache.count_in_payload(remote, local, bytes); } - pub fn count_dropped_payload(&self, bytes: usize) { - self.traffic.lock().count_dropped_payload(bytes); + pub fn count_dropped_payload(&mut self, bytes: usize) { + self.cache.count_dropped_payload(bytes); } - pub fn count_invalid_protocol(&self, bytes: usize) { - self.traffic.lock().count_invalid_protocol(bytes); + pub fn count_invalid_protocol(&mut self, bytes: usize) { + self.cache.count_invalid_protocol(bytes); } pub fn period(&mut self, cleanup_idle: Option) { @@ -133,60 +140,60 @@ impl SharedTraffic { #[derive(Clone)] pub struct SharedTable { table: Arc>>, - //TODO: local reader lookup table Addr => Option - //TODO: local writer cache Addr => SocketAddr + cache: HashMap, Hash>, } impl SharedTable { pub fn new(config: &Config) -> Self { let table = ClaimTable::new(config.switch_timeout as Duration, config.peer_timeout as Duration); - SharedTable { table: Arc::new(Mutex::new(table)) } + SharedTable { table: Arc::new(Mutex::new(table)), cache: Default::default() } } pub fn sync(&mut self) { - // TODO sync if needed - // once every x seconds - // fetch reader cache - // clear writer cache + self.cache.clear(); } pub fn lookup(&mut self, addr: Address) -> Option { - // TODO: use local reader cache + if let Some(val) = self.cache.get(&addr) { + return *val; + } // if not found, use shared table and put into cache - self.table.lock().lookup(addr) + let val = self.table.lock().lookup(addr); + self.cache.insert(addr, val); + val } pub fn set_claims(&mut self, peer: SocketAddr, claims: RangeList) { - // clear writer cache - self.table.lock().set_claims(peer, claims) + self.table.lock().set_claims(peer, claims); + self.cache.clear(); } pub fn remove_claims(&mut self, peer: SocketAddr) { - // clear writer cache - self.table.lock().remove_claims(peer) + self.table.lock().remove_claims(peer); + self.cache.clear(); } pub fn cache(&mut self, addr: Address, peer: SocketAddr) { - // check writer cache and only write real updates to shared table - self.table.lock().cache(addr, peer) + if self.cache.get(&addr) != Some(&Some(peer)) { + self.table.lock().cache(addr, peer); + self.cache.insert(addr, Some(peer)); + } } pub fn housekeep(&mut self) { - self.table.lock().housekeep() + self.table.lock().housekeep(); + self.cache.clear(); } pub fn write_out(&self, out: &mut W) -> Result<(), io::Error> { - //TODO: stats call self.table.lock().write_out(out) } pub fn cache_len(&self) -> usize { - //TODO: stats call self.table.lock().cache_len() } pub fn claim_len(&self) -> usize { - //TODO: stats call self.table.lock().claim_len() } } diff --git a/src/engine/socket_thread.rs b/src/engine/socket_thread.rs index 3a456b2..7ae2835 100644 --- a/src/engine/socket_thread.rs +++ b/src/engine/socket_thread.rs @@ -35,7 +35,6 @@ use std::{ net::{SocketAddr, ToSocketAddrs}, str::FromStr, }; -use tokio::time::timeout; const MAX_RECONNECT_INTERVAL: u16 = 3600; const RESOLVE_INTERVAL: Time = 300; @@ -151,11 +150,11 @@ impl SocketThread Result<(), Error> { + fn send_to(&mut self, addr: SocketAddr) -> Result<(), Error> { let size = self.buffer.len(); debug!("Sending msg with {} bytes to {}", size, addr); self.traffic.count_out_traffic(addr, size); - match self.socket.send(self.buffer.message(), addr).await { + match self.socket.send(self.buffer.message(), addr) { Ok(written) if written == size => { self.buffer.clear(); Ok(()) @@ -166,7 +165,7 @@ impl SocketThread Result<(), Error> { + fn broadcast_msg(&mut self, type_: u8) -> Result<(), Error> { debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, self.buffer.len(), self.peers.len()); for (addr, peer) in &mut self.peers { self.broadcast_buffer.set_start(self.buffer.get_start()); @@ -175,7 +174,7 @@ impl SocketThread Ok(()), Ok(_) => Err(Error::Socket("Sent out truncated packet")), Err(e) => Err(Error::SocketIo("IOError when sending", e)), @@ -185,7 +184,7 @@ impl SocketThread Result<(), Error> { + fn connect_sock(&mut self, addr: SocketAddr) -> Result<(), Error> { let addr = mapped_addr(addr); if self.peers.contains_key(&addr) || self.own_addresses.contains(&addr) @@ -198,10 +197,10 @@ impl SocketThread(&mut self, addr: Addr) -> Result<(), Error> { + pub fn connect(&mut self, addr: Addr) -> Result<(), Error> { let addrs = resolve(&addr)?.into_iter().map(mapped_addr).collect::>(); for addr in &addrs { if self.own_addresses.contains(addr) @@ -214,7 +213,7 @@ impl SocketThread SocketThread) -> Result<(), Error> { + fn update_peer_info(&mut self, addr: SocketAddr, info: Option) -> Result<(), Error> { if let Some(peer) = self.peers.get_mut(&addr) { peer.last_seen = TS::now(); peer.timeout = TS::now() + self.config.peer_timeout as Time; @@ -260,12 +259,12 @@ impl SocketThread Result<(), Error> { + fn add_new_peer(&mut self, addr: SocketAddr, info: NodeInfo) -> Result<(), Error> { info!("Added peer {}", addr_nice(addr)); if let Some(init) = self.pending_inits.remove(&addr) { self.buffer.clear(); @@ -281,9 +280,9 @@ impl SocketThread SocketThread Result<(), Error> { + fn connect_to_peers(&mut self, peers: &[PeerInfo]) -> Result<(), Error> { 'outer: for peer in peers { for addr in &peer.addrs { if self.peers.contains_key(addr) { @@ -315,7 +314,7 @@ impl SocketThread SocketThread Result<(), Error> { + fn handle_payload_from(&mut self, peer: SocketAddr) -> Result<(), Error> { let (src, dst) = P::parse(self.buffer.message())?; let len = self.buffer.len(); debug!("Writing data to device: {} bytes", len); self.traffic.count_in_payload(src, dst, len); - if let Err(e) = self.device.write(&mut self.buffer).await { + if let Err(e) = self.device.write(&mut self.buffer) { error!("Failed to send via device: {}", e); return Err(e); } @@ -345,10 +344,10 @@ impl SocketThread Result<(), Error> { + fn process_message(&mut self, src: SocketAddr, msg_result: MessageResult) -> Result<(), Error> { match msg_result { MessageResult::Message(type_) => match type_ { - MESSAGE_TYPE_DATA => self.handle_payload_from(src).await?, + MESSAGE_TYPE_DATA => self.handle_payload_from(src)?, MESSAGE_TYPE_NODE_INFO => { let info = match NodeInfo::decode(Cursor::new(self.buffer.message())) { Ok(val) => val, @@ -358,11 +357,11 @@ impl SocketThread { self.buffer.clear(); - self.update_peer_info(src, None).await?; + self.update_peer_info(src, None)?; } MESSAGE_TYPE_CLOSE => { self.buffer.clear(); @@ -374,7 +373,7 @@ impl SocketThread self.send_to(src).await?, + MessageResult::Reply => self.send_to(src)?, MessageResult::None => { self.buffer.clear(); } @@ -382,13 +381,13 @@ impl SocketThread Result<(), Error> { + fn handle_message(&mut self, src: SocketAddr) -> Result<(), Error> { let src = mapped_addr(src); debug!("Received {} bytes from {}", self.buffer.len(), src); let buffer = &mut self.buffer; self.traffic.count_in_traffic(src, buffer.len()); if let Some(result) = self.peers.get_mut(&src).map(|peer| peer.crypto.handle_message(buffer)) { - return self.process_message(src, result?).await; + return self.process_message(src, result?); } let is_init = is_init_message(buffer.message()); if let Some(result) = self.pending_inits.get_mut(&src).map(|init| { @@ -401,10 +400,10 @@ impl SocketThread SocketThread { self.pending_inits.insert(src, init); - self.send_to(src).await + self.send_to(src) } Err(err) => { self.traffic.count_invalid_protocol(self.buffer.len()); @@ -428,7 +427,7 @@ impl SocketThread Result<(), Error> { + pub fn housekeep(&mut self) -> Result<(), Error> { let now = TS::now(); let mut del: SmallVec<[SocketAddr; 3]> = SmallVec::new(); for (&addr, ref data) in &self.peers { @@ -440,10 +439,10 @@ impl SocketThread SocketThread SocketThread Result<(), Error> { + fn crypto_housekeep(&mut self) -> Result<(), Error> { let mut del: SmallVec<[SocketAddr; 4]> = smallvec![]; for addr in self.pending_inits.keys().copied().collect::>() { self.buffer.clear(); if self.pending_inits.get_mut(&addr).unwrap().every_second(&mut self.buffer).is_err() { del.push(addr) } else if !self.buffer.is_empty() { - self.send_to(addr).await? + self.send_to(addr)? } } for addr in self.peers.keys().copied().collect::>() { self.buffer.clear(); self.peers.get_mut(&addr).unwrap().crypto.every_second(&mut self.buffer); if !self.buffer.is_empty() { - self.send_to(addr).await? + self.send_to(addr)? } } for addr in del { self.pending_inits.remove(&addr); if self.peers.remove(&addr).is_some() { - self.connect_sock(addr).await?; + self.connect_sock(addr)?; } } Ok(()) } - async fn reset_own_addresses(&mut self) -> io::Result<()> { + fn reset_own_addresses(&mut self) -> io::Result<()> { self.own_addresses.clear(); - let socket_addr = self.socket.address().await.map(mapped_addr)?; + let socket_addr = self.socket.address().map(mapped_addr)?; // 1) Specified advertise addresses for addr in &self.config.advertise_addresses { self.own_addresses.push(parse_listen(addr, socket_addr.port())); @@ -555,7 +554,7 @@ impl SocketThread Result<(), Error> { + fn load_beacon(&mut self) -> Result<(), Error> { let peers; if let Some(ref path) = self.config.beacon_load { if let Some(path) = path.strip_prefix('|') { @@ -574,7 +573,7 @@ impl SocketThread SocketThread Result<(), Error> { + fn send_stats_to_statsd(&mut self) -> Result<(), Error> { if let Some(ref endpoint) = self.statsd_server { let peer_traffic = self.traffic.total_peer_traffic(); let payload_traffic = self.traffic.total_payload_traffic(); @@ -652,7 +651,7 @@ impl SocketThread Ok(()), Ok(_) => Err(Error::Socket("Sent out truncated packet")), Err(e) => Err(Error::SocketIo("IOError when sending", e)), @@ -664,14 +663,14 @@ impl SocketThread Result<(), Error> { + fn reconnect_to_peers(&mut self) -> Result<(), Error> { let now = TS::now(); // Connect to those reconnect_peers that are due for entry in self.reconnect_peers.clone() { if entry.next > now { continue; } - self.connect(&entry.resolved as &[SocketAddr]).await?; + self.connect(&entry.resolved as &[SocketAddr])?; } for entry in &mut self.reconnect_peers { // Schedule for next second if node is connected @@ -717,11 +716,10 @@ impl SocketThread { debug!("Fatal crypto init error from {}: {}", src, e); info!("Closing pending connection to {} due to error in crypto init", addr_nice(src)); @@ -743,7 +741,7 @@ impl SocketThread SocketThread TunTapDevice { } #[allow(clippy::cognitive_complexity)] -fn run(config: Config, socket: S) { +fn run(config: Config, socket: S) { let device = setup_device(&config); let port_forwarding = if config.port_forwarding { socket.create_port_forwarding() } else { None }; let stats_file = match config.stats_file { @@ -222,32 +219,25 @@ fn run(config: Config, socket: S) { } try_fail!(pd.apply(), "Failed to drop privileges: {}"); } - let rt = Runtime::new().unwrap(); let ifdown = config.ifdown.clone(); - rt.block_on(async move { - // Warning: no async code outside this block, or it will break on daemonize - let device = AsyncTunTapDevice::from_sync(device); - let socket = try_fail!(socket.build(), "Failed to create async socket: {}"); - let mut cloud = try_fail!( - GenericCloud::::new( - &config, - socket, - device, - port_forwarding, - stats_file - ) - .await, - "Failed to create engine: {}" - ); - for mut addr in config.peers { - if addr.find(':').unwrap_or(0) <= addr.find(']').unwrap_or(0) { - // : not present or only in IPv6 address - addr = format!("{}:{}", addr, DEFAULT_PORT) - } - try_fail!(cloud.add_peer(addr.clone()), "Failed to send message to {}: {}", &addr); + let mut cloud = try_fail!( + GenericCloud::::new( + &config, + socket, + device, + port_forwarding, + stats_file + ), + "Failed to create engine: {}" + ); + for mut addr in config.peers { + if addr.find(':').unwrap_or(0) <= addr.find(']').unwrap_or(0) { + // : not present or only in IPv6 address + addr = format!("{}:{}", addr, DEFAULT_PORT) } - cloud.run().await - }); + try_fail!(cloud.add_peer(addr.clone()), "Failed to send message to {}: {}", &addr); + } + cloud.run(); if let Some(script) = ifdown { run_script(&script, &ifname); } diff --git a/src/net.rs b/src/net.rs index 14aa3b4..365c63a 100644 --- a/src/net.rs +++ b/src/net.rs @@ -5,9 +5,8 @@ use crate::config::DEFAULT_PORT; use crate::port_forwarding::PortForwarding; use crate::util::{MockTimeSource, MsgBuffer, Time, TimeSource}; -use async_trait::async_trait; use parking_lot::Mutex; -use tokio::net::UdpSocket as AsyncUdpSocket; +use std::time::Duration; use std::{ collections::{HashMap, VecDeque}, io::{self, ErrorKind}, @@ -32,17 +31,11 @@ pub fn get_ip() -> IpAddr { s.local_addr().unwrap().ip() } -pub trait SocketBuilder { - type SocketType: Socket; - fn build(self) -> Result; - fn create_port_forwarding(&self) -> Option; -} - -#[async_trait] pub trait Socket: Sized + Clone + Send + Sync + 'static { - async fn receive(&mut self, buffer: &mut MsgBuffer) -> Result; - async fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result; - async fn address(&self) -> Result; + fn receive(&mut self, buffer: &mut MsgBuffer) -> Result; + fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result; + fn address(&self) -> Result; + fn create_port_forwarding(&self) -> Option; } pub fn parse_listen(addr: &str, default_port: u16) -> SocketAddr { @@ -59,49 +52,40 @@ pub fn parse_listen(addr: &str, default_port: u16) -> SocketAddr { } } -pub struct NetSocket(UdpSocket); +pub struct NetSocket(Arc); impl NetSocket { pub fn listen(addr: &str) -> Result { let addr = parse_listen(addr, DEFAULT_PORT); - Ok(Self(UdpSocket::bind(addr)?)) + let sock = UdpSocket::bind(addr)?; + sock.set_read_timeout(Some(Duration::from_secs(1)))?; + Ok(Self(Arc::new(sock))) } } -impl SocketBuilder for NetSocket { - type SocketType = AsyncNetSocket; - - fn create_port_forwarding(&self) -> Option { - PortForwarding::new(self.0.local_addr().unwrap().port()) - } - - fn build(self) -> Result { - Ok(AsyncNetSocket(Arc::new(AsyncUdpSocket::from_std(self.0)?))) - } -} - -pub struct AsyncNetSocket(Arc); - -impl Clone for AsyncNetSocket { +impl Clone for NetSocket { fn clone(&self) -> Self { Self(self.0.clone()) } } -#[async_trait] -impl Socket for AsyncNetSocket { - async fn receive(&mut self, buffer: &mut MsgBuffer) -> Result { +impl Socket for NetSocket { + fn create_port_forwarding(&self) -> Option { + PortForwarding::new(self.0.local_addr().unwrap().port()) + } + + fn receive(&mut self, buffer: &mut MsgBuffer) -> Result { buffer.clear(); - let (size, addr) = self.0.recv_from(buffer.buffer()).await?; + let (size, addr) = self.0.recv_from(buffer.buffer())?; buffer.set_length(size); Ok(addr) } - async fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result { - self.0.send_to(data, addr).await + fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result { + self.0.send_to(data, addr) } - async fn address(&self) -> Result { + fn address(&self) -> Result { let mut addr = self.0.local_addr()?; addr.set_ip(get_ip()); Ok(addr) @@ -160,9 +144,8 @@ impl MockSocket { } } -#[async_trait] impl Socket for MockSocket { - async fn receive(&mut self, buffer: &mut MsgBuffer) -> Result { + fn receive(&mut self, buffer: &mut MsgBuffer) -> Result { if let Some((addr, data)) = self.inbound.lock().pop_front() { buffer.clear(); buffer.set_length(data.len()); @@ -173,7 +156,7 @@ impl Socket for MockSocket { } } - async fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result { + fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result { self.outbound.lock().push_back((addr, data.into())); if self.nat { self.nat_peers.lock().insert(addr, MockTimeSource::now() + 300); @@ -181,9 +164,13 @@ impl Socket for MockSocket { Ok(data.len()) } - async fn address(&self) -> Result { + fn address(&self) -> Result { Ok(self.address) } + + fn create_port_forwarding(&self) -> Option { + None + } } #[cfg(feature = "bench")] diff --git a/src/payload.rs b/src/payload.rs index 6d42821..7a75ad3 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -53,7 +53,7 @@ impl Protocol for Frame { } #[test] -async fn decode_frame_without_vlan() { +fn decode_frame_without_vlan() { let data = [6, 5, 4, 3, 2, 1, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 8]; let (src, dst) = Frame::parse(&data).unwrap(); assert_eq!(src, Address { data: [1, 2, 3, 4, 5, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], len: 6 }); @@ -61,7 +61,7 @@ async fn decode_frame_without_vlan() { } #[test] -async fn decode_frame_with_vlan() { +fn decode_frame_with_vlan() { let data = [6, 5, 4, 3, 2, 1, 1, 2, 3, 4, 5, 6, 0x81, 0, 4, 210, 1, 2, 3, 4, 5, 6, 7, 8]; let (src, dst) = Frame::parse(&data).unwrap(); assert_eq!(src, Address { data: [4, 210, 1, 2, 3, 4, 5, 6, 0, 0, 0, 0, 0, 0, 0, 0], len: 8 }); @@ -69,7 +69,7 @@ async fn decode_frame_with_vlan() { } #[test] -async fn decode_invalid_frame() { +fn decode_invalid_frame() { assert!(Frame::parse(&[6, 5, 4, 3, 2, 1, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 8]).is_ok()); // truncated frame assert!(Frame::parse(&[]).is_err()); @@ -118,7 +118,7 @@ impl Protocol for Packet { } #[test] -async fn decode_ipv4_packet() { +fn decode_ipv4_packet() { let data = [0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 192, 168, 1, 1, 192, 168, 1, 2]; let (src, dst) = Packet::parse(&data).unwrap(); assert_eq!(src, Address { data: [192, 168, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], len: 4 }); @@ -126,7 +126,7 @@ async fn decode_ipv4_packet() { } #[test] -async fn decode_ipv6_packet() { +fn decode_ipv6_packet() { let data = [ 0x60, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 6, 5, 4, 3, 2, 1, @@ -137,7 +137,7 @@ async fn decode_ipv6_packet() { } #[test] -async fn decode_invalid_packet() { +fn decode_invalid_packet() { assert!(Packet::parse(&[0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 192, 168, 1, 1, 192, 168, 1, 2]).is_ok()); assert!(Packet::parse(&[ 0x60, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 6, 5, diff --git a/src/tests/common.rs b/src/tests/common.rs index 5a09fe2..6a84748 100644 --- a/src/tests/common.rs +++ b/src/tests/common.rs @@ -79,7 +79,7 @@ impl Simulator

{ Self { next_port: 1, nodes: HashMap::default(), messages: VecDeque::default() } } - pub async fn add_node(&mut self, nat: bool, config: &Config) -> SocketAddr { + pub fn add_node(&mut self, nat: bool, config: &Config) -> SocketAddr { let mut config = config.clone(); MockSocket::set_nat(nat); config.listen = format!("[::]:{}", self.next_port); @@ -89,7 +89,7 @@ impl Simulator

{ } DebugLogger::set_node(self.next_port as usize); self.next_port += 1; - let node = TestNode::new(&config, MockSocket::new(addr), MockDevice::new(), None, None).await.unwrap(); + let node = TestNode::new(&config, MockSocket::new(addr), MockDevice::new(), None, None).unwrap(); DebugLogger::set_node(0); self.nodes.insert(addr, node); @@ -97,18 +97,18 @@ impl Simulator

{ } #[allow(dead_code)] - pub async fn get_node(&mut self, addr: SocketAddr) -> &mut TestNode

{ + pub fn get_node(&mut self, addr: SocketAddr) -> &mut TestNode

{ let node = self.nodes.get_mut(&addr).unwrap(); - DebugLogger::set_node(node.get_num().await); + DebugLogger::set_node(node.get_num()); node } - pub async fn simulate_next_message(&mut self) { + pub fn simulate_next_message(&mut self) { if let Some((src, dst, data)) = self.messages.pop_front() { if let Some(node) = self.nodes.get_mut(&dst) { if node.socket().put_inbound(src, data) { - DebugLogger::set_node(node.get_num().await); - node.trigger_socket_event().await; + DebugLogger::set_node(node.get_num()); + node.trigger_socket_event(); DebugLogger::set_node(0); let sock = node.socket(); let src = dst; @@ -122,16 +122,16 @@ impl Simulator

{ } } - pub async fn simulate_all_messages(&mut self) { + pub fn simulate_all_messages(&mut self) { while !self.messages.is_empty() { - self.simulate_next_message().await + self.simulate_next_message() } } - pub async fn trigger_node_housekeep(&mut self, addr: SocketAddr) { + pub fn trigger_node_housekeep(&mut self, addr: SocketAddr) { let node = self.nodes.get_mut(&addr).unwrap(); - DebugLogger::set_node(node.get_num().await); - node.trigger_housekeep().await; + DebugLogger::set_node(node.get_num()); + node.trigger_housekeep(); DebugLogger::set_node(0); let sock = node.socket(); while let Some((dst, data)) = sock.pop_outbound() { @@ -139,10 +139,10 @@ impl Simulator

{ } } - pub async fn trigger_housekeep(&mut self) { + pub fn trigger_housekeep(&mut self) { for (src, node) in &mut self.nodes { - DebugLogger::set_node(node.get_num().await); - node.trigger_housekeep().await; + DebugLogger::set_node(node.get_num()); + node.trigger_housekeep(); DebugLogger::set_node(0); let sock = node.socket(); while let Some((dst, data)) = sock.pop_outbound() { @@ -155,20 +155,20 @@ impl Simulator

{ MockTimeSource::set_time(time); } - pub async fn simulate_time(&mut self, time: Time) { + pub fn simulate_time(&mut self, time: Time) { let mut t = MockTimeSource::now(); while t < time { t += 1; self.set_time(t); - self.trigger_housekeep().await; - self.simulate_all_messages().await; + self.trigger_housekeep(); + self.simulate_all_messages(); } } - pub async fn connect(&mut self, src: SocketAddr, dst: SocketAddr) { + pub fn connect(&mut self, src: SocketAddr, dst: SocketAddr) { let node = self.nodes.get_mut(&src).unwrap(); - DebugLogger::set_node(node.get_num().await); - node.connect(dst).await.unwrap(); + DebugLogger::set_node(node.get_num()); + node.connect(dst).unwrap(); DebugLogger::set_node(0); let sock = node.socket(); while let Some((dst, data)) = sock.pop_outbound() { @@ -190,11 +190,11 @@ impl Simulator

{ self.messages.len() } - pub async fn put_payload(&mut self, addr: SocketAddr, data: Vec) { + pub fn put_payload(&mut self, addr: SocketAddr, data: Vec) { let node = self.nodes.get_mut(&addr).unwrap(); node.device().put_inbound(data); - DebugLogger::set_node(node.get_num().await); - node.trigger_device_event().await; + DebugLogger::set_node(node.get_num()); + node.trigger_device_event(); DebugLogger::set_node(0); let sock = node.socket(); while let Some((dst, data)) = sock.pop_outbound() { diff --git a/src/tests/nat.rs b/src/tests/nat.rs index 62a1452..9593f6c 100644 --- a/src/tests/nat.rs +++ b/src/tests/nat.rs @@ -5,35 +5,35 @@ use super::common::*; #[test] -async fn connect_nat_2_peers() { +fn connect_nat_2_peers() { let config = Config { port_forwarding: false, ..Default::default() }; let mut sim = TapSimulator::new(); - let node1 = sim.add_node(true, &config).await; - let node2 = sim.add_node(true, &config).await; + let node1 = sim.add_node(true, &config); + let node2 = sim.add_node(true, &config); - sim.connect(node1, node2).await; - sim.connect(node2, node1).await; + sim.connect(node1, node2); + sim.connect(node2, node1); - sim.simulate_time(60).await; + sim.simulate_time(60); assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); } #[test] -async fn connect_nat_3_peers() { +fn connect_nat_3_peers() { let config = Config::default(); let mut sim = TapSimulator::new(); - let node1 = sim.add_node(true, &config).await; - let node2 = sim.add_node(true, &config).await; - let node3 = sim.add_node(true, &config).await; + let node1 = sim.add_node(true, &config); + let node2 = sim.add_node(true, &config); + let node3 = sim.add_node(true, &config); - sim.connect(node1, node2).await; - sim.connect(node2, node1).await; - sim.connect(node1, node3).await; - sim.connect(node3, node1).await; + sim.connect(node1, node2); + sim.connect(node2, node1); + sim.connect(node1, node3); + sim.connect(node3, node1); - sim.simulate_time(300).await; + sim.simulate_time(300); assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node1, node3)); @@ -43,19 +43,19 @@ async fn connect_nat_3_peers() { } #[test] -async fn nat_keepalive() { +fn nat_keepalive() { let config = Config::default(); let mut sim = TapSimulator::new(); - let node1 = sim.add_node(true, &config).await; - let node2 = sim.add_node(true, &config).await; - let node3 = sim.add_node(true, &config).await; + let node1 = sim.add_node(true, &config); + let node2 = sim.add_node(true, &config); + let node3 = sim.add_node(true, &config); - sim.connect(node1, node2).await; - sim.connect(node2, node1).await; - sim.connect(node1, node3).await; - sim.connect(node3, node1).await; + sim.connect(node1, node2); + sim.connect(node2, node1); + sim.connect(node1, node3); + sim.connect(node3, node1); - sim.simulate_time(1000).await; + sim.simulate_time(1000); assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node1, node3)); @@ -63,7 +63,7 @@ async fn nat_keepalive() { assert!(sim.is_connected(node2, node3)); assert!(sim.is_connected(node3, node2)); - sim.simulate_time(10000).await; + sim.simulate_time(10000); assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node1, node3)); diff --git a/src/tests/payload.rs b/src/tests/payload.rs index 67663fa..5403c67 100644 --- a/src/tests/payload.rs +++ b/src/tests/payload.rs @@ -5,50 +5,52 @@ use super::common::*; #[test] -async fn switch_delivers() { +fn switch_delivers() { let config = Config { device_type: Type::Tap, ..Config::default() }; let mut sim = TapSimulator::new(); - let node1 = sim.add_node(false, &config).await; - let node2 = sim.add_node(false, &config).await; + let node1 = sim.add_node(false, &config); + let node2 = sim.add_node(false, &config); - sim.connect(node1, node2).await; - sim.simulate_all_messages().await; + sim.connect(node1, node2); + sim.simulate_all_messages(); assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); + sim.trigger_housekeep(); let payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5]; - sim.put_payload(node1, payload.clone()).await; - sim.simulate_all_messages().await; + sim.put_payload(node1, payload.clone()); + sim.simulate_all_messages(); assert_eq!(Some(payload), sim.pop_payload(node2)); } #[test] -async fn switch_learns() { +fn switch_learns() { let config = Config { device_type: Type::Tap, ..Config::default() }; let mut sim = TapSimulator::new(); - let node1 = sim.add_node(false, &config).await; - let node2 = sim.add_node(false, &config).await; - let node3 = sim.add_node(false, &config).await; + let node1 = sim.add_node(false, &config); + let node2 = sim.add_node(false, &config); + let node3 = sim.add_node(false, &config); - sim.connect(node1, node2).await; - sim.connect(node1, node3).await; - sim.connect(node2, node3).await; - sim.simulate_all_messages().await; + sim.connect(node1, node2); + sim.connect(node1, node3); + sim.connect(node2, node3); + sim.simulate_all_messages(); assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node1, node3)); assert!(sim.is_connected(node3, node1)); assert!(sim.is_connected(node2, node3)); assert!(sim.is_connected(node3, node2)); + sim.trigger_housekeep(); let payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5]; // Nothing learnt so far, node1 broadcasts - sim.put_payload(node1, payload.clone()).await; - sim.simulate_all_messages().await; + sim.put_payload(node1, payload.clone()); + sim.simulate_all_messages(); assert_eq!(Some(payload.clone()), sim.pop_payload(node2)); assert_eq!(Some(payload), sim.pop_payload(node3)); @@ -57,38 +59,39 @@ async fn switch_learns() { // Node 2 learned the address by receiving it, does not broadcast - sim.put_payload(node2, payload.clone()).await; - sim.simulate_all_messages().await; + sim.put_payload(node2, payload.clone()); + sim.simulate_all_messages(); assert_eq!(Some(payload), sim.pop_payload(node1)); assert_eq!(None, sim.pop_payload(node3)); } #[test] -async fn switch_honours_vlans() { +fn switch_honours_vlans() { let config = Config { device_type: Type::Tap, ..Config::default() }; let mut sim = TapSimulator::new(); - let node1 = sim.add_node(false, &config).await; - let node2 = sim.add_node(false, &config).await; - let node3 = sim.add_node(false, &config).await; + let node1 = sim.add_node(false, &config); + let node2 = sim.add_node(false, &config); + let node3 = sim.add_node(false, &config); - sim.connect(node1, node2).await; - sim.connect(node1, node3).await; - sim.connect(node2, node3).await; - sim.simulate_all_messages().await; + sim.connect(node1, node2); + sim.connect(node1, node3); + sim.connect(node2, node3); + sim.simulate_all_messages(); assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); assert!(sim.is_connected(node1, node3)); assert!(sim.is_connected(node3, node1)); assert!(sim.is_connected(node2, node3)); assert!(sim.is_connected(node3, node2)); + sim.trigger_housekeep(); let payload = vec![2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 0x81, 0, 0, 0x67, 1, 2, 3, 4, 5]; // Nothing learnt so far, node1 broadcasts - sim.put_payload(node1, payload.clone()).await; - sim.simulate_all_messages().await; + sim.put_payload(node1, payload.clone()); + sim.simulate_all_messages(); assert_eq!(Some(payload.clone()), sim.pop_payload(node2)); assert_eq!(Some(payload), sim.pop_payload(node3)); @@ -97,8 +100,8 @@ async fn switch_honours_vlans() { // Node 2 learned the address by receiving it, does not broadcast - sim.put_payload(node2, payload.clone()).await; - sim.simulate_all_messages().await; + sim.put_payload(node2, payload.clone()); + sim.simulate_all_messages(); assert_eq!(Some(payload), sim.pop_payload(node1)); assert_eq!(None, sim.pop_payload(node3)); @@ -107,8 +110,8 @@ async fn switch_honours_vlans() { // Different VLANs, node 2 does not learn, still broadcasts - sim.put_payload(node2, payload.clone()).await; - sim.simulate_all_messages().await; + sim.put_payload(node2, payload.clone()); + sim.simulate_all_messages(); assert_eq!(Some(payload.clone()), sim.pop_payload(node1)); assert_eq!(Some(payload), sim.pop_payload(node3)); @@ -116,13 +119,13 @@ async fn switch_honours_vlans() { #[test] #[ignore] -async fn switch_forgets() { +fn switch_forgets() { // TODO Test unimplemented!() } #[test] -async fn router_delivers() { +fn router_delivers() { let config1 = Config { device_type: Type::Tun, auto_claim: false, @@ -136,24 +139,24 @@ async fn router_delivers() { ..Config::default() }; let mut sim = TunSimulator::new(); - let node1 = sim.add_node(false, &config1).await; - let node2 = sim.add_node(false, &config2).await; + let node1 = sim.add_node(false, &config1); + let node2 = sim.add_node(false, &config2); - sim.connect(node1, node2).await; - sim.simulate_all_messages().await; + sim.connect(node1, node2); + sim.simulate_all_messages(); assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); let payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2]; - sim.put_payload(node1, payload.clone()).await; - sim.simulate_all_messages().await; + sim.put_payload(node1, payload.clone()); + sim.simulate_all_messages(); assert_eq!(Some(payload), sim.pop_payload(node2)); } #[test] -async fn router_drops_unknown_dest() { +fn router_drops_unknown_dest() { let config1 = Config { device_type: Type::Tun, auto_claim: false, @@ -167,18 +170,18 @@ async fn router_drops_unknown_dest() { ..Config::default() }; let mut sim = TunSimulator::new(); - let node1 = sim.add_node(false, &config1).await; - let node2 = sim.add_node(false, &config2).await; + let node1 = sim.add_node(false, &config1); + let node2 = sim.add_node(false, &config2); - sim.connect(node1, node2).await; - sim.simulate_all_messages().await; + sim.connect(node1, node2); + sim.simulate_all_messages(); assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); let payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 3, 3, 3, 3]; - sim.put_payload(node1, payload).await; - sim.simulate_all_messages().await; + sim.put_payload(node1, payload); + sim.simulate_all_messages(); assert_eq!(None, sim.pop_payload(node2)); } diff --git a/src/tests/peers.rs b/src/tests/peers.rs index ce65df4..8c56182 100644 --- a/src/tests/peers.rs +++ b/src/tests/peers.rs @@ -5,47 +5,47 @@ use super::common::*; #[test] -async fn direct_connect() { +fn direct_connect() { let config = Config::default(); let mut sim = TapSimulator::new(); - let node1 = sim.add_node(false, &config).await; - let node2 = sim.add_node(false, &config).await; + let node1 = sim.add_node(false, &config); + let node2 = sim.add_node(false, &config); - sim.connect(node1, node2).await; - sim.simulate_all_messages().await; + sim.connect(node1, node2); + sim.simulate_all_messages(); assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); } #[test] -async fn direct_connect_unencrypted() { +fn direct_connect_unencrypted() { let config = Config { crypto: CryptoConfig { algorithms: vec!["plain".to_string()], ..CryptoConfig::default() }, ..Config::default() }; let mut sim = TapSimulator::new(); - let node1 = sim.add_node(false, &config).await; - let node2 = sim.add_node(false, &config).await; + let node1 = sim.add_node(false, &config); + let node2 = sim.add_node(false, &config); - sim.connect(node1, node2).await; - sim.simulate_all_messages().await; + sim.connect(node1, node2); + sim.simulate_all_messages(); assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); } #[test] -async fn cross_connect() { +fn cross_connect() { let config = Config::default(); let mut sim = TapSimulator::new(); - let node1 = sim.add_node(false, &config).await; - let node2 = sim.add_node(false, &config).await; - let node3 = sim.add_node(false, &config).await; + let node1 = sim.add_node(false, &config); + let node2 = sim.add_node(false, &config); + let node3 = sim.add_node(false, &config); - sim.connect(node1, node2).await; - sim.connect(node1, node3).await; - sim.simulate_all_messages().await; + sim.connect(node1, node2); + sim.connect(node1, node3); + sim.simulate_all_messages(); - sim.simulate_time(120).await; + sim.simulate_time(120); assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); @@ -56,124 +56,124 @@ async fn cross_connect() { } #[test] -async fn connect_via_beacons() { +fn connect_via_beacons() { let mut sim = TapSimulator::new(); let beacon_path = "target/.vpncloud_test"; let config1 = Config { beacon_store: Some(beacon_path.to_string()), ..Default::default() }; - let node1 = sim.add_node(false, &config1).await; + let node1 = sim.add_node(false, &config1); let config2 = Config { beacon_load: Some(beacon_path.to_string()), ..Default::default() }; - let node2 = sim.add_node(false, &config2).await; + let node2 = sim.add_node(false, &config2); sim.set_time(100); - sim.trigger_node_housekeep(node1).await; - sim.trigger_node_housekeep(node2).await; - sim.simulate_all_messages().await; + sim.trigger_node_housekeep(node1); + sim.trigger_node_housekeep(node2); + sim.simulate_all_messages(); assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); } #[test] -async fn reconnect_after_timeout() { +fn reconnect_after_timeout() { let config = Config::default(); let mut sim = TapSimulator::new(); - let node1 = sim.add_node(false, &config).await; - let node2 = sim.add_node(false, &config).await; + let node1 = sim.add_node(false, &config); + let node2 = sim.add_node(false, &config); - sim.connect(node1, node2).await; - sim.simulate_all_messages().await; + sim.connect(node1, node2); + sim.simulate_all_messages(); assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); sim.set_time(5000); - sim.trigger_housekeep().await; + sim.trigger_housekeep(); assert!(!sim.is_connected(node1, node2)); assert!(!sim.is_connected(node2, node1)); - sim.simulate_all_messages().await; + sim.simulate_all_messages(); assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); } #[test] -async fn lost_init_ping() { +fn lost_init_ping() { let config = Config::default(); let mut sim = TapSimulator::new(); - let node1 = sim.add_node(false, &config).await; - let node2 = sim.add_node(false, &config).await; + let node1 = sim.add_node(false, &config); + let node2 = sim.add_node(false, &config); - sim.connect(node1, node2).await; + sim.connect(node1, node2); sim.drop_message(); // drop init ping - sim.simulate_time(120).await; + sim.simulate_time(120); assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); } #[test] -async fn lost_init_pong() { +fn lost_init_pong() { let config = Config::default(); let mut sim = TapSimulator::new(); - let node1 = sim.add_node(false, &config).await; - let node2 = sim.add_node(false, &config).await; + let node1 = sim.add_node(false, &config); + let node2 = sim.add_node(false, &config); - sim.connect(node1, node2).await; - sim.simulate_next_message().await; // init ping + sim.connect(node1, node2); + sim.simulate_next_message(); // init ping sim.drop_message(); // drop init pong - sim.simulate_time(120).await; + sim.simulate_time(120); assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); } #[test] -async fn lost_init_peng() { +fn lost_init_peng() { let config = Config::default(); let mut sim = TapSimulator::new(); - let node1 = sim.add_node(false, &config).await; - let node2 = sim.add_node(false, &config).await; + let node1 = sim.add_node(false, &config); + let node2 = sim.add_node(false, &config); - sim.connect(node1, node2).await; - sim.simulate_next_message().await; // init ping - sim.simulate_next_message().await; // init pong + sim.connect(node1, node2); + sim.simulate_next_message(); // init ping + sim.simulate_next_message(); // init pong sim.drop_message(); // drop init peng - sim.simulate_time(120).await; + sim.simulate_time(120); assert!(sim.is_connected(node1, node2)); assert!(sim.is_connected(node2, node1)); } #[test] #[ignore] -async fn peer_exchange() { +fn peer_exchange() { // TODO Test unimplemented!() } #[test] #[ignore] -async fn lost_peer_exchange() { +fn lost_peer_exchange() { // TODO Test unimplemented!() } #[test] #[ignore] -async fn remove_dead_peers() { +fn remove_dead_peers() { // TODO Test unimplemented!() } #[test] #[ignore] -async fn update_primary_address() { +fn update_primary_address() { // TODO Test unimplemented!() } #[test] #[ignore] -async fn automatic_peer_timeout() { +fn automatic_peer_timeout() { // TODO Test unimplemented!() } diff --git a/src/traffic.rs b/src/traffic.rs index f6451e5..a047758 100644 --- a/src/traffic.rs +++ b/src/traffic.rs @@ -70,6 +70,18 @@ impl TrafficEntry { self.out_bytes = 0; self.in_bytes = 0; } + + fn clear(&mut self) { + self.in_bytes = 0; + self.out_bytes = 0; + self.in_packets = 0; + self.out_packets = 0; + self.idle_periods = 0; + self.in_bytes_total = 0; + self.in_packets_total = 0; + self.out_bytes_total = 0; + self.out_packets_total = 0; + } } #[derive(Default)] @@ -203,4 +215,28 @@ impl TrafficStats { )?; Ok(()) } + + pub fn add(&mut self, other: &Self) { + for (addr, data) in &other.peers { + if let Some(entry) = self.peers.get_mut(addr) { + *entry += data + } else { + self.peers.insert(*addr, data.clone()); + } + } + for (key, data) in &other.payload { + if let Some(entry) = self.payload.get_mut(key) { + *entry += data + } else { + self.payload.insert(*key, data.clone()); + } + } + self.dropped += &other.dropped + } + + pub fn clear(&mut self) { + self.peers.clear(); + self.payload.clear(); + self.dropped.clear(); + } } diff --git a/src/types.rs b/src/types.rs index 941f536..dba7d57 100644 --- a/src/types.rs +++ b/src/types.rs @@ -233,7 +233,7 @@ mod tests { use std::io::Cursor; #[test] - async fn address_parse_fmt() { + fn address_parse_fmt() { assert_eq!(format!("{}", Address::from_str("120.45.22.5").unwrap()), "120.45.22.5"); assert_eq!(format!("{}", Address::from_str("78:2d:16:05:01:02").unwrap()), "78:2d:16:05:01:02"); assert_eq!( @@ -249,7 +249,7 @@ mod tests { } #[test] - async fn address_decode_encode() { + fn address_decode_encode() { let mut buf = vec![]; let addr = Address::from_str("120.45.22.5").unwrap(); addr.write_to(Cursor::new(&mut buf)); @@ -270,7 +270,7 @@ mod tests { } #[test] - async fn address_eq() { + fn address_eq() { assert_eq!( Address::read_from_fixed(Cursor::new(&[1, 2, 3, 4]), 4).unwrap(), Address::read_from_fixed(Cursor::new(&[1, 2, 3, 4]), 4).unwrap() @@ -290,7 +290,7 @@ mod tests { } #[test] - async fn address_range_decode_encode() { + fn address_range_decode_encode() { let mut buf = vec![]; let range = Range { base: Address { data: [0, 1, 2, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], len: 4 }, prefix_len: 24 }; diff --git a/src/util.rs b/src/util.rs index bbd9d69..80beddd 100644 --- a/src/util.rs +++ b/src/util.rs @@ -3,6 +3,7 @@ // This software is licensed under GPL-3 or newer (see LICENSE.md) use std::process::Command; +use std::time::Instant; use std::{ fmt, net::{Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket}, @@ -11,6 +12,8 @@ use std::{ use crate::error::Error; +use signal::trap::Trap; +use signal::Signal; #[cfg(not(target_os = "linux"))] use time; @@ -260,6 +263,38 @@ impl fmt::Display for Bytes { } } +pub struct CtrlC { + dummy_time: Instant, + trap: Trap, +} + +impl CtrlC { + pub fn new() -> Self { + Default::default() + } + + pub fn was_pressed(&self) -> bool { + self.trap.wait(self.dummy_time).is_some() + } + + pub fn wait(&self) { + loop { + let deadline = Instant::now() + std::time::Duration::from_secs(10); + if self.trap.wait(deadline).is_some() { + return; + } + } + } +} + +impl Default for CtrlC { + fn default() -> Self { + let dummy_time = Instant::now(); + let trap = Trap::trap(&[Signal::SIGINT, Signal::SIGTERM, Signal::SIGQUIT]); + Self { dummy_time, trap } + } +} + pub trait TimeSource: Sync + Copy + Send + 'static { fn now() -> Time; } @@ -402,7 +437,7 @@ pub fn run_cmd(mut cmd: Command) { } #[test] -async fn base62() { +fn base62() { assert_eq!("", to_base62(&[0])); assert_eq!("z", to_base62(&[61])); assert_eq!("10", to_base62(&[62])); diff --git a/src/wsproxy.rs b/src/wsproxy.rs index 26c43fb..0f6eba1 100644 --- a/src/wsproxy.rs +++ b/src/wsproxy.rs @@ -3,19 +3,13 @@ // This software is licensed under GPL-3 or newer (see LICENSE.md) use super::{ - net::{get_ip, mapped_addr, parse_listen, Socket, SocketBuilder}, + net::{get_ip, mapped_addr, parse_listen, Socket}, poll::{WaitImpl, WaitResult}, port_forwarding::PortForwarding, util::MsgBuffer, }; -use async_trait::async_trait; use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt}; -use std::{ - io::{self, Cursor, Read, Write}, - net::{Ipv6Addr, SocketAddr, SocketAddrV6, TcpListener, TcpStream, UdpSocket}, - os::unix::io::AsRawFd, - sync::Arc, -}; +use std::{io::{self, Cursor, Read, Write}, net::{Ipv6Addr, SocketAddr, SocketAddrV6, TcpListener, TcpStream, UdpSocket}, os::unix::io::AsRawFd, sync::Arc, thread}; use tungstenite::{client::AutoStream, connect, protocol::WebSocket, server::accept, Message}; use url::Url; @@ -99,7 +93,7 @@ pub fn run_proxy(listen: &str) -> Result<(), io::Error> { for stream in server.incoming() { let stream = stream?; let peer = stream.peer_addr()?; - tokio::spawn(async move { + thread::spawn(move || { if let Err(err) = serve_proxy_connection(stream) { error!("Error on connection {}: {}", peer, err); } @@ -138,21 +132,12 @@ impl ProxyConnection { } } -impl SocketBuilder for ProxyConnection { - type SocketType = ProxyConnection; - - fn build(self) -> Result { - Ok(self) - } - +impl Socket for ProxyConnection { fn create_port_forwarding(&self) -> Option { None } -} -#[async_trait] -impl Socket for ProxyConnection { - async fn receive(&mut self, buffer: &mut MsgBuffer) -> Result { + fn receive(&mut self, buffer: &mut MsgBuffer) -> Result { buffer.clear(); let data = self.read_message()?; let addr = read_addr(Cursor::new(&data))?; @@ -160,7 +145,7 @@ impl Socket for ProxyConnection { Ok(addr) } - async fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result { + fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result { let mut msg = Vec::with_capacity(data.len() + 18); write_addr(addr, &mut msg)?; msg.write_all(data)?; @@ -171,7 +156,7 @@ impl Socket for ProxyConnection { */ } - async fn address(&self) -> Result { + fn address(&self) -> Result { Ok(self.addr) } }