Impl ctrl-c

This commit is contained in:
Dennis Schwerdel 2021-04-13 23:37:01 +02:00
parent 66bef5cd21
commit ba23a7ef6d
9 changed files with 76 additions and 98 deletions

64
Cargo.lock generated
View File

@ -127,12 +127,6 @@ version = "1.0.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd"
[[package]]
name = "cfg-if"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "cfg-if"
version = "1.0.0"
@ -223,7 +217,7 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dca26ee1f8d361640700bde38b2c37d8c22b3ce2d360e1fc1c74ea4b0aa7d775"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"crossbeam-utils",
]
@ -233,7 +227,7 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"crossbeam-epoch",
"crossbeam-utils",
]
@ -244,7 +238,7 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2584f639eb95fea8c798496315b297cf81b9b58b6d30ab066a75455333cf4b12"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"crossbeam-utils",
"lazy_static",
"memoffset",
@ -258,7 +252,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7e9d99fa91428effe99c5c6d4634cdeba32b8cf784fc428a2a687f61a952c49"
dependencies = [
"autocfg",
"cfg-if 1.0.0",
"cfg-if",
"lazy_static",
]
@ -371,7 +365,7 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9495705279e7140bf035dde1f6e750c162df8b625267cd52cc44e0b156732c8"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"libc",
"wasi",
]
@ -462,7 +456,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
]
[[package]]
@ -531,7 +525,7 @@ version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
]
[[package]]
@ -578,19 +572,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "nix"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c722bee1037d430d0f8e687bbdbf222f27cc6e4e68d5caf630857bb2b6dbdce"
dependencies = [
"bitflags",
"cc",
"cfg-if 0.1.10",
"libc",
"void",
]
[[package]]
name = "nix"
version = "0.19.1"
@ -599,7 +580,7 @@ checksum = "b2ccba0cfe4fdf15982d1674c69b1fd80bad427d293849982668dfe454bd61f2"
dependencies = [
"bitflags",
"cc",
"cfg-if 1.0.0",
"cfg-if",
"libc",
]
@ -666,7 +647,7 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"instant",
"libc",
"redox_syscall",
@ -727,7 +708,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebd4c2739642e70439d1c0d9545beec45c1e54128739b3cda29bf2c366028c87"
dependencies = [
"libc",
"nix 0.19.1",
"nix",
]
[[package]]
@ -1005,7 +986,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfebf75d25bd900fd1e7d11501efab59bc846dbc76196839663e6637bba9f25f"
dependencies = [
"block-buffer",
"cfg-if 1.0.0",
"cfg-if",
"cpuid-bool",
"digest",
"opaque-debug",
@ -1017,16 +998,6 @@ version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d"
[[package]]
name = "signal"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f6ce83b159ab6984d2419f495134972b48754d13ff2e3f8c998339942b56ed9"
dependencies = [
"libc",
"nix 0.14.1",
]
[[package]]
name = "signal-hook-registry"
version = "1.3.0"
@ -1048,7 +1019,7 @@ version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"libc",
"winapi",
]
@ -1164,7 +1135,7 @@ version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"libc",
"rand",
"redox_syscall",
@ -1403,12 +1374,6 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe"
[[package]]
name = "void"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
[[package]]
name = "vpncloud"
version = "2.2.0"
@ -1429,7 +1394,6 @@ dependencies = [
"ring",
"serde",
"serde_yaml",
"signal",
"smallvec",
"structopt",
"tempfile",
@ -1464,7 +1428,7 @@ version = "0.2.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83240549659d187488f91f33c0f8547cbfef0b2088bc470c116d1d260ef623d9"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"wasm-bindgen-macro",
]

View File

@ -21,7 +21,6 @@ structopt = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.8"
log = { version = "0.4", features = ["std"] }
signal = "0.7"
libc = "0.2"
rand = "0.8"
fnv = "1"

View File

