Document hot paths

pull/148/head
Dennis Schwerdel 2021-01-28 23:19:20 +01:00
parent ca7df77532
commit cbd38ed712
6 changed files with 72 additions and 8 deletions

View File

@ -202,6 +202,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
#[inline]
fn send_to(&mut self, addr: SocketAddr, msg: &mut MsgBuffer) -> Result<(), Error> {
// HOT PATH
debug!("Sending msg with {} bytes to {}", msg.len(), addr);
self.traffic.count_out_traffic(addr, msg.len());
match self.socket.send(msg.message(), addr) {
@ -213,6 +214,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
#[inline]
fn send_msg(&mut self, addr: SocketAddr, type_: u8, msg: &mut MsgBuffer) -> Result<(), Error> {
// HOT PATH
debug!("Sending msg with {} bytes to {}", msg.len(), addr);
let peer = match self.peers.get_mut(&addr) {
Some(peer) => peer,
@ -610,15 +612,18 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
}
pub fn handle_interface_data(&mut self, data: &mut MsgBuffer) -> Result<(), Error> {
// HOT PATH
let (src, dst) = P::parse(data.message())?;
debug!("Read data from interface: src: {}, dst: {}, {} bytes", src, dst, data.len());
self.traffic.count_out_payload(dst, src, data.len());
match self.table.lookup(dst) {
Some(addr) => {
// HOT PATH
// Peer found for destination
debug!("Found destination for {} => {}", dst, addr);
self.send_msg(addr, MESSAGE_TYPE_DATA, data)?;
if !self.peers.contains_key(&addr) {
// COLD PATH
// If the peer is not actually connected, remove the entry in the table and try
// to reconnect.
warn!("Destination for {} not found in peers: {}", dst, addr_nice(addr));
@ -627,6 +632,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
}
}
None => {
// COLD PATH
if self.broadcast {
debug!("No destination for {} found, broadcasting", dst);
self.broadcast_msg(MESSAGE_TYPE_DATA, data)?;
@ -723,6 +729,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
}
fn handle_payload_from(&mut self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
// HOT PATH
let (src, dst) = P::parse(data.message())?;
let len = data.len();
debug!("Writing data to device: {} bytes", len);
@ -741,11 +748,17 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
fn handle_message(
&mut self, src: SocketAddr, msg_result: MessageResult<NodeInfo>, data: &mut MsgBuffer
) -> Result<(), Error> {
// HOT PATH
match msg_result {
MessageResult::Message(type_) => {
// HOT PATH
match type_ {
MESSAGE_TYPE_DATA => self.handle_payload_from(src, data)?,
MESSAGE_TYPE_DATA => {
// HOT PATH
self.handle_payload_from(src, data)?
}
MESSAGE_TYPE_NODE_INFO => {
// COLD PATH
let info = match NodeInfo::decode(Cursor::new(data.message())) {
Ok(val) => val,
Err(err) => {
@ -755,31 +768,50 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
};
self.update_peer_info(src, Some(info))?
}
MESSAGE_TYPE_KEEPALIVE => self.update_peer_info(src, None)?,
MESSAGE_TYPE_CLOSE => self.remove_peer(src),
MESSAGE_TYPE_KEEPALIVE => {
// COLD PATH
self.update_peer_info(src, None)?
}
MESSAGE_TYPE_CLOSE => {
// COLD PATH
self.remove_peer(src)
}
_ => {
// COLD PATH
self.traffic.count_invalid_protocol(data.len());
return Err(Error::Message("Unknown message type"))
}
}
}
MessageResult::Initialized(info) => self.add_new_peer(src, info)?,
MessageResult::Initialized(info) => {
// COLD PATH
self.add_new_peer(src, info)?
}
MessageResult::InitializedWithReply(info) => {
// COLD PATH
self.add_new_peer(src, info)?;
self.send_to(src, data)?
}
MessageResult::Reply => self.send_to(src, data)?,
MessageResult::None => ()
MessageResult::Reply => {
// COLD PATH
self.send_to(src, data)?
}
MessageResult::None => {
// COLD PATH
}
}
Ok(())
}
pub fn handle_net_message(&mut self, src: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
// HOT PATH
let src = mapped_addr(src);
debug!("Received {} bytes from {}", data.len(), src);
let msg_result = if let Some(init) = self.pending_inits.get_mut(&src) {
// COLD PATH
init.handle_message(data)
} else if is_init_message(data.message()) {
// COLD PATH
let mut result = None;
if let Some(peer) = self.peers.get_mut(&src) {
if peer.crypto.has_init() {
@ -811,15 +843,22 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
}
}
} else if let Some(peer) = self.peers.get_mut(&src) {
// HOT PATH
peer.crypto.handle_message(data)
} else {
// COLD PATH
info!("Ignoring non-init message from unknown peer {}", addr_nice(src));
self.traffic.count_invalid_protocol(data.len());
return Ok(())
};
// HOT PATH
match msg_result {
Ok(val) => self.handle_message(src, val, data),
Ok(val) => {
// HOT PATH
self.handle_message(src, val, data)
},
Err(err) => {
// COLD PATH
self.traffic.count_invalid_protocol(data.len());
Err(err)
}
@ -833,10 +872,12 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
}
fn handle_socket_event(&mut self, buffer: &mut MsgBuffer) {
// HOT PATH
let src = try_fail!(self.socket.receive(buffer), "Failed to read from network socket: {}");
self.traffic.count_in_traffic(src, buffer.len());
match self.handle_net_message(src, buffer) {
Err(e @ Error::CryptoInitFatal(_)) => {
// COLD PATH
debug!("Fatal crypto init error from {}: {}", src, e);
info!("Closing pending connection to {} due to error in crypto init", addr_nice(src));
self.pending_inits.remove(&src);
@ -847,17 +888,20 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
);
}
Err(e @ Error::CryptoInit(_)) => {
// COLD PATH
debug!("Recoverable init error from {}: {}", src, e);
info!("Ignoring invalid init message from peer {}", addr_nice(src));
}
Err(e) => {
// COLD PATH
error!("{}", e);
}
Ok(_) => {}
Ok(_) => {} // HOT PATH
}
}
fn handle_device_event(&mut self, buffer: &mut MsgBuffer) {
// HOT PATH
try_fail!(self.device.read(buffer), "Failed to read from device: {}");
if let Err(e) = self.handle_interface_data(buffer) {
error!("{}", e);
@ -878,8 +922,10 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
let mut poll_error = false;
self.config.call_hook("vpn_started", vec![("IFNAME", self.device.ifname())], true);
for evt in waiter {
// HOT PATH
match evt {
WaitResult::Error(err) => {
// COLD PATH
if poll_error {
fail!("Poll wait failed again: {}", err);
}
@ -891,6 +937,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
WaitResult::Device => self.handle_device_event(&mut buffer)
}
if self.next_housekeep < TS::now() {
// COLD PATH
poll_error = false;
if ctrlc.was_pressed() {
break

View File

@ -347,6 +347,7 @@ impl<P: Payload> PeerCrypto<P> {
}
fn decrypt_message(&mut self, buffer: &mut MsgBuffer) -> Result<(), Error> {
// HOT PATH
if self.unencrypted {
return Ok(())
}
@ -354,18 +355,22 @@ impl<P: Payload> PeerCrypto<P> {
}
pub fn handle_message(&mut self, buffer: &mut MsgBuffer) -> Result<MessageResult<P>, Error> {
// HOT PATH
if buffer.is_empty() {
return Err(Error::InvalidCryptoState("No message in buffer"))
}
if is_init_message(buffer.buffer()) {
// COLD PATH
debug!("Received init message");
buffer.take_prefix();
self.handle_init_message(buffer)
} else {
// HOT PATH
debug!("Received encrypted message");
self.decrypt_message(buffer)?;
let msg_type = buffer.take_prefix();
if msg_type == MESSAGE_TYPE_ROTATION {
// COLD PATH
debug!("Received rotation message");
self.handle_rotate_message(buffer.buffer())?;
buffer.clear();
@ -377,6 +382,7 @@ impl<P: Payload> PeerCrypto<P> {
}
pub fn send_message(&mut self, type_: u8, buffer: &mut MsgBuffer) -> Result<(), Error> {
// HOT PATH
assert_ne!(type_, MESSAGE_TYPE_ROTATION);
buffer.prepend_byte(type_);
self.encrypt_message(buffer)
@ -419,6 +425,7 @@ impl<P: Payload> PeerCrypto<P> {
}
pub fn is_init_message(msg: &[u8]) -> bool {
// HOT PATH
!msg.is_empty() && msg[0] == INIT_MESSAGE_FIRST_BYTE
}

View File

@ -13,6 +13,7 @@ use std::{
use super::util::{MockTimeSource, MsgBuffer, Time, TimeSource};
pub fn mapped_addr(addr: SocketAddr) -> SocketAddr {
// HOT PATH
match addr {
SocketAddr::V4(addr4) => SocketAddr::new(IpAddr::V6(addr4.ip().to_ipv6_mapped()), addr4.port()),
_ => addr

View File

@ -23,6 +23,7 @@ impl Protocol for Frame {
/// # Errors
/// This method will fail when the given data is not a valid ethernet frame.
fn parse(data: &[u8]) -> Result<(Address, Address), Error> {
// HOT PATH
let mut cursor = Cursor::new(data);
let mut src = [0; 16];
let mut dst = [0; 16];
@ -90,6 +91,7 @@ impl Protocol for Packet {
/// # Errors
/// This method will fail when the given data is not a valid ipv4 and ipv6 packet.
fn parse(data: &[u8]) -> Result<(Address, Address), Error> {
// HOT PATH
if data.is_empty() {
return Err(Error::Parse("Empty header"))
}

View File

@ -41,6 +41,7 @@ impl<TS: TimeSource> ClaimTable<TS> {
}
pub fn cache(&mut self, addr: Address, peer: SocketAddr) {
// HOT PATH
self.cache.insert(addr, CacheValue { peer, timeout: TS::now() + self.cache_timeout as Time });
}
@ -89,9 +90,11 @@ impl<TS: TimeSource> ClaimTable<TS> {
}
pub fn lookup(&mut self, addr: Address) -> Option<SocketAddr> {
// HOT PATH
if let Some(entry) = self.cache.get(&addr) {
return Some(entry.peer)
}
// COLD PATH
let mut found = None;
let mut prefix_len = -1;
for entry in &self.claims {

View File

@ -83,21 +83,25 @@ pub struct TrafficStats {
impl TrafficStats {
#[inline]
pub fn count_out_traffic(&mut self, peer: SocketAddr, bytes: usize) {
// HOT PATH
self.peers.entry(peer).or_insert_with(TrafficEntry::default).count_out(bytes);
}
#[inline]
pub fn count_in_traffic(&mut self, peer: SocketAddr, bytes: usize) {
// HOT PATH
self.peers.entry(peer).or_insert_with(TrafficEntry::default).count_in(bytes);
}
#[inline]
pub fn count_out_payload(&mut self, remote: Address, local: Address, bytes: usize) {
// HOT PATH
self.payload.entry((remote, local)).or_insert_with(TrafficEntry::default).count_out(bytes);
}
#[inline]
pub fn count_in_payload(&mut self, remote: Address, local: Address, bytes: usize) {
// HOT PATH
self.payload.entry((remote, local)).or_insert_with(TrafficEntry::default).count_in(bytes);
}