From 150f219e0471a74a048c5efe5cd99d307c8fa5c5 Mon Sep 17 00:00:00 2001 From: Dennis Schwerdel Date: Tue, 3 Nov 2020 18:49:35 +0100 Subject: [PATCH] Some fixes/improvements --- CHANGELOG.md | 3 + Cargo.lock | 8 +-- src/beacon.rs | 14 +++-- src/cloud.rs | 127 +++++++++++++++++++++++------------------ src/crypto/init.rs | 2 +- src/device.rs | 2 +- src/messages.rs | 10 ++-- src/port_forwarding.rs | 9 ++- src/types.rs | 2 +- src/util.rs | 5 +- 10 files changed, 104 insertions(+), 78 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b7382e..bb514f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,10 +7,13 @@ This project follows [semantic versioning](http://semver.org). - [changed] Changed documentation - [changed] Updated dependencies - [changed] Retrying connections for 120 secs +- [changed] Resetting own addresses periodically +- [changed] Using smallvec everywhere - [fixed] Fixed corner case with lost init message - [fixed] Do not reconnect to timed out pending connections - [fixed] Most specific claims beat less specific claims - [fixed] Count all invalid protocol traffic +- [fixed] Fixed compile with musl ### v2.0.0 (2020-10-30) diff --git a/Cargo.lock b/Cargo.lock index b717857..2de37f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -96,9 +96,9 @@ dependencies = [ [[package]] name = "const_fn" -version = "0.4.2" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce90df4c658c62f12d78f7508cf92f9173e5184a539c10bfe54a3107b3ffd0f2" +checksum = "c478836e029dcef17fb47c89023448c64f781a046e0300e257ad8225ae59afab" [[package]] name = "daemonize" @@ -279,9 +279,9 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" [[package]] name = "ppv-lite86" -version = "0.2.9" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c36fa947111f5c62a733b652544dd0016a43ce89619538a8ef92724a6f501a20" +checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" [[package]] name = "privdrop" diff --git a/src/beacon.rs b/src/beacon.rs index c8c5a4c..a33827c 100644 --- a/src/beacon.rs +++ b/src/beacon.rs @@ -21,6 +21,7 @@ use std::{ }; use super::util::{from_base62, to_base62, Encoder, TimeSource}; +use smallvec::SmallVec; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; @@ -33,8 +34,8 @@ fn base_62_sanitize(data: &str) -> String { data.chars().filter(|c| c.is_ascii_alphanumeric()).collect() } -fn sha512(data: &[u8]) -> Vec { - digest::digest(&digest::SHA512, data).as_ref().to_vec() +fn sha512(data: &[u8]) -> SmallVec<[u8; 64]> { + digest::digest(&digest::SHA512, data).as_ref().into() } struct FutureResult { @@ -62,8 +63,8 @@ impl BeaconSerializer { ((TS::now() / 3600) & 0xffff) as u16 } - fn get_keystream(&self, type_: u8, seed: u8, iter: u8) -> Vec { - let mut data = Vec::new(); + fn get_keystream(&self, type_: u8, seed: u8, iter: u8) -> SmallVec<[u8; 64]> { + let mut data = SmallVec::<[u8; 128]>::new(); data.extend_from_slice(&[type_, seed, iter]); data.extend_from_slice(&self.shared_key); sha512(&data) @@ -92,6 +93,7 @@ impl BeaconSerializer { fn end(&self) -> String { to_base62(&self.get_keystream(TYPE_END, 0, 0))[0..5].to_string() } + fn encrypt_data(&self, data: &mut Vec) { // Note: the 1 byte seed is only meant to protect from random changes, // not malicious ones. For full protection, at least 8 bytes (~12 @@ -115,8 +117,8 @@ impl BeaconSerializer { // Add timestamp data.extend_from_slice(&Self::now_hour_16().to_be_bytes()); // Split addresses into v4 and v6 - let mut v4addrs = Vec::new(); - let mut v6addrs = Vec::new(); + let mut v4addrs = SmallVec::<[SocketAddrV4; 256]>::new(); + let mut v6addrs = SmallVec::<[SocketAddrV6; 256]>::new(); for p in peers { match *p { SocketAddr::V4(addr) => v4addrs.push(addr), diff --git a/src/cloud.rs b/src/cloud.rs index cabf99b..99198f0 100644 --- a/src/cloud.rs +++ b/src/cloud.rs @@ -43,6 +43,7 @@ pub type Hash = BuildHasherDefault; const MAX_RECONNECT_INTERVAL: u16 = 3600; const RESOLVE_INTERVAL: Time = 300; pub const STATS_INTERVAL: Time = 60; +const OWN_ADDRESS_RESET_INTERVAL: Time = 300; const SPACE_BEFORE: usize = 100; struct PeerData { @@ -56,7 +57,7 @@ struct PeerData { #[derive(Clone)] pub struct ReconnectEntry { address: Option<(String, Time)>, - resolved: Vec, + resolved: SmallVec<[SocketAddr; 3]>, tries: u16, timeout: u16, next: Time, @@ -70,8 +71,8 @@ pub struct GenericCloud { learning: bool, broadcast: bool, peers: HashMap, - reconnect_peers: Vec, - own_addresses: Vec, + reconnect_peers: SmallVec<[ReconnectEntry; 3]>, + own_addresses: SmallVec<[SocketAddr; 3]>, pending_inits: HashMap, Hash>, table: ClaimTable, socket: S, @@ -86,6 +87,7 @@ pub struct GenericCloud { next_housekeep: Time, next_stats_out: Time, next_beacon: Time, + next_own_address_reset: Time, port_forwarding: Option, traffic: TrafficStats, beacon_serializer: BeaconSerializer, @@ -137,8 +139,8 @@ impl GenericCloud GenericCloud GenericCloud io::Result { - Ok(self.socket.address().map(mapped_addr)?) + pub fn reset_own_addresses(&mut self) -> io::Result<()> { + self.own_addresses.clear(); + self.own_addresses.push(self.socket.address().map(mapped_addr)?); + if let Some(ref pfw) = self.port_forwarding { + self.own_addresses.push(pfw.get_internal_ip().into()); + self.own_addresses.push(pfw.get_external_ip().into()); + } + Ok(()) } /// Returns the number of peers @@ -246,7 +247,7 @@ impl GenericCloud addrs, Err(err) => { warn!("Failed to resolve {}: {:?}", add, err); - vec![] + smallvec![] } }; self.reconnect_peers.push(ReconnectEntry { @@ -268,7 +269,7 @@ impl GenericCloud(&mut self, addr: Addr) -> Result<(), Error> { - let addrs = resolve(&addr)?.into_iter().map(mapped_addr).collect::>(); + let addrs = resolve(&addr)?.into_iter().map(mapped_addr).collect::>(); for addr in &addrs { if self.own_addresses.contains(addr) || self.peers.contains_key(addr) @@ -305,7 +306,10 @@ impl GenericCloud Result<(), Error> { let addr = mapped_addr(addr); - if self.peers.contains_key(&addr) || self.own_addresses.contains(&addr) { + if self.peers.contains_key(&addr) + || self.own_addresses.contains(&addr) + || self.pending_inits.contains_key(&addr) + { return Ok(()) } debug!("Connecting to {:?}", addr); @@ -320,7 +324,7 @@ impl GenericCloud Result<(), Error> { let mut msg = MsgBuffer::new(SPACE_BEFORE); let mut del: SmallVec<[SocketAddr; 4]> = smallvec![]; - for addr in self.pending_inits.keys().copied().collect::>() { + for addr in self.pending_inits.keys().copied().collect::>() { msg.clear(); match self.pending_inits.get_mut(&addr).unwrap().every_second(&mut msg) { Err(_) => del.push(addr), @@ -329,7 +333,7 @@ impl GenericCloud unreachable!() } } - for addr in self.peers.keys().copied().collect::>() { + for addr in self.peers.keys().copied().collect::>() { msg.clear(); match self.peers.get_mut(&addr).unwrap().crypto.every_second(&mut msg) { Err(_) => del.push(addr), @@ -347,39 +351,8 @@ impl GenericCloud Result<(), Error> { + fn reconnect_to_peers(&mut self) -> Result<(), Error> { let now = TS::now(); - let mut buffer = MsgBuffer::new(SPACE_BEFORE); - let mut del: Vec = Vec::new(); - for (&addr, ref data) in &self.peers { - if data.timeout < now { - del.push(addr); - } - } - for addr in del { - info!("Forgot peer {} due to timeout", addr_nice(addr)); - self.peers.remove(&addr); - self.table.remove_claims(addr); - self.connect_sock(addr)?; // Try to reconnect - } - self.table.housekeep(); - self.crypto_housekeep()?; - // Periodically extend the port-forwarding - if let Some(ref mut pfw) = self.port_forwarding { - pfw.check_extend(); - } - // Periodically send peer list to peers - let now = TS::now(); - if self.next_peers <= now { - debug!("Send peer list to all peers"); - let info = self.create_node_info(); - info.encode(&mut buffer); - self.broadcast_msg(MESSAGE_TYPE_NODE_INFO, &mut buffer)?; - // Reschedule for next update - let min_peer_timeout = self.peers.iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT); - let interval = min(self.update_freq as u16, max(min_peer_timeout / 2 - 60, 1)); - self.next_peers = now + Time::from(interval); - } // Connect to those reconnect_peers that are due for entry in self.reconnect_peers.clone() { if entry.next > now { @@ -424,6 +397,48 @@ impl GenericCloud= now); + Ok(()) + } + + fn housekeep(&mut self) -> Result<(), Error> { + let now = TS::now(); + let mut buffer = MsgBuffer::new(SPACE_BEFORE); + let mut del: SmallVec<[SocketAddr; 3]> = SmallVec::new(); + for (&addr, ref data) in &self.peers { + if data.timeout < now { + del.push(addr); + } + } + for addr in del { + info!("Forgot peer {} due to timeout", addr_nice(addr)); + self.peers.remove(&addr); + self.table.remove_claims(addr); + self.connect_sock(addr)?; // Try to reconnect + } + self.table.housekeep(); + self.crypto_housekeep()?; + // Periodically extend the port-forwarding + if let Some(ref mut pfw) = self.port_forwarding { + pfw.check_extend(); + } + let now = TS::now(); + // Periodically reset own peers + if self.next_own_address_reset <= now { + self.reset_own_addresses().map_err(|err| Error::SocketIo("Failed to get own addresses", err))?; + self.next_own_address_reset = now + OWN_ADDRESS_RESET_INTERVAL; + } + // Periodically send peer list to peers + if self.next_peers <= now { + debug!("Send peer list to all peers"); + let info = self.create_node_info(); + info.encode(&mut buffer); + self.broadcast_msg(MESSAGE_TYPE_NODE_INFO, &mut buffer)?; + // Reschedule for next update + let min_peer_timeout = self.peers.iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT); + let interval = min(self.update_freq as u16, max(min_peer_timeout / 2 - 60, 1)); + self.next_peers = now + Time::from(interval); + } + self.reconnect_to_peers()?; if self.next_stats_out < now { // Write out the statistics self.write_out_stats().map_err(|err| Error::FileIo("Failed to write stats file", err))?; @@ -448,7 +463,8 @@ impl GenericCloud Result<(), Error> { if let Some(ref path) = self.config.beacon_store { - let peers: Vec<_> = self.own_addresses.choose_multiple(&mut thread_rng(), 3).cloned().collect(); + let peers: SmallVec<[SocketAddr; 3]> = + self.own_addresses.choose_multiple(&mut thread_rng(), 3).cloned().collect(); if let Some(path) = path.strip_prefix('|') { self.beacon_serializer .write_to_cmd(&peers, path) @@ -762,9 +778,8 @@ impl GenericCloud error!("Failed to obtain local addresses: {}", err), - Ok(addr) => self.own_addresses.push(addr) + if let Err(err) = self.reset_own_addresses() { + error!("Failed to obtain local addresses: {}", err) } } diff --git a/src/crypto/init.rs b/src/crypto/init.rs index b8d4bd6..2895294 100644 --- a/src/crypto/init.rs +++ b/src/crypto/init.rs @@ -229,7 +229,7 @@ impl InitMsg { let pos = r.position() as usize; let signature_len = r.read_u8().map_err(|_| Error::Parse("Init message too short"))? as usize; - let mut signature = vec![0; signature_len]; + let mut signature: SmallVec<[u8; 32]> = smallvec![0; signature_len]; r.read_exact(&mut signature).map_err(|_| Error::Parse("Init message too short"))?; let signed_data = &r.into_inner()[0..pos]; diff --git a/src/device.rs b/src/device.rs index e315e09..169c191 100644 --- a/src/device.rs +++ b/src/device.rs @@ -254,7 +254,7 @@ impl TunTapDevice { + crypto::EXTRA_LEN + crypto::TAG_LEN /* crypto overhead */ + 1 /* message type header */ + match self.type_ { - Type::Tap => 12, /* inner ethernet header */ + Type::Tap => 14, /* inner ethernet header */ Type::Tun | Type::Dummy => 0 } } diff --git a/src/messages.rs b/src/messages.rs index da2a423..3734026 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -40,9 +40,9 @@ pub struct NodeInfo { } impl NodeInfo { - const PART_NODEID: u8 = 4; const PART_CLAIMS: u8 = 2; const PART_END: u8 = 0; + const PART_NODEID: u8 = 4; const PART_PEERS: u8 = 1; const PART_PEER_TIMEOUT: u8 = 3; @@ -133,8 +133,8 @@ impl NodeInfo { fn encode_peer_list_part(&self, mut out: W) -> Result<(), io::Error> { for p in &self.peers { - let mut addr_ipv4 = vec![]; - let mut addr_ipv6 = vec![]; + let mut addr_ipv4: SmallVec<[SocketAddrV4; 16]> = smallvec![]; + let mut addr_ipv6: SmallVec<[SocketAddrV6; 16]> = smallvec![]; for a in &p.addrs { match a { SocketAddr::V4(addr) => addr_ipv4.push(*addr), @@ -186,9 +186,7 @@ impl NodeInfo { let len; { let mut cursor = Cursor::new(buffer.buffer()); - Self::encode_part(&mut cursor, Self::PART_NODEID, |cursor| { - cursor.write_all(&self.node_id) - })?; + Self::encode_part(&mut cursor, Self::PART_NODEID, |cursor| cursor.write_all(&self.node_id))?; Self::encode_part(&mut cursor, Self::PART_PEERS, |cursor| self.encode_peer_list_part(cursor))?; Self::encode_part(&mut cursor, Self::PART_CLAIMS, |mut cursor| { for c in &self.claims { diff --git a/src/port_forwarding.rs b/src/port_forwarding.rs index 89311f1..1891614 100644 --- a/src/port_forwarding.rs +++ b/src/port_forwarding.rs @@ -103,7 +103,6 @@ mod internal { } } } else { - gateway.remove_port(PortMappingProtocol::UDP, port).ok(); match gateway.add_port(PortMappingProtocol::UDP, port, addr, LEASE_TIME, DESCRIPTION) { Ok(()) => Ok((port, LEASE_TIME)), Err(AddPortError::OnlyPermanentLeasesSupported) => { @@ -150,6 +149,14 @@ mod internal { Err(err) => debug!("Port-forwarding: failed to deactivate port forwarding: {}", err) } } + + pub fn get_internal_ip(&self) -> SocketAddrV4 { + self.internal_addr + } + + pub fn get_external_ip(&self) -> SocketAddrV4 { + self.external_addr + } } impl Drop for PortForwarding { diff --git a/src/types.rs b/src/types.rs index c7ff17a..3bfccc4 100644 --- a/src/types.rs +++ b/src/types.rs @@ -118,7 +118,7 @@ impl FromStr for Address { } return Ok(Address { data: res, len: 16 }) } - let parts: Vec<&str> = text.split(':').collect(); + let parts: SmallVec<[&str; 10]> = text.split(':').collect(); if parts.len() == 6 { let mut bytes = [0; 16]; for i in 0..6 { diff --git a/src/util.rs b/src/util.rs index 0922edc..f14278f 100644 --- a/src/util.rs +++ b/src/util.rs @@ -14,6 +14,7 @@ use crate::error::Error; use signal::{trap::Trap, Signal}; use std::time::Instant; +use smallvec::SmallVec; pub type Duration = u32; @@ -216,10 +217,10 @@ pub fn get_internal_ip() -> Ipv4Addr { #[allow(unknown_lints, clippy::needless_pass_by_value)] -pub fn resolve(addr: Addr) -> Result, Error> { +pub fn resolve(addr: Addr) -> Result, Error> { let addrs = addr.to_socket_addrs().map_err(|_| Error::NameUnresolvable(format!("{:?}", addr)))?; // Remove duplicates in addrs (why are there duplicates???) - let mut addrs = addrs.collect::>(); + let mut addrs = addrs.collect::>(); // Try IPv4 first as it usually is faster addrs.sort_by_key(|addr| { match *addr {