mirror of https://github.com/dswd/vpncloud.git
Coms
This commit is contained in:
parent
a9b960b04e
commit
2fcb7646c7
|
@ -12,8 +12,8 @@ use crate::{
|
||||||
device::Device,
|
device::Device,
|
||||||
engine::{
|
engine::{
|
||||||
device_thread::DeviceThread,
|
device_thread::DeviceThread,
|
||||||
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
|
|
||||||
socket_thread::{ReconnectEntry, SocketThread},
|
socket_thread::{ReconnectEntry, SocketThread},
|
||||||
|
coms::Coms
|
||||||
},
|
},
|
||||||
error::Error,
|
error::Error,
|
||||||
messages::AddrList,
|
messages::AddrList,
|
||||||
|
@ -50,26 +50,17 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
|
||||||
pub fn new(
|
pub fn new(
|
||||||
config: &Config, socket: S, device: D, port_forwarding: Option<PortForwarding>, stats_file: Option<File>,
|
config: &Config, socket: S, device: D, port_forwarding: Option<PortForwarding>, stats_file: Option<File>,
|
||||||
) -> Result<Self, Error> {
|
) -> Result<Self, Error> {
|
||||||
let table = SharedTable::<TS>::new(config);
|
|
||||||
let traffic = SharedTraffic::new();
|
|
||||||
let peer_crypto = SharedPeerCrypto::new();
|
|
||||||
let running = Arc::new(AtomicBool::new(true));
|
let running = Arc::new(AtomicBool::new(true));
|
||||||
|
let coms = Coms::<S, TS, P>::new(config, socket.try_clone().map_err(|e| Error::SocketIo("Failed to clone socket", e))?);
|
||||||
let device_thread = DeviceThread::<S, D, P, TS>::new(
|
let device_thread = DeviceThread::<S, D, P, TS>::new(
|
||||||
config.clone(),
|
|
||||||
device.duplicate()?,
|
device.duplicate()?,
|
||||||
socket.try_clone().map_err(|e| Error::SocketIo("Failed to clone socket", e))?,
|
coms.try_clone()?,
|
||||||
traffic.clone(),
|
|
||||||
peer_crypto.clone(),
|
|
||||||
table.clone(),
|
|
||||||
running.clone(),
|
running.clone(),
|
||||||
);
|
);
|
||||||
let mut socket_thread = SocketThread::<S, D, P, TS>::new(
|
let mut socket_thread = SocketThread::<S, D, P, TS>::new(
|
||||||
config.clone(),
|
config.clone(),
|
||||||
|
coms,
|
||||||
device,
|
device,
|
||||||
socket,
|
|
||||||
traffic,
|
|
||||||
peer_crypto,
|
|
||||||
table,
|
|
||||||
port_forwarding,
|
port_forwarding,
|
||||||
stats_file,
|
stats_file,
|
||||||
running.clone(),
|
running.clone(),
|
||||||
|
@ -120,7 +111,7 @@ use std::net::SocketAddr;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
|
impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
|
||||||
pub fn socket(&mut self) -> &mut MockSocket {
|
pub fn socket(&mut self) -> &mut MockSocket {
|
||||||
&mut self.socket_thread.socket
|
&mut self.socket_thread.coms.socket
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn device(&mut self) -> &mut MockDevice {
|
pub fn device(&mut self) -> &mut MockDevice {
|
||||||
|
@ -153,6 +144,6 @@ impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_num(&self) -> usize {
|
pub fn get_num(&self) -> usize {
|
||||||
self.socket_thread.socket.address().unwrap().port() as usize
|
self.socket_thread.coms.socket.address().unwrap().port() as usize
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,152 @@
|
||||||
|
use std::{net::SocketAddr, marker::PhantomData, io};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
config::Config,
|
||||||
|
error::Error,
|
||||||
|
messages::MESSAGE_TYPE_DATA,
|
||||||
|
net::{Socket, mapped_addr},
|
||||||
|
util::{MsgBuffer, TimeSource}, payload::Protocol,
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::{
|
||||||
|
common::{SPACE_BEFORE, PeerData},
|
||||||
|
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct Coms<S: Socket, TS: TimeSource, P: Protocol> {
|
||||||
|
_dummy_p: PhantomData<P>,
|
||||||
|
broadcast: bool,
|
||||||
|
broadcast_buffer: MsgBuffer,
|
||||||
|
peer_crypto: SharedPeerCrypto,
|
||||||
|
pub table: SharedTable<TS>,
|
||||||
|
pub traffic: SharedTraffic,
|
||||||
|
pub socket: S,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: Socket, TS: TimeSource, P: Protocol> Coms<S, TS, P> {
|
||||||
|
pub fn new(config: &Config, socket: S) -> Self {
|
||||||
|
Self {
|
||||||
|
_dummy_p: PhantomData,
|
||||||
|
broadcast: config.is_broadcasting(),
|
||||||
|
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE),
|
||||||
|
traffic: SharedTraffic::new(),
|
||||||
|
peer_crypto: SharedPeerCrypto::new(),
|
||||||
|
table: SharedTable::<TS>::new(config),
|
||||||
|
socket,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn try_clone(&self) -> Result<Self, Error> {
|
||||||
|
Ok(Self {
|
||||||
|
_dummy_p: PhantomData,
|
||||||
|
broadcast: self.broadcast,
|
||||||
|
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE),
|
||||||
|
traffic: self.traffic.clone(),
|
||||||
|
peer_crypto: self.peer_crypto.clone(),
|
||||||
|
table: self.table.clone(),
|
||||||
|
socket: self.socket.try_clone().map_err(|e| Error::SocketIo("Failed to clone socket", e))?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn sync(&mut self) -> Result<(), Error> {
|
||||||
|
self.peer_crypto.load();
|
||||||
|
self.table.sync();
|
||||||
|
self.traffic.sync();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_address(&self) -> Result<SocketAddr, io::Error> {
|
||||||
|
self.socket.address().map(mapped_addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send_raw(&mut self, data: &[u8], addr: SocketAddr) -> Result<(), Error> {
|
||||||
|
match self.socket.send(data, addr) {
|
||||||
|
Ok(written) if written == data.len() => Ok(()),
|
||||||
|
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
||||||
|
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> {
|
||||||
|
self.socket.receive(buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn send_to(&mut self, addr: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
|
||||||
|
let size = data.len();
|
||||||
|
debug!("Sending msg with {} bytes to {}", size, addr);
|
||||||
|
self.traffic.count_out_traffic(addr, size);
|
||||||
|
match self.socket.send(data.message(), addr) {
|
||||||
|
Ok(written) if written == size => Ok(()),
|
||||||
|
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
||||||
|
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn send_msg(&mut self, addr: SocketAddr, type_: u8, data: &mut MsgBuffer) -> Result<(), Error> {
|
||||||
|
debug!("Sending msg with {} bytes to {}", data.len(), addr);
|
||||||
|
data.prepend_byte(type_);
|
||||||
|
self.peer_crypto.encrypt_for(addr, data)?;
|
||||||
|
self.send_to(addr, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn broadcast_msg(&mut self, type_: u8, data: &mut MsgBuffer) -> Result<(), Error> {
|
||||||
|
let size = data.len();
|
||||||
|
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, size, self.peer_crypto.count());
|
||||||
|
let traffic = &mut self.traffic;
|
||||||
|
let socket = &mut self.socket;
|
||||||
|
let peers = self.peer_crypto.get_snapshot();
|
||||||
|
for (addr, crypto) in peers {
|
||||||
|
self.broadcast_buffer.set_start(data.get_start());
|
||||||
|
self.broadcast_buffer.set_length(data.len());
|
||||||
|
self.broadcast_buffer.message_mut().clone_from_slice(data.message());
|
||||||
|
self.broadcast_buffer.prepend_byte(type_);
|
||||||
|
if let Some(crypto) = crypto {
|
||||||
|
crypto.encrypt(&mut self.broadcast_buffer);
|
||||||
|
}
|
||||||
|
traffic.count_out_traffic(*addr, self.broadcast_buffer.len());
|
||||||
|
match socket.send(self.broadcast_buffer.message(), *addr) {
|
||||||
|
Ok(written) if written == self.broadcast_buffer.len() => Ok(()),
|
||||||
|
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
||||||
|
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
|
||||||
|
}?
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn forward_packet(&mut self, data: &mut MsgBuffer) -> Result<(), Error> {
|
||||||
|
let (src, dst) = P::parse(data.message())?;
|
||||||
|
debug!("Read data from interface: src: {}, dst: {}, {} bytes", src, dst, data.len());
|
||||||
|
self.traffic.count_out_payload(dst.clone(), src, data.len());
|
||||||
|
match self.table.lookup(&dst) {
|
||||||
|
Some(addr) => {
|
||||||
|
// Peer found for destination
|
||||||
|
debug!("Found destination for {} => {}", dst, addr);
|
||||||
|
self.send_msg(addr, MESSAGE_TYPE_DATA, data)?;
|
||||||
|
}
|
||||||
|
//TODO: VIA: find relay peer and relay message
|
||||||
|
None => {
|
||||||
|
if self.broadcast {
|
||||||
|
debug!("No destination for {} found, broadcasting", dst);
|
||||||
|
self.broadcast_msg(MESSAGE_TYPE_DATA, data)?;
|
||||||
|
} else {
|
||||||
|
debug!("No destination for {} found, dropping", dst);
|
||||||
|
self.traffic.count_dropped_payload(data.len());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_peer(&mut self, addr: SocketAddr, peer: &PeerData) {
|
||||||
|
self.peer_crypto.add(addr, peer.crypto.get_core());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_peer(&mut self, addr: &SocketAddr) {
|
||||||
|
self.table.remove_claims(*addr);
|
||||||
|
self.peer_crypto.remove(addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,139 +1,43 @@
|
||||||
use super::{
|
use super::{common::SPACE_BEFORE, coms::Coms};
|
||||||
common::SPACE_BEFORE,
|
|
||||||
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
|
|
||||||
};
|
|
||||||
use crate::{
|
use crate::{
|
||||||
config::Config,
|
|
||||||
device::Device,
|
device::Device,
|
||||||
error::Error,
|
error::Error,
|
||||||
messages::MESSAGE_TYPE_DATA,
|
|
||||||
net::Socket,
|
net::Socket,
|
||||||
util::{MsgBuffer, Time, TimeSource},
|
util::{MsgBuffer, Time, TimeSource},
|
||||||
Protocol,
|
Protocol,
|
||||||
};
|
};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{marker::PhantomData, net::SocketAddr};
|
|
||||||
|
|
||||||
pub struct DeviceThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
|
pub struct DeviceThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
|
||||||
// Read-only fields
|
|
||||||
_dummy_ts: PhantomData<TS>,
|
|
||||||
_dummy_p: PhantomData<P>,
|
|
||||||
broadcast: bool,
|
|
||||||
// Device-only fields
|
// Device-only fields
|
||||||
socket: S,
|
coms: Coms<S, TS, P>,
|
||||||
pub device: D,
|
pub device: D,
|
||||||
next_housekeep: Time,
|
next_housekeep: Time,
|
||||||
buffer: MsgBuffer,
|
buffer: MsgBuffer,
|
||||||
broadcast_buffer: MsgBuffer,
|
|
||||||
// Shared fields
|
// Shared fields
|
||||||
traffic: SharedTraffic,
|
|
||||||
peer_crypto: SharedPeerCrypto,
|
|
||||||
table: SharedTable<TS>,
|
|
||||||
running: Arc<AtomicBool>,
|
running: Arc<AtomicBool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS> {
|
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS> {
|
||||||
pub fn new(
|
pub fn new(device: D, coms: Coms<S, TS, P>, running: Arc<AtomicBool>) -> Self {
|
||||||
config: Config, device: D, socket: S, traffic: SharedTraffic, peer_crypto: SharedPeerCrypto,
|
|
||||||
table: SharedTable<TS>, running: Arc<AtomicBool>,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
Self {
|
||||||
_dummy_ts: PhantomData,
|
|
||||||
_dummy_p: PhantomData,
|
|
||||||
broadcast: config.is_broadcasting(),
|
|
||||||
socket,
|
|
||||||
device,
|
device,
|
||||||
next_housekeep: TS::now(),
|
next_housekeep: TS::now(),
|
||||||
traffic,
|
|
||||||
peer_crypto,
|
|
||||||
table,
|
|
||||||
buffer: MsgBuffer::new(SPACE_BEFORE),
|
buffer: MsgBuffer::new(SPACE_BEFORE),
|
||||||
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE),
|
|
||||||
running,
|
running,
|
||||||
|
coms,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn send_to(&mut self, addr: SocketAddr) -> Result<(), Error> {
|
|
||||||
let size = self.buffer.len();
|
|
||||||
debug!("Sending msg with {} bytes to {}", size, addr);
|
|
||||||
self.traffic.count_out_traffic(addr, size);
|
|
||||||
match self.socket.send(self.buffer.message(), addr) {
|
|
||||||
Ok(written) if written == size => Ok(()),
|
|
||||||
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
|
||||||
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn send_msg(&mut self, addr: SocketAddr, type_: u8) -> Result<(), Error> {
|
|
||||||
debug!("Sending msg with {} bytes to {}", self.buffer.len(), addr);
|
|
||||||
self.buffer.prepend_byte(type_);
|
|
||||||
self.peer_crypto.encrypt_for(addr, &mut self.buffer)?;
|
|
||||||
self.send_to(addr)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn broadcast_msg(&mut self, type_: u8) -> Result<(), Error> {
|
|
||||||
let size = self.buffer.len();
|
|
||||||
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, size, self.peer_crypto.count());
|
|
||||||
let traffic = &mut self.traffic;
|
|
||||||
let socket = &mut self.socket;
|
|
||||||
let peers = self.peer_crypto.get_snapshot();
|
|
||||||
for (addr, crypto) in peers {
|
|
||||||
self.broadcast_buffer.set_start(self.buffer.get_start());
|
|
||||||
self.broadcast_buffer.set_length(self.buffer.len());
|
|
||||||
self.broadcast_buffer.message_mut().clone_from_slice(self.buffer.message());
|
|
||||||
self.broadcast_buffer.prepend_byte(type_);
|
|
||||||
if let Some(crypto) = crypto {
|
|
||||||
crypto.encrypt(&mut self.broadcast_buffer);
|
|
||||||
}
|
|
||||||
traffic.count_out_traffic(*addr, self.broadcast_buffer.len());
|
|
||||||
match socket.send(self.broadcast_buffer.message(), *addr) {
|
|
||||||
Ok(written) if written == self.broadcast_buffer.len() => Ok(()),
|
|
||||||
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
|
||||||
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
|
|
||||||
}?
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn forward_packet(&mut self) -> Result<(), Error> {
|
|
||||||
let (src, dst) = P::parse(self.buffer.message())?;
|
|
||||||
debug!("Read data from interface: src: {}, dst: {}, {} bytes", src, dst, self.buffer.len());
|
|
||||||
self.traffic.count_out_payload(dst.clone(), src, self.buffer.len());
|
|
||||||
match self.table.lookup(&dst) {
|
|
||||||
Some(addr) => {
|
|
||||||
// Peer found for destination
|
|
||||||
debug!("Found destination for {} => {}", dst, addr);
|
|
||||||
self.send_msg(addr, MESSAGE_TYPE_DATA)?;
|
|
||||||
}
|
|
||||||
//TODO: VIA: find relay peer and relay message
|
|
||||||
None => {
|
|
||||||
if self.broadcast {
|
|
||||||
debug!("No destination for {} found, broadcasting", dst);
|
|
||||||
self.broadcast_msg(MESSAGE_TYPE_DATA)?;
|
|
||||||
} else {
|
|
||||||
debug!("No destination for {} found, dropping", dst);
|
|
||||||
self.traffic.count_dropped_payload(self.buffer.len());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn housekeep(&mut self) -> Result<(), Error> {
|
pub fn housekeep(&mut self) -> Result<(), Error> {
|
||||||
self.peer_crypto.load();
|
self.coms.sync()
|
||||||
self.table.sync();
|
|
||||||
self.traffic.sync();
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn iteration(&mut self) -> bool {
|
pub fn iteration(&mut self) -> bool {
|
||||||
if self.device.read(&mut self.buffer).is_ok() {
|
if self.device.read(&mut self.buffer).is_ok() {
|
||||||
//try_fail!(result, "Failed to read from device: {}");
|
//try_fail!(result, "Failed to read from device: {}");
|
||||||
if let Err(e) = self.forward_packet() {
|
if let Err(e) = self.coms.forward_packet(&mut self.buffer) {
|
||||||
error!("{}", e);
|
error!("{}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,4 +5,5 @@
|
||||||
mod device_thread;
|
mod device_thread;
|
||||||
mod shared;
|
mod shared;
|
||||||
mod socket_thread;
|
mod socket_thread;
|
||||||
|
pub mod coms;
|
||||||
pub mod common;
|
pub mod common;
|
|
@ -49,6 +49,18 @@ impl SharedPeerCrypto {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn add(&mut self, addr: SocketAddr, crypto: Option<Arc<CryptoCore>>) {
|
||||||
|
self.cache.insert(addr, crypto.clone());
|
||||||
|
let mut peers = self.peers.lock();
|
||||||
|
peers.insert(addr, crypto);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove(&mut self, addr: &SocketAddr) {
|
||||||
|
self.cache.remove(addr);
|
||||||
|
let mut peers = self.peers.lock();
|
||||||
|
peers.remove(addr);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn store(&mut self, data: &HashMap<SocketAddr, PeerData, Hash>) {
|
pub fn store(&mut self, data: &HashMap<SocketAddr, PeerData, Hash>) {
|
||||||
self.cache.clear();
|
self.cache.clear();
|
||||||
self.cache.extend(data.iter().map(|(k, v)| (*k, v.crypto.get_core())));
|
self.cache.extend(data.iter().map(|(k, v)| (*k, v.crypto.get_core())));
|
||||||
|
@ -138,6 +150,12 @@ impl SharedTraffic {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Default for SharedTraffic {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct SharedTable<TS: TimeSource> {
|
pub struct SharedTable<TS: TimeSource> {
|
||||||
table: Arc<Mutex<ClaimTable<TS>>>,
|
table: Arc<Mutex<ClaimTable<TS>>>,
|
||||||
|
|
|
@ -1,7 +1,4 @@
|
||||||
use super::{
|
use super::{common::SPACE_BEFORE, coms::Coms};
|
||||||
common::SPACE_BEFORE,
|
|
||||||
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
|
|
||||||
};
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
beacon::BeaconSerializer,
|
beacon::BeaconSerializer,
|
||||||
|
@ -62,7 +59,7 @@ pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
|
||||||
_dummy_ts: PhantomData<TS>,
|
_dummy_ts: PhantomData<TS>,
|
||||||
_dummy_p: PhantomData<P>,
|
_dummy_p: PhantomData<P>,
|
||||||
// Socket-only fields
|
// Socket-only fields
|
||||||
pub socket: S,
|
pub coms: Coms<S, TS, P>,
|
||||||
device: D,
|
device: D,
|
||||||
next_housekeep: Time,
|
next_housekeep: Time,
|
||||||
pub own_addresses: AddrList,
|
pub own_addresses: AddrList,
|
||||||
|
@ -78,11 +75,8 @@ pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
|
||||||
statsd_server: Option<String>,
|
statsd_server: Option<String>,
|
||||||
pub reconnect_peers: SmallVec<[ReconnectEntry; 3]>,
|
pub reconnect_peers: SmallVec<[ReconnectEntry; 3]>,
|
||||||
buffer: MsgBuffer,
|
buffer: MsgBuffer,
|
||||||
broadcast_buffer: MsgBuffer,
|
|
||||||
// Shared fields
|
// Shared fields
|
||||||
peer_crypto: SharedPeerCrypto,
|
//table: SharedTable<TS>,
|
||||||
traffic: SharedTraffic,
|
|
||||||
table: SharedTable<TS>,
|
|
||||||
running: Arc<AtomicBool>,
|
running: Arc<AtomicBool>,
|
||||||
// Should not be here
|
// Should not be here
|
||||||
port_forwarding: Option<PortForwarding>, // TODO: 3rd thread
|
port_forwarding: Option<PortForwarding>, // TODO: 3rd thread
|
||||||
|
@ -91,9 +85,8 @@ pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
|
||||||
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS> {
|
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS> {
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub fn new(
|
pub fn new(
|
||||||
config: Config, device: D, socket: S, traffic: SharedTraffic, peer_crypto: SharedPeerCrypto,
|
config: Config, coms: Coms<S, TS, P>, device: D,
|
||||||
table: SharedTable<TS>, port_forwarding: Option<PortForwarding>, stats_file: Option<File>,
|
port_forwarding: Option<PortForwarding>, stats_file: Option<File>, running: Arc<AtomicBool>,
|
||||||
running: Arc<AtomicBool>,
|
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let mut claims = SmallVec::with_capacity(config.claims.len());
|
let mut claims = SmallVec::with_capacity(config.claims.len());
|
||||||
for s in &config.claims {
|
for s in &config.claims {
|
||||||
|
@ -122,10 +115,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
node_id,
|
node_id,
|
||||||
claims,
|
claims,
|
||||||
device,
|
device,
|
||||||
socket,
|
coms,
|
||||||
peer_crypto,
|
|
||||||
traffic,
|
|
||||||
table,
|
|
||||||
learning: config.is_learning(),
|
learning: config.is_learning(),
|
||||||
next_housekeep: now,
|
next_housekeep: now,
|
||||||
next_beacon: now,
|
next_beacon: now,
|
||||||
|
@ -145,46 +135,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
crypto: Crypto::new(node_id, &config.crypto).unwrap(),
|
crypto: Crypto::new(node_id, &config.crypto).unwrap(),
|
||||||
config,
|
config,
|
||||||
buffer: MsgBuffer::new(SPACE_BEFORE),
|
buffer: MsgBuffer::new(SPACE_BEFORE),
|
||||||
broadcast_buffer: MsgBuffer::new(SPACE_BEFORE),
|
|
||||||
running,
|
running,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn send_to(&mut self, addr: SocketAddr) -> Result<(), Error> {
|
|
||||||
let size = self.buffer.len();
|
|
||||||
debug!("Sending msg with {} bytes to {}", size, addr);
|
|
||||||
self.traffic.count_out_traffic(addr, size);
|
|
||||||
match self.socket.send(self.buffer.message(), addr) {
|
|
||||||
Ok(written) if written == size => {
|
|
||||||
self.buffer.clear();
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
|
||||||
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn broadcast_msg(&mut self, type_: u8) -> Result<(), Error> {
|
|
||||||
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, self.buffer.len(), self.peers.len());
|
|
||||||
for (addr, peer) in &mut self.peers {
|
|
||||||
self.broadcast_buffer.set_start(self.buffer.get_start());
|
|
||||||
self.broadcast_buffer.set_length(self.buffer.len());
|
|
||||||
self.broadcast_buffer.message_mut().clone_from_slice(self.buffer.message());
|
|
||||||
self.broadcast_buffer.prepend_byte(type_);
|
|
||||||
peer.crypto.encrypt_message(&mut self.broadcast_buffer);
|
|
||||||
self.traffic.count_out_traffic(*addr, self.broadcast_buffer.len());
|
|
||||||
match self.socket.send(self.broadcast_buffer.message(), *addr) {
|
|
||||||
Ok(written) if written == self.broadcast_buffer.len() => Ok(()),
|
|
||||||
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
|
||||||
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
|
|
||||||
}?
|
|
||||||
}
|
|
||||||
self.buffer.clear();
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn connect_sock(&mut self, addr: SocketAddr) -> Result<(), Error> {
|
fn connect_sock(&mut self, addr: SocketAddr) -> Result<(), Error> {
|
||||||
let addr = mapped_addr(addr);
|
let addr = mapped_addr(addr);
|
||||||
if self.peers.contains_key(&addr)
|
if self.peers.contains_key(&addr)
|
||||||
|
@ -198,7 +152,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
let mut init = self.crypto.peer_instance(payload);
|
let mut init = self.crypto.peer_instance(payload);
|
||||||
init.send_ping(&mut self.buffer);
|
init.send_ping(&mut self.buffer);
|
||||||
self.pending_inits.insert(addr, init);
|
self.pending_inits.insert(addr, init);
|
||||||
self.send_to(addr)
|
self.coms.send_to(addr, &mut self.buffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn connect<Addr: ToSocketAddrs + fmt::Debug + Clone>(&mut self, addr: Addr) -> Result<(), Error> {
|
pub fn connect<Addr: ToSocketAddrs + fmt::Debug + Clone>(&mut self, addr: Addr) -> Result<(), Error> {
|
||||||
|
@ -258,7 +212,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
}
|
}
|
||||||
if let Some(info) = info {
|
if let Some(info) = info {
|
||||||
debug!("Adding claims of peer {}: {:?}", addr_nice(addr), info.claims);
|
debug!("Adding claims of peer {}: {:?}", addr_nice(addr), info.claims);
|
||||||
self.table.set_claims(addr, info.claims);
|
self.coms.table.set_claims(addr, info.claims);
|
||||||
debug!("Received {} peers from {}: {:?}", info.peers.len(), addr_nice(addr), info.peers);
|
debug!("Received {} peers from {}: {:?}", info.peers.len(), addr_nice(addr), info.peers);
|
||||||
self.connect_to_peers(&info.peers)?;
|
self.connect_to_peers(&info.peers)?;
|
||||||
}
|
}
|
||||||
|
@ -270,22 +224,20 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
if let Some(init) = self.pending_inits.remove(&addr) {
|
if let Some(init) = self.pending_inits.remove(&addr) {
|
||||||
self.buffer.clear();
|
self.buffer.clear();
|
||||||
let crypto = init.finish(&mut self.buffer);
|
let crypto = init.finish(&mut self.buffer);
|
||||||
self.peers.insert(
|
let peer_data = PeerData {
|
||||||
addr,
|
|
||||||
PeerData {
|
|
||||||
addrs: info.addrs.clone(),
|
addrs: info.addrs.clone(),
|
||||||
crypto,
|
crypto,
|
||||||
node_id: info.node_id,
|
node_id: info.node_id,
|
||||||
peer_timeout: info.peer_timeout.unwrap_or(DEFAULT_PEER_TIMEOUT),
|
peer_timeout: info.peer_timeout.unwrap_or(DEFAULT_PEER_TIMEOUT),
|
||||||
last_seen: TS::now(),
|
last_seen: TS::now(),
|
||||||
timeout: TS::now() + self.config.peer_timeout as Time,
|
timeout: TS::now() + self.config.peer_timeout as Time,
|
||||||
},
|
};
|
||||||
);
|
self.coms.add_peer(addr, &peer_data);
|
||||||
|
self.peers.insert(addr, peer_data);
|
||||||
self.update_peer_info(addr, Some(info))?;
|
self.update_peer_info(addr, Some(info))?;
|
||||||
if !self.buffer.is_empty() {
|
if !self.buffer.is_empty() {
|
||||||
self.send_to(addr)?;
|
self.coms.send_to(addr, &mut self.buffer)?;
|
||||||
}
|
}
|
||||||
self.peer_crypto.store(&self.peers);
|
|
||||||
} else {
|
} else {
|
||||||
error!("No init for new peer {}", addr_nice(addr));
|
error!("No init for new peer {}", addr_nice(addr));
|
||||||
}
|
}
|
||||||
|
@ -323,8 +275,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
fn remove_peer(&mut self, addr: SocketAddr) {
|
fn remove_peer(&mut self, addr: SocketAddr) {
|
||||||
if let Some(_peer) = self.peers.remove(&addr) {
|
if let Some(_peer) = self.peers.remove(&addr) {
|
||||||
info!("Closing connection to {}", addr_nice(addr));
|
info!("Closing connection to {}", addr_nice(addr));
|
||||||
self.table.remove_claims(addr);
|
self.coms.remove_peer(&addr);
|
||||||
self.peer_crypto.store(&self.peers);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -332,7 +283,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
let (src, dst) = P::parse(self.buffer.message())?;
|
let (src, dst) = P::parse(self.buffer.message())?;
|
||||||
let len = self.buffer.len();
|
let len = self.buffer.len();
|
||||||
debug!("Writing data to device: {} bytes", len);
|
debug!("Writing data to device: {} bytes", len);
|
||||||
self.traffic.count_in_payload(src.clone(), dst, len);
|
self.coms.traffic.count_in_payload(src.clone(), dst, len);
|
||||||
if let Err(e) = self.device.write(&mut self.buffer) {
|
if let Err(e) = self.device.write(&mut self.buffer) {
|
||||||
error!("Failed to send via device: {}", e);
|
error!("Failed to send via device: {}", e);
|
||||||
return Err(e);
|
return Err(e);
|
||||||
|
@ -340,7 +291,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
self.buffer.clear();
|
self.buffer.clear();
|
||||||
if self.learning {
|
if self.learning {
|
||||||
// Learn single address
|
// Learn single address
|
||||||
self.table.cache(&src, peer);
|
self.coms.table.cache(&src, peer);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -354,7 +305,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
let info = match NodeInfo::decode(Cursor::new(self.buffer.message())) {
|
let info = match NodeInfo::decode(Cursor::new(self.buffer.message())) {
|
||||||
Ok(val) => val,
|
Ok(val) => val,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
self.traffic.count_invalid_protocol(self.buffer.len());
|
self.coms.traffic.count_invalid_protocol(self.buffer.len());
|
||||||
return Err(err);
|
return Err(err);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -371,11 +322,11 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
self.buffer.clear();
|
self.buffer.clear();
|
||||||
self.traffic.count_invalid_protocol(self.buffer.len());
|
self.coms.traffic.count_invalid_protocol(self.buffer.len());
|
||||||
return Err(Error::Message("Unknown message type"));
|
return Err(Error::Message("Unknown message type"));
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
MessageResult::Reply => self.send_to(src)?,
|
MessageResult::Reply => self.coms.send_to(src, &mut self.buffer)?,
|
||||||
MessageResult::None => {
|
MessageResult::None => {
|
||||||
self.buffer.clear();
|
self.buffer.clear();
|
||||||
}
|
}
|
||||||
|
@ -387,7 +338,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
let src = mapped_addr(src);
|
let src = mapped_addr(src);
|
||||||
debug!("Received {} bytes from {}", self.buffer.len(), src);
|
debug!("Received {} bytes from {}", self.buffer.len(), src);
|
||||||
let buffer = &mut self.buffer;
|
let buffer = &mut self.buffer;
|
||||||
self.traffic.count_in_traffic(src, buffer.len());
|
self.coms.traffic.count_in_traffic(src, buffer.len());
|
||||||
if let Some(result) = self.peers.get_mut(&src).map(|peer| peer.crypto.handle_message(buffer)) {
|
if let Some(result) = self.peers.get_mut(&src).map(|peer| peer.crypto.handle_message(buffer)) {
|
||||||
return self.process_message(src, result?);
|
return self.process_message(src, result?);
|
||||||
}
|
}
|
||||||
|
@ -402,7 +353,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
}
|
}
|
||||||
}) {
|
}) {
|
||||||
if !buffer.is_empty() {
|
if !buffer.is_empty() {
|
||||||
self.send_to(src)?
|
self.coms.send_to(src, &mut self.buffer)?
|
||||||
}
|
}
|
||||||
if let InitResult::Success { peer_payload, .. } = result? {
|
if let InitResult::Success { peer_payload, .. } = result? {
|
||||||
self.add_new_peer(src, peer_payload)?
|
self.add_new_peer(src, peer_payload)?
|
||||||
|
@ -411,7 +362,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
}
|
}
|
||||||
if !is_init_message(self.buffer.message()) {
|
if !is_init_message(self.buffer.message()) {
|
||||||
info!("Ignoring non-init message from unknown peer {}", addr_nice(src));
|
info!("Ignoring non-init message from unknown peer {}", addr_nice(src));
|
||||||
self.traffic.count_invalid_protocol(self.buffer.len());
|
self.coms.traffic.count_invalid_protocol(self.buffer.len());
|
||||||
self.buffer.clear();
|
self.buffer.clear();
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
@ -420,10 +371,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
match msg_result {
|
match msg_result {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
self.pending_inits.insert(src, init);
|
self.pending_inits.insert(src, init);
|
||||||
self.send_to(src)
|
self.coms.send_to(src, &mut self.buffer)
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
self.traffic.count_invalid_protocol(self.buffer.len());
|
self.coms.traffic.count_invalid_protocol(self.buffer.len());
|
||||||
Err(err)
|
Err(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -440,10 +391,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
for addr in del {
|
for addr in del {
|
||||||
info!("Forgot peer {} due to timeout", addr_nice(addr));
|
info!("Forgot peer {} due to timeout", addr_nice(addr));
|
||||||
self.peers.remove(&addr);
|
self.peers.remove(&addr);
|
||||||
self.table.remove_claims(addr);
|
self.coms.remove_peer(&addr);
|
||||||
self.connect_sock(addr)?; // Try to reconnect
|
self.connect_sock(addr)?; // Try to reconnect
|
||||||
}
|
}
|
||||||
self.table.housekeep();
|
self.coms.table.housekeep();
|
||||||
self.crypto_housekeep()?;
|
self.crypto_housekeep()?;
|
||||||
// Periodically extend the port-forwarding
|
// Periodically extend the port-forwarding
|
||||||
//TODO: extra thread
|
//TODO: extra thread
|
||||||
|
@ -456,7 +407,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
debug!("Send peer list to all peers");
|
debug!("Send peer list to all peers");
|
||||||
let info = self.create_node_info();
|
let info = self.create_node_info();
|
||||||
info.encode(&mut self.buffer);
|
info.encode(&mut self.buffer);
|
||||||
self.broadcast_msg(MESSAGE_TYPE_NODE_INFO)?;
|
self.coms.broadcast_msg(MESSAGE_TYPE_NODE_INFO, &mut self.buffer)?;
|
||||||
// Reschedule for next update
|
// Reschedule for next update
|
||||||
let min_peer_timeout = self.peers.iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT);
|
let min_peer_timeout = self.peers.iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT);
|
||||||
let interval = min(self.update_freq as u16, max(min_peer_timeout / 2 - 60, 1));
|
let interval = min(self.update_freq as u16, max(min_peer_timeout / 2 - 60, 1));
|
||||||
|
@ -470,7 +421,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
//TODO: extra thread
|
//TODO: extra thread
|
||||||
self.send_stats_to_statsd()?;
|
self.send_stats_to_statsd()?;
|
||||||
self.next_stats_out = now + STATS_INTERVAL;
|
self.next_stats_out = now + STATS_INTERVAL;
|
||||||
self.traffic.period(Some(5));
|
self.coms.traffic.period(Some(5));
|
||||||
}
|
}
|
||||||
if let Some(peers) = self.beacon_serializer.get_cmd_results() {
|
if let Some(peers) = self.beacon_serializer.get_cmd_results() {
|
||||||
debug!("Loaded beacon with peers: {:?}", peers);
|
debug!("Loaded beacon with peers: {:?}", peers);
|
||||||
|
@ -483,9 +434,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
self.load_beacon()?;
|
self.load_beacon()?;
|
||||||
self.next_beacon = now + Time::from(self.config.beacon_interval);
|
self.next_beacon = now + Time::from(self.config.beacon_interval);
|
||||||
}
|
}
|
||||||
self.table.sync();
|
self.coms.sync()?;
|
||||||
self.traffic.sync();
|
|
||||||
self.peer_crypto.store(&self.peers);
|
|
||||||
// Periodically reset own peers
|
// Periodically reset own peers
|
||||||
if self.next_own_address_reset <= now {
|
if self.next_own_address_reset <= now {
|
||||||
self.reset_own_addresses().map_err(|err| Error::SocketIo("Failed to get own addresses", err))?;
|
self.reset_own_addresses().map_err(|err| Error::SocketIo("Failed to get own addresses", err))?;
|
||||||
|
@ -502,14 +451,14 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
if self.pending_inits.get_mut(&addr).unwrap().every_second(&mut self.buffer).is_err() {
|
if self.pending_inits.get_mut(&addr).unwrap().every_second(&mut self.buffer).is_err() {
|
||||||
del.push(addr)
|
del.push(addr)
|
||||||
} else if !self.buffer.is_empty() {
|
} else if !self.buffer.is_empty() {
|
||||||
self.send_to(addr)?
|
self.coms.send_to(addr, &mut self.buffer)?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for addr in self.peers.keys().copied().collect::<SmallVec<[SocketAddr; 16]>>() {
|
for addr in self.peers.keys().copied().collect::<SmallVec<[SocketAddr; 16]>>() {
|
||||||
self.buffer.clear();
|
self.buffer.clear();
|
||||||
self.peers.get_mut(&addr).unwrap().crypto.every_second(&mut self.buffer);
|
self.peers.get_mut(&addr).unwrap().crypto.every_second(&mut self.buffer);
|
||||||
if !self.buffer.is_empty() {
|
if !self.buffer.is_empty() {
|
||||||
self.send_to(addr)?
|
self.coms.send_to(addr, &mut self.buffer)?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for addr in del {
|
for addr in del {
|
||||||
|
@ -523,7 +472,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
|
|
||||||
fn reset_own_addresses(&mut self) -> io::Result<()> {
|
fn reset_own_addresses(&mut self) -> io::Result<()> {
|
||||||
self.own_addresses.clear();
|
self.own_addresses.clear();
|
||||||
let socket_addr = self.socket.address().map(mapped_addr)?;
|
let socket_addr = self.coms.get_address()?;
|
||||||
// 1) Specified advertise addresses
|
// 1) Specified advertise addresses
|
||||||
for addr in &self.config.advertise_addresses {
|
for addr in &self.config.advertise_addresses {
|
||||||
self.own_addresses.push(parse_listen(addr, socket_addr.port()));
|
self.own_addresses.push(parse_listen(addr, socket_addr.port()));
|
||||||
|
@ -601,9 +550,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
writeln!(f)?;
|
writeln!(f)?;
|
||||||
self.table.write_out(f)?;
|
self.coms.table.write_out(f)?;
|
||||||
writeln!(f)?;
|
writeln!(f)?;
|
||||||
self.traffic.write_out(f)?;
|
self.coms.traffic.write_out(f)?;
|
||||||
writeln!(f)?;
|
writeln!(f)?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -612,15 +561,15 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
/// Sends the statistics to a statsd endpoint
|
/// Sends the statistics to a statsd endpoint
|
||||||
fn send_stats_to_statsd(&mut self) -> Result<(), Error> {
|
fn send_stats_to_statsd(&mut self) -> Result<(), Error> {
|
||||||
if let Some(ref endpoint) = self.statsd_server {
|
if let Some(ref endpoint) = self.statsd_server {
|
||||||
let peer_traffic = self.traffic.total_peer_traffic();
|
let peer_traffic = self.coms.traffic.total_peer_traffic();
|
||||||
let payload_traffic = self.traffic.total_payload_traffic();
|
let payload_traffic = self.coms.traffic.total_payload_traffic();
|
||||||
let dropped = &self.traffic.dropped();
|
let dropped = &self.coms.traffic.dropped();
|
||||||
let prefix = self.config.statsd_prefix.as_ref().map(|s| s as &str).unwrap_or("vpncloud");
|
let prefix = self.config.statsd_prefix.as_ref().map(|s| s as &str).unwrap_or("vpncloud");
|
||||||
let msg = StatsdMsg::new()
|
let msg = StatsdMsg::new()
|
||||||
.with_ns(prefix, |msg| {
|
.with_ns(prefix, |msg| {
|
||||||
msg.add("peer_count", self.peers.len(), "g");
|
msg.add("peer_count", self.peers.len(), "g");
|
||||||
msg.add("table_cache_entries", self.table.cache_len(), "g");
|
msg.add("table_cache_entries", self.coms.table.cache_len(), "g");
|
||||||
msg.add("table_claims", self.table.claim_len(), "g");
|
msg.add("table_claims", self.coms.table.claim_len(), "g");
|
||||||
msg.with_ns("traffic", |msg| {
|
msg.with_ns("traffic", |msg| {
|
||||||
msg.with_ns("protocol", |msg| {
|
msg.with_ns("protocol", |msg| {
|
||||||
msg.with_ns("inbound", |msg| {
|
msg.with_ns("inbound", |msg| {
|
||||||
|
@ -653,14 +602,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
});
|
});
|
||||||
})
|
})
|
||||||
.build();
|
.build();
|
||||||
let msg_data = msg.as_bytes();
|
|
||||||
let addrs = resolve(endpoint)?;
|
let addrs = resolve(endpoint)?;
|
||||||
if let Some(addr) = addrs.first() {
|
if let Some(addr) = addrs.first() {
|
||||||
match self.socket.send(msg_data, *addr) {
|
self.coms.send_raw(msg.as_bytes(), *addr)?;
|
||||||
Ok(written) if written == msg_data.len() => Ok(()),
|
|
||||||
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
|
||||||
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
|
|
||||||
}?
|
|
||||||
} else {
|
} else {
|
||||||
error!("Failed to resolve statsd server {}", endpoint);
|
error!("Failed to resolve statsd server {}", endpoint);
|
||||||
}
|
}
|
||||||
|
@ -722,8 +666,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn iteration(&mut self) -> bool {
|
pub fn iteration(&mut self) -> bool {
|
||||||
if let Ok(src) = self.socket.receive(&mut self.buffer)
|
if let Ok(src) = self.coms.receive(&mut self.buffer) {
|
||||||
{
|
|
||||||
match self.handle_message(src) {
|
match self.handle_message(src) {
|
||||||
Err(e @ Error::CryptoInitFatal(_)) => {
|
Err(e @ Error::CryptoInitFatal(_)) => {
|
||||||
debug!("Fatal crypto init error from {}: {}", src, e);
|
debug!("Fatal crypto init error from {}: {}", src, e);
|
||||||
|
|
Loading…
Reference in New Issue