mirror of https://github.com/dswd/vpncloud.git
Changes
This commit is contained in:
parent
c9a0cc85ab
commit
d118b96e03
|
@ -77,7 +77,7 @@ impl FromStr for Type {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait Device: AsRawFd + Clone {
|
pub trait Device: AsRawFd + Clone + Send + 'static {
|
||||||
/// Returns the type of this device
|
/// Returns the type of this device
|
||||||
fn get_type(&self) -> Type;
|
fn get_type(&self) -> Type;
|
||||||
|
|
||||||
|
|
|
@ -6,54 +6,31 @@ mod device_thread;
|
||||||
mod shared;
|
mod shared;
|
||||||
mod socket_thread;
|
mod socket_thread;
|
||||||
|
|
||||||
use std::{
|
use std::{fs::File, hash::BuildHasherDefault, thread};
|
||||||
cmp::{max, min},
|
|
||||||
collections::HashMap,
|
|
||||||
fmt,
|
|
||||||
fs::{self, File},
|
|
||||||
hash::BuildHasherDefault,
|
|
||||||
io::{self, Cursor, Seek, SeekFrom, Write},
|
|
||||||
marker::PhantomData,
|
|
||||||
net::{SocketAddr, ToSocketAddrs},
|
|
||||||
path::Path,
|
|
||||||
str::FromStr,
|
|
||||||
thread
|
|
||||||
};
|
|
||||||
|
|
||||||
use fnv::FnvHasher;
|
use fnv::FnvHasher;
|
||||||
use rand::{random, seq::SliceRandom, thread_rng};
|
|
||||||
use smallvec::{smallvec, SmallVec};
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
beacon::BeaconSerializer,
|
config::Config,
|
||||||
config::{Config, DEFAULT_PEER_TIMEOUT, DEFAULT_PORT},
|
crypto::PeerCrypto,
|
||||||
crypto::{is_init_message, Crypto, InitResult, InitState, MessageResult, PeerCrypto},
|
device::Device,
|
||||||
device::{Device, Type},
|
|
||||||
engine::{
|
engine::{
|
||||||
device_thread::DeviceThread,
|
device_thread::DeviceThread,
|
||||||
shared::{SharedPeerCrypto, SharedTable, SharedTraffic}
|
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
|
||||||
|
socket_thread::SocketThread
|
||||||
},
|
},
|
||||||
error::Error,
|
error::Error,
|
||||||
messages::{
|
messages::AddrList,
|
||||||
AddrList, NodeInfo, PeerInfo, MESSAGE_TYPE_CLOSE, MESSAGE_TYPE_DATA, MESSAGE_TYPE_KEEPALIVE,
|
net::Socket,
|
||||||
MESSAGE_TYPE_NODE_INFO
|
|
||||||
},
|
|
||||||
net::{mapped_addr, Socket},
|
|
||||||
payload::Protocol,
|
payload::Protocol,
|
||||||
poll::{WaitImpl, WaitResult},
|
|
||||||
port_forwarding::PortForwarding,
|
port_forwarding::PortForwarding,
|
||||||
table::ClaimTable,
|
types::NodeId,
|
||||||
traffic::TrafficStats,
|
util::{addr_nice, resolve, CtrlC, Time, TimeSource}
|
||||||
types::{Address, Mode, NodeId, Range, RangeList},
|
|
||||||
util::{addr_nice, bytes_to_hex, resolve, CtrlC, Duration, MsgBuffer, StatsdMsg, Time, TimeSource}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
pub type Hash = BuildHasherDefault<FnvHasher>;
|
pub type Hash = BuildHasherDefault<FnvHasher>;
|
||||||
|
|
||||||
const MAX_RECONNECT_INTERVAL: u16 = 3600;
|
|
||||||
const RESOLVE_INTERVAL: Time = 300;
|
|
||||||
pub const STATS_INTERVAL: Time = 60;
|
pub const STATS_INTERVAL: Time = 60;
|
||||||
const OWN_ADDRESS_RESET_INTERVAL: Time = 300;
|
|
||||||
const SPACE_BEFORE: usize = 100;
|
const SPACE_BEFORE: usize = 100;
|
||||||
|
|
||||||
struct PeerData {
|
struct PeerData {
|
||||||
|
@ -78,33 +55,8 @@ pub struct ReconnectEntry {
|
||||||
|
|
||||||
|
|
||||||
pub struct GenericCloud<D: Device, P: Protocol, S: Socket, TS: TimeSource> {
|
pub struct GenericCloud<D: Device, P: Protocol, S: Socket, TS: TimeSource> {
|
||||||
node_id: NodeId,
|
socket_thread: SocketThread<S, D, P, TS>,
|
||||||
config: Config,
|
device_thread: DeviceThread<S, D, P, TS>
|
||||||
learning: bool,
|
|
||||||
broadcast: bool,
|
|
||||||
peers: HashMap<SocketAddr, PeerData, Hash>,
|
|
||||||
reconnect_peers: SmallVec<[ReconnectEntry; 3]>,
|
|
||||||
own_addresses: AddrList,
|
|
||||||
pending_inits: HashMap<SocketAddr, InitState<NodeInfo>, Hash>,
|
|
||||||
table: ClaimTable<TS>,
|
|
||||||
socket: S,
|
|
||||||
device: D,
|
|
||||||
claims: RangeList,
|
|
||||||
crypto: Crypto,
|
|
||||||
next_peers: Time,
|
|
||||||
peer_timeout_publish: u16,
|
|
||||||
update_freq: u16,
|
|
||||||
stats_file: Option<File>,
|
|
||||||
statsd_server: Option<String>,
|
|
||||||
next_housekeep: Time,
|
|
||||||
next_stats_out: Time,
|
|
||||||
next_beacon: Time,
|
|
||||||
next_own_address_reset: Time,
|
|
||||||
port_forwarding: Option<PortForwarding>,
|
|
||||||
traffic: TrafficStats,
|
|
||||||
beacon_serializer: BeaconSerializer<TS>,
|
|
||||||
_dummy_p: PhantomData<P>,
|
|
||||||
_dummy_ts: PhantomData<TS>
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS> {
|
impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS> {
|
||||||
|
@ -112,265 +64,63 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
|
||||||
pub fn new(
|
pub fn new(
|
||||||
config: &Config, socket: S, device: D, port_forwarding: Option<PortForwarding>, stats_file: Option<File>
|
config: &Config, socket: S, device: D, port_forwarding: Option<PortForwarding>, stats_file: Option<File>
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let (learning, broadcast) = match config.mode {
|
let table = SharedTable::<TS>::new(&config);
|
||||||
Mode::Normal => {
|
|
||||||
match config.device_type {
|
|
||||||
Type::Tap => (true, true),
|
|
||||||
Type::Tun => (false, false)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Mode::Router => (false, false),
|
|
||||||
Mode::Switch => (true, true),
|
|
||||||
Mode::Hub => (false, true)
|
|
||||||
};
|
|
||||||
let mut claims = SmallVec::with_capacity(config.claims.len());
|
|
||||||
for s in &config.claims {
|
|
||||||
claims.push(try_fail!(Range::from_str(s), "Invalid subnet format: {} ({})", s));
|
|
||||||
}
|
|
||||||
if device.get_type() == Type::Tun && config.auto_claim {
|
|
||||||
match device.get_ip() {
|
|
||||||
Ok(ip) => {
|
|
||||||
let range = Range { base: Address::from_ipv4(ip), prefix_len: 32 };
|
|
||||||
info!("Auto-claiming {} due to interface address", range);
|
|
||||||
claims.push(range);
|
|
||||||
}
|
|
||||||
Err(Error::DeviceIo(_, e)) if e.kind() == io::ErrorKind::AddrNotAvailable => {
|
|
||||||
info!("No address set on interface.")
|
|
||||||
}
|
|
||||||
Err(e) => error!("{}", e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let now = TS::now();
|
|
||||||
let update_freq = config.get_keepalive() as u16;
|
|
||||||
let node_id = random();
|
|
||||||
let crypto = Crypto::new(node_id, &config.crypto).unwrap();
|
|
||||||
let beacon_key = config.beacon_password.as_ref().map(|s| s.as_bytes()).unwrap_or(&[]);
|
|
||||||
let mut res = GenericCloud {
|
|
||||||
node_id,
|
|
||||||
peers: HashMap::default(),
|
|
||||||
claims,
|
|
||||||
learning,
|
|
||||||
broadcast,
|
|
||||||
pending_inits: HashMap::default(),
|
|
||||||
reconnect_peers: SmallVec::new(),
|
|
||||||
own_addresses: SmallVec::new(),
|
|
||||||
peer_timeout_publish: config.peer_timeout as u16,
|
|
||||||
table: ClaimTable::new(config.switch_timeout as Duration, config.peer_timeout as Duration),
|
|
||||||
socket,
|
|
||||||
device,
|
|
||||||
next_peers: now,
|
|
||||||
update_freq,
|
|
||||||
stats_file,
|
|
||||||
statsd_server: config.statsd_server.clone(),
|
|
||||||
next_housekeep: now,
|
|
||||||
next_stats_out: now + STATS_INTERVAL,
|
|
||||||
next_beacon: now,
|
|
||||||
next_own_address_reset: now + OWN_ADDRESS_RESET_INTERVAL,
|
|
||||||
port_forwarding,
|
|
||||||
traffic: TrafficStats::default(),
|
|
||||||
beacon_serializer: BeaconSerializer::new(beacon_key),
|
|
||||||
crypto,
|
|
||||||
config: config.clone(),
|
|
||||||
_dummy_p: PhantomData,
|
|
||||||
_dummy_ts: PhantomData
|
|
||||||
};
|
|
||||||
res
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
pub fn ifname(&self) -> &str {
|
|
||||||
self.device.ifname()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sends the message to all peers
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// Returns an `Error::SocketError` when the underlying system call fails or only part of the
|
|
||||||
/// message could be sent (can this even happen?).
|
|
||||||
/// Some messages could have been sent.
|
|
||||||
#[inline]
|
|
||||||
fn broadcast_msg(&mut self, type_: u8, msg: &mut MsgBuffer) -> Result<(), Error> {
|
|
||||||
debug!("Broadcasting message type {}, {:?} bytes to {} peers", type_, msg.len(), self.peers.len());
|
|
||||||
let mut msg_data = MsgBuffer::new(100);
|
|
||||||
for (addr, peer) in &mut self.peers {
|
|
||||||
msg_data.set_start(msg.get_start());
|
|
||||||
msg_data.set_length(msg.len());
|
|
||||||
msg_data.message_mut().clone_from_slice(msg.message());
|
|
||||||
msg_data.prepend_byte(type_);
|
|
||||||
peer.crypto.encrypt_message(&mut msg_data);
|
|
||||||
self.traffic.count_out_traffic(*addr, msg_data.len());
|
|
||||||
match self.socket.send(msg_data.message(), *addr) {
|
|
||||||
Ok(written) if written == msg_data.len() => Ok(()),
|
|
||||||
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
|
||||||
Err(e) => Err(Error::SocketIo("IOError when sending", e))
|
|
||||||
}?
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn send_to(&mut self, addr: SocketAddr, msg: &mut MsgBuffer) -> Result<(), Error> {
|
|
||||||
// HOT PATH
|
|
||||||
debug!("Sending msg with {} bytes to {}", msg.len(), addr);
|
|
||||||
self.traffic.count_out_traffic(addr, msg.len());
|
|
||||||
match self.socket.send(msg.message(), addr) {
|
|
||||||
Ok(written) if written == msg.len() => Ok(()),
|
|
||||||
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
|
|
||||||
Err(e) => Err(Error::SocketIo("IOError when sending", e))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn send_msg(&mut self, addr: SocketAddr, type_: u8, msg: &mut MsgBuffer) -> Result<(), Error> {
|
|
||||||
// HOT PATH
|
|
||||||
debug!("Sending msg with {} bytes to {}", msg.len(), addr);
|
|
||||||
let peer = match self.peers.get_mut(&addr) {
|
|
||||||
Some(peer) => peer,
|
|
||||||
None => return Err(Error::Message("Sending to node that is not a peer"))
|
|
||||||
};
|
|
||||||
msg.prepend_byte(type_);
|
|
||||||
peer.crypto.encrypt_message(msg);
|
|
||||||
self.send_to(addr, msg)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn reset_own_addresses(&mut self) -> io::Result<()> {
|
|
||||||
self.own_addresses.clear();
|
|
||||||
self.own_addresses.push(self.socket.address().map(mapped_addr)?);
|
|
||||||
if let Some(ref pfw) = self.port_forwarding {
|
|
||||||
self.own_addresses.push(pfw.get_internal_ip().into());
|
|
||||||
self.own_addresses.push(pfw.get_external_ip().into());
|
|
||||||
}
|
|
||||||
debug!("Own addresses: {:?}", self.own_addresses);
|
|
||||||
// TODO: detect address changes and call event
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the number of peers
|
|
||||||
#[allow(dead_code)]
|
|
||||||
pub fn peer_count(&self) -> usize {
|
|
||||||
self.peers.len()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Adds a peer to the reconnect list
|
|
||||||
///
|
|
||||||
/// This method adds a peer to the list of nodes to reconnect to. A periodic task will try to
|
|
||||||
/// connect to the peer if it is not already connected.
|
|
||||||
pub fn add_reconnect_peer(&mut self, add: String) {
|
|
||||||
let now = TS::now();
|
|
||||||
let resolved = match resolve(&add as &str) {
|
|
||||||
Ok(addrs) => addrs,
|
|
||||||
Err(err) => {
|
|
||||||
warn!("Failed to resolve {}: {:?}", add, err);
|
|
||||||
smallvec![]
|
|
||||||
}
|
|
||||||
};
|
|
||||||
self.reconnect_peers.push(ReconnectEntry {
|
|
||||||
address: Some((add, now)),
|
|
||||||
tries: 0,
|
|
||||||
timeout: 1,
|
|
||||||
resolved,
|
|
||||||
next: now,
|
|
||||||
final_timeout: None
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn connect<Addr: ToSocketAddrs + fmt::Debug + Clone>(&mut self, addr: Addr) -> Result<(), Error> {
|
|
||||||
let addrs = resolve(&addr)?.into_iter().map(mapped_addr).collect::<SmallVec<[SocketAddr; 3]>>();
|
|
||||||
for addr in &addrs {
|
|
||||||
if self.own_addresses.contains(addr)
|
|
||||||
|| self.peers.contains_key(addr)
|
|
||||||
|| self.pending_inits.contains_key(addr)
|
|
||||||
{
|
|
||||||
return Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !addrs.is_empty() {
|
|
||||||
self.config.call_hook(
|
|
||||||
"peer_connecting",
|
|
||||||
vec![("PEER", format!("{:?}", addr_nice(addrs[0]))), ("IFNAME", self.device.ifname().to_owned())],
|
|
||||||
true
|
|
||||||
);
|
|
||||||
}
|
|
||||||
unimplemented!()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn run(&mut self) {
|
|
||||||
let table = SharedTable::<TS>::new(&self.config);
|
|
||||||
let traffic = SharedTraffic::new();
|
let traffic = SharedTraffic::new();
|
||||||
let peer_crypto = SharedPeerCrypto::new();
|
let peer_crypto = SharedPeerCrypto::new();
|
||||||
let device_thread = DeviceThread::<S, D, P, TS>::new(
|
let device_thread = DeviceThread::<S, D, P, TS>::new(
|
||||||
self.config.clone(),
|
config.clone(),
|
||||||
self.device.clone(),
|
device.clone(),
|
||||||
self.socket.clone(),
|
socket.clone(),
|
||||||
traffic.clone(),
|
traffic.clone(),
|
||||||
peer_crypto.clone(),
|
peer_crypto.clone(),
|
||||||
table.clone()
|
table.clone()
|
||||||
);
|
);
|
||||||
|
let socket_thread = SocketThread::<S, D, P, TS>::new(
|
||||||
// TODO: create shared data structures
|
config.clone(),
|
||||||
// TODO: create and spawn threads
|
device,
|
||||||
let ctrlc = CtrlC::new();
|
socket,
|
||||||
// TODO: wait for ctrl-c
|
traffic,
|
||||||
let waiter = try_fail!(
|
peer_crypto,
|
||||||
WaitImpl::new(self.socket.as_raw_fd(), self.device.as_raw_fd(), 1000),
|
table,
|
||||||
"Failed to setup poll: {}"
|
port_forwarding,
|
||||||
|
stats_file
|
||||||
);
|
);
|
||||||
let mut buffer = MsgBuffer::new(SPACE_BEFORE);
|
Self { socket_thread, device_thread }
|
||||||
let mut poll_error = false;
|
}
|
||||||
self.config.call_hook("vpn_started", vec![("IFNAME", self.device.ifname())], true);
|
|
||||||
for evt in waiter {
|
pub fn add_peer(&mut self, addr: String) -> Result<(), Error> {
|
||||||
// HOT PATH
|
unimplemented!()
|
||||||
match evt {
|
}
|
||||||
WaitResult::Error(err) => {
|
|
||||||
// COLD PATH
|
pub fn run(self) {
|
||||||
if poll_error {
|
// TODO: spawn threads
|
||||||
fail!("Poll wait failed again: {}", err);
|
let ctrlc = CtrlC::new();
|
||||||
}
|
let device_thread = self.device_thread;
|
||||||
debug!("Poll wait failed: {}, retrying...", err);
|
let device_thread_handle = thread::spawn(move || device_thread.run());
|
||||||
poll_error = true;
|
let socket_thread = self.socket_thread;
|
||||||
}
|
let socket_thread_handle = thread::spawn(move || socket_thread.run());
|
||||||
WaitResult::Timeout => {}
|
// TODO: wait for ctrl-c
|
||||||
WaitResult::Socket => unimplemented!(),
|
device_thread_handle.join().unwrap();
|
||||||
WaitResult::Device => unimplemented!()
|
socket_thread_handle.join().unwrap();
|
||||||
}
|
|
||||||
if self.next_housekeep < TS::now() {
|
|
||||||
// COLD PATH
|
|
||||||
poll_error = false;
|
|
||||||
if ctrlc.was_pressed() {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
self.next_housekeep = TS::now() + 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
info!("Shutting down...");
|
|
||||||
self.config.call_hook("vpn_shutdown", vec![("IFNAME", self.device.ifname())], true);
|
|
||||||
buffer.clear();
|
|
||||||
self.broadcast_msg(MESSAGE_TYPE_CLOSE, &mut buffer).ok();
|
|
||||||
if let Some(ref path) = self.config.beacon_store {
|
|
||||||
let path = Path::new(path);
|
|
||||||
if path.exists() {
|
|
||||||
info!("Removing beacon file");
|
|
||||||
if let Err(e) = fs::remove_file(path) {
|
|
||||||
error!("Failed to remove beacon file: {}", e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#[cfg(test)] use super::device::MockDevice;
|
#[cfg(test)] use super::device::MockDevice;
|
||||||
#[cfg(test)] use super::net::MockSocket;
|
#[cfg(test)] use super::net::MockSocket;
|
||||||
#[cfg(test)] use super::util::MockTimeSource;
|
#[cfg(test)] use super::util::{MockTimeSource, MsgBuffer};
|
||||||
|
#[cfg(test)] use std::net::SocketAddr;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
|
impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
|
||||||
pub fn socket(&mut self) -> &mut MockSocket {
|
pub fn socket(&mut self) -> &mut MockSocket {
|
||||||
&mut self.socket
|
unimplemented!()
|
||||||
|
//&mut self.socket
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn device(&mut self) -> &mut MockDevice {
|
pub fn device(&mut self) -> &mut MockDevice {
|
||||||
&mut self.device
|
unimplemented!()
|
||||||
|
//&mut self.device
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn trigger_socket_event(&mut self) {
|
pub fn trigger_socket_event(&mut self) {
|
||||||
|
@ -388,14 +138,17 @@ impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_connected(&self, addr: &SocketAddr) -> bool {
|
pub fn is_connected(&self, addr: &SocketAddr) -> bool {
|
||||||
self.peers.contains_key(addr)
|
unimplemented!()
|
||||||
|
// self.peers.contains_key(addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn own_addresses(&self) -> &[SocketAddr] {
|
pub fn own_addresses(&self) -> &[SocketAddr] {
|
||||||
&self.own_addresses
|
unimplemented!()
|
||||||
|
//&self.own_addresses
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_num(&self) -> usize {
|
pub fn get_num(&self) -> usize {
|
||||||
self.socket.address().unwrap().port() as usize
|
unimplemented!()
|
||||||
|
// self.socket.address().unwrap().port() as usize
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,14 +1,12 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
|
config::Config,
|
||||||
crypto::CryptoCore,
|
crypto::CryptoCore,
|
||||||
engine::{Hash, PeerData, TimeSource},
|
engine::{Hash, TimeSource},
|
||||||
error::Error,
|
error::Error,
|
||||||
messages::NodeInfo,
|
|
||||||
table::ClaimTable,
|
table::ClaimTable,
|
||||||
traffic::{TrafficStats, TrafficEntry},
|
traffic::{TrafficEntry, TrafficStats},
|
||||||
types::{Address, NodeId, RangeList},
|
types::{Address, RangeList},
|
||||||
util::MsgBuffer,
|
util::{Duration, MsgBuffer}
|
||||||
util::Duration,
|
|
||||||
config::Config
|
|
||||||
};
|
};
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use std::{
|
use std::{
|
||||||
|
@ -37,7 +35,10 @@ impl SharedPeerCrypto {
|
||||||
match peers.get_mut(&peer) {
|
match peers.get_mut(&peer) {
|
||||||
None => Err(Error::InvalidCryptoState("No crypto found for peer")),
|
None => Err(Error::InvalidCryptoState("No crypto found for peer")),
|
||||||
Some(None) => Ok(()),
|
Some(None) => Ok(()),
|
||||||
Some(Some(crypto)) => Ok(crypto.encrypt(data))
|
Some(Some(crypto)) => {
|
||||||
|
crypto.encrypt(data);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,12 +7,16 @@ use crate::{
|
||||||
beacon::BeaconSerializer,
|
beacon::BeaconSerializer,
|
||||||
config::{DEFAULT_PEER_TIMEOUT, DEFAULT_PORT},
|
config::{DEFAULT_PEER_TIMEOUT, DEFAULT_PORT},
|
||||||
crypto::{is_init_message, InitResult, InitState, MessageResult},
|
crypto::{is_init_message, InitResult, InitState, MessageResult},
|
||||||
|
device::Type,
|
||||||
engine::{addr_nice, resolve, Hash, PeerData},
|
engine::{addr_nice, resolve, Hash, PeerData},
|
||||||
error::Error,
|
error::Error,
|
||||||
messages::{AddrList, NodeInfo, PeerInfo, MESSAGE_TYPE_NODE_INFO},
|
messages::{
|
||||||
|
AddrList, NodeInfo, PeerInfo, MESSAGE_TYPE_CLOSE, MESSAGE_TYPE_DATA, MESSAGE_TYPE_KEEPALIVE,
|
||||||
|
MESSAGE_TYPE_NODE_INFO
|
||||||
|
},
|
||||||
net::{mapped_addr, Socket},
|
net::{mapped_addr, Socket},
|
||||||
port_forwarding::PortForwarding,
|
port_forwarding::PortForwarding,
|
||||||
types::{NodeId, RangeList},
|
types::{Address, NodeId, Range, RangeList},
|
||||||
util::{MsgBuffer, StatsdMsg, Time, TimeSource},
|
util::{MsgBuffer, StatsdMsg, Time, TimeSource},
|
||||||
Config, Crypto, Device, Protocol
|
Config, Crypto, Device, Protocol
|
||||||
};
|
};
|
||||||
|
@ -26,7 +30,8 @@ use std::{
|
||||||
io,
|
io,
|
||||||
io::{Cursor, Seek, SeekFrom, Write},
|
io::{Cursor, Seek, SeekFrom, Write},
|
||||||
marker::PhantomData,
|
marker::PhantomData,
|
||||||
net::{SocketAddr, ToSocketAddrs}
|
net::{SocketAddr, ToSocketAddrs},
|
||||||
|
str::FromStr
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -81,6 +86,63 @@ pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS> {
|
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS> {
|
||||||
|
pub fn new(
|
||||||
|
config: Config, device: D, socket: S, traffic: SharedTraffic, peer_crypto: SharedPeerCrypto,
|
||||||
|
table: SharedTable<TS>, port_forwarding: Option<PortForwarding>, stats_file: Option<File>
|
||||||
|
) -> Self {
|
||||||
|
let mut claims = SmallVec::with_capacity(config.claims.len());
|
||||||
|
for s in &config.claims {
|
||||||
|
claims.push(try_fail!(Range::from_str(s), "Invalid subnet format: {} ({})", s));
|
||||||
|
}
|
||||||
|
if device.get_type() == Type::Tun && config.auto_claim {
|
||||||
|
match device.get_ip() {
|
||||||
|
Ok(ip) => {
|
||||||
|
let range = Range { base: Address::from_ipv4(ip), prefix_len: 32 };
|
||||||
|
info!("Auto-claiming {} due to interface address", range);
|
||||||
|
claims.push(range);
|
||||||
|
}
|
||||||
|
Err(Error::DeviceIo(_, e)) if e.kind() == io::ErrorKind::AddrNotAvailable => {
|
||||||
|
info!("No address set on interface.")
|
||||||
|
}
|
||||||
|
Err(e) => error!("{}", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let now = TS::now();
|
||||||
|
let update_freq = config.get_keepalive() as u16;
|
||||||
|
let node_id = random();
|
||||||
|
let crypto = Crypto::new(node_id, &config.crypto).unwrap();
|
||||||
|
let beacon_key = config.beacon_password.as_ref().map(|s| s.as_bytes()).unwrap_or(&[]);
|
||||||
|
Self {
|
||||||
|
_dummy_p: PhantomData,
|
||||||
|
_dummy_ts: PhantomData,
|
||||||
|
node_id,
|
||||||
|
claims,
|
||||||
|
device,
|
||||||
|
socket,
|
||||||
|
peer_crypto,
|
||||||
|
traffic,
|
||||||
|
table,
|
||||||
|
learning: config.is_learning(),
|
||||||
|
next_housekeep: now,
|
||||||
|
next_beacon: now,
|
||||||
|
next_peers: now,
|
||||||
|
next_stats_out: now + STATS_INTERVAL,
|
||||||
|
next_own_address_reset: now + OWN_ADDRESS_RESET_INTERVAL,
|
||||||
|
pending_inits: HashMap::default(),
|
||||||
|
reconnect_peers: SmallVec::new(),
|
||||||
|
own_addresses: SmallVec::new(),
|
||||||
|
peers: HashMap::default(),
|
||||||
|
peer_timeout_publish: config.peer_timeout as u16,
|
||||||
|
beacon_serializer: BeaconSerializer::new(beacon_key),
|
||||||
|
port_forwarding,
|
||||||
|
stats_file,
|
||||||
|
update_freq,
|
||||||
|
statsd_server: config.statsd_server.clone(),
|
||||||
|
crypto: Crypto::new(node_id, &config.crypto).unwrap(),
|
||||||
|
config
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn send_to(&mut self, addr: SocketAddr, msg: &MsgBuffer) -> Result<(), Error> {
|
fn send_to(&mut self, addr: SocketAddr, msg: &MsgBuffer) -> Result<(), Error> {
|
||||||
debug!("Sending msg with {} bytes to {}", msg.len(), addr);
|
debug!("Sending msg with {} bytes to {}", msg.len(), addr);
|
||||||
|
@ -126,7 +188,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
let mut msg = MsgBuffer::new(SPACE_BEFORE);
|
let mut msg = MsgBuffer::new(SPACE_BEFORE);
|
||||||
init.send_ping(&mut msg);
|
init.send_ping(&mut msg);
|
||||||
self.pending_inits.insert(addr, init);
|
self.pending_inits.insert(addr, init);
|
||||||
self.send_to(addr, &mut msg)
|
self.send_to(addr, &msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn connect<Addr: ToSocketAddrs + fmt::Debug + Clone>(&mut self, addr: Addr) -> Result<(), Error> {
|
pub fn connect<Addr: ToSocketAddrs + fmt::Debug + Clone>(&mut self, addr: Addr) -> Result<(), Error> {
|
||||||
|
@ -322,7 +384,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
self.traffic.count_invalid_protocol(data.len());
|
self.traffic.count_invalid_protocol(data.len());
|
||||||
return Err(err)
|
Err(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -398,14 +460,14 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
if self.pending_inits.get_mut(&addr).unwrap().every_second(&mut msg).is_err() {
|
if self.pending_inits.get_mut(&addr).unwrap().every_second(&mut msg).is_err() {
|
||||||
del.push(addr)
|
del.push(addr)
|
||||||
} else if !msg.is_empty() {
|
} else if !msg.is_empty() {
|
||||||
self.send_to(addr, &mut msg)?
|
self.send_to(addr, &msg)?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for addr in self.peers.keys().copied().collect::<SmallVec<[SocketAddr; 16]>>() {
|
for addr in self.peers.keys().copied().collect::<SmallVec<[SocketAddr; 16]>>() {
|
||||||
msg.clear();
|
msg.clear();
|
||||||
self.peers.get_mut(&addr).unwrap().crypto.every_second(&mut msg);
|
self.peers.get_mut(&addr).unwrap().crypto.every_second(&mut msg);
|
||||||
if !msg.is_empty() {
|
if !msg.is_empty() {
|
||||||
self.send_to(addr, &mut msg)?
|
self.send_to(addr, &msg)?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for addr in del {
|
for addr in del {
|
||||||
|
|
|
@ -185,6 +185,7 @@ fn run<P: Protocol, S: Socket>(config: Config, socket: S) {
|
||||||
Some(file)
|
Some(file)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
let ifname = device.ifname().to_string();
|
||||||
let mut cloud =
|
let mut cloud =
|
||||||
GenericCloud::<TunTapDevice, P, S, SystemTimeSource>::new(&config, socket, device, port_forwarding, stats_file);
|
GenericCloud::<TunTapDevice, P, S, SystemTimeSource>::new(&config, socket, device, port_forwarding, stats_file);
|
||||||
for mut addr in config.peers {
|
for mut addr in config.peers {
|
||||||
|
@ -192,8 +193,7 @@ fn run<P: Protocol, S: Socket>(config: Config, socket: S) {
|
||||||
// : not present or only in IPv6 address
|
// : not present or only in IPv6 address
|
||||||
addr = format!("{}:{}", addr, DEFAULT_PORT)
|
addr = format!("{}:{}", addr, DEFAULT_PORT)
|
||||||
}
|
}
|
||||||
try_fail!(cloud.connect(&addr as &str), "Failed to send message to {}: {}", &addr);
|
try_fail!(cloud.add_peer(addr.clone()), "Failed to send message to {}: {}", &addr);
|
||||||
cloud.add_reconnect_peer(addr);
|
|
||||||
}
|
}
|
||||||
if config.daemonize {
|
if config.daemonize {
|
||||||
info!("Running process as daemon");
|
info!("Running process as daemon");
|
||||||
|
@ -223,7 +223,7 @@ fn run<P: Protocol, S: Socket>(config: Config, socket: S) {
|
||||||
}
|
}
|
||||||
cloud.run();
|
cloud.run();
|
||||||
if let Some(script) = config.ifdown {
|
if let Some(script) = config.ifdown {
|
||||||
run_script(&script, cloud.ifname());
|
run_script(&script, &ifname);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,7 @@ pub fn get_ip() -> IpAddr {
|
||||||
s.local_addr().unwrap().ip()
|
s.local_addr().unwrap().ip()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait Socket: AsRawFd + Sized + Clone {
|
pub trait Socket: AsRawFd + Sized + Clone + Send + 'static {
|
||||||
fn listen(addr: &str) -> Result<Self, io::Error>;
|
fn listen(addr: &str) -> Result<Self, io::Error>;
|
||||||
fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error>;
|
fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error>;
|
||||||
fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error>;
|
fn send(&mut self, data: &[u8], addr: SocketAddr) -> Result<usize, io::Error>;
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
use crate::{error::Error, types::Address};
|
use crate::{error::Error, types::Address};
|
||||||
use std::io::{Cursor, Read};
|
use std::io::{Cursor, Read};
|
||||||
|
|
||||||
pub trait Protocol: Sized {
|
pub trait Protocol: Sized + Send + 'static {
|
||||||
fn parse(_: &[u8]) -> Result<(Address, Address), Error>;
|
fn parse(_: &[u8]) -> Result<(Address, Address), Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -159,8 +159,10 @@ impl Socket for ProxyConnection {
|
||||||
write_addr(addr, &mut msg)?;
|
write_addr(addr, &mut msg)?;
|
||||||
msg.write_all(data)?;
|
msg.write_all(data)?;
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
//io_error!(self.socket.write_message(Message::Binary(msg)), "Failed to write to ws proxy: {}")?;
|
/*
|
||||||
|
io_error!(self.socket.write_message(Message::Binary(msg)), "Failed to write to ws proxy: {}")?;
|
||||||
Ok(data.len())
|
Ok(data.len())
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
fn address(&self) -> Result<SocketAddr, io::Error> {
|
fn address(&self) -> Result<SocketAddr, io::Error> {
|
||||||
|
|
Loading…
Reference in New Issue