@ -1,13 +1,27 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{fs::File, hash::BuildHasherDefault};
use tokio;
use fnv::FnvHasher;
use crate::{config::Config, crypto::PeerCrypto, device::Device, engine::{
use crate::{
config::Config,
crypto::PeerCrypto,
device::Device,
engine::{
device_thread::DeviceThread,
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
socket_thread::{SocketThread, ReconnectEntry},
}, error::Error, messages::AddrList, net::Socket, payload::Protocol, port_forwarding::PortForwarding, types::NodeId, util::{CtrlC, Time, TimeSource, resolve}};
socket_thread::{ReconnectEntry, SocketThread},
},
error::Error,
messages::AddrList,
net::Socket,
payload::Protocol,
port_forwarding::PortForwarding,
types::NodeId,
util::{resolve, Time, TimeSource},
};
pub type Hash = BuildHasherDefault<FnvHasher>;
@ -27,6 +41,7 @@ pub struct PeerData {
pub struct GenericCloud<D: Device, P: Protocol, S: Socket, TS: TimeSource> {
socket_thread: SocketThread<S, D, P, TS>,
device_thread: DeviceThread<S, D, P, TS>,
running: Arc<AtomicBool>,
}
impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS> {
@ -37,6 +52,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
let table = SharedTable::<TS>::new(&config);
let traffic = SharedTraffic::new();
let peer_crypto = SharedPeerCrypto::new();
let running = Arc::new(AtomicBool::new(true));
let device_thread = DeviceThread::<S, D, P, TS>::new(
config.clone(),
device.duplicate().await?,
@ -44,6 +60,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
traffic.clone(),
peer_crypto.clone(),
table.clone(),
running.clone(),
);
let mut socket_thread = SocketThread::<S, D, P, TS>::new(
config.clone(),
@ -54,9 +71,10 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
table,
port_forwarding,
stats_file,
running.clone(),
);
socket_thread.housekeep().await?;
Ok(Self { socket_thread, device_thread })
Ok(Self { socket_thread, device_thread, running })
}
pub fn add_peer(&mut self, addr: String) -> Result<(), Error> {
@ -67,19 +85,23 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
tries: 0,
timeout: 1,
next: TS::now(),
final_timeout: None
final_timeout: None,
});
Ok(())
}
pub async fn run(self) {
let ctrlc = CtrlC::new();
debug!("Starting threads");
let running = self.running.clone();
let device_thread_handle = tokio::spawn(self.device_thread.run());
let socket_thread_handle = tokio::spawn(self.socket_thread.run());
// TODO: wait for ctrl-c
try_fail!(tokio::signal::ctrl_c().await, "Failed to set ctrl-c handler: {}");
running.store(false, Ordering::SeqCst);
debug!("Waiting for threads to end");
let (dev_ret, sock_ret) = join!(device_thread_handle, socket_thread_handle);
dev_ret.unwrap();
sock_ret.unwrap();
debug!("Threads stopped");
}
}

View File

@ -1,6 +1,6 @@
use super::{
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
common::SPACE_BEFORE,
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
};
use crate::{
config::Config,
@ -11,6 +11,8 @@ use crate::{
util::{MsgBuffer, Time, TimeSource},
Protocol,
};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{marker::PhantomData, net::SocketAddr};
use tokio::time::timeout;
@ -29,12 +31,13 @@ pub struct DeviceThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
traffic: SharedTraffic,
peer_crypto: SharedPeerCrypto,
table: SharedTable<TS>,
running: Arc<AtomicBool>,
}
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS> {
pub fn new(
config: Config, device: D, socket: S, traffic: SharedTraffic, peer_crypto: SharedPeerCrypto,
table: SharedTable<TS>,
table: SharedTable<TS>, running: Arc<AtomicBool>,
) -> Self {
Self {
_dummy_ts: PhantomData,
@ -47,7 +50,8 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
peer_crypto,
table,
buffer: MsgBuffer::new(SPACE_BEFORE),
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE)
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE),
running,
}
}
@ -144,7 +148,11 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
pub async fn run(mut self) {
loop {
self.iteration().await
self.iteration().await;
if !self.running.load(Ordering::SeqCst) {
debug!("Device: end");
return
}
}
}
}

View File

@ -22,6 +22,8 @@ use crate::{
};
use rand::{random, seq::SliceRandom, thread_rng};
use smallvec::{smallvec, SmallVec};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{
cmp::{max, min},
collections::HashMap,
@ -82,6 +84,7 @@ pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
peer_crypto: SharedPeerCrypto,
traffic: SharedTraffic,
table: SharedTable<TS>,
running: Arc<AtomicBool>,
// Should not be here
port_forwarding: Option<PortForwarding>, // TODO: 3rd thread
}
@ -90,6 +93,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
pub fn new(
config: Config, device: D, socket: S, traffic: SharedTraffic, peer_crypto: SharedPeerCrypto,
table: SharedTable<TS>, port_forwarding: Option<PortForwarding>, stats_file: Option<File>,
running: Arc<AtomicBool>,
) -> Self {
let mut claims = SmallVec::with_capacity(config.claims.len());
for s in &config.claims {
@ -142,6 +146,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
config,
buffer: MsgBuffer::new(SPACE_BEFORE),
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE),
running,
}
}
@ -747,7 +752,11 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
pub async fn run(mut self) {
loop {
self.iteration().await
self.iteration().await;
if !self.running.load(Ordering::SeqCst) {
debug!("Socket: end");
return;
}
}
}
}

