Finish advertise address & exchange own addresses

This commit is contained in:
Dennis Schwerdel 2021-04-09 22:01:35 +02:00
parent 7427be31c8
commit 8c55e6c076
4 changed files with 49 additions and 33 deletions

View File

@ -222,26 +222,18 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
pub fn reset_own_addresses(&mut self) -> io::Result<()> {
self.own_addresses.clear();
if self.config.advertise_addresses.len() > 0 &&
!self.config.listen.starts_with("ws://") {
// Force advertised addresses based on configuration instead
// of discovery. Note: Disables port forwarding
// Because the listen config may contain a color (aka
// both address and port are specified) we parse it and
// then extract just the port.
let sockaddr = parse_listen(&self.config.listen);
let port = sockaddr.port();
for address in &self.config.advertise_addresses {
let sockaddr = try_fail!(SocketAddr::from_str(&format!("{}:{}", address, port)), "Invalid IP Address or port {}");
self.own_addresses.push(sockaddr);
let socket_addr = self.socket.address().map(mapped_addr)?;
// 1) Specified advertise addresses
for addr in &self.config.advertise_addresses {
self.own_addresses.push(parse_listen(addr, socket_addr.port()));
}
} else {
self.own_addresses.push(self.socket.address().map(mapped_addr)?);
// 2) Address of UDP socket
self.own_addresses.push(socket_addr);
// 3) Addresses from port forwarding
if let Some(ref pfw) = self.port_forwarding {
self.own_addresses.push(pfw.get_internal_ip().into());
self.own_addresses.push(pfw.get_external_ip().into());
}
}
debug!("Own addresses: {:?}", self.own_addresses);
// TODO: detect address changes and call event
Ok(())
@ -450,11 +442,6 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
pfw.check_extend();
}
let now = TS::now();
// Periodically reset own peers
if self.next_own_address_reset <= now {
self.reset_own_addresses().map_err(|err| Error::SocketIo("Failed to get own addresses", err))?;
self.next_own_address_reset = now + OWN_ADDRESS_RESET_INTERVAL;
}
// Periodically send peer list to peers
if self.next_peers <= now {
debug!("Send peer list to all peers");
@ -485,6 +472,11 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
self.load_beacon()?;
self.next_beacon = now + Time::from(self.config.beacon_interval);
}
// Periodically reset own peers
if self.next_own_address_reset <= now {
self.reset_own_addresses().map_err(|err| Error::SocketIo("Failed to get own addresses", err))?;
self.next_own_address_reset = now + OWN_ADDRESS_RESET_INTERVAL;
}
Ok(())
}
@ -706,6 +698,12 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
}
if let Some(node_id) = peer.node_id {
if self.node_id == node_id {
// Check addresses and add addresses that we don't know to own addresses
for addr in &peer.addrs {
if !self.own_addresses.contains(addr) {
self.own_addresses.push(*addr)
}
}
continue 'outer;
}
for p in self.peers.values() {
@ -722,7 +720,17 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
fn update_peer_info(&mut self, addr: SocketAddr, info: Option<NodeInfo>) -> Result<(), Error> {
if let Some(peer) = self.peers.get_mut(&addr) {
peer.last_seen = TS::now();
peer.timeout = TS::now() + self.config.peer_timeout as Time
peer.timeout = TS::now() + self.config.peer_timeout as Time;
if let Some(info) = &info {
// Update peer addresses, always add seen address
peer.addrs.clear();
peer.addrs.push(addr);
for addr in &info.addrs {
if !peer.addrs.contains(addr) {
peer.addrs.push(*addr);
}
}
}
} else {
error!("Received peer update from non peer {}", addr_nice(addr));
return Ok(());

View File

@ -11,7 +11,7 @@ use std::{
};
use super::util::{MockTimeSource, MsgBuffer, Time, TimeSource};
use crate::port_forwarding::PortForwarding;
use crate::{config::DEFAULT_PORT, port_forwarding::PortForwarding};
pub fn mapped_addr(addr: SocketAddr) -> SocketAddr {
// HOT PATH
@ -35,21 +35,23 @@ pub trait Socket: AsRawFd + Sized {
fn create_port_forwarding(&self) -> Option<PortForwarding>;
}
pub fn parse_listen(addr: &str) -> SocketAddr {
pub fn parse_listen(addr: &str, default_port: u16) -> SocketAddr {
if let Some(addr) = addr.strip_prefix("*:") {
let port = try_fail!(addr.parse::<u16>(), "Invalid port: {}");
SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port)
} else if addr.contains(':') {
try_fail!(addr.parse::<SocketAddr>(), "Invalid address: {}: {}", addr)
} else {
let port = try_fail!(addr.parse::<u16>(), "Invalid port: {}");
} else if let Ok(port) = addr.parse::<u16>() {
SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port)
} else {
let ip = try_fail!(addr.parse::<IpAddr>(), "Invalid addr: {}");
SocketAddr::new(ip, default_port)
}
}
impl Socket for UdpSocket {
fn listen(addr: &str) -> Result<Self, io::Error> {
let addr = parse_listen(addr);
let addr = parse_listen(addr, DEFAULT_PORT);
UdpSocket::bind(addr)
}
@ -134,7 +136,7 @@ impl AsRawFd for MockSocket {
impl Socket for MockSocket {
fn listen(addr: &str) -> Result<Self, io::Error> {
Ok(Self::new(parse_listen(addr)))
Ok(Self::new(parse_listen(addr, DEFAULT_PORT)))
}
fn receive(&mut self, buffer: &mut MsgBuffer) -> Result<SocketAddr, io::Error> {

View File

@ -41,6 +41,12 @@ fn configure_connectivity(config: &mut Config, mode: usize, theme: &ColorfulThem
.interact()?;
}
if mode == MODE_EXPERT {
config.advertise_addresses = str_list(
Input::with_theme(theme)
.with_prompt("Advertise addresses (comma separated)")
.default(config.advertise_addresses.join(","))
.interact_text()?,
);
config.peer_timeout = Input::with_theme(theme)
.with_prompt("Peer timeout (in seconds)")
.default(config.peer_timeout)

View File

@ -92,7 +92,7 @@ fn serve_proxy_connection(stream: TcpStream) -> Result<(), io::Error> {
}
pub fn run_proxy(listen: &str) -> Result<(), io::Error> {
let addr = parse_listen(listen);
let addr = parse_listen(listen, 8080);
let server = TcpListener::bind(addr)?;
info!("Listening on ws://{}", server.local_addr()?);
for stream in server.incoming() {