Some fixes/improvements

pull/109/head
Dennis Schwerdel 2020-11-03 18:49:35 +01:00
parent 35bdfafabf
commit 150f219e04
10 changed files with 104 additions and 78 deletions

View File

@ -7,10 +7,13 @@ This project follows [semantic versioning](http://semver.org).
- [changed] Changed documentation
- [changed] Updated dependencies
- [changed] Retrying connections for 120 secs
- [changed] Resetting own addresses periodically
- [changed] Using smallvec everywhere
- [fixed] Fixed corner case with lost init message
- [fixed] Do not reconnect to timed out pending connections
- [fixed] Most specific claims beat less specific claims
- [fixed] Count all invalid protocol traffic
- [fixed] Fixed compile with musl
### v2.0.0 (2020-10-30)

8
Cargo.lock generated
View File

@ -96,9 +96,9 @@ dependencies = [
[[package]]
name = "const_fn"
version = "0.4.2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce90df4c658c62f12d78f7508cf92f9173e5184a539c10bfe54a3107b3ffd0f2"
checksum = "c478836e029dcef17fb47c89023448c64f781a046e0300e257ad8225ae59afab"
[[package]]
name = "daemonize"
@ -279,9 +279,9 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "ppv-lite86"
version = "0.2.9"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c36fa947111f5c62a733b652544dd0016a43ce89619538a8ef92724a6f501a20"
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
[[package]]
name = "privdrop"

View File

@ -21,6 +21,7 @@ use std::{
};
use super::util::{from_base62, to_base62, Encoder, TimeSource};
use smallvec::SmallVec;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
@ -33,8 +34,8 @@ fn base_62_sanitize(data: &str) -> String {
data.chars().filter(|c| c.is_ascii_alphanumeric()).collect()
}
fn sha512(data: &[u8]) -> Vec<u8> {
digest::digest(&digest::SHA512, data).as_ref().to_vec()
fn sha512(data: &[u8]) -> SmallVec<[u8; 64]> {
digest::digest(&digest::SHA512, data).as_ref().into()
}
struct FutureResult<T> {
@ -62,8 +63,8 @@ impl<TS: TimeSource> BeaconSerializer<TS> {
((TS::now() / 3600) & 0xffff) as u16
}
fn get_keystream(&self, type_: u8, seed: u8, iter: u8) -> Vec<u8> {
let mut data = Vec::new();
fn get_keystream(&self, type_: u8, seed: u8, iter: u8) -> SmallVec<[u8; 64]> {
let mut data = SmallVec::<[u8; 128]>::new();
data.extend_from_slice(&[type_, seed, iter]);
data.extend_from_slice(&self.shared_key);
sha512(&data)
@ -92,6 +93,7 @@ impl<TS: TimeSource> BeaconSerializer<TS> {
fn end(&self) -> String {
to_base62(&self.get_keystream(TYPE_END, 0, 0))[0..5].to_string()
}
fn encrypt_data(&self, data: &mut Vec<u8>) {
// Note: the 1 byte seed is only meant to protect from random changes,
// not malicious ones. For full protection, at least 8 bytes (~12
@ -115,8 +117,8 @@ impl<TS: TimeSource> BeaconSerializer<TS> {
// Add timestamp
data.extend_from_slice(&Self::now_hour_16().to_be_bytes());
// Split addresses into v4 and v6
let mut v4addrs = Vec::new();
let mut v6addrs = Vec::new();
let mut v4addrs = SmallVec::<[SocketAddrV4; 256]>::new();
let mut v6addrs = SmallVec::<[SocketAddrV6; 256]>::new();
for p in peers {
match *p {
SocketAddr::V4(addr) => v4addrs.push(addr),

View File

@ -43,6 +43,7 @@ pub type Hash = BuildHasherDefault<FnvHasher>;
const MAX_RECONNECT_INTERVAL: u16 = 3600;
const RESOLVE_INTERVAL: Time = 300;
pub const STATS_INTERVAL: Time = 60;
const OWN_ADDRESS_RESET_INTERVAL: Time = 300;
const SPACE_BEFORE: usize = 100;
struct PeerData {
@ -56,7 +57,7 @@ struct PeerData {
#[derive(Clone)]
pub struct ReconnectEntry {
address: Option<(String, Time)>,
resolved: Vec<SocketAddr>,
resolved: SmallVec<[SocketAddr; 3]>,
tries: u16,
timeout: u16,
next: Time,
@ -70,8 +71,8 @@ pub struct GenericCloud<D: Device, P: Protocol, S: Socket, TS: TimeSource> {
learning: bool,
broadcast: bool,
peers: HashMap<SocketAddr, PeerData, Hash>,
reconnect_peers: Vec<ReconnectEntry>,
own_addresses: Vec<SocketAddr>,
reconnect_peers: SmallVec<[ReconnectEntry; 3]>,
own_addresses: SmallVec<[SocketAddr; 3]>,
pending_inits: HashMap<SocketAddr, PeerCrypto<NodeInfo>, Hash>,
table: ClaimTable<TS>,
socket: S,
@ -86,6 +87,7 @@ pub struct GenericCloud<D: Device, P: Protocol, S: Socket, TS: TimeSource> {
next_housekeep: Time,
next_stats_out: Time,
next_beacon: Time,
next_own_address_reset: Time,
port_forwarding: Option<PortForwarding>,
traffic: TrafficStats,
beacon_serializer: BeaconSerializer<TS>,
@ -137,8 +139,8 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
learning,
broadcast,
pending_inits: HashMap::default(),
reconnect_peers: Vec::new(),
own_addresses: Vec::new(),
reconnect_peers: SmallVec::new(),
own_addresses: SmallVec::new(),
peer_timeout_publish: config.peer_timeout as u16,
table: ClaimTable::new(config.switch_timeout as Duration, config.peer_timeout as Duration),
socket,
@ -150,6 +152,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
next_housekeep: now,
next_stats_out: now + STATS_INTERVAL,
next_beacon: now,
next_own_address_reset: now + OWN_ADDRESS_RESET_INTERVAL,
port_forwarding,
traffic: TrafficStats::default(),
beacon_serializer: BeaconSerializer::new(beacon_key),
@ -214,16 +217,14 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
self.send_to(addr, msg)
}
/// Returns the self-perceived addresses (IPv4 and IPv6) of this node
///
/// Note that those addresses could be private addresses that are not reachable by other nodes,
/// or only some other nodes inside the same network.
///
/// # Errors
/// Returns an IOError if the underlying system call fails
#[allow(dead_code)]
pub fn address(&self) -> io::Result<SocketAddr> {
Ok(self.socket.address().map(mapped_addr)?)
pub fn reset_own_addresses(&mut self) -> io::Result<()> {
self.own_addresses.clear();
self.own_addresses.push(self.socket.address().map(mapped_addr)?);
if let Some(ref pfw) = self.port_forwarding {
self.own_addresses.push(pfw.get_internal_ip().into());
self.own_addresses.push(pfw.get_external_ip().into());
}
Ok(())
}
/// Returns the number of peers
@ -246,7 +247,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
Ok(addrs) => addrs,
Err(err) => {
warn!("Failed to resolve {}: {:?}", add, err);
vec![]
smallvec![]
}
};
self.reconnect_peers.push(ReconnectEntry {
@ -268,7 +269,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
/// # Errors
/// This method returns `Error::NameError` if the address is a name that fails to resolve.
pub fn connect<Addr: ToSocketAddrs + fmt::Debug + Clone>(&mut self, addr: Addr) -> Result<(), Error> {
let addrs = resolve(&addr)?.into_iter().map(mapped_addr).collect::<Vec<_>>();
let addrs = resolve(&addr)?.into_iter().map(mapped_addr).collect::<SmallVec<[SocketAddr; 3]>>();
for addr in &addrs {
if self.own_addresses.contains(addr)
|| self.peers.contains_key(addr)
@ -305,7 +306,10 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
fn connect_sock(&mut self, addr: SocketAddr) -> Result<(), Error> {
let addr = mapped_addr(addr);
if self.peers.contains_key(&addr) || self.own_addresses.contains(&addr) {
if self.peers.contains_key(&addr)
|| self.own_addresses.contains(&addr)
|| self.pending_inits.contains_key(&addr)
{
return Ok(())
}
debug!("Connecting to {:?}", addr);
@ -320,7 +324,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
fn crypto_housekeep(&mut self) -> Result<(), Error> {
let mut msg = MsgBuffer::new(SPACE_BEFORE);
let mut del: SmallVec<[SocketAddr; 4]> = smallvec![];
for addr in self.pending_inits.keys().copied().collect::<Vec<_>>() {
for addr in self.pending_inits.keys().copied().collect::<SmallVec<[SocketAddr; 4]>>() {
msg.clear();
match self.pending_inits.get_mut(&addr).unwrap().every_second(&mut msg) {
Err(_) => del.push(addr),
@ -329,7 +333,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
Ok(_) => unreachable!()
}
}
for addr in self.peers.keys().copied().collect::<Vec<_>>() {
for addr in self.peers.keys().copied().collect::<SmallVec<[SocketAddr; 16]>>() {
msg.clear();
match self.peers.get_mut(&addr).unwrap().crypto.every_second(&mut msg) {
Err(_) => del.push(addr),
@ -347,39 +351,8 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
Ok(())
}
fn housekeep(&mut self) -> Result<(), Error> {
fn reconnect_to_peers(&mut self) -> Result<(), Error> {
let now = TS::now();
let mut buffer = MsgBuffer::new(SPACE_BEFORE);
let mut del: Vec<SocketAddr> = Vec::new();
for (&addr, ref data) in &self.peers {
if data.timeout < now {
del.push(addr);
}
}
for addr in del {
info!("Forgot peer {} due to timeout", addr_nice(addr));
self.peers.remove(&addr);
self.table.remove_claims(addr);
self.connect_sock(addr)?; // Try to reconnect
}
self.table.housekeep();
self.crypto_housekeep()?;
// Periodically extend the port-forwarding
if let Some(ref mut pfw) = self.port_forwarding {
pfw.check_extend();
}
// Periodically send peer list to peers
let now = TS::now();
if self.next_peers <= now {
debug!("Send peer list to all peers");
let info = self.create_node_info();
info.encode(&mut buffer);
self.broadcast_msg(MESSAGE_TYPE_NODE_INFO, &mut buffer)?;
// Reschedule for next update
let min_peer_timeout = self.peers.iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT);
let interval = min(self.update_freq as u16, max(min_peer_timeout / 2 - 60, 1));
self.next_peers = now + Time::from(interval);
}
// Connect to those reconnect_peers that are due
for entry in self.reconnect_peers.clone() {
if entry.next > now {
@ -424,6 +397,48 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
entry.next = now + Time::from(entry.timeout);
}
self.reconnect_peers.retain(|e| e.final_timeout.unwrap_or(now) >= now);
Ok(())
}
fn housekeep(&mut self) -> Result<(), Error> {
let now = TS::now();
let mut buffer = MsgBuffer::new(SPACE_BEFORE);
let mut del: SmallVec<[SocketAddr; 3]> = SmallVec::new();
for (&addr, ref data) in &self.peers {
if data.timeout < now {
del.push(addr);
}
}
for addr in del {
info!("Forgot peer {} due to timeout", addr_nice(addr));
self.peers.remove(&addr);
self.table.remove_claims(addr);
self.connect_sock(addr)?; // Try to reconnect
}
self.table.housekeep();
self.crypto_housekeep()?;
// Periodically extend the port-forwarding
if let Some(ref mut pfw) = self.port_forwarding {
pfw.check_extend();
}
let now = TS::now();
// Periodically reset own peers
if self.next_own_address_reset <= now {
self.reset_own_addresses().map_err(|err| Error::SocketIo("Failed to get own addresses", err))?;
self.next_own_address_reset = now + OWN_ADDRESS_RESET_INTERVAL;
}
// Periodically send peer list to peers
if self.next_peers <= now {
debug!("Send peer list to all peers");
let info = self.create_node_info();
info.encode(&mut buffer);
self.broadcast_msg(MESSAGE_TYPE_NODE_INFO, &mut buffer)?;
// Reschedule for next update
let min_peer_timeout = self.peers.iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT);
let interval = min(self.update_freq as u16, max(min_peer_timeout / 2 - 60, 1));
self.next_peers = now + Time::from(interval);
}
self.reconnect_to_peers()?;
if self.next_stats_out < now {
// Write out the statistics
self.write_out_stats().map_err(|err| Error::FileIo("Failed to write stats file", err))?;
@ -448,7 +463,8 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
/// Stores the beacon
fn store_beacon(&mut self) -> Result<(), Error> {
if let Some(ref path) = self.config.beacon_store {
let peers: Vec<_> = self.own_addresses.choose_multiple(&mut thread_rng(), 3).cloned().collect();
let peers: SmallVec<[SocketAddr; 3]> =
self.own_addresses.choose_multiple(&mut thread_rng(), 3).cloned().collect();
if let Some(path) = path.strip_prefix('|') {
self.beacon_serializer
.write_to_cmd(&peers, path)
@ -762,9 +778,8 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
}
fn initialize(&mut self) {
match self.address() {
Err(err) => error!("Failed to obtain local addresses: {}", err),
Ok(addr) => self.own_addresses.push(addr)
if let Err(err) = self.reset_own_addresses() {
error!("Failed to obtain local addresses: {}", err)
}
}

View File

@ -229,7 +229,7 @@ impl InitMsg {
let pos = r.position() as usize;
let signature_len = r.read_u8().map_err(|_| Error::Parse("Init message too short"))? as usize;
let mut signature = vec![0; signature_len];
let mut signature: SmallVec<[u8; 32]> = smallvec![0; signature_len];
r.read_exact(&mut signature).map_err(|_| Error::Parse("Init message too short"))?;
let signed_data = &r.into_inner()[0..pos];

View File

@ -254,7 +254,7 @@ impl TunTapDevice {
+ crypto::EXTRA_LEN + crypto::TAG_LEN /* crypto overhead */
+ 1 /* message type header */
+ match self.type_ {
Type::Tap => 12, /* inner ethernet header */
Type::Tap => 14, /* inner ethernet header */
Type::Tun | Type::Dummy => 0
}
}

View File

@ -40,9 +40,9 @@ pub struct NodeInfo {
}
impl NodeInfo {
const PART_NODEID: u8 = 4;
const PART_CLAIMS: u8 = 2;
const PART_END: u8 = 0;
const PART_NODEID: u8 = 4;
const PART_PEERS: u8 = 1;
const PART_PEER_TIMEOUT: u8 = 3;
@ -133,8 +133,8 @@ impl NodeInfo {
fn encode_peer_list_part<W: Write>(&self, mut out: W) -> Result<(), io::Error> {
for p in &self.peers {
let mut addr_ipv4 = vec![];
let mut addr_ipv6 = vec![];
let mut addr_ipv4: SmallVec<[SocketAddrV4; 16]> = smallvec![];
let mut addr_ipv6: SmallVec<[SocketAddrV6; 16]> = smallvec![];
for a in &p.addrs {
match a {
SocketAddr::V4(addr) => addr_ipv4.push(*addr),
@ -186,9 +186,7 @@ impl NodeInfo {
let len;
{
let mut cursor = Cursor::new(buffer.buffer());
Self::encode_part(&mut cursor, Self::PART_NODEID, |cursor| {
cursor.write_all(&self.node_id)
})?;
Self::encode_part(&mut cursor, Self::PART_NODEID, |cursor| cursor.write_all(&self.node_id))?;
Self::encode_part(&mut cursor, Self::PART_PEERS, |cursor| self.encode_peer_list_part(cursor))?;
Self::encode_part(&mut cursor, Self::PART_CLAIMS, |mut cursor| {
for c in &self.claims {

View File

@ -103,7 +103,6 @@ mod internal {
}
}
} else {
gateway.remove_port(PortMappingProtocol::UDP, port).ok();
match gateway.add_port(PortMappingProtocol::UDP, port, addr, LEASE_TIME, DESCRIPTION) {
Ok(()) => Ok((port, LEASE_TIME)),
Err(AddPortError::OnlyPermanentLeasesSupported) => {
@ -150,6 +149,14 @@ mod internal {
Err(err) => debug!("Port-forwarding: failed to deactivate port forwarding: {}", err)
}
}
pub fn get_internal_ip(&self) -> SocketAddrV4 {
self.internal_addr
}
pub fn get_external_ip(&self) -> SocketAddrV4 {
self.external_addr
}
}
impl Drop for PortForwarding {

View File

@ -118,7 +118,7 @@ impl FromStr for Address {
}
return Ok(Address { data: res, len: 16 })
}
let parts: Vec<&str> = text.split(':').collect();
let parts: SmallVec<[&str; 10]> = text.split(':').collect();
if parts.len() == 6 {
let mut bytes = [0; 16];
for i in 0..6 {

View File

@ -14,6 +14,7 @@ use crate::error::Error;
use signal::{trap::Trap, Signal};
use std::time::Instant;
use smallvec::SmallVec;
pub type Duration = u32;
@ -216,10 +217,10 @@ pub fn get_internal_ip() -> Ipv4Addr {
#[allow(unknown_lints, clippy::needless_pass_by_value)]
pub fn resolve<Addr: ToSocketAddrs + fmt::Debug>(addr: Addr) -> Result<Vec<SocketAddr>, Error> {
pub fn resolve<Addr: ToSocketAddrs + fmt::Debug>(addr: Addr) -> Result<SmallVec<[SocketAddr; 3]>, Error> {
let addrs = addr.to_socket_addrs().map_err(|_| Error::NameUnresolvable(format!("{:?}", addr)))?;
// Remove duplicates in addrs (why are there duplicates???)
let mut addrs = addrs.collect::<Vec<_>>();
let mut addrs = addrs.collect::<SmallVec<_>>();
// Try IPv4 first as it usually is faster
addrs.sort_by_key(|addr| {
match *addr {