View File

@ -339,4 +339,5 @@ async fn main() {
Type::Tap => run::<payload::Frame, _>(config, socket).await,
Type::Tun => run::<payload::Packet, _>(config, socket).await
}
std::process::exit(0)
}

View File

@ -7,10 +7,11 @@ use crate::port_forwarding::PortForwarding;
use crate::util::{MockTimeSource, MsgBuffer, Time, TimeSource};
use async_trait::async_trait;
use parking_lot::Mutex;
use tokio::net::UdpSocket;
use std::{
collections::{HashMap, VecDeque},
io::{self, ErrorKind},
net::{IpAddr, Ipv6Addr, SocketAddr, UdpSocket},
net::{IpAddr, Ipv6Addr, SocketAddr},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
@ -26,7 +27,7 @@ pub fn mapped_addr(addr: SocketAddr) -> SocketAddr {
}
pub fn get_ip() -> IpAddr {
let s = UdpSocket::bind("[::]:0").unwrap();
let s = std::net::UdpSocket::bind("[::]:0").unwrap();
s.connect("8.8.8.8:0").unwrap();
s.local_addr().unwrap().ip()
}
@ -54,11 +55,11 @@ pub fn parse_listen(addr: &str, default_port: u16) -> SocketAddr {
}
}
pub struct NetSocket(UdpSocket);
pub struct NetSocket(Arc<UdpSocket>);
impl Clone for NetSocket {
fn clone(&self) -> Self {
Self(try_fail!(self.0.try_clone(), "Failed to clone socket: {}"))
Self(self.0.clone())
}
}
@ -66,18 +67,18 @@ impl Clone for NetSocket {
impl Socket for NetSocket {
async fn listen(addr: &str) -> Result<Self, io::Error> {
let addr = parse_listen(addr, DEFAULT_PORT);
Ok(NetSocket(UdpSocket::bind(addr)?))
Ok(NetSocket(Arc::new(UdpSocket::bind(addr).await?)))
}
async fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> {
buffer.clear();
let (size, addr) = self.0.recv_from(buffer.buffer())?;
let (size, addr) = self.0.recv_from(buffer.buffer()).await?;
buffer.set_length(size);
Ok(addr)
}
async fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error> {
self.0.send_to(data, addr)
self.0.send_to(data, addr).await
}
async fn address(&self) -> Result<SocketAddr, io::Error> {

View File

@ -14,9 +14,7 @@ use crate::error::Error;
#[cfg(not(target_os = "linux"))]
use time;
use signal::{trap::Trap, Signal};
use smallvec::SmallVec;
use std::time::Instant;
pub type Duration = u32;
pub type Time = i64;
@ -262,29 +260,6 @@ impl fmt::Display for Bytes {
}
}
pub struct CtrlC {
dummy_time: Instant,
trap: Trap,
}
impl CtrlC {
pub fn new() -> Self {
Default::default()
}
pub fn was_pressed(&self) -> bool {
self.trap.wait(self.dummy_time).is_some()
}
}
impl Default for CtrlC {
fn default() -> Self {
let dummy_time = Instant::now();
let trap = Trap::trap(&[Signal::SIGINT, Signal::SIGTERM, Signal::SIGQUIT]);
Self { dummy_time, trap }
}
}
pub trait TimeSource: Sync + Copy + Send + 'static {
fn now() -> Time;
}

View File

@ -15,7 +15,6 @@ use std::{
net::{Ipv6Addr, SocketAddr, SocketAddrV6, TcpListener, TcpStream, UdpSocket},
os::unix::io::AsRawFd,
sync::Arc,
thread::spawn,
};
use tungstenite::{client::AutoStream, connect, protocol::WebSocket, server::accept, Message};
use url::Url;
@ -100,7 +99,7 @@ pub fn run_proxy(listen: &str) -> Result<(), io::Error> {
for stream in server.incoming() {
let stream = stream?;
let peer = stream.peer_addr()?;
spawn(move || {
tokio::spawn(async move {
if let Err(err) = serve_proxy_connection(stream) {
error!("Error on connection {}: {}", peer, err);
}