First tests work

This commit is contained in:
Dennis Schwerdel 2021-03-09 08:42:39 +00:00
parent 87e715f475
commit 2ea70ca1cd
9 changed files with 109 additions and 75 deletions

View File

@ -242,6 +242,13 @@ impl PeerCrypto {
}
}
pub fn get_core(&self) -> Option<Arc<CryptoCore>> {
match self {
PeerCrypto::Encrypted { core, .. } => Some(core.clone()),
PeerCrypto::Unencrypted { .. } => None
}
}
fn handle_init_message(&mut self, buffer: &mut MsgBuffer) -> Result<MessageResult, Error> {
// TODO: parse message stage
// TODO: depending on stage resend last message

View File

@ -120,7 +120,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
}
pub async fn housekeep(&mut self) -> Result<(), Error> {
self.peer_crypto.sync();
self.peer_crypto.load();
self.table.sync();
self.traffic.sync();
Ok(())

View File

@ -34,7 +34,7 @@ pub type Hash = BuildHasherDefault<FnvHasher>;
pub const STATS_INTERVAL: Time = 60;
const SPACE_BEFORE: usize = 100;
struct PeerData {
pub struct PeerData {
addrs: AddrList,
#[allow(dead_code)] // TODO: export in status
last_seen: Time,
@ -122,8 +122,8 @@ impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
&mut self.device_thread.device
}
pub fn connect(&mut self, addr: SocketAddr) -> Result<(), Error> {
unimplemented!()
pub async fn connect(&mut self, addr: SocketAddr) -> Result<(), Error> {
self.socket_thread.connect(addr).await
}
pub async fn trigger_socket_event(&mut self) {
@ -140,17 +140,14 @@ impl<P: Protocol> GenericCloud<MockDevice, P, MockSocket, MockTimeSource> {
}
pub fn is_connected(&self, addr: &SocketAddr) -> bool {
unimplemented!()
// self.peers.contains_key(addr)
self.socket_thread.peers.contains_key(addr)
}
pub fn own_addresses(&self) -> &[SocketAddr] {
unimplemented!()
//&self.own_addresses
&self.socket_thread.own_addresses
}
pub fn get_num(&self) -> usize {
unimplemented!()
// self.socket.address().unwrap().port() as usize
pub async fn get_num(&self) -> usize {
self.socket_thread.socket.address().await.unwrap().port() as usize
}
}

View File

@ -16,6 +16,8 @@ use std::{
sync::Arc
};
use super::PeerData;
#[derive(Clone)]
pub struct SharedPeerCrypto {
peers: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>
@ -26,11 +28,7 @@ impl SharedPeerCrypto {
SharedPeerCrypto { peers: Arc::new(Mutex::new(HashMap::default())) }
}
pub fn sync(&mut self) {
// TODO sync if needed
}
pub fn encrypt_for(&mut self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
pub fn encrypt_for(&self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
let mut peers = self.peers.lock();
match peers.get_mut(&peer) {
None => Err(Error::InvalidCryptoState("No crypto found for peer")),
@ -42,6 +40,16 @@ impl SharedPeerCrypto {
}
}
pub fn store(&self, data: &HashMap<SocketAddr, PeerData, Hash>) {
let mut peers = self.peers.lock();
peers.clear();
peers.extend(data.iter().map(|(k, v)| (*k, v.crypto.get_core())));
}
pub fn load(&mut self) {
// TODO sync if needed
}
pub fn get_snapshot(&self) -> HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash> {
self.peers.lock().clone()
}

View File

@ -64,11 +64,11 @@ pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
pub socket: S,
device: D,
next_housekeep: Time,
own_addresses: AddrList,
pub own_addresses: AddrList,
next_own_address_reset: Time,
pending_inits: HashMap<SocketAddr, InitState<NodeInfo>, Hash>,
crypto: Crypto,
peers: HashMap<SocketAddr, PeerData, Hash>,
pub peers: HashMap<SocketAddr, PeerData, Hash>,
next_peers: Time,
next_stats_out: Time,
next_beacon: Time,
@ -111,7 +111,6 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
let now = TS::now();
let update_freq = config.get_keepalive() as u16;
let node_id = random();
let crypto = Crypto::new(node_id, &config.crypto).unwrap();
let beacon_key = config.beacon_password.as_ref().map(|s| s.as_bytes()).unwrap_or(&[]);
Self {
_dummy_p: PhantomData,
@ -152,7 +151,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
debug!("Sending msg with {} bytes to {}", size, addr);
self.traffic.count_out_traffic(addr, size);
match self.socket.send(self.buffer.message(), addr).await {
Ok(written) if written == size => Ok(()),
Ok(written) if written == size => {
self.buffer.clear();
Ok(())
}
Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
}
@ -174,6 +176,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
Err(e) => Err(Error::SocketIo("IOError when sending", e)),
}?
}
self.buffer.clear();
Ok(())
}
@ -267,6 +270,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
if !self.buffer.is_empty() {
self.send_to(addr).await?;
}
self.peer_crypto.store(&self.peers);
} else {
error!("No init for new peer {}", addr_nice(addr));
}
@ -299,6 +303,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
if let Some(_peer) = self.peers.remove(&addr) {
info!("Closing connection to {}", addr_nice(addr));
self.table.remove_claims(addr);
self.peer_crypto.store(&self.peers);
}
}
@ -311,6 +316,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
error!("Failed to send via device: {}", e);
return Err(e);
}
self.buffer.clear();
if self.learning {
// Learn single address
self.table.cache(src, peer);
@ -330,17 +336,27 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
return Err(err);
}
};
self.update_peer_info(src, Some(info)).await?
self.update_peer_info(src, Some(info)).await?;
self.buffer.clear();
}
MESSAGE_TYPE_KEEPALIVE => {
self.update_peer_info(src, None).await?;
self.buffer.clear();
}
MESSAGE_TYPE_CLOSE => {
self.remove_peer(src);
self.buffer.clear();
}
MESSAGE_TYPE_KEEPALIVE => self.update_peer_info(src, None).await?,
MESSAGE_TYPE_CLOSE => self.remove_peer(src),
_ => {
self.traffic.count_invalid_protocol(self.buffer.len());
self.buffer.clear();
return Err(Error::Message("Unknown message type"));
}
},
MessageResult::Reply => self.send_to(src).await?,
MessageResult::None => (),
MessageResult::None => {
self.buffer.clear();
}
}
Ok(())
}
@ -362,19 +378,18 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
Ok(InitResult::Continue)
}
}) {
match result? {
InitResult::Continue => {
if !buffer.is_empty() {
self.send_to(src).await?
}
}
InitResult::Success { peer_payload, .. } => self.add_new_peer(src, peer_payload).await?,
if !buffer.is_empty() {
self.send_to(src).await?
}
if let InitResult::Success { peer_payload, .. } = result? {
self.add_new_peer(src, peer_payload).await?
}
return Ok(());
}
if !is_init_message(self.buffer.message()) {
info!("Ignoring non-init message from unknown peer {}", addr_nice(src));
self.traffic.count_invalid_protocol(self.buffer.len());
self.buffer.clear();
return Ok(());
}
let mut init = self.crypto.peer_instance(self.create_node_info());
@ -447,10 +462,11 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
self.load_beacon().await?;
self.next_beacon = now + Time::from(self.config.beacon_interval);
}
// TODO: sync peer_crypto
self.table.sync();
self.traffic.sync();
unimplemented!();
self.peer_crypto.store(&self.peers);
assert!(self.buffer.is_empty());
Ok(())
}
async fn crypto_housekeep(&mut self) -> Result<(), Error> {
@ -673,23 +689,28 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
pub async fn iteration(&mut self) {
if let Ok(result) = timeout(std::time::Duration::from_millis(1000), self.socket.receive(&mut self.buffer)).await {
if let Ok(result) = timeout(std::time::Duration::from_millis(1000), self.socket.receive(&mut self.buffer)).await
{
let src = try_fail!(result, "Failed to read from network socket: {}");
match self.handle_message(src).await {
Err(e @ Error::CryptoInitFatal(_)) => {
debug!("Fatal crypto init error from {}: {}", src, e);
info!("Closing pending connection to {} due to error in crypto init", addr_nice(src));
self.pending_inits.remove(&src);
self.buffer.clear();
}
Err(e @ Error::CryptoInit(_)) => {
debug!("Recoverable init error from {}: {}", src, e);
info!("Ignoring invalid init message from peer {}", addr_nice(src));
self.buffer.clear();
}
Err(e) => {
error!("{}", e);
self.buffer.clear();
}
Ok(_) => {}
}
debug_assert!(self.buffer.is_empty());
}
let now = TS::now();
if self.next_housekeep < now {
@ -698,6 +719,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
}
self.next_housekeep = now + 1
}
debug_assert!(self.buffer.is_empty());
}
pub async fn run(mut self) {

View File

@ -99,9 +99,9 @@ impl<P: Protocol> Simulator<P> {
}
#[allow(dead_code)]
pub fn get_node(&mut self, addr: SocketAddr) -> &mut TestNode<P> {
pub async fn get_node(&mut self, addr: SocketAddr) -> &mut TestNode<P> {
let node = self.nodes.get_mut(&addr).unwrap();
DebugLogger::set_node(node.get_num());
DebugLogger::set_node(node.get_num().await);
node
}
@ -109,7 +109,7 @@ impl<P: Protocol> Simulator<P> {
if let Some((src, dst, data)) = self.messages.pop_front() {
if let Some(node) = self.nodes.get_mut(&dst) {
if node.socket().put_inbound(src, data) {
DebugLogger::set_node(node.get_num());
DebugLogger::set_node(node.get_num().await);
node.trigger_socket_event().await;
DebugLogger::set_node(0);
let sock = node.socket();
@ -132,7 +132,7 @@ impl<P: Protocol> Simulator<P> {
pub async fn trigger_node_housekeep(&mut self, addr: SocketAddr) {
let node = self.nodes.get_mut(&addr).unwrap();
DebugLogger::set_node(node.get_num());
DebugLogger::set_node(node.get_num().await);
node.trigger_housekeep().await;
DebugLogger::set_node(0);
let sock = node.socket();
@ -143,7 +143,7 @@ impl<P: Protocol> Simulator<P> {
pub async fn trigger_housekeep(&mut self) {
for (src, node) in &mut self.nodes {
DebugLogger::set_node(node.get_num());
DebugLogger::set_node(node.get_num().await);
node.trigger_housekeep().await;
DebugLogger::set_node(0);
let sock = node.socket();
@ -167,10 +167,10 @@ impl<P: Protocol> Simulator<P> {
}
}
pub fn connect(&mut self, src: SocketAddr, dst: SocketAddr) {
pub async fn connect(&mut self, src: SocketAddr, dst: SocketAddr) {
let node = self.nodes.get_mut(&src).unwrap();
DebugLogger::set_node(node.get_num());
node.connect(dst).unwrap();
DebugLogger::set_node(node.get_num().await);
node.connect(dst).await.unwrap();
DebugLogger::set_node(0);
let sock = node.socket();
while let Some((dst, data)) = sock.pop_outbound() {
@ -195,7 +195,7 @@ impl<P: Protocol> Simulator<P> {
pub async fn put_payload(&mut self, addr: SocketAddr, data: Vec<u8>) {
let node = self.nodes.get_mut(&addr).unwrap();
node.device().put_inbound(data);
DebugLogger::set_node(node.get_num());
DebugLogger::set_node(node.get_num().await);
node.trigger_device_event().await;
DebugLogger::set_node(0);
let sock = node.socket();

View File

@ -11,8 +11,8 @@ async fn connect_nat_2_peers() {
let node1 = sim.add_node(true, &config).await;
let node2 = sim.add_node(true, &config).await;
sim.connect(node1, node2);
sim.connect(node2, node1);
sim.connect(node1, node2).await;
sim.connect(node2, node1).await;
sim.simulate_time(60).await;
@ -28,10 +28,10 @@ async fn connect_nat_3_peers() {
let node2 = sim.add_node(true, &config).await;
let node3 = sim.add_node(true, &config).await;
sim.connect(node1, node2);
sim.connect(node2, node1);
sim.connect(node1, node3);
sim.connect(node3, node1);
sim.connect(node1, node2).await;
sim.connect(node2, node1).await;
sim.connect(node1, node3).await;
sim.connect(node3, node1).await;
sim.simulate_time(300).await;
assert!(sim.is_connected(node1, node2));
@ -50,10 +50,10 @@ async fn nat_keepalive() {
let node2 = sim.add_node(true, &config).await;
let node3 = sim.add_node(true, &config).await;
sim.connect(node1, node2);
sim.connect(node2, node1);
sim.connect(node1, node3);
sim.connect(node3, node1);
sim.connect(node1, node2).await;
sim.connect(node2, node1).await;
sim.connect(node1, node3).await;
sim.connect(node3, node1).await;
sim.simulate_time(1000).await;
assert!(sim.is_connected(node1, node2));

View File

@ -11,7 +11,7 @@ async fn switch_delivers() {
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
sim.connect(node1, node2);
sim.connect(node1, node2).await;
sim.simulate_all_messages().await;
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
@ -32,9 +32,9 @@ async fn switch_learns() {
let node2 = sim.add_node(false, &config).await;
let node3 = sim.add_node(false, &config).await;
sim.connect(node1, node2);
sim.connect(node1, node3);
sim.connect(node2, node3);
sim.connect(node1, node2).await;
sim.connect(node1, node3).await;
sim.connect(node2, node3).await;
sim.simulate_all_messages().await;
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
@ -72,9 +72,9 @@ async fn switch_honours_vlans() {
let node2 = sim.add_node(false, &config).await;
let node3 = sim.add_node(false, &config).await;
sim.connect(node1, node2);
sim.connect(node1, node3);
sim.connect(node2, node3);
sim.connect(node1, node2).await;
sim.connect(node1, node3).await;
sim.connect(node2, node3).await;
sim.simulate_all_messages().await;
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
@ -139,17 +139,17 @@ async fn size() {
let mut sim = TunSimulator::new();
let node1 = sim.add_node(false, &config1).await;
let node2 = sim.add_node(false, &config2).await;
sim.connect(node1, node2);
sim.connect(node1, node2).await;
sim.simulate_all_messages().await;
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
let payload = vec![0x40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2];
sim.put_payload(node1, payload.clone()).await;
sim.simulate_all_messages().await;
assert_eq!(Some(payload), sim.pop_payload(node2));
};
assert_eq!(std::mem::size_of_val(&future), 100);
@ -174,7 +174,7 @@ async fn router_delivers() {
let node1 = sim.add_node(false, &config1).await;
let node2 = sim.add_node(false, &config2).await;
sim.connect(node1, node2);
sim.connect(node1, node2).await;
sim.simulate_all_messages().await;
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
@ -205,7 +205,7 @@ async fn router_drops_unknown_dest() {
let node1 = sim.add_node(false, &config1).await;
let node2 = sim.add_node(false, &config2).await;
sim.connect(node1, node2);
sim.connect(node1, node2).await;
sim.simulate_all_messages().await;
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));

View File

@ -11,7 +11,7 @@ async fn direct_connect() {
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
sim.connect(node1, node2);
sim.connect(node1, node2).await;
sim.simulate_all_messages().await;
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
@ -27,7 +27,7 @@ async fn direct_connect_unencrypted() {
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
sim.connect(node1, node2);
sim.connect(node1, node2).await;
sim.simulate_all_messages().await;
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
@ -41,8 +41,8 @@ async fn cross_connect() {
let node2 = sim.add_node(false, &config).await;
let node3 = sim.add_node(false, &config).await;
sim.connect(node1, node2);
sim.connect(node1, node3);
sim.connect(node1, node2).await;
sim.connect(node1, node3).await;
sim.simulate_all_messages().await;
sim.simulate_time(120).await;
@ -80,7 +80,7 @@ async fn reconnect_after_timeout() {
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
sim.connect(node1, node2);
sim.connect(node1, node2).await;
sim.simulate_all_messages().await;
assert!(sim.is_connected(node1, node2));
assert!(sim.is_connected(node2, node1));
@ -102,7 +102,7 @@ async fn lost_init_ping() {
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
sim.connect(node1, node2);
sim.connect(node1, node2).await;
sim.drop_message(); // drop init ping
sim.simulate_time(120).await;
@ -117,7 +117,7 @@ async fn lost_init_pong() {
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
sim.connect(node1, node2);
sim.connect(node1, node2).await;
sim.simulate_next_message().await; // init ping
sim.drop_message(); // drop init pong
@ -133,7 +133,7 @@ async fn lost_init_peng() {
let node1 = sim.add_node(false, &config).await;
let node2 = sim.add_node(false, &config).await;
sim.connect(node1, node2);
sim.connect(node1, node2).await;
sim.simulate_next_message().await; // init ping
sim.simulate_next_message().await; // init pong
sim.drop_message(); // drop init peng