Some more code

This commit is contained in:
Dennis Schwerdel 2021-02-20 00:17:06 +01:00
parent 450101d8ec
commit c9a0cc85ab
9 changed files with 209 additions and 67 deletions

View File

@ -350,6 +350,34 @@ impl Config {
} }
} }
pub fn is_learning(&self) -> bool {
match self.mode {
Mode::Normal => {
match self.device_type {
Type::Tap => true,
Type::Tun => false
}
}
Mode::Router => false,
Mode::Switch => true,
Mode::Hub => false
}
}
pub fn is_broadcasting(&self) -> bool {
match self.mode {
Mode::Normal => {
match self.device_type {
Type::Tap => true,
Type::Tun => false
}
}
Mode::Router => false,
Mode::Switch => true,
Mode::Hub => true
}
}
pub fn call_hook( pub fn call_hook(
&self, event: &'static str, envs: impl IntoIterator<Item = (&'static str, impl AsRef<OsStr>)>, detach: bool &self, event: &'static str, envs: impl IntoIterator<Item = (&'static str, impl AsRef<OsStr>)>, detach: bool
) { ) {

View File

@ -2,6 +2,7 @@
// Copyright (C) 2015-2021 Dennis Schwerdel // Copyright (C) 2015-2021 Dennis Schwerdel
// This software is licensed under GPL-3 or newer (see LICENSE.md) // This software is licensed under GPL-3 or newer (see LICENSE.md)
use parking_lot::Mutex;
use std::{ use std::{
cmp, cmp,
collections::VecDeque, collections::VecDeque,
@ -12,7 +13,8 @@ use std::{
net::{Ipv4Addr, UdpSocket}, net::{Ipv4Addr, UdpSocket},
os::unix::io::{AsRawFd, RawFd}, os::unix::io::{AsRawFd, RawFd},
str, str,
str::FromStr str::FromStr,
sync::Arc
}; };
use crate::{crypto, error::Error, util::MsgBuffer}; use crate::{crypto, error::Error, util::MsgBuffer};
@ -75,7 +77,7 @@ impl FromStr for Type {
} }
} }
pub trait Device: AsRawFd { pub trait Device: AsRawFd + Clone {
/// Returns the type of this device /// Returns the type of this device
fn get_type(&self) -> Type; fn get_type(&self) -> Type;
@ -118,6 +120,16 @@ pub struct TunTapDevice {
} }
impl Clone for TunTapDevice {
fn clone(&self) -> Self {
Self {
fd: try_fail!(self.fd.try_clone(), "Failed to clone device: {}"),
ifname: self.ifname.clone(),
type_: self.type_
}
}
}
impl TunTapDevice { impl TunTapDevice {
/// Creates a new tun/tap device /// Creates a new tun/tap device
/// ///
@ -300,9 +312,10 @@ impl AsRawFd for TunTapDevice {
} }
#[derive(Clone)]
pub struct MockDevice { pub struct MockDevice {
inbound: VecDeque<Vec<u8>>, inbound: Arc<Mutex<VecDeque<Vec<u8>>>>,
outbound: VecDeque<Vec<u8>> outbound: Arc<Mutex<VecDeque<Vec<u8>>>>
} }
impl MockDevice { impl MockDevice {
@ -311,15 +324,15 @@ impl MockDevice {
} }
pub fn put_inbound(&mut self, data: Vec<u8>) { pub fn put_inbound(&mut self, data: Vec<u8>) {
self.inbound.push_back(data) self.inbound.lock().push_back(data)
} }
pub fn pop_outbound(&mut self) -> Option<Vec<u8>> { pub fn pop_outbound(&mut self) -> Option<Vec<u8>> {
self.outbound.pop_front() self.outbound.lock().pop_front()
} }
pub fn has_inbound(&self) -> bool { pub fn has_inbound(&self) -> bool {
!self.inbound.is_empty() !self.inbound.lock().is_empty()
} }
} }
@ -333,7 +346,7 @@ impl Device for MockDevice {
} }
fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> { fn read(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> {
if let Some(data) = self.inbound.pop_front() { if let Some(data) = self.inbound.lock().pop_front() {
buffer.clear(); buffer.clear();
buffer.set_length(data.len()); buffer.set_length(data.len());
buffer.message_mut().copy_from_slice(&data); buffer.message_mut().copy_from_slice(&data);
@ -344,7 +357,7 @@ impl Device for MockDevice {
} }
fn write(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> { fn write(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> {
self.outbound.push_back(buffer.message().into()); self.outbound.lock().push_back(buffer.message().into());
Ok(()) Ok(())
} }
@ -355,7 +368,10 @@ impl Device for MockDevice {
impl Default for MockDevice { impl Default for MockDevice {
fn default() -> Self { fn default() -> Self {
Self { outbound: VecDeque::with_capacity(10), inbound: VecDeque::with_capacity(10) } Self {
outbound: Arc::new(Mutex::new(VecDeque::with_capacity(10))),
inbound: Arc::new(Mutex::new(VecDeque::with_capacity(10)))
}
} }
} }

View File

@ -8,7 +8,8 @@ use crate::{
messages::MESSAGE_TYPE_DATA, messages::MESSAGE_TYPE_DATA,
net::Socket, net::Socket,
util::{MsgBuffer, Time, TimeSource}, util::{MsgBuffer, Time, TimeSource},
Protocol Protocol,
config::Config
}; };
use std::{marker::PhantomData, net::SocketAddr}; use std::{marker::PhantomData, net::SocketAddr};
@ -28,6 +29,20 @@ pub struct DeviceThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
} }
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS> { impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS> {
pub fn new(config: Config, device: D, socket: S, traffic: SharedTraffic, peer_crypto: SharedPeerCrypto, table: SharedTable<TS>) -> Self {
Self {
_dummy_ts: PhantomData,
_dummy_p: PhantomData,
broadcast: config.is_broadcasting(),
socket,
device,
next_housekeep: TS::now(),
traffic,
peer_crypto,
table
}
}
#[inline] #[inline]
fn send_to(&mut self, addr: SocketAddr, msg: &mut MsgBuffer) -> Result<(), Error> { fn send_to(&mut self, addr: SocketAddr, msg: &mut MsgBuffer) -> Result<(), Error> {
debug!("Sending msg with {} bytes to {}", msg.len(), addr); debug!("Sending msg with {} bytes to {}", msg.len(), addr);
@ -104,15 +119,16 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
let mut buffer = MsgBuffer::new(SPACE_BEFORE); let mut buffer = MsgBuffer::new(SPACE_BEFORE);
loop { loop {
try_fail!(self.device.read(&mut buffer), "Failed to read from device: {}"); try_fail!(self.device.read(&mut buffer), "Failed to read from device: {}");
//TODO: set and handle timeout
if let Err(e) = self.forward_packet(&mut buffer) { if let Err(e) = self.forward_packet(&mut buffer) {
error!("{}", e); error!("{}", e);
} }
let now = TS::now(); let now = TS::now();
if self.next_housekeep < TS::now() { if self.next_housekeep < now {
if let Err(e) = self.housekeep() { if let Err(e) = self.housekeep() {
error!("{}", e) error!("{}", e)
} }
self.next_housekeep = TS::now() + 1 self.next_housekeep = now + 1
} }
} }
} }

View File

@ -3,8 +3,8 @@
// This software is licensed under GPL-3 or newer (see LICENSE.md) // This software is licensed under GPL-3 or newer (see LICENSE.md)
mod device_thread; mod device_thread;
mod socket_thread;
mod shared; mod shared;
mod socket_thread;
use std::{ use std::{
cmp::{max, min}, cmp::{max, min},
@ -16,7 +16,8 @@ use std::{
marker::PhantomData, marker::PhantomData,
net::{SocketAddr, ToSocketAddrs}, net::{SocketAddr, ToSocketAddrs},
path::Path, path::Path,
str::FromStr str::FromStr,
thread
}; };
use fnv::FnvHasher; use fnv::FnvHasher;
@ -26,8 +27,12 @@ use smallvec::{smallvec, SmallVec};
use crate::{ use crate::{
beacon::BeaconSerializer, beacon::BeaconSerializer,
config::{Config, DEFAULT_PEER_TIMEOUT, DEFAULT_PORT}, config::{Config, DEFAULT_PEER_TIMEOUT, DEFAULT_PORT},
crypto::{is_init_message, Crypto, MessageResult, PeerCrypto, InitState, InitResult}, crypto::{is_init_message, Crypto, InitResult, InitState, MessageResult, PeerCrypto},
device::{Device, Type}, device::{Device, Type},
engine::{
device_thread::DeviceThread,
shared::{SharedPeerCrypto, SharedTable, SharedTraffic}
},
error::Error, error::Error,
messages::{ messages::{
AddrList, NodeInfo, PeerInfo, MESSAGE_TYPE_CLOSE, MESSAGE_TYPE_DATA, MESSAGE_TYPE_KEEPALIVE, AddrList, NodeInfo, PeerInfo, MESSAGE_TYPE_CLOSE, MESSAGE_TYPE_DATA, MESSAGE_TYPE_KEEPALIVE,
@ -53,7 +58,7 @@ const SPACE_BEFORE: usize = 100;
struct PeerData { struct PeerData {
addrs: AddrList, addrs: AddrList,
#[allow(dead_code)] //TODO: export in status #[allow(dead_code)] // TODO: export in status
last_seen: Time, last_seen: Time,
timeout: Time, timeout: Time,
peer_timeout: u16, peer_timeout: u16,
@ -104,7 +109,9 @@ pub struct GenericCloud<D: Device, P: Protocol, S: Socket, TS: TimeSource> {
impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS> { impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS> {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn new(config: &Config, socket: S, device: D, port_forwarding: Option<PortForwarding>, stats_file: Option<File>) -> Self { pub fn new(
config: &Config, socket: S, device: D, port_forwarding: Option<PortForwarding>, stats_file: Option<File>
) -> Self {
let (learning, broadcast) = match config.mode { let (learning, broadcast) = match config.mode {
Mode::Normal => { Mode::Normal => {
match config.device_type { match config.device_type {
@ -288,8 +295,26 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
} }
pub fn run(&mut self) { pub fn run(&mut self) {
let table = SharedTable::<TS>::new(&self.config);
let traffic = SharedTraffic::new();
let peer_crypto = SharedPeerCrypto::new();
let device_thread = DeviceThread::<S, D, P, TS>::new(
self.config.clone(),
self.device.clone(),
self.socket.clone(),
traffic.clone(),
peer_crypto.clone(),
table.clone()
);
// TODO: create shared data structures
// TODO: create and spawn threads
let ctrlc = CtrlC::new(); let ctrlc = CtrlC::new();
let waiter = try_fail!(WaitImpl::new(self.socket.as_raw_fd(), self.device.as_raw_fd(), 1000), "Failed to setup poll: {}"); // TODO: wait for ctrl-c
let waiter = try_fail!(
WaitImpl::new(self.socket.as_raw_fd(), self.device.as_raw_fd(), 1000),
"Failed to setup poll: {}"
);
let mut buffer = MsgBuffer::new(SPACE_BEFORE); let mut buffer = MsgBuffer::new(SPACE_BEFORE);
let mut poll_error = false; let mut poll_error = false;
self.config.call_hook("vpn_started", vec![("IFNAME", self.device.ifname())], true); self.config.call_hook("vpn_started", vec![("IFNAME", self.device.ifname())], true);

View File

@ -6,7 +6,9 @@ use crate::{
table::ClaimTable, table::ClaimTable,
traffic::{TrafficStats, TrafficEntry}, traffic::{TrafficStats, TrafficEntry},
types::{Address, NodeId, RangeList}, types::{Address, NodeId, RangeList},
util::MsgBuffer util::MsgBuffer,
util::Duration,
config::Config
}; };
use parking_lot::Mutex; use parking_lot::Mutex;
use std::{ use std::{
@ -16,11 +18,16 @@ use std::{
sync::Arc sync::Arc
}; };
#[derive(Clone)]
pub struct SharedPeerCrypto { pub struct SharedPeerCrypto {
peers: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>> peers: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>
} }
impl SharedPeerCrypto { impl SharedPeerCrypto {
pub fn new() -> Self {
SharedPeerCrypto { peers: Arc::new(Mutex::new(HashMap::default())) }
}
pub fn sync(&mut self) { pub fn sync(&mut self) {
// TODO sync if needed // TODO sync if needed
} }
@ -50,11 +57,16 @@ impl SharedPeerCrypto {
} }
#[derive(Clone)]
pub struct SharedTraffic { pub struct SharedTraffic {
traffic: Arc<Mutex<TrafficStats>> traffic: Arc<Mutex<TrafficStats>>
} }
impl SharedTraffic { impl SharedTraffic {
pub fn new() -> Self {
Self { traffic: Arc::new(Mutex::new(Default::default())) }
}
pub fn sync(&mut self) { pub fn sync(&mut self) {
// TODO sync if needed // TODO sync if needed
} }
@ -105,11 +117,17 @@ impl SharedTraffic {
} }
#[derive(Clone)]
pub struct SharedTable<TS: TimeSource> { pub struct SharedTable<TS: TimeSource> {
table: Arc<Mutex<ClaimTable<TS>>> table: Arc<Mutex<ClaimTable<TS>>>
} }
impl<TS: TimeSource> SharedTable<TS> { impl<TS: TimeSource> SharedTable<TS> {
pub fn new(config: &Config) -> Self {
let table = ClaimTable::new(config.switch_timeout as Duration, config.peer_timeout as Duration);
SharedTable { table: Arc::new(Mutex::new(table)) }
}
pub fn sync(&mut self) { pub fn sync(&mut self) {
// TODO sync if needed // TODO sync if needed
} }

View File

@ -24,7 +24,7 @@ use std::{
fmt, fmt,
fs::File, fs::File,
io, io,
io::{Write, Cursor, Seek, SeekFrom}, io::{Cursor, Seek, SeekFrom, Write},
marker::PhantomData, marker::PhantomData,
net::{SocketAddr, ToSocketAddrs} net::{SocketAddr, ToSocketAddrs}
}; };
@ -615,28 +615,38 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
pub fn run(mut self) { pub fn run(mut self) {
let mut buffer = MsgBuffer::new(SPACE_BEFORE); let mut buffer = MsgBuffer::new(SPACE_BEFORE);
loop { loop {
let src = try_fail!(self.socket.receive(&mut buffer), "Failed to read from network socket: {}"); match self.socket.receive(&mut buffer) {
match self.handle_message(src, &mut buffer) { Err(err) => {
Err(e @ Error::CryptoInitFatal(_)) => { if err.kind() == io::ErrorKind::TimedOut || err.kind() == io::ErrorKind::WouldBlock {
debug!("Fatal crypto init error from {}: {}", src, e); // ok, this is a normal timeout
info!("Closing pending connection to {} due to error in crypto init", addr_nice(src)); } else {
self.pending_inits.remove(&src); fail!("Failed to read from network socket: {}", err);
}
} }
Err(e @ Error::CryptoInit(_)) => { Ok(src) => {
debug!("Recoverable init error from {}: {}", src, e); match self.handle_message(src, &mut buffer) {
info!("Ignoring invalid init message from peer {}", addr_nice(src)); Err(e @ Error::CryptoInitFatal(_)) => {
debug!("Fatal crypto init error from {}: {}", src, e);
info!("Closing pending connection to {} due to error in crypto init", addr_nice(src));
self.pending_inits.remove(&src);
}
Err(e @ Error::CryptoInit(_)) => {
debug!("Recoverable init error from {}: {}", src, e);
info!("Ignoring invalid init message from peer {}", addr_nice(src));
}
Err(e) => {
error!("{}", e);
}
Ok(_) => {}
}
} }
Err(e) => {
error!("{}", e);
}
Ok(_) => {}
} }
let now = TS::now(); let now = TS::now();
if self.next_housekeep < now { if self.next_housekeep < now {
if let Err(e) = self.housekeep() { if let Err(e) = self.housekeep() {
error!("{}", e) error!("{}", e)
} }
self.next_housekeep = TS::now() + 1 self.next_housekeep = now + 1
} }
} }
} }

View File

@ -36,7 +36,7 @@ use structopt::StructOpt;
use std::{ use std::{
fs::{self, File, Permissions}, fs::{self, File, Permissions},
io::{self, Write}, io::{self, Write},
net::{Ipv4Addr, UdpSocket}, net::{Ipv4Addr},
os::unix::fs::PermissionsExt, os::unix::fs::PermissionsExt,
path::Path, path::Path,
process, process,
@ -50,7 +50,7 @@ use crate::{
config::{Args, Command, Config, DEFAULT_PORT}, config::{Args, Command, Config, DEFAULT_PORT},
crypto::Crypto, crypto::Crypto,
device::{Device, TunTapDevice, Type}, device::{Device, TunTapDevice, Type},
net::Socket, net::{Socket, NetSocket},
oldconfig::OldConfigFile, oldconfig::OldConfigFile,
payload::Protocol, payload::Protocol,
util::SystemTimeSource, util::SystemTimeSource,
@ -328,7 +328,7 @@ fn main() {
} }
return return
} }
let socket = try_fail!(UdpSocket::listen(&config.listen), "Failed to open socket {}: {}", config.listen); let socket = try_fail!(NetSocket::listen(&config.listen), "Failed to open socket {}: {}", config.listen);
match config.device_type { match config.device_type {
Type::Tap => run::<payload::Frame, _>(config, socket), Type::Tap => run::<payload::Frame, _>(config, socket),
Type::Tun => run::<payload::Packet, _>(config, socket) Type::Tun => run::<payload::Packet, _>(config, socket)

View File

@ -2,17 +2,21 @@
// Copyright (C) 2015-2021 Dennis Schwerdel // Copyright (C) 2015-2021 Dennis Schwerdel
// This software is licensed under GPL-3 or newer (see LICENSE.md) // This software is licensed under GPL-3 or newer (see LICENSE.md)
use super::util::{MockTimeSource, MsgBuffer, Time, TimeSource};
use crate::port_forwarding::PortForwarding;
use parking_lot::Mutex;
use std::{ use std::{
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
io::{self, ErrorKind}, io::{self, ErrorKind},
net::{IpAddr, SocketAddr, UdpSocket, Ipv6Addr}, net::{IpAddr, Ipv6Addr, SocketAddr, UdpSocket},
os::unix::io::{AsRawFd, RawFd}, os::unix::io::{AsRawFd, RawFd},
sync::atomic::{AtomicBool, Ordering} sync::{
atomic::{AtomicBool, Ordering},
Arc
},
time::Duration
}; };
use super::util::{MockTimeSource, MsgBuffer, Time, TimeSource};
use crate::port_forwarding::PortForwarding;
pub fn mapped_addr(addr: SocketAddr) -> SocketAddr { pub fn mapped_addr(addr: SocketAddr) -> SocketAddr {
// HOT PATH // HOT PATH
match addr { match addr {
@ -27,7 +31,7 @@ pub fn get_ip() -> IpAddr {
s.local_addr().unwrap().ip() s.local_addr().unwrap().ip()
} }
pub trait Socket: AsRawFd + Sized { pub trait Socket: AsRawFd + Sized + Clone {
fn listen(addr: &str) -> Result<Self, io::Error>; fn listen(addr: &str) -> Result<Self, io::Error>;
fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error>; fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error>;
fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error>; fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error>;
@ -47,25 +51,42 @@ pub fn parse_listen(addr: &str) -> SocketAddr {
} }
} }
impl Socket for UdpSocket { pub struct NetSocket(UdpSocket);
impl Clone for NetSocket {
fn clone(&self) -> Self {
Self(try_fail!(self.0.try_clone(), "Failed to clone socket: {}"))
}
}
impl AsRawFd for NetSocket {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_fd()
}
}
impl Socket for NetSocket {
fn listen(addr: &str) -> Result<Self, io::Error> { fn listen(addr: &str) -> Result<Self, io::Error> {
let addr = parse_listen(addr); let addr = parse_listen(addr);
UdpSocket::bind(addr) Ok(NetSocket(UdpSocket::bind(addr).and_then(|s| {
s.set_read_timeout(Some(Duration::from_secs(1)))?;
Ok(s)
})?))
} }
fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> { fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> {
buffer.clear(); buffer.clear();
let (size, addr) = self.recv_from(buffer.buffer())?; let (size, addr) = self.0.recv_from(buffer.buffer())?;
buffer.set_length(size); buffer.set_length(size);
Ok(addr) Ok(addr)
} }
fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error> { fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error> {
self.send_to(data, addr) self.0.send_to(data, addr)
} }
fn address(&self) -> Result<SocketAddr, io::Error> { fn address(&self) -> Result<SocketAddr, io::Error> {
let mut addr = self.local_addr()?; let mut addr = self.0.local_addr()?;
addr.set_ip(get_ip()); addr.set_ip(get_ip());
Ok(addr) Ok(addr)
} }
@ -79,22 +100,24 @@ thread_local! {
static MOCK_SOCKET_NAT: AtomicBool = AtomicBool::new(false); static MOCK_SOCKET_NAT: AtomicBool = AtomicBool::new(false);
} }
#[derive(Clone)]
pub struct MockSocket { pub struct MockSocket {
nat: bool, nat: bool,
nat_peers: HashMap<SocketAddr, Time>, nat_peers: Arc<Mutex<HashMap<SocketAddr, Time>>>,
address: SocketAddr, address: SocketAddr,
outbound: VecDeque<(SocketAddr, Vec<u8>)>, outbound: Arc<Mutex<VecDeque<(SocketAddr, Vec<u8>)>>>,
inbound: VecDeque<(SocketAddr, Vec<u8>)> inbound: Arc<Mutex<VecDeque<(SocketAddr, Vec<u8>)>>>
} }
impl MockSocket { impl MockSocket {
pub fn new(address: SocketAddr) -> Self { pub fn new(address: SocketAddr) -> Self {
Self { Self {
nat: Self::get_nat(), nat: Self::get_nat(),
nat_peers: HashMap::new(), nat_peers: Default::default(),
address, address,
outbound: VecDeque::with_capacity(10), outbound: Arc::new(Mutex::new(VecDeque::with_capacity(10))),
inbound: VecDeque::with_capacity(10) inbound: Arc::new(Mutex::new(VecDeque::with_capacity(10)))
} }
} }
@ -108,12 +131,12 @@ impl MockSocket {
pub fn put_inbound(&mut self, from: SocketAddr, data: Vec<u8>) -> bool { pub fn put_inbound(&mut self, from: SocketAddr, data: Vec<u8>) -> bool {
if !self.nat { if !self.nat {
self.inbound.push_back((from, data)); self.inbound.lock().push_back((from, data));
return true return true
} }
if let Some(timeout) = self.nat_peers.get(&from) { if let Some(timeout) = self.nat_peers.lock().get(&from) {
if *timeout >= MockTimeSource::now() { if *timeout >= MockTimeSource::now() {
self.inbound.push_back((from, data)); self.inbound.lock().push_back((from, data));
return true return true
} }
} }
@ -122,7 +145,7 @@ impl MockSocket {
} }
pub fn pop_outbound(&mut self) -> Option<(SocketAddr, Vec<u8>)> { pub fn pop_outbound(&mut self) -> Option<(SocketAddr, Vec<u8>)> {
self.outbound.pop_front() self.outbound.lock().pop_front()
} }
} }
@ -138,7 +161,7 @@ impl Socket for MockSocket {
} }
fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> { fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> {
if let Some((addr, data)) = self.inbound.pop_front() { if let Some((addr, data)) = self.inbound.lock().pop_front() {
buffer.clear(); buffer.clear();
buffer.set_length(data.len()); buffer.set_length(data.len());
buffer.message_mut().copy_from_slice(&data); buffer.message_mut().copy_from_slice(&data);
@ -149,9 +172,9 @@ impl Socket for MockSocket {
} }
fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error> { fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error> {
self.outbound.push_back((addr, data.into())); self.outbound.lock().push_back((addr, data.into()));
if self.nat { if self.nat {
self.nat_peers.insert(addr, MockTimeSource::now() + 300); self.nat_peers.lock().insert(addr, MockTimeSource::now() + 300);
} }
Ok(data.len()) Ok(data.len())
} }
@ -178,4 +201,4 @@ mod bench {
b.iter(|| sock.send_to(&data, &addr).unwrap()); b.iter(|| sock.send_to(&data, &addr).unwrap());
b.bytes = 1400; b.bytes = 1400;
} }
} }

View File

@ -13,6 +13,7 @@ use std::{
io::{self, Cursor, Read, Write}, io::{self, Cursor, Read, Write},
net::{Ipv6Addr, SocketAddr, SocketAddrV6, TcpListener, TcpStream, UdpSocket}, net::{Ipv6Addr, SocketAddr, SocketAddrV6, TcpListener, TcpStream, UdpSocket},
os::unix::io::{AsRawFd, RawFd}, os::unix::io::{AsRawFd, RawFd},
sync::Arc,
thread::spawn thread::spawn
}; };
use tungstenite::{client::AutoStream, connect, protocol::WebSocket, server::accept, Message}; use tungstenite::{client::AutoStream, connect, protocol::WebSocket, server::accept, Message};
@ -108,17 +109,21 @@ pub fn run_proxy(listen: &str) -> Result<(), io::Error> {
Ok(()) Ok(())
} }
#[derive(Clone)]
pub struct ProxyConnection { pub struct ProxyConnection {
addr: SocketAddr, addr: SocketAddr,
socket: WebSocket<AutoStream> socket: Arc<WebSocket<AutoStream>>
} }
impl ProxyConnection { impl ProxyConnection {
fn read_message(&mut self) -> Result<Vec<u8>, io::Error> { fn read_message(&mut self) -> Result<Vec<u8>, io::Error> {
loop { loop {
unimplemented!();
/*
if let Message::Binary(data) = io_error!(self.socket.read_message(), "Failed to read from ws proxy: {}")? { if let Message::Binary(data) = io_error!(self.socket.read_message(), "Failed to read from ws proxy: {}")? {
return Ok(data) return Ok(data)
} }
*/
} }
} }
} }
@ -128,14 +133,14 @@ impl AsRawFd for ProxyConnection {
self.socket.get_ref().as_raw_fd() self.socket.get_ref().as_raw_fd()
} }
} }
impl Socket for ProxyConnection { impl Socket for ProxyConnection {
fn listen(url: &str) -> Result<Self, io::Error> { fn listen(url: &str) -> Result<Self, io::Error> {
let parsed_url = io_error!(Url::parse(url), "Invalid URL {}: {}", url)?; let parsed_url = io_error!(Url::parse(url), "Invalid URL {}: {}", url)?;
let (mut socket, _) = io_error!(connect(parsed_url), "Failed to connect to URL {}: {}", url)?; let (mut socket, _) = io_error!(connect(parsed_url), "Failed to connect to URL {}: {}", url)?;
socket.get_mut().set_nodelay(true)?; socket.get_mut().set_nodelay(true)?;
let addr = "0.0.0.0:0".parse::<SocketAddr>().unwrap(); let addr = "0.0.0.0:0".parse::<SocketAddr>().unwrap();
let mut con = ProxyConnection { addr, socket }; let mut con = ProxyConnection { addr, socket: Arc::new(socket) };
let addr_data = con.read_message()?; let addr_data = con.read_message()?;
con.addr = read_addr(Cursor::new(&addr_data))?; con.addr = read_addr(Cursor::new(&addr_data))?;
Ok(con) Ok(con)
@ -153,7 +158,8 @@ impl Socket for ProxyConnection {
let mut msg = Vec::with_capacity(data.len() + 18); let mut msg = Vec::with_capacity(data.len() + 18);
write_addr(addr, &mut msg)?; write_addr(addr, &mut msg)?;
msg.write_all(data)?; msg.write_all(data)?;
io_error!(self.socket.write_message(Message::Binary(msg)), "Failed to write to ws proxy: {}")?; unimplemented!();
//io_error!(self.socket.write_message(Message::Binary(msg)), "Failed to write to ws proxy: {}")?;
Ok(data.len()) Ok(data.len())
} }