vpncloud/src/cloud.rs

704 lines
27 KiB
Rust
Raw Normal View History

// VpnCloud - Peer-to-Peer VPN
2017-07-22 14:49:53 +00:00
// Copyright (C) 2015-2017 Dennis Schwerdel
// This software is licensed under GPL-3 or newer (see LICENSE.md)
2016-05-02 07:05:34 +00:00
use std::net::{SocketAddr, ToSocketAddrs};
use std::collections::{HashMap, HashSet};
2015-11-19 15:34:20 +00:00
use std::net::UdpSocket;
2019-01-09 16:45:12 +00:00
use std::io::{self, Write};
2015-11-23 00:04:30 +00:00
use std::fmt;
2015-11-20 08:11:54 +00:00
use std::os::unix::io::AsRawFd;
2015-11-21 17:09:13 +00:00
use std::marker::PhantomData;
2016-03-29 08:45:54 +00:00
use std::hash::BuildHasherDefault;
use std::time::Instant;
2017-01-11 13:31:28 +00:00
use std::cmp::min;
2019-01-09 16:45:12 +00:00
use std::fs::File;
2015-11-19 15:34:20 +00:00
2016-03-29 08:45:54 +00:00
use fnv::FnvHasher;
2019-01-01 23:35:14 +00:00
use signal::{trap::Trap, Signal};
use rand::{prelude::*, random, thread_rng};
2016-05-02 06:35:11 +00:00
use net2::UdpBuilder;
2015-11-19 15:34:20 +00:00
2019-01-10 18:36:50 +00:00
use super::config::Config;
use super::types::{Table, Protocol, Range, Error, HeaderMagic, NodeId};
2019-01-10 18:36:50 +00:00
use super::device::{Device, Type};
use super::udpmessage::{encode, decode, Message};
2015-11-24 19:55:14 +00:00
use super::crypto::Crypto;
use super::port_forwarding::PortForwarding;
2016-06-27 13:43:30 +00:00
use super::util::{now, Time, Duration, resolve};
2019-01-01 23:35:14 +00:00
use super::poll::{Poll, Flags};
2019-01-09 16:45:12 +00:00
use super::traffic::TrafficStats;
2015-11-19 15:34:20 +00:00
2019-01-09 16:45:12 +00:00
pub type Hash = BuildHasherDefault<FnvHasher>;
2016-03-29 08:45:54 +00:00
const MAX_RECONNECT_INTERVAL: u16 = 3600;
const RESOLVE_INTERVAL: Time = 300;
2019-01-09 16:45:12 +00:00
pub const STATS_INTERVAL: Time = 60;
2019-01-09 16:45:12 +00:00
struct PeerData {
timeout: Time,
node_id: NodeId,
alt_addrs: Vec<SocketAddr>,
}
2015-11-19 15:34:20 +00:00
struct PeerList {
timeout: Duration,
2019-01-09 16:45:12 +00:00
peers: HashMap<SocketAddr, PeerData, Hash>,
2016-05-02 07:05:34 +00:00
nodes: HashMap<NodeId, SocketAddr, Hash>,
addresses: HashSet<SocketAddr, Hash>
2015-11-19 15:34:20 +00:00
}
impl PeerList {
fn new(timeout: Duration) -> PeerList {
2016-05-02 07:05:34 +00:00
PeerList{
peers: HashMap::default(),
2019-01-01 23:35:14 +00:00
timeout,
2016-05-02 07:05:34 +00:00
nodes: HashMap::default(),
addresses: HashSet::default()
}
2015-11-19 15:34:20 +00:00
}
2015-11-20 11:09:07 +00:00
fn timeout(&mut self) -> Vec<SocketAddr> {
2015-11-25 20:55:30 +00:00
let now = now();
2015-11-19 15:34:20 +00:00
let mut del: Vec<SocketAddr> = Vec::new();
2019-01-09 16:45:12 +00:00
for (&addr, ref data) in &self.peers {
if data.timeout < now {
2015-11-19 15:34:20 +00:00
del.push(addr);
}
}
2015-11-20 11:09:07 +00:00
for addr in &del {
2016-08-12 06:30:13 +00:00
info!("Forgot peer: {}", addr);
2019-01-09 16:45:12 +00:00
if let Some(data) = self.peers.remove(addr) {
self.nodes.remove(&data.node_id);
2016-05-02 07:05:34 +00:00
self.addresses.remove(addr);
2019-01-09 16:45:12 +00:00
for addr in &data.alt_addrs {
2016-05-02 07:05:34 +00:00
self.addresses.remove(addr);
}
}
2015-11-19 15:34:20 +00:00
}
2015-11-20 11:09:07 +00:00
del
2015-11-19 15:34:20 +00:00
}
2016-06-11 14:08:57 +00:00
#[inline]
fn contains_addr(&self, addr: &SocketAddr) -> bool {
2016-05-02 07:05:34 +00:00
self.addresses.contains(addr)
}
#[inline]
2016-08-29 13:20:32 +00:00
fn is_connected<Addr: ToSocketAddrs+fmt::Debug>(&self, addr: Addr) -> Result<bool, Error> {
2017-05-04 05:26:21 +00:00
for addr in try!(resolve(&addr)) {
2016-06-11 14:08:57 +00:00
if self.contains_addr(&addr) {
return Ok(true);
}
}
Ok(false)
}
2016-06-11 14:08:57 +00:00
#[inline]
fn contains_node(&self, node_id: &NodeId) -> bool {
2016-05-02 07:05:34 +00:00
self.nodes.contains_key(node_id)
2015-11-19 15:34:20 +00:00
}
2016-05-02 07:05:34 +00:00
2015-11-20 12:34:54 +00:00
#[inline]
2016-05-02 07:05:34 +00:00
fn add(&mut self, node_id: NodeId, addr: SocketAddr) {
if self.nodes.insert(node_id, addr).is_none() {
2016-02-02 21:03:56 +00:00
info!("New peer: {}", addr);
2019-01-09 16:45:12 +00:00
self.peers.insert(addr, PeerData {
timeout: now() + Time::from(self.timeout),
node_id,
alt_addrs: vec![]
});
2016-05-25 11:30:18 +00:00
self.addresses.insert(addr);
2016-05-02 07:05:34 +00:00
}
}
2016-08-12 06:31:21 +00:00
#[inline]
fn refresh(&mut self, addr: &SocketAddr) {
2019-01-09 16:45:12 +00:00
if let Some(ref mut data) = self.peers.get_mut(addr) {
data.timeout = now()+Time::from(self.timeout);
2016-08-12 06:31:21 +00:00
}
}
2016-05-02 07:05:34 +00:00
#[inline]
fn add_alt_addr(&mut self, node_id: NodeId, addr: SocketAddr) {
if let Some(main_addr) = self.nodes.get(&node_id) {
2019-01-09 16:45:12 +00:00
if let Some(ref mut data) = self.peers.get_mut(main_addr) {
data.alt_addrs.push(addr);
2016-05-25 11:30:18 +00:00
self.addresses.insert(addr);
2016-05-02 07:05:34 +00:00
} else {
error!("Main address for node is not connected");
}
} else {
error!("Node not connected");
2015-11-19 15:34:20 +00:00
}
}
2015-11-20 12:34:54 +00:00
#[inline]
2015-11-19 15:34:20 +00:00
fn as_vec(&self) -> Vec<SocketAddr> {
2016-06-11 14:08:57 +00:00
self.addresses.iter().cloned().collect()
2015-11-19 15:34:20 +00:00
}
2016-06-11 14:08:57 +00:00
#[inline]
2015-11-20 12:34:54 +00:00
fn len(&self) -> usize {
self.peers.len()
}
2016-06-11 14:08:57 +00:00
#[inline]
#[allow(dead_code)]
fn is_empty(&self) -> bool {
self.peers.is_empty()
}
2015-11-20 12:34:54 +00:00
#[inline]
2015-12-04 10:25:14 +00:00
fn subset(&self, size: usize) -> Vec<SocketAddr> {
2019-01-01 23:35:14 +00:00
self.addresses.iter().choose_multiple(&mut thread_rng(), size).into_iter().cloned().collect()
2015-11-20 12:34:54 +00:00
}
#[inline]
2015-11-19 15:34:20 +00:00
fn remove(&mut self, addr: &SocketAddr) {
2019-01-09 16:45:12 +00:00
if let Some(data) = self.peers.remove(addr) {
2016-02-02 21:03:56 +00:00
info!("Removed peer: {}", addr);
2019-01-09 16:45:12 +00:00
self.nodes.remove(&data.node_id);
2016-05-02 07:05:34 +00:00
self.addresses.remove(addr);
2019-01-09 16:45:12 +00:00
for addr in data.alt_addrs {
2016-05-02 07:05:34 +00:00
self.addresses.remove(&addr);
}
2015-11-19 15:34:20 +00:00
}
}
2019-01-09 16:45:12 +00:00
#[inline]
fn write_out<W: Write>(&self, out: &mut W) -> Result<(), io::Error> {
try!(writeln!(out, "Peers:"));
for (addr, data) in &self.peers {
try!(writeln!(out, " - {} (ttl: {} s)", addr, data.timeout-now()));
}
Ok(())
}
2015-11-19 15:34:20 +00:00
}
#[derive(Clone)]
pub struct ReconnectEntry {
address: String,
2016-08-29 13:20:32 +00:00
resolved: Vec<SocketAddr>,
next_resolve: Time,
tries: u16,
timeout: u16,
next: Time
}
2015-11-22 16:28:04 +00:00
2016-11-23 10:27:29 +00:00
pub struct GenericCloud<P: Protocol, T: Table> {
magic: HeaderMagic,
node_id: NodeId,
2015-11-20 08:11:54 +00:00
peers: PeerList,
2015-11-22 23:49:58 +00:00
addresses: Vec<Range>,
2015-11-22 21:00:34 +00:00
learning: bool,
2015-11-22 21:45:04 +00:00
broadcast: bool,
reconnect_peers: Vec<ReconnectEntry>,
blacklist_peers: Vec<SocketAddr>,
table: T,
2016-05-02 06:35:11 +00:00
socket4: UdpSocket,
socket6: UdpSocket,
2015-11-23 00:40:47 +00:00
device: Device,
2015-11-23 14:40:04 +00:00
crypto: Crypto,
2015-11-25 20:55:30 +00:00
next_peerlist: Time,
2015-11-19 19:51:53 +00:00
update_freq: Duration,
2015-11-20 09:59:01 +00:00
buffer_out: [u8; 64*1024],
2015-11-25 20:55:30 +00:00
next_housekeep: Time,
2019-01-09 16:45:12 +00:00
next_stats_out: Time,
port_forwarding: Option<PortForwarding>,
2019-01-09 16:45:12 +00:00
traffic: TrafficStats,
stats_file: Option<String>,
2015-11-23 00:40:47 +00:00
_dummy_p: PhantomData<P>,
2015-11-19 16:11:59 +00:00
}
impl<P: Protocol, T: Table> GenericCloud<P, T> {
2019-01-10 18:36:50 +00:00
pub fn new(config: &Config, device: Device, table: T,
learning: bool, broadcast: bool, addresses: Vec<Range>,
crypto: Crypto, port_forwarding: Option<PortForwarding>
2019-01-09 16:45:12 +00:00
) -> Self {
2016-05-02 06:35:11 +00:00
let socket4 = match UdpBuilder::new_v4().expect("Failed to obtain ipv4 socket builder")
2019-01-10 18:36:50 +00:00
.reuse_address(true).expect("Failed to set so_reuseaddr").bind(("0.0.0.0", config.port)) {
2015-11-19 15:34:20 +00:00
Ok(socket) => socket,
2019-01-10 18:36:50 +00:00
Err(err) => fail!("Failed to open ipv4 address 0.0.0.0:{}: {}", config.port, err)
2016-05-02 06:35:11 +00:00
};
let socket6 = match UdpBuilder::new_v6().expect("Failed to obtain ipv6 socket builder")
.only_v6(true).expect("Failed to set only_v6")
2019-01-10 18:36:50 +00:00
.reuse_address(true).expect("Failed to set so_reuseaddr").bind(("::", config.port)) {
2016-05-02 06:35:11 +00:00
Ok(socket) => socket,
2019-01-10 18:36:50 +00:00
Err(err) => fail!("Failed to open ipv6 address ::{}: {}", config.port, err)
2015-11-19 15:34:20 +00:00
};
2015-11-22 16:28:04 +00:00
GenericCloud{
2019-01-10 18:36:50 +00:00
magic: config.get_magic(),
node_id: random(),
2019-01-10 18:36:50 +00:00
peers: PeerList::new(config.peer_timeout),
2019-01-01 23:35:14 +00:00
addresses,
learning,
broadcast,
2015-11-20 11:09:07 +00:00
reconnect_peers: Vec::new(),
blacklist_peers: Vec::new(),
2019-01-01 23:35:14 +00:00
table,
socket4,
socket6,
device,
crypto,
2015-11-25 20:55:30 +00:00
next_peerlist: now(),
2019-01-10 18:36:50 +00:00
update_freq: config.get_keepalive(),
2015-11-20 09:59:01 +00:00
buffer_out: [0; 64*1024],
2015-11-25 20:55:30 +00:00
next_housekeep: now(),
2019-01-09 16:45:12 +00:00
next_stats_out: now() + STATS_INTERVAL,
2019-01-01 23:35:14 +00:00
port_forwarding,
2019-01-09 16:45:12 +00:00
traffic: TrafficStats::new(),
2019-01-10 18:36:50 +00:00
stats_file: config.stats_file.clone(),
2015-11-23 00:40:47 +00:00
_dummy_p: PhantomData,
2015-11-20 08:11:54 +00:00
}
2015-11-19 15:34:20 +00:00
}
2015-12-22 21:45:52 +00:00
#[inline]
2015-11-23 18:06:25 +00:00
pub fn ifname(&self) -> &str {
self.device.ifname()
}
2016-06-27 13:43:30 +00:00
/// Sends the message to all peers
///
/// # Errors
/// Returns an `Error::SocketError` when the underlying system call fails or only part of the
/// message could be sent (can this even happen?).
/// Some messages could have been sent.
2015-12-22 21:45:52 +00:00
#[inline]
fn broadcast_msg(&mut self, msg: &mut Message) -> Result<(), Error> {
debug!("Broadcasting {:?}", msg);
2016-06-27 13:43:30 +00:00
// Encrypt and encode once and send several times
let msg_data = encode(msg, &mut self.buffer_out, self.magic, &mut self.crypto);
for addr in self.peers.peers.keys() {
2019-01-09 16:45:12 +00:00
self.traffic.count_out_traffic(*addr, msg_data.len());
let socket = match *addr {
2016-06-11 14:08:57 +00:00
SocketAddr::V4(_) => &self.socket4,
SocketAddr::V6(_) => &self.socket6
2016-05-02 06:35:11 +00:00
};
try!(match socket.send_to(msg_data, addr) {
2015-12-22 21:45:52 +00:00
Ok(written) if written == msg_data.len() => Ok(()),
2016-07-06 20:35:42 +00:00
Ok(_) => Err(Error::Socket("Sent out truncated packet", io::Error::new(io::ErrorKind::Other, "truncated"))),
2016-08-08 20:24:04 +00:00
Err(e) => Err(Error::Socket("IOError when sending", e))
2015-12-22 21:45:52 +00:00
})
}
Ok(())
}
2016-06-27 13:43:30 +00:00
/// Sends a message to one peer
///
/// # Errors
/// Returns an `Error::SocketError` when the underlying system call fails or only part of the
/// message could be sent (can this even happen?).
2015-12-22 21:45:52 +00:00
#[inline]
fn send_msg(&mut self, addr: SocketAddr, msg: &mut Message) -> Result<(), Error> {
2015-11-19 15:34:20 +00:00
debug!("Sending {:?} to {}", msg, addr);
2016-06-27 13:43:30 +00:00
// Encrypt and encode
let msg_data = encode(msg, &mut self.buffer_out, self.magic, &mut self.crypto);
2019-01-09 16:45:12 +00:00
self.traffic.count_out_traffic(addr, msg_data.len());
2016-06-11 14:08:57 +00:00
let socket = match addr {
SocketAddr::V4(_) => &self.socket4,
SocketAddr::V6(_) => &self.socket6
2016-05-02 06:35:11 +00:00
};
match socket.send_to(msg_data, addr) {
2015-12-13 21:03:06 +00:00
Ok(written) if written == msg_data.len() => Ok(()),
2016-07-06 20:35:42 +00:00
Ok(_) => Err(Error::Socket("Sent out truncated packet", io::Error::new(io::ErrorKind::Other, "truncated"))),
2016-08-08 20:24:04 +00:00
Err(e) => Err(Error::Socket("IOError when sending", e))
2015-11-19 15:34:20 +00:00
}
}
2016-06-27 13:43:30 +00:00
/// Returns the self-perceived addresses (IPv4 and IPv6) of this node
///
/// Note that those addresses could be private addresses that are not reachable by other nodes,
/// or only some other nodes inside the same network.
///
/// # Errors
/// Returns an IOError if the underlying system call fails
2016-02-08 19:37:06 +00:00
#[allow(dead_code)]
2016-07-06 16:48:58 +00:00
pub fn address(&self) -> io::Result<(SocketAddr, SocketAddr)> {
2016-05-02 06:35:11 +00:00
Ok((try!(self.socket4.local_addr()), try!(self.socket6.local_addr())))
2016-02-08 19:37:06 +00:00
}
2016-06-27 13:43:30 +00:00
/// Returns the number of peers
2016-02-08 19:37:06 +00:00
#[allow(dead_code)]
pub fn peer_count(&self) -> usize {
self.peers.len()
}
2016-06-27 13:43:30 +00:00
/// Adds a peer to the reconnect list
///
/// This method adds a peer to the list of nodes to reconnect to. A periodic task will try to
/// connect to the peer if it is not already connected.
pub fn add_reconnect_peer(&mut self, add: String) {
self.reconnect_peers.push(ReconnectEntry {
address: add,
tries: 0,
timeout: 1,
2016-08-29 13:20:32 +00:00
resolved: vec![],
next_resolve: now(),
next: now()
})
}
2016-06-27 13:43:30 +00:00
/// Returns whether the address is blacklisted
///
/// # Errors
/// Returns an `Error::SocketError` if the given address is a name that failed to resolve to
/// actual addresses.
2016-08-29 13:20:32 +00:00
fn is_blacklisted<Addr: ToSocketAddrs+fmt::Debug>(&self, addr: Addr) -> Result<bool, Error> {
2017-05-04 05:26:21 +00:00
for addr in try!(resolve(&addr)) {
2016-06-11 14:08:57 +00:00
if self.blacklist_peers.contains(&addr) {
return Ok(true);
2015-11-20 11:09:07 +00:00
}
}
Ok(false)
}
2016-06-27 13:43:30 +00:00
/// Connects to a node given by its address
///
/// This method connects to node by sending a `Message::Init` to it. If `addr` is a name that
/// resolves to multiple addresses, one message is sent to each of them.
/// If the node is already a connected peer or the address is blacklisted, no message is sent.
///
/// # Errors
/// This method returns `Error::NameError` if the address is a name that fails to resolve.
2016-08-29 13:20:32 +00:00
pub fn connect<Addr: ToSocketAddrs+fmt::Debug+Clone>(&mut self, addr: Addr) -> Result<(), Error> {
if try!(self.peers.is_connected(addr.clone())) || try!(self.is_blacklisted(addr.clone())) {
return Ok(())
}
2016-08-29 13:20:32 +00:00
debug!("Connecting to {:?}", addr);
2015-12-22 21:44:25 +00:00
let subnets = self.addresses.clone();
2016-06-11 14:08:57 +00:00
let node_id = self.node_id;
2016-06-27 13:43:30 +00:00
// Send a message to each resolved address
2017-05-04 05:26:21 +00:00
for a in try!(resolve(&addr)) {
2016-06-27 13:43:30 +00:00
// Ignore error this time
let mut msg = Message::Init(0, node_id, subnets.clone());
self.send_msg(a, &mut msg).ok();
2015-12-22 21:44:25 +00:00
}
Ok(())
2015-11-19 15:34:20 +00:00
}
2016-06-27 13:43:30 +00:00
/// Run all periodic housekeeping tasks
///
/// This method executes several tasks:
/// - Remove peers that have timed out
/// - Remove switch table entries that have timed out
/// - Periodically send the peers list to all peers
/// - Periodically reconnect to peers in the reconnect list
///
/// # Errors
/// This method returns errors if sending a message fails or resolving an address fails.
2015-11-20 08:11:54 +00:00
fn housekeep(&mut self) -> Result<(), Error> {
for peer in self.peers.timeout() {
self.table.remove_all(&peer);
}
2015-11-21 17:09:13 +00:00
self.table.housekeep();
// Periodically extend the port-forwarding
if let Some(ref mut pfw) = self.port_forwarding {
pfw.check_extend();
}
2016-06-27 13:43:30 +00:00
// Periodically send peer list to peers
let now = now();
if self.next_peerlist <= now {
2015-11-19 15:34:20 +00:00
debug!("Send peer list to all peers");
2015-11-20 12:34:54 +00:00
let mut peer_num = self.peers.len();
2016-06-27 13:43:30 +00:00
// If the number of peers is high, send only a fraction of the full peer list to
2017-01-11 13:31:28 +00:00
// reduce the management traffic. The number of peers to send is limited by 20.
peer_num = min(peer_num, 20);
2016-06-27 13:43:30 +00:00
// Select that many peers...
2015-12-04 10:25:14 +00:00
let peers = self.peers.subset(peer_num);
2016-06-27 13:43:30 +00:00
// ...and send them to all peers
2015-12-13 21:03:06 +00:00
let mut msg = Message::Peers(peers);
2015-12-22 21:45:52 +00:00
try!(self.broadcast_msg(&mut msg));
2016-06-27 13:43:30 +00:00
// Reschedule for next update
2019-01-01 23:35:14 +00:00
self.next_peerlist = now + Time::from(self.update_freq);
}
2016-06-27 13:43:30 +00:00
// Connect to those reconnect_peers that are due
for entry in self.reconnect_peers.clone() {
if entry.next > now {
continue
}
2016-08-29 13:20:32 +00:00
try!(self.connect(&entry.resolved as &[SocketAddr]));
2015-11-19 15:34:20 +00:00
}
for entry in &mut self.reconnect_peers {
2016-06-27 13:43:30 +00:00
// Schedule for next second if node is connected
2016-08-29 13:20:32 +00:00
if try!(self.peers.is_connected(&entry.resolved as &[SocketAddr])) {
entry.tries = 0;
entry.timeout = 1;
entry.next = now + 1;
continue
}
2016-08-29 13:20:32 +00:00
// Resolve entries anew
if entry.next_resolve <= now {
if let Ok(addrs) = resolve(&entry.address as &str) {
entry.resolved = addrs;
}
entry.next_resolve = now + RESOLVE_INTERVAL;
2016-08-29 13:20:32 +00:00
}
2016-06-27 13:43:30 +00:00
// Ignore if next attempt is already in the future
if entry.next > now {
continue
}
2016-06-27 13:43:30 +00:00
// Exponential backoff: every 10 tries, the interval doubles
entry.tries += 1;
if entry.tries > 10 {
entry.tries = 0;
entry.timeout *= 2;
}
2016-06-27 13:43:30 +00:00
// Maximum interval is one hour
if entry.timeout > MAX_RECONNECT_INTERVAL {
entry.timeout = MAX_RECONNECT_INTERVAL;
}
2016-06-27 13:43:30 +00:00
// Schedule next connection attempt
2019-01-01 23:35:14 +00:00
entry.next = now + Time::from(entry.timeout);
2015-11-20 11:09:07 +00:00
}
2019-01-09 16:45:12 +00:00
if self.next_stats_out < now {
// Write out the statistics
try!(self.write_out_stats().map_err(|err| Error::File("Failed to write stats file", err)));
self.next_stats_out = now + STATS_INTERVAL;
self.traffic.period(Some(60));
}
Ok(())
}
/// Calculates, resets and writes out the statistics to a file
fn write_out_stats(&mut self) -> Result<(), io::Error> {
if self.stats_file.is_none() { return Ok(()) }
debug!("Writing out stats");
let mut f = try!(File::create(self.stats_file.as_ref().unwrap()));
try!(self.peers.write_out(&mut f));
try!(writeln!(&mut f));
try!(self.table.write_out(&mut f));
try!(writeln!(&mut f));
try!(self.traffic.write_out(&mut f));
try!(writeln!(&mut f));
2015-11-19 15:34:20 +00:00
Ok(())
}
2016-06-27 13:43:30 +00:00
/// Handles payload data coming in from the local network device
///
/// This method takes payload data received from the local device and parses it to obtain the
/// destination address. Then it checks the lookup table to get the peer for that destination
/// address. If a peer is found, the message is sent to it, otherwise the message is either
/// broadcast to all peers or dropped (depending on mode).
///
/// The parameter `payload` contains the payload data starting at position `start` and ending
/// at `end`. It is important that the buffer has enough space before the payload data to
/// prepend a header of max 64 bytes and enough space after the payload data to append a mac of
/// max 64 bytes.
///
/// # Errors
/// This method fails
/// - with `Error::ParseError` if the payload data failed to parse
/// - with `Error::SocketError` if sending a message fails
2016-02-08 19:37:06 +00:00
pub fn handle_interface_data(&mut self, payload: &mut [u8], start: usize, end: usize) -> Result<(), Error> {
2015-12-22 21:47:41 +00:00
let (src, dst) = try!(P::parse(&payload[start..end]));
debug!("Read data from interface: src: {}, dst: {}, {} bytes", src, dst, end-start);
2019-01-09 16:45:12 +00:00
self.traffic.count_out_payload(dst, src, end-start);
2015-11-22 17:05:15 +00:00
match self.table.lookup(&dst) {
2016-06-27 13:43:30 +00:00
Some(addr) => { // Peer found for destination
2015-11-26 21:16:51 +00:00
debug!("Found destination for {} => {}", dst, addr);
try!(self.send_msg(addr, &mut Message::Data(payload, start, end)));
if !self.peers.contains_addr(&addr) {
2019-01-09 16:45:12 +00:00
// If the peer is not actually connected, remove the entry in the table and try
2016-06-27 13:43:30 +00:00
// to reconnect.
2015-11-26 21:16:51 +00:00
warn!("Destination for {} not found in peers: {}", dst, addr);
self.table.remove(&dst);
try!(self.connect(&addr));
2015-11-22 21:00:34 +00:00
}
2015-11-19 15:34:20 +00:00
},
None => {
2016-06-27 13:43:30 +00:00
if self.broadcast {
debug!("No destination for {} found, broadcasting", dst);
let mut msg = Message::Data(payload, start, end);
try!(self.broadcast_msg(&mut msg));
} else {
2015-11-26 21:16:51 +00:00
debug!("No destination for {} found, dropping", dst);
2015-11-22 21:45:04 +00:00
}
2015-11-19 15:34:20 +00:00
}
}
Ok(())
}
2016-06-27 13:43:30 +00:00
/// Handles a message received from the network
///
/// This method handles messages from the network, i.e. from peers. `peer` contains the sender
/// of the message and `msg` contains the message.
2016-06-27 13:43:30 +00:00
///
/// Then this method will check the message type and will handle each message type differently.
///
/// # `Message::Data` messages
/// This message type contains payload data and therefore this path is optimized for speed.
///
/// The payload of data messages is written to the local network device and if the node is in
/// a learning mode it will associate the sender peer with the source address.
///
/// # `Message::Peers` messages
/// If this message is received, the local node will use all the node addresses in the message
/// as well as the senders address to connect to.
///
/// # `Message::Init` messages
/// This message is used in the peer connection handshake.
///
/// To make sure, the node does not connect to itself, it will compare the remote `node_id` to
/// the local one. If the id is the same, it will ignore the message and blacklist the address
/// so that it won't be used in the future.
///
/// If the message is coming from a different node, the nodes address is added to the peer list
/// and its claimed addresses are associated with it.
///
/// If the `stage` of the message is 1, a `Message::Init` message with `stage=1` is sent in
/// reply, together with a peer list.
///
/// # `Message::Close` message
/// If this message is received, the sender is removed from the peer list and its claimed
/// addresses are removed from the table.
pub fn handle_net_message(&mut self, peer: SocketAddr, msg: Message) -> Result<(), Error> {
debug!("Received {:?} from {}", msg, peer);
2015-11-19 15:34:20 +00:00
match msg {
2015-12-13 21:03:06 +00:00
Message::Data(payload, start, end) => {
2019-01-09 16:45:12 +00:00
let (src, dst) = try!(P::parse(&payload[start..end]));
2015-12-13 21:03:06 +00:00
debug!("Writing data to device: {} bytes", end-start);
2019-01-09 16:45:12 +00:00
self.traffic.count_in_payload(src, dst, end-start);
2016-07-06 20:35:42 +00:00
if let Err(e) = self.device.write(&mut payload[..end], start) {
error!("Failed to send via device: {}", e);
return Err(e);
2015-11-19 15:34:20 +00:00
}
2015-11-22 21:00:34 +00:00
if self.learning {
2016-06-27 13:43:30 +00:00
// Learn single address
2015-11-22 23:49:58 +00:00
self.table.learn(src, None, peer);
2015-11-22 21:00:34 +00:00
}
2016-06-27 13:43:30 +00:00
// Not adding peer in this case to increase performance
2015-11-19 15:34:20 +00:00
},
2015-11-20 17:09:51 +00:00
Message::Peers(peers) => {
2016-06-27 13:43:30 +00:00
// Connect to sender if not connected
if !self.peers.contains_addr(&peer) {
try!(self.connect(&peer));
}
2019-02-12 18:29:36 +00:00
//TODO: make this address primary
2016-06-27 13:43:30 +00:00
// Connect to all peers in the message
2015-11-19 15:34:20 +00:00
for p in &peers {
2016-05-02 07:05:34 +00:00
if ! self.peers.contains_addr(p) && ! self.blacklist_peers.contains(p) {
try!(self.connect(p));
2015-11-19 15:34:20 +00:00
}
}
2016-08-12 06:31:21 +00:00
// Refresh peer
self.peers.refresh(&peer);
2015-11-19 15:34:20 +00:00
},
Message::Init(stage, node_id, ranges) => {
2016-06-27 13:43:30 +00:00
// Avoid connecting to self
if node_id == self.node_id {
self.blacklist_peers.push(peer);
return Ok(())
}
2016-06-27 13:43:30 +00:00
// Add sender as peer or as alternative address to existing peer
2016-05-02 07:05:34 +00:00
if self.peers.contains_node(&node_id) {
2019-02-12 18:29:36 +00:00
//TODO: make this address primary
2016-05-02 07:05:34 +00:00
self.peers.add_alt_addr(node_id, peer);
} else {
self.peers.add(node_id, peer);
for range in ranges {
2016-06-11 14:08:57 +00:00
self.table.learn(range.base, Some(range.prefix_len), peer);
2016-05-02 07:05:34 +00:00
}
2015-11-22 21:00:34 +00:00
}
2016-06-27 13:43:30 +00:00
// Reply with stage=1 if stage is 0
2015-11-26 09:52:58 +00:00
if stage == 0 {
let peers = self.peers.as_vec();
let own_addrs = self.addresses.clone();
2016-06-11 14:08:57 +00:00
let own_node_id = self.node_id;
2015-12-13 21:03:06 +00:00
try!(self.send_msg(peer, &mut Message::Init(stage+1, own_node_id, own_addrs)));
try!(self.send_msg(peer, &mut Message::Peers(peers)));
2015-11-26 09:52:58 +00:00
}
2015-11-19 15:34:20 +00:00
},
2015-11-20 17:09:51 +00:00
Message::Close => {
2015-11-20 09:59:01 +00:00
self.peers.remove(&peer);
self.table.remove_all(&peer);
2015-11-20 09:59:01 +00:00
}
2015-11-19 15:34:20 +00:00
}
Ok(())
}
2016-06-27 13:43:30 +00:00
/// The main method of the node
///
/// This method will use epoll to wait in the sockets and the device at the same time.
/// It will read from the sockets, decode and decrypt the message and then call the
/// `handle_net_message` method. It will also read from the device and call
/// `handle_interface_data` for each packet read.
/// Also, this method will call `housekeep` every second.
2019-01-01 23:35:14 +00:00
#[allow(unknown_lints,clippy::cyclomatic_complexity)]
2015-11-20 08:11:54 +00:00
pub fn run(&mut self) {
let dummy_time = Instant::now();
2019-01-01 23:35:14 +00:00
let trap = Trap::trap(&[Signal::SIGINT, Signal::SIGTERM, Signal::SIGQUIT]);
2016-06-30 08:05:37 +00:00
let mut poll_handle = try_fail!(Poll::new(3), "Failed to create poll handle: {}");
2016-05-02 06:35:11 +00:00
let socket4_fd = self.socket4.as_raw_fd();
let socket6_fd = self.socket6.as_raw_fd();
2015-11-22 15:48:01 +00:00
let device_fd = self.device.as_raw_fd();
2019-01-01 23:35:14 +00:00
try_fail!(poll_handle.register(socket4_fd, Flags::READ), "Failed to add ipv4 socket to poll handle: {}");
try_fail!(poll_handle.register(socket6_fd, Flags::READ), "Failed to add ipv6 socket to poll handle: {}");
2019-01-10 18:36:50 +00:00
if let Err(err) = poll_handle.register(device_fd, Flags::READ) {
if self.device.get_type() != Type::Dummy {
fail!("Failed to add device to poll handle: {}", err);
} else {
warn!("Failed to add device to poll handle: {}", err);
}
}
2015-11-20 08:11:54 +00:00
let mut buffer = [0; 64*1024];
let mut poll_error = false;
2015-11-19 15:34:20 +00:00
loop {
let evts = match poll_handle.wait(1000) {
Ok(evts) => evts,
Err(err) => {
if poll_error {
fail!("Poll wait failed again: {}", err);
}
error!("Poll wait failed: {}, retrying...", err);
poll_error = true;
continue
}
};
for evt in evts {
2016-06-30 08:05:37 +00:00
match evt.fd() {
fd if (fd == socket4_fd || fd == socket6_fd) => {
let (size, src) = match evt.fd() {
fd if fd == socket4_fd => try_fail!(self.socket4.recv_from(&mut buffer), "Failed to read from ipv4 network socket: {}"),
fd if fd == socket6_fd => try_fail!(self.socket6.recv_from(&mut buffer), "Failed to read from ipv6 network socket: {}"),
2016-06-29 06:43:39 +00:00
_ => unreachable!()
};
2019-01-10 18:36:50 +00:00
if let Err(e) = decode(&mut buffer[..size], self.magic, &mut self.crypto).and_then(|msg| {
self.traffic.count_in_traffic(src, size);
self.handle_net_message(src, msg)
}) {
2016-06-29 06:43:39 +00:00
error!("Error: {}, from: {}", e, src);
2016-05-02 06:35:11 +00:00
}
},
2016-06-30 08:05:37 +00:00
fd if (fd == device_fd) => {
2016-07-03 09:59:06 +00:00
let mut start = 64;
2016-07-03 07:47:58 +00:00
let (offset, size) = try_fail!(self.device.read(&mut buffer[start..]), "Failed to read from tap device: {}");
start += offset;
2016-06-29 06:43:39 +00:00
if let Err(e) = self.handle_interface_data(&mut buffer, start, start+size) {
error!("Error: {}", e);
}
2015-11-20 08:11:54 +00:00
},
_ => unreachable!()
}
2015-11-19 15:34:20 +00:00
}
2015-11-25 20:55:30 +00:00
if self.next_housekeep < now() {
poll_error = false;
2015-11-25 20:55:30 +00:00
// Check for signals
if trap.wait(dummy_time).is_some() {
break;
}
// Do the housekeeping
2016-06-29 06:43:39 +00:00
if let Err(e) = self.housekeep() {
error!("Error: {}", e)
2015-11-20 09:59:01 +00:00
}
2015-11-25 20:55:30 +00:00
self.next_housekeep = now() + 1
2015-11-20 09:59:01 +00:00
}
2015-11-19 19:51:53 +00:00
}
2015-11-25 13:31:05 +00:00
info!("Shutting down...");
2015-12-22 21:45:52 +00:00
self.broadcast_msg(&mut Message::Close).ok();
2015-11-19 19:51:53 +00:00
}
2015-11-19 15:34:20 +00:00
}