Fixes & Tests

This commit is contained in:
Dennis Schwerdel 2023-10-09 00:15:37 +02:00
parent 8f40cce6c4
commit 13fec09d74
7 changed files with 39 additions and 12 deletions

7
Cargo.lock generated
View File

@ -1299,6 +1299,7 @@ dependencies = [
"tungstenite",
"url",
"yaml-rust",
"yansi",
]
[[package]]
@ -1583,6 +1584,12 @@ dependencies = [
"linked-hash-map",
]
[[package]]
name = "yansi"
version = "1.0.0-rc.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1367295b8f788d371ce2dbc842c7b709c73ee1364d30351dd300ec2203b12377"
[[package]]
name = "zeroize"
version = "1.6.0"

View File

@ -43,6 +43,7 @@ signal = "0.7"
tempfile = "3"
criterion = { version = "0.5", features = ["html_reports"] }
iai = "0.1"
yansi = "1.0.0-gamma"
[features]
default = ["nat", "websocket", "wizard"]

View File

@ -53,6 +53,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
}
pub fn run(mut self) {
panic!();
while self.iteration() {}
}
}

View File

@ -54,6 +54,7 @@ impl<S: Socket, P: Protocol, TS: TimeSource> ExtrasThread<S, P, TS> {
}
pub fn run(mut self) {
panic!();
while self.iteration() {}
}
}

View File

@ -62,6 +62,7 @@ impl<S: Socket, P: Protocol, TS: TimeSource> HousekeepThread<S, P, TS> {
}
pub fn run(mut self) {
panic!();
while self.iteration() {}
}
}

View File

@ -149,7 +149,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
let mut init = self.crypto.peer_instance(payload);
init.send_ping(&mut self.buffer);
self.pending_inits.insert(addr, init);
self.coms.send_to(addr, &mut self.buffer)
self.coms.send_to(addr, &mut self.buffer)?;
self.buffer.clear();
Ok(())
}
pub fn connect<Addr: ToSocketAddrs + fmt::Debug + Clone>(&mut self, addr: Addr) -> Result<(), Error> {
@ -233,6 +235,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
self.update_peer_info(addr, Some(info))?;
if !self.buffer.is_empty() {
self.coms.send_to(addr, &mut self.buffer)?;
self.buffer.clear();
}
} else {
error!("No init for new peer {}", addr_nice(addr));
@ -320,7 +323,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
return Err(Error::Message("Unknown message type"));
}
},
MessageResult::Reply => self.coms.send_to(src, &mut self.buffer)?,
MessageResult::Reply => {
self.coms.send_to(src, &mut self.buffer)?;
self.buffer.clear();
},
MessageResult::None => {
self.buffer.clear();
}
@ -347,7 +353,8 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
}) {
if !buffer.is_empty() {
self.coms.send_to(src, &mut self.buffer)?
self.coms.send_to(src, &mut self.buffer)?;
self.buffer.clear();
}
if let InitResult::Success { peer_payload, .. } = result? {
self.add_new_peer(src, peer_payload)?
@ -365,7 +372,9 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
match msg_result {
Ok(_) => {
self.pending_inits.insert(src, init);
self.coms.send_to(src, &mut self.buffer)
self.coms.send_to(src, &mut self.buffer)?;
self.buffer.clear();
Ok(())
}
Err(err) => {
self.coms.traffic.count_invalid_protocol(self.buffer.len());
@ -401,9 +410,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
let info = self.create_node_info();
info.encode(&mut self.buffer);
self.coms.broadcast_msg(MESSAGE_TYPE_NODE_INFO, &mut self.buffer)?;
self.buffer.clear();
// Reschedule for next update
let min_peer_timeout = self.coms.get_peers().iter().map(|p| p.1.peer_timeout).min().unwrap_or(DEFAULT_PEER_TIMEOUT);
let interval = min(self.update_freq as u16, max(min_peer_timeout / 2 - 60, 1));
let interval = min(self.update_freq, max(min_peer_timeout / 2 - 60, 1));
self.next_peers = now + Time::from(interval);
}
self.reconnect_to_peers()?;
@ -444,14 +454,16 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
if self.pending_inits.get_mut(&addr).unwrap().every_second(&mut self.buffer).is_err() {
del.push(addr)
} else if !self.buffer.is_empty() {
self.coms.send_to(addr, &mut self.buffer)?
self.coms.send_to(addr, &mut self.buffer)?;
self.buffer.clear();
}
}
for (addr, crypto) in self.peer_crypto.iter_mut() {
self.buffer.clear();
crypto.every_second(&mut self.buffer);
if !self.buffer.is_empty() {
self.coms.send_to(*addr, &mut self.buffer)?
self.coms.send_to(*addr, &mut self.buffer)?;
self.buffer.clear();
}
}
for addr in del {
@ -697,6 +709,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
pub fn run(mut self) {
panic!();
while self.iteration() {}
}
}

View File

@ -31,13 +31,16 @@ pub fn init_debug_logger() {
})
}
static CURRENT_NODE: AtomicUsize = AtomicUsize::new(0);
thread_local! {
static CURRENT_NODE: AtomicUsize = AtomicUsize::new(0);
}
static COLORS: [yansi::Style; 6] = [yansi::Style::new(), yansi::Style::new().red(), yansi::Style::new().blue(), yansi::Style::new().green(), yansi::Style::new().magenta(), yansi::Style::new().yellow()];
struct DebugLogger;
impl DebugLogger {
pub fn set_node(node: usize) {
CURRENT_NODE.store(node, Ordering::SeqCst);
CURRENT_NODE.with(|n| n.store(node, Ordering::SeqCst));
}
}
@ -50,7 +53,8 @@ impl log::Log for DebugLogger {
#[inline]
fn log(&self, record: &log::Record) {
if self.enabled(record.metadata()) {
eprintln!("Node {} - {} - {}", CURRENT_NODE.load(Ordering::SeqCst), record.level(), record.args());
let node = CURRENT_NODE.with(|n| n.load(Ordering::SeqCst));
eprintln!("{}Node {} - {} - {}{}", COLORS[node].prefix(), node, record.level(), record.args(), COLORS[node].suffix());
}
}
@ -90,7 +94,6 @@ impl<P: Protocol> Simulator<P> {
DebugLogger::set_node(self.next_port as usize);
self.next_port += 1;
let node = TestNode::new(&config, MockSocket::new(addr), MockDevice::new(), None, None).unwrap();
DebugLogger::set_node(0);
self.nodes.insert(addr, node);
addr
@ -99,7 +102,7 @@ impl<P: Protocol> Simulator<P> {
#[allow(dead_code)]
pub fn get_node(&mut self, addr: SocketAddr) -> &mut TestNode<P> {
let node = self.nodes.get_mut(&addr).unwrap();
DebugLogger::set_node(node.get_num());
//DebugLogger::set_node(node.get_num());
node
}