This commit is contained in:
Dennis Schwerdel 2021-04-11 21:50:02 +02:00
parent a7a7ab3a1f
commit 66bef5cd21
5 changed files with 40 additions and 21 deletions

View File

@ -8,7 +8,6 @@ use crate::{
types::NodeId,
util::{from_base62, to_base62, MsgBuffer},
};
use libc::BPF_FS_MAGIC;
use ring::{
aead::{self, Algorithm, LessSafeKey, UnboundKey},
agreement::{EphemeralPrivateKey, UnparsedPublicKey},
@ -253,8 +252,8 @@ impl PeerCrypto {
if msg.stage() == STAGE_PONG {
buffer.set_length(last_init_message.len());
buffer.message_mut().copy_from_slice(last_init_message);
return Ok(MessageResult::Reply)
}
return Ok(MessageResult::Reply)
}
}
Ok(MessageResult::None)

View File

@ -9,4 +9,3 @@ mod rotate;
pub use self::core::{EXTRA_LEN, TAG_LEN};
pub use common::*;
pub use self::core::{EXTRA_LEN, TAG_LEN};

View File

@ -1,20 +1,20 @@
use super::{
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
common::SPACE_BEFORE,
shared::{SharedPeerCrypto, SharedTable, SharedTraffic},
};
use crate::{
beacon::BeaconSerializer,
config::{DEFAULT_PEER_TIMEOUT, DEFAULT_PORT},
crypto::{is_init_message, InitResult, InitState, MessageResult, Crypto},
device::{Type, Device},
crypto::{is_init_message, Crypto, InitResult, InitState, MessageResult},
device::{Device, Type},
engine::common::{Hash, PeerData},
error::Error,
messages::{
AddrList, NodeInfo, PeerInfo, MESSAGE_TYPE_CLOSE, MESSAGE_TYPE_DATA, MESSAGE_TYPE_KEEPALIVE,
MESSAGE_TYPE_NODE_INFO,
},
net::{mapped_addr, Socket},
net::{mapped_addr, parse_listen, Socket},
port_forwarding::PortForwarding,
types::{Address, NodeId, Range, RangeList},
util::{addr_nice, resolve, MsgBuffer, StatsdMsg, Time, TimeSource},
@ -236,7 +236,17 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
async fn update_peer_info(&mut self, addr: SocketAddr, info: Option<NodeInfo>) -> Result<(), Error> {
if let Some(peer) = self.peers.get_mut(&addr) {
peer.last_seen = TS::now();
peer.timeout = TS::now() + self.config.peer_timeout as Time
peer.timeout = TS::now() + self.config.peer_timeout as Time;
if let Some(info) = &info {
// Update peer addresses, always add seen address
peer.addrs.clear();
peer.addrs.push(addr);
for addr in &info.addrs {
if !peer.addrs.contains(addr) {
peer.addrs.push(*addr);
}
}
}
} else {
error!("Received peer update from non peer {}", addr_nice(addr));
return Ok(());
@ -281,6 +291,12 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
'outer: for peer in peers {
for addr in &peer.addrs {
if self.peers.contains_key(addr) {
// Check addresses and add addresses that we don't know to own addresses
for addr in &peer.addrs {
if !self.own_addresses.contains(addr) {
self.own_addresses.push(*addr)
}
}
continue 'outer;
}
}
@ -427,11 +443,6 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
pfw.check_extend();
}
let now = TS::now();
// Periodically reset own peers
if self.next_own_address_reset <= now {
self.reset_own_addresses().await.map_err(|err| Error::SocketIo("Failed to get own addresses", err))?;
self.next_own_address_reset = now + OWN_ADDRESS_RESET_INTERVAL;
}
// Periodically send peer list to peers
if self.next_peers <= now {
debug!("Send peer list to all peers");
@ -465,6 +476,11 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
self.table.sync();
self.traffic.sync();
self.peer_crypto.store(&self.peers);
// Periodically reset own peers
if self.next_own_address_reset <= now {
self.reset_own_addresses().await.map_err(|err| Error::SocketIo("Failed to get own addresses", err))?;
self.next_own_address_reset = now + OWN_ADDRESS_RESET_INTERVAL;
}
assert!(self.buffer.is_empty());
Ok(())
}
@ -497,7 +513,14 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
async fn reset_own_addresses(&mut self) -> io::Result<()> {
self.own_addresses.clear();
self.own_addresses.push(self.socket.address().await.map(mapped_addr)?);
let socket_addr = self.socket.address().await.map(mapped_addr)?;
// 1) Specified advertise addresses
for addr in &self.config.advertise_addresses {
self.own_addresses.push(parse_listen(addr, socket_addr.port()));
}
// 2) Address of UDP socket
self.own_addresses.push(socket_addr);
// 3) Addresses from port forwarding
if let Some(ref pfw) = self.port_forwarding {
self.own_addresses.push(pfw.get_internal_ip().into());
self.own_addresses.push(pfw.get_external_ip().into());

View File

@ -2,15 +2,15 @@
// Copyright (C) 2015-2021 Dennis Schwerdel
// This software is licensed under GPL-3 or newer (see LICENSE.md)
use super::util::{MockTimeSource, MsgBuffer, Time, TimeSource};
use crate::config::DEFAULT_PORT;
use crate::port_forwarding::PortForwarding;
use crate::util::{MockTimeSource, MsgBuffer, Time, TimeSource};
use async_trait::async_trait;
use parking_lot::Mutex;
use std::{
collections::{HashMap, VecDeque},
io::{self, ErrorKind},
net::{IpAddr, Ipv6Addr, SocketAddr, UdpSocket},
os::unix::io::AsRawFd,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
@ -62,11 +62,10 @@ impl Clone for NetSocket {
}
}
#[async_trait]
impl Socket for NetSocket {
async fn listen(addr: &str) -> Result<Self, io::Error> {
let addr = parse_listen(addr);
let addr = parse_listen(addr, DEFAULT_PORT);
Ok(NetSocket(UdpSocket::bind(addr)?))
}
@ -144,11 +143,10 @@ impl MockSocket {
}
}
#[async_trait]
impl Socket for MockSocket {
async fn listen(addr: &str) -> Result<Self, io::Error> {
Ok(Self::new(parse_listen(addr)))
Ok(Self::new(parse_listen(addr, DEFAULT_PORT)))
}
async fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> {

View File

@ -8,6 +8,6 @@ mod payload;
mod peers;
#[test]
fn test_time_format() {
async fn test_time_format() {
assert!(time::OffsetDateTime::try_now_local().is_ok());
}