From a03d3672939d38d3a91c6bd931d63eb57c9a6803 Mon Sep 17 00:00:00 2001 From: Dennis Schwerdel Date: Tue, 11 May 2021 23:38:03 +0200 Subject: [PATCH] Fix daemonize, break websocket and port-forward --- Cargo.lock | 4 +- Cargo.toml | 2 +- contrib/aws/common.py | 41 +++++----- contrib/aws/example.py | 2 +- contrib/aws/performance.py | 14 ++-- contrib/aws/quick_perf.py | 100 +++++++++++++++++++++++ contrib/aws/testnet.py | 4 +- src/device.rs | 159 +++++++++++++++++++----------------- src/engine/shared.rs | 17 ++++ src/engine/socket_thread.rs | 1 + src/main.rs | 115 ++++++++++++++++---------- src/net.rs | 30 +++---- src/wsproxy.rs | 2 + 13 files changed, 327 insertions(+), 164 deletions(-) create mode 100755 contrib/aws/quick_perf.py diff --git a/Cargo.lock b/Cargo.lock index 038e311..4e939e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1143,9 +1143,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.2.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8190d04c665ea9e6b6a0dc45523ade572c088d2e6566244c1122671dbf4ae3a" +checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5" dependencies = [ "autocfg", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 5316af7..bd23ad1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ dialoguer = { version = "0.8", optional = true } tungstenite = { version = "0.13", optional = true, default-features = false } url = { version = "2.2", optional = true } igd = { version = "0.12", optional = true } -tokio = { version = "1", features = ["full"] } +tokio = { version = "^1.5", features = ["full"] } async-trait = "0.1" diff --git a/contrib/aws/common.py b/contrib/aws/common.py index bb7e9fc..35ccf88 100644 --- a/contrib/aws/common.py +++ b/contrib/aws/common.py @@ -49,25 +49,27 @@ class Node: def run_cmd(self, cmd): return run_cmd(self.connection, cmd) - def start_vpncloud(self, ip=None, crypto=None, password="test", device_type="tun", listen="3210", mode="normal", peers=[], claims=[]): + def start_vpncloud(self, ip=None, crypto=None, password="test", device_type="tun", listen="3210", mode="normal", peers=[], claims=[], logfile="/var/log/vpncloud.log", extra_args=[]): args = [ "--daemon", "--no-port-forwarding", - "-t {}".format(device_type), - "-m {}".format(mode), - "-l {}".format(listen), - "--password '{}'".format(password) + f"-t {device_type}", + f"-m {mode}", + f"-l {listen}", + f"--password '{password}'", + f"--log-file '{logfile}'", + *extra_args ] if ip: - args.append("--ip {}".format(ip)) + args.append(f"--ip {ip}") if crypto: - args.append("--algo {}".format(crypto)) + args.append(f"--algo {crypto}") for p in peers: - args.append("-c {}".format(p)) + args.append(f"-c {p}") for c in claims: - args.append("--claim {}".format(c)) + args.append(f"--claim {c}") args = " ".join(args) - self.run_cmd("sudo vpncloud {}".format(args)) + self.run_cmd(f"sudo vpncloud {args}") def stop_vpncloud(self, wait=True): self.run_cmd("sudo killall vpncloud") @@ -75,7 +77,7 @@ class Node: time.sleep(3.0) def ping(self, dst, size=100, count=10, interval=0.001): - (out, _) = self.run_cmd('sudo ping {dst} -c {count} -i {interval} -s {size} -U -q'.format(dst=dst, size=size, count=count, interval=interval)) + (out, _) = self.run_cmd(f'sudo ping {dst} -c {count} -i {interval} -s {size} -U -q') match = re.search(r'([\d]*\.[\d]*)/([\d]*\.[\d]*)/([\d]*\.[\d]*)/([\d]*\.[\d]*)', out) ping_min = float(match.group(1)) ping_avg = float(match.group(2)) @@ -97,7 +99,7 @@ class Node: self.run_cmd('killall iperf3') def run_iperf(self, dst, duration): - (out, _) = self.run_cmd('iperf3 -c {dst} -t {duration} --json'.format(dst=dst, duration=duration)) + (out, _) = self.run_cmd(f'iperf3 -c {dst} -t {duration} --json') data = json.loads(out) return { "throughput": data['end']['streams'][0]['receiver']['bits_per_second'], @@ -197,7 +199,7 @@ class EC2Environment: if self.subnet == CREATE: self.setup_vpc() else: - eprint("\tUsing subnet {}".format(self.subnet)) + eprint(f"\tUsing subnet {self.subnet}") vpc = ec2.Subnet(self.subnet).vpc @@ -209,7 +211,7 @@ class EC2Environment: sg.authorize_ingress(CidrIp='172.16.1.0/24', IpProtocol='udp', FromPort=0, ToPort=65535) if self.keyname == CREATE: - key_pair = ec2.create_key_pair(KeyName="{}-keypair".format(self.tag)) + key_pair = ec2.create_key_pair(KeyName=f"{self.tag}-keypair") self.track_resource(key_pair) self.keyname = key_pair.name self.privatekey = key_pair.key_material @@ -217,7 +219,7 @@ class EC2Environment: placement = {} if self.cluster_nodes: - placement_group = ec2.create_placement_group(GroupName="{}-placement".format(self.tag), Strategy="cluster") + placement_group = ec2.create_placement_group(GroupName=f"{self.tag}-placement", Strategy="cluster") self.track_resource(placement_group) placement = { 'GroupName': placement_group.name } @@ -227,12 +229,11 @@ packages: - socat """ if not self.vpncloud_file: - userdata += """ + userdata += f""" runcmd: - - wget https://github.com/dswd/vpncloud/releases/download/v{version}/vpncloud_{version}.x86_64.rpm -O /tmp/vpncloud.rpm + - wget https://github.com/dswd/vpncloud/releases/download/v{self.vpncloud_version}/vpncloud_{self.vpncloud_version}.x86_64.rpm -O /tmp/vpncloud.rpm - yum install -y /tmp/vpncloud.rpm -""".format(version=self.vpncloud_version) - +""" if self.use_spot: response = ec2client.request_spot_instances( SpotPrice = self.max_price, @@ -368,7 +369,7 @@ runcmd: eprint("Deleting resources...") ec2client = boto3.client('ec2', region_name=self.region) for res in reversed(self.resources): - eprint("\t{} {}".format(res.__class__.__name__, res.id if hasattr(res, "id") else "")) + eprint(f"\t{res.__class__.__name__} {res.id if hasattr(res, 'id') else ''}") if isinstance(res, SpotInstanceRequest): ec2client.cancel_spot_instance_requests(SpotInstanceRequestIds=[res.id]) if hasattr(res, "attachments"): diff --git a/contrib/aws/example.py b/contrib/aws/example.py index 1be4286..bd02e0b 100755 --- a/contrib/aws/example.py +++ b/contrib/aws/example.py @@ -14,7 +14,7 @@ sender = setup.nodes[0] receiver = setup.nodes[1] sender.start_vpncloud(ip="10.0.0.1/24") -receiver.start_vpncloud(ip="10.0.0.2/24", peers=["{}:3210".format(sender.private_ip)]) +receiver.start_vpncloud(ip="10.0.0.2/24", peers=[f"{sender.private_ip}:3210"]) time.sleep(1.0) sender.ping("10.0.0.2") diff --git a/contrib/aws/performance.py b/contrib/aws/performance.py index 15d7f8e..d914b35 100755 --- a/contrib/aws/performance.py +++ b/contrib/aws/performance.py @@ -47,11 +47,11 @@ class PerfTest: return cls(env.nodes[0], env.nodes[1], meta) def run_ping(self, dst, size): - eprint("\tRunning ping {} with size {} ...".format(dst, size)) + eprint(f"\tRunning ping {dst} with size {size} ...") return self.sender.ping(dst=dst, size=size, count=30000, interval=0.001) def run_iperf(self, dst): - eprint("\tRunning iperf on {} ...".format(dst)) + eprint(f"\tRunning iperf on {dst} ...") self.receiver.start_iperf_server() time.sleep(0.1) result = self.sender.run_iperf(dst=dst, duration=30) @@ -68,9 +68,9 @@ class PerfTest: def start_vpncloud(self, crypto=None): eprint("\tSetting up vpncloud on receiver") - self.receiver.start_vpncloud(crypto=crypto, ip="{}/24".format(self.receiver_ip_vpncloud)) + self.receiver.start_vpncloud(crypto=crypto, ip=f"{self.receiver_ip_vpncloud}/24") eprint("\tSetting up vpncloud on sender") - self.sender.start_vpncloud(crypto=crypto, peers=["{}:3210".format(self.receiver.private_ip)], ip="{}/24".format(self.sender_ip_vpncloud)) + self.sender.start_vpncloud(crypto=crypto, peers=[f"{self.receiver_ip_vpncloud}:3210"], ip=f"{self.sender_ip_vpncloud}/24") time.sleep(1.0) def stop_vpncloud(self): @@ -84,7 +84,7 @@ class PerfTest: "native": self.run_suite(self.receiver.private_ip) } for crypto in CRYPTO: - eprint("Running with crypto {}".format(crypto)) + eprint(f"Running with crypto {crypto}") self.start_vpncloud(crypto=crypto) res = self.run_suite(self.receiver_ip_vpncloud) self.stop_vpncloud() @@ -109,8 +109,8 @@ duration = time.time() - start results["meta"]["duration"] = duration -name = "measurements/{date}_{version}_perf.json".format(date=date.today().strftime('%Y-%m-%d'), version=VERSION) -eprint('Storing results in {}'.format(name)) +name = f"measurements/{date.today().strftime('%Y-%m-%d')}_{VERSION}_perf.json" +eprint(f'Storing results in {name}') with open(name, 'w') as fp: json.dump(results, fp, indent=2) eprint("done.") diff --git a/contrib/aws/quick_perf.py b/contrib/aws/quick_perf.py new file mode 100755 index 0000000..87d324f --- /dev/null +++ b/contrib/aws/quick_perf.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 + +from common import EC2Environment, CREATE, eprint +import time, json, os, atexit +from datetime import date + + +# Note: this script will run for ~8 minutes and incur costs of about $ 0.02 + +FILE = "../../target/release/vpncloud" +VERSION = "2.2.0" +REGION = "eu-central-1" + +env = EC2Environment( + region = REGION, + node_count = 2, + instance_type = "m5.large", + use_spot = True, + max_price = "0.08", # USD per hour per VM + vpncloud_version = VERSION, + vpncloud_file = FILE, + cluster_nodes = True, + subnet = CREATE, + keyname = CREATE +) + + +CRYPTO = ["plain", "aes256", "aes128", "chacha20"] + + +class PerfTest: + def __init__(self, sender, receiver): + self.sender = sender + self.receiver = receiver + self.sender_ip_vpncloud = "10.0.0.1" + self.receiver_ip_vpncloud = "10.0.0.2" + + @classmethod + def from_ec2_env(cls, env): + return cls(env.nodes[0], env.nodes[1]) + + def run_ping(self, dst, size): + eprint(f"\tRunning ping {dst} with size {size} ...") + return self.sender.ping(dst=dst, size=size, count=30000, interval=0.001) + + def run_iperf(self, dst): + eprint(f"\tRunning iperf on {dst} ...") + self.receiver.start_iperf_server() + time.sleep(0.1) + result = self.sender.run_iperf(dst=dst, duration=30) + self.receiver.stop_iperf_server() + return result + + def start_vpncloud(self): + eprint("\tSetting up vpncloud on receiver") + self.receiver.start_vpncloud(ip=f"{self.receiver_ip_vpncloud}/24") + eprint("\tSetting up vpncloud on sender") + self.sender.start_vpncloud(peers=[f"{self.receiver.private_ip}:3210"], ip=f"{self.sender_ip_vpncloud}/24") + time.sleep(1.0) + + def stop_vpncloud(self): + self.sender.stop_vpncloud(wait=False) + self.receiver.stop_vpncloud(wait=True) + + def run(self): + print() + self.start_vpncloud() + throughput = self.run_iperf(self.receiver_ip_vpncloud)["throughput"] + print(f"Throughput: {throughput / 1_000_000.0} MBit/s") + native_ping_100 = self.run_ping(self.receiver.private_ip, 100)["rtt_avg"] + ping_100 = self.run_ping(self.receiver_ip_vpncloud, 100)["rtt_avg"] + print(f"Latency 100: +{(ping_100 - native_ping_100)*1000.0/2.0} µs") + native_ping_1000 = self.run_ping(self.receiver.private_ip, 1000)["rtt_avg"] + ping_1000 = self.run_ping(self.receiver_ip_vpncloud, 1000)["rtt_avg"] + print(f"Latency 1000: +{(ping_1000 - native_ping_1000)*1000.0/2.0} µs") + self.stop_vpncloud() + +keyfile = "key.pem" +assert not os.path.exists(keyfile) +with open(keyfile, 'x') as fp: + fp.write(env.privatekey) +os.chmod(keyfile, 0o400) +print(f"SSH private key written to {keyfile}") +atexit.register(lambda : os.remove(keyfile)) +print() +print("Nodes:") +for node in env.nodes: + print(f"\t {env.username}@{node.public_ip}\tprivate: {node.private_ip}") +print() + +perf = PerfTest.from_ec2_env(env) + +try: + perf.run() +except Exception as e: + eprint(f"Exception: {e}") + print("Press ENTER to shut down") + input() + +eprint("done.") \ No newline at end of file diff --git a/contrib/aws/testnet.py b/contrib/aws/testnet.py index af52448..2baae45 100755 --- a/contrib/aws/testnet.py +++ b/contrib/aws/testnet.py @@ -48,13 +48,13 @@ if not args.keyname: with open(args.keyfile, 'x') as fp: fp.write(setup.privatekey) os.chmod(args.keyfile, 0o400) - print("SSH private key written to {}".format(args.keyfile)) + print(f"SSH private key written to {args.keyfile}") atexit.register(lambda : os.remove(args.keyfile)) print() print("Nodes:") for node in setup.nodes: - print("\t {}@{}\tprivate: {}".format(setup.username, node.public_ip, node.private_ip)) + print(f"\t {setup.username}@{node.public_ip}\tprivate: {node.private_ip}") print() print("Press ENTER to shut down") diff --git a/src/device.rs b/src/device.rs index 57d2115..5c61fd4 100644 --- a/src/device.rs +++ b/src/device.rs @@ -9,15 +9,16 @@ use std::{ collections::VecDeque, convert::TryInto, fmt, - io::{self, Cursor, Error as IoError}, + io::{self, Cursor, Read, Write, Error as IoError, BufReader, BufRead}, net::{Ipv4Addr, UdpSocket}, + fs::{self, File}, os::unix::io::AsRawFd, str, str::FromStr, sync::Arc, }; -use tokio::fs::{self, File}; -use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; +use tokio::fs::{File as AsyncFile}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use crate::{crypto, error::Error, util::MsgBuffer}; @@ -83,9 +84,6 @@ pub trait Device: Send + 'static + Sized { /// Returns the type of this device fn get_type(&self) -> Type; - /// Returns the interface name of this device. - fn ifname(&self) -> &str; - /// Reads a packet/frame from the device /// /// This method reads one packet or frame (depending on the device type) into the `buffer`. @@ -141,9 +139,9 @@ impl TunTapDevice { /// # Panics /// This method panics if the interface name is longer than 31 bytes. #[allow(clippy::useless_conversion)] - pub async fn new(ifname: &str, type_: Type, path: Option<&str>) -> io::Result { + pub fn new(ifname: &str, type_: Type, path: Option<&str>) -> io::Result { let path = path.unwrap_or_else(|| Self::default_path(type_)); - let fd = fs::OpenOptions::new().read(true).write(true).open(path).await?; + let fd = fs::OpenOptions::new().read(true).write(true).open(path)?; let flags = match type_ { Type::Tun => libc::IFF_TUN | libc::IFF_NO_PI, Type::Tap => libc::IFF_TAP | libc::IFF_NO_PI, @@ -155,7 +153,7 @@ impl TunTapDevice { 0 => { let mut ifname = String::with_capacity(32); let mut cursor = Cursor::new(ifreq.ifr_name); - cursor.read_to_string(&mut ifname).await?; + Read::read_to_string(&mut cursor, &mut ifname)?; ifname = ifname.trim_end_matches('\0').to_owned(); Ok(Self { fd, ifname, type_ }) } @@ -163,6 +161,10 @@ impl TunTapDevice { } } + pub fn ifname(&self) -> &str { + &self.ifname + } + /// Returns the default device path for a given type #[inline] pub fn default_path(type_: Type) -> &'static str { @@ -171,6 +173,69 @@ impl TunTapDevice { } } + pub fn get_overhead(&self) -> usize { + 40 /* for outer IPv6 header, can't be sure to only have IPv4 peers */ + + 8 /* for outer UDP header */ + + crypto::EXTRA_LEN + crypto::TAG_LEN /* crypto overhead */ + + 1 /* message type header */ + + match self.type_ { + Type::Tap => 14, /* inner ethernet header */ + Type::Tun => 0 + } + } + + pub fn set_mtu(&self, value: Option) -> io::Result<()> { + let value = match value { + Some(value) => value, + None => { + let default_device = get_default_device()?; + info!("Deriving MTU from default device {}", default_device); + get_device_mtu(&default_device)? - self.get_overhead() + } + }; + info!("Setting MTU {} on device {}", value, self.ifname); + set_device_mtu(&self.ifname, value) + } + + pub fn configure(&self, addr: Ipv4Addr, netmask: Ipv4Addr) -> io::Result<()> { + set_device_addr(&self.ifname, addr)?; + set_device_netmask(&self.ifname, netmask)?; + set_device_enabled(&self.ifname, true) + } + + pub fn get_rp_filter(&self) -> io::Result { + Ok(cmp::max(get_rp_filter("all")?, get_rp_filter(&self.ifname)?)) + } + + pub fn fix_rp_filter(&self) -> io::Result<()> { + if get_rp_filter("all")? > 1 { + info!("Setting net.ipv4.conf.all.rp_filter=1"); + set_rp_filter("all", 1)? + } + if get_rp_filter(&self.ifname)? != 1 { + info!("Setting net.ipv4.conf.{}.rp_filter=1", self.ifname); + set_rp_filter(&self.ifname, 1)? + } + Ok(()) + } +} + +/// Represents a tun/tap device +pub struct AsyncTunTapDevice { + fd: AsyncFile, + ifname: String, + type_: Type, +} + +impl AsyncTunTapDevice { + pub fn from_sync(dev: TunTapDevice) -> Self { + Self { + fd: AsyncFile::from_std(dev.fd), + ifname: dev.ifname, + type_: dev.type_ + } + } + #[cfg(any(target_os = "linux", target_os = "android"))] #[inline] fn correct_data_after_read(&mut self, _buffer: &mut MsgBuffer) {} @@ -219,64 +284,14 @@ impl TunTapDevice { } } } - - pub fn get_overhead(&self) -> usize { - 40 /* for outer IPv6 header, can't be sure to only have IPv4 peers */ - + 8 /* for outer UDP header */ - + crypto::EXTRA_LEN + crypto::TAG_LEN /* crypto overhead */ - + 1 /* message type header */ - + match self.type_ { - Type::Tap => 14, /* inner ethernet header */ - Type::Tun => 0 - } - } - - pub async fn set_mtu(&self, value: Option) -> io::Result<()> { - let value = match value { - Some(value) => value, - None => { - let default_device = get_default_device().await?; - info!("Deriving MTU from default device {}", default_device); - get_device_mtu(&default_device)? - self.get_overhead() - } - }; - info!("Setting MTU {} on device {}", value, self.ifname); - set_device_mtu(&self.ifname, value) - } - - pub fn configure(&self, addr: Ipv4Addr, netmask: Ipv4Addr) -> io::Result<()> { - set_device_addr(&self.ifname, addr)?; - set_device_netmask(&self.ifname, netmask)?; - set_device_enabled(&self.ifname, true) - } - - pub async fn get_rp_filter(&self) -> io::Result { - Ok(cmp::max(get_rp_filter("all").await?, get_rp_filter(&self.ifname).await?)) - } - - pub async fn fix_rp_filter(&self) -> io::Result<()> { - if get_rp_filter("all").await? > 1 { - info!("Setting net.ipv4.conf.all.rp_filter=1"); - set_rp_filter("all", 1).await? - } - if get_rp_filter(&self.ifname).await? != 1 { - info!("Setting net.ipv4.conf.{}.rp_filter=1", self.ifname); - set_rp_filter(&self.ifname, 1).await? - } - Ok(()) - } } #[async_trait] -impl Device for TunTapDevice { +impl Device for AsyncTunTapDevice { fn get_type(&self) -> Type { self.type_ } - fn ifname(&self) -> &str { - &self.ifname - } - async fn duplicate(&self) -> Result { Ok(Self { fd: self.fd.try_clone().await.map_err(|e| Error::DeviceIo("Failed to clone device", e))?, @@ -340,10 +355,6 @@ impl Device for MockDevice { Type::Tun } - fn ifname(&self) -> &str { - "mock0" - } - async fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> { if let Some(data) = self.inbound.lock().pop_front() { buffer.clear(); @@ -477,13 +488,13 @@ fn set_device_enabled(ifname: &str, up: bool) -> io::Result<()> { } } -async fn get_default_device() -> io::Result { - let mut fd = BufReader::new(File::open("/proc/net/route").await?); +fn get_default_device() -> io::Result { + let mut fd = BufReader::new(File::open("/proc/net/route")?); let mut best = None; let mut line = String::with_capacity(80); - fd.read_line(&mut line).await?; + fd.read_line(&mut line)?; line.clear(); - while let Ok(read) = fd.read_line(&mut line).await { + while let Ok(read) = fd.read_line(&mut line) { if read == 0 { break; } @@ -504,14 +515,14 @@ async fn get_default_device() -> io::Result { } } -async fn get_rp_filter(device: &str) -> io::Result { - let mut fd = File::open(format!("/proc/sys/net/ipv4/conf/{}/rp_filter", device)).await?; +fn get_rp_filter(device: &str) -> io::Result { + let mut fd = File::open(format!("/proc/sys/net/ipv4/conf/{}/rp_filter", device))?; let mut contents = String::with_capacity(10); - fd.read_to_string(&mut contents).await?; + fd.read_to_string(&mut contents)?; u8::from_str(contents.trim()).map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid rp_filter value")) } -async fn set_rp_filter(device: &str, val: u8) -> io::Result<()> { - let mut fd = File::create(format!("/proc/sys/net/ipv4/conf/{}/rp_filter", device)).await?; - fd.write_all(format!("{}", val).as_bytes()).await +fn set_rp_filter(device: &str, val: u8) -> io::Result<()> { + let mut fd = File::create(format!("/proc/sys/net/ipv4/conf/{}/rp_filter", device))?; + fd.write_all(format!("{}", val).as_bytes()) } diff --git a/src/engine/shared.rs b/src/engine/shared.rs index e678e92..4b13b87 100644 --- a/src/engine/shared.rs +++ b/src/engine/shared.rs @@ -21,6 +21,7 @@ use super::common::PeerData; #[derive(Clone)] pub struct SharedPeerCrypto { peers: Arc>, Hash>>>, + //TODO: local hashmap as cache } impl SharedPeerCrypto { @@ -29,6 +30,7 @@ impl SharedPeerCrypto { } pub fn encrypt_for(&self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> { + //TODO: use cache first let mut peers = self.peers.lock(); match peers.get_mut(&peer) { None => Err(Error::InvalidCryptoState("No crypto found for peer")), @@ -41,6 +43,7 @@ impl SharedPeerCrypto { } pub fn store(&self, data: &HashMap) { + //TODO: store in shared and in cache let mut peers = self.peers.lock(); peers.clear(); peers.extend(data.iter().map(|(k, v)| (*k, v.crypto.get_core()))); @@ -51,6 +54,7 @@ impl SharedPeerCrypto { } pub fn get_snapshot(&self) -> HashMap>, Hash> { + //TODO: return local cache self.peers.lock().clone() } @@ -121,6 +125,8 @@ impl SharedTraffic { #[derive(Clone)] pub struct SharedTable { table: Arc>>, + //TODO: local reader lookup table Addr => Option + //TODO: local writer cache Addr => SocketAddr } impl SharedTable { @@ -131,21 +137,29 @@ impl SharedTable { pub fn sync(&mut self) { // TODO sync if needed + // once every x seconds + // fetch reader cache + // clear writer cache } pub fn lookup(&mut self, addr: Address) -> Option { + // TODO: use local reader cache + // if not found, use shared table and put into cache self.table.lock().lookup(addr) } pub fn set_claims(&mut self, peer: SocketAddr, claims: RangeList) { + // clear writer cache self.table.lock().set_claims(peer, claims) } pub fn remove_claims(&mut self, peer: SocketAddr) { + // clear writer cache self.table.lock().remove_claims(peer) } pub fn cache(&mut self, addr: Address, peer: SocketAddr) { + // check writer cache and only write real updates to shared table self.table.lock().cache(addr, peer) } @@ -154,14 +168,17 @@ impl SharedTable { } pub fn write_out(&self, out: &mut W) -> Result<(), io::Error> { + //TODO: stats call self.table.lock().write_out(out) } pub fn cache_len(&self) -> usize { + //TODO: stats call self.table.lock().cache_len() } pub fn claim_len(&self) -> usize { + //TODO: stats call self.table.lock().claim_len() } } diff --git a/src/engine/socket_thread.rs b/src/engine/socket_thread.rs index 907f7d0..3a456b2 100644 --- a/src/engine/socket_thread.rs +++ b/src/engine/socket_thread.rs @@ -386,6 +386,7 @@ impl SocketThread Result<(Ipv4Addr, Ipv4Addr), String> { Ok((ip, netmask)) } -async fn setup_device(config: &Config) -> TunTapDevice { +fn setup_device(config: &Config) -> TunTapDevice { let device = try_fail!( - TunTapDevice::new(&config.device_name, config.device_type, config.device_path.as_ref().map(|s| s as &str)).await, + TunTapDevice::new(&config.device_name, config.device_type, config.device_path.as_ref().map(|s| s as &str)), "Failed to open virtual {} interface {}: {}", config.device_type, config.device_name ); info!("Opened device {}", device.ifname()); config.call_hook("device_setup", vec![("IFNAME", device.ifname())], true); - if let Err(err) = device.set_mtu(config.device_mtu).await { + if let Err(err) = device.set_mtu(config.device_mtu) { error!("Error setting MTU on {}: {}", device.ifname(), err); } if let Some(ip) = &config.ip { @@ -160,9 +164,9 @@ async fn setup_device(config: &Config) -> TunTapDevice { run_script(script, device.ifname()); } if config.fix_rp_filter { - try_fail!(device.fix_rp_filter().await, "Failed to change rp_filter settings: {}"); + try_fail!(device.fix_rp_filter(), "Failed to change rp_filter settings: {}"); } - if let Ok(val) = device.get_rp_filter().await { + if let Ok(val) = device.get_rp_filter() { if val != 1 { warn!("Your networking configuration might be affected by a vulnerability (https://vpncloud.ddswd.de/docs/security/cve-2019-14899/), please change your rp_filter setting to 1 (currently {}).", val); } @@ -172,9 +176,10 @@ async fn setup_device(config: &Config) -> TunTapDevice { } #[allow(clippy::cognitive_complexity)] -async fn run(config: Config, socket: S) { - let device = setup_device(&config).await; - let port_forwarding = if config.port_forwarding { socket.create_port_forwarding().await } else { None }; +fn run(config: Config, socket: UdpSocket) { + let device = setup_device(&config); + let port_forwarding = None; + //if config.port_forwarding { rt.block_on(socket.create_port_forwarding()) } else { None }; let stats_file = match config.stats_file { None => None, Some(ref name) => { @@ -191,25 +196,16 @@ async fn run(config: Config, socket: S) { } }; let ifname = device.ifname().to_string(); - let mut cloud = - try_fail!(GenericCloud::::new(&config, socket, device, port_forwarding, stats_file).await, "Failed to create engine: {}"); - for mut addr in config.peers { - if addr.find(':').unwrap_or(0) <= addr.find(']').unwrap_or(0) { - // : not present or only in IPv6 address - addr = format!("{}:{}", addr, DEFAULT_PORT) - } - try_fail!(cloud.add_peer(addr.clone()), "Failed to send message to {}: {}", &addr); - } if config.daemonize { info!("Running process as daemon"); let mut daemonize = daemonize::Daemonize::new(); - if let Some(user) = config.user { - daemonize = daemonize.user(&user as &str); + if let Some(user) = &config.user { + daemonize = daemonize.user(user as &str); } - if let Some(group) = config.group { - daemonize = daemonize.group(&group as &str); + if let Some(group) = &config.group { + daemonize = daemonize.group(group as &str); } - if let Some(pid_file) = config.pid_file { + if let Some(pid_file) = &config.pid_file { daemonize = daemonize.pid_file(pid_file).chown_pid_file(true); // Give child process some time to write PID file daemonize = daemonize.exit_action(|| thread::sleep(std::time::Duration::from_millis(10))); @@ -218,22 +214,47 @@ async fn run(config: Config, socket: S) { } else if config.user.is_some() || config.group.is_some() { info!("Dropping privileges"); let mut pd = privdrop::PrivDrop::default(); - if let Some(user) = config.user { + if let Some(user) = &config.user { pd = pd.user(user); } - if let Some(group) = config.group { + if let Some(group) = &config.group { pd = pd.group(group); } try_fail!(pd.apply(), "Failed to drop privileges: {}"); } - cloud.run().await; - if let Some(script) = config.ifdown { + let rt = Runtime::new().unwrap(); + let ifdown = config.ifdown.clone(); + rt.block_on(async move { + // Warning: no async code outside this block, or it will break on daemonize + let device = AsyncTunTapDevice::from_sync(device); + let socket = try_fail!(AsyncNetSocket::from_socket(socket), "Failed to create async socket: {}"); + let mut cloud = try_fail!( + GenericCloud::::new( + &config, + socket, + device, + port_forwarding, + stats_file + ) + .await, + "Failed to create engine: {}" + ); + for mut addr in config.peers { + if addr.find(':').unwrap_or(0) <= addr.find(']').unwrap_or(0) { + // : not present or only in IPv6 address + addr = format!("{}:{}", addr, DEFAULT_PORT) + } + try_fail!(cloud.add_peer(addr.clone()), "Failed to send message to {}: {}", &addr); + } + cloud.run().await + }); + if let Some(script) = ifdown { run_script(&script, &ifname); } + std::process::exit(0) } -#[tokio::main] -async fn main() { +fn main() { let args: Args = Args::from_args(); if args.version { println!("VpnCloud v{}", env!("CARGO_PKG_VERSION")); @@ -325,19 +346,27 @@ async fn main() { error!("Either password or private key must be set in config or given as parameter"); return; } + /* #[cfg(feature = "websocket")] if config.listen.starts_with("ws://") { - let socket = try_fail!(ProxyConnection::listen(&config.listen).await, "Failed to open socket {}: {}", config.listen); + let socket = { + let rt = Runtime::new().unwrap(); + try_fail!( + rt.block_on(ProxyConnection::listen(&config.listen)), + "Failed to open socket {}: {}", + config.listen + ) + }; match config.device_type { - Type::Tap => run::(config, socket).await, - Type::Tun => run::(config, socket).await + Type::Tap => run::(config, socket), + Type::Tun => run::(config, socket), } return; } - let socket = try_fail!(NetSocket::listen(&config.listen).await, "Failed to open socket {}: {}", config.listen); + */ + let socket = try_fail!(listen_udp(&config.listen), "Failed to open socket {}: {}", config.listen); match config.device_type { - Type::Tap => run::(config, socket).await, - Type::Tun => run::(config, socket).await + Type::Tap => run::(config, socket), + Type::Tun => run::(config, socket), } - std::process::exit(0) } diff --git a/src/net.rs b/src/net.rs index a453f83..559bd26 100644 --- a/src/net.rs +++ b/src/net.rs @@ -7,11 +7,11 @@ use crate::port_forwarding::PortForwarding; use crate::util::{MockTimeSource, MsgBuffer, Time, TimeSource}; use async_trait::async_trait; use parking_lot::Mutex; -use tokio::net::UdpSocket; +use tokio::net::UdpSocket as AsyncUdpSocket; use std::{ collections::{HashMap, VecDeque}, io::{self, ErrorKind}, - net::{IpAddr, Ipv6Addr, SocketAddr}, + net::{UdpSocket, IpAddr, Ipv6Addr, SocketAddr}, sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -34,7 +34,6 @@ pub fn get_ip() -> IpAddr { #[async_trait] pub trait Socket: Sized + Clone + Send + Sync + 'static { - async fn listen(addr: &str) -> Result; async fn receive(&mut self, buffer: &mut MsgBuffer) -> Result; async fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result; async fn address(&self) -> Result; @@ -55,21 +54,28 @@ pub fn parse_listen(addr: &str, default_port: u16) -> SocketAddr { } } -pub struct NetSocket(Arc); +pub fn listen_udp(addr: &str) -> Result { + let addr = parse_listen(addr, DEFAULT_PORT); + UdpSocket::bind(addr) +} -impl Clone for NetSocket { +pub struct AsyncNetSocket(Arc); + +impl Clone for AsyncNetSocket { fn clone(&self) -> Self { Self(self.0.clone()) } } -#[async_trait] -impl Socket for NetSocket { - async fn listen(addr: &str) -> Result { - let addr = parse_listen(addr, DEFAULT_PORT); - Ok(NetSocket(Arc::new(UdpSocket::bind(addr).await?))) +impl AsyncNetSocket { + pub fn from_socket(sock: UdpSocket) -> Result { + Ok(Self(Arc::new(AsyncUdpSocket::from_std(sock)?))) } +} + +#[async_trait] +impl Socket for AsyncNetSocket { async fn receive(&mut self, buffer: &mut MsgBuffer) -> Result { buffer.clear(); let (size, addr) = self.0.recv_from(buffer.buffer()).await?; @@ -146,10 +152,6 @@ impl MockSocket { #[async_trait] impl Socket for MockSocket { - async fn listen(addr: &str) -> Result { - Ok(Self::new(parse_listen(addr, DEFAULT_PORT))) - } - async fn receive(&mut self, buffer: &mut MsgBuffer) -> Result { if let Some((addr, data)) = self.inbound.lock().pop_front() { buffer.clear(); diff --git a/src/wsproxy.rs b/src/wsproxy.rs index d8a86f2..6e738d5 100644 --- a/src/wsproxy.rs +++ b/src/wsproxy.rs @@ -129,6 +129,7 @@ impl ProxyConnection { #[async_trait] impl Socket for ProxyConnection { + /* async fn listen(url: &str) -> Result { let parsed_url = io_error!(Url::parse(url), "Invalid URL {}: {}", url)?; let (mut socket, _) = io_error!(connect(parsed_url), "Failed to connect to URL {}: {}", url)?; @@ -139,6 +140,7 @@ impl Socket for ProxyConnection { con.addr = read_addr(Cursor::new(&addr_data))?; Ok(con) } + */ async fn receive(&mut self, buffer: &mut MsgBuffer) -> Result { buffer.clear();