From 1f94d0deffcc8b08b4eb1789ea2936e49bb2a057 Mon Sep 17 00:00:00 2001 From: Dennis Schwerdel Date: Fri, 29 May 2020 11:51:04 +0200 Subject: [PATCH] Statsd support --- src/cloud.rs | 72 ++++++++++++++++++++++++++++++++++++++++++++----- src/config.rs | 19 +++++++++++-- src/ethernet.rs | 6 ++++- src/ip.rs | 6 ++++- src/main.rs | 4 +++ src/traffic.rs | 34 +++++++++++++++++++++-- src/types.rs | 1 + 7 files changed, 129 insertions(+), 13 deletions(-) diff --git a/src/cloud.rs b/src/cloud.rs index 0fe0cc3..4c2745a 100644 --- a/src/cloud.rs +++ b/src/cloud.rs @@ -73,7 +73,7 @@ impl PeerList { } } for addr in &del { - info!("Forgot peer: {}", addr); + info!("Forgot peer: {}", addr_nice(*addr)); if let Some(data) = self.peers.remove(addr) { self.nodes.remove(&data.node_id); self.addresses.remove(addr); @@ -113,7 +113,7 @@ impl PeerList { #[inline] fn add(&mut self, node_id: NodeId, addr: SocketAddr, peer_timeout: u16) { if self.nodes.insert(node_id, addr).is_none() { - info!("New peer: {}", addr); + info!("New peer: {}", addr_nice(addr)); let mut alt_addrs = vec![]; if let SocketAddr::V6(v6_addr) = addr { if let Some(ipv4) = v6_addr.ip().to_ipv4() { @@ -188,7 +188,7 @@ impl PeerList { #[inline] fn remove(&mut self, addr: &SocketAddr) { if let Some(data) = self.peers.remove(addr) { - info!("Removed peer: {}", addr); + info!("Removed peer: {}", addr_nice(*addr)); self.nodes.remove(&data.node_id); self.addresses.remove(addr); for addr in data.alt_addrs { @@ -238,6 +238,7 @@ pub struct GenericCloud, + statsd_server: Option, next_housekeep: Time, next_stats_out: Time, next_beacon: Time, @@ -277,6 +278,7 @@ impl GenericCloud GenericCloud GenericCloud Result<(), io::Error> { if let Some(ref mut f) = self.stats_file { debug!("Writing out stats"); @@ -591,6 +594,61 @@ impl GenericCloud Result<(), Error> { + if let Some(ref endpoint) = self.statsd_server { + let peer_traffic = self.traffic.total_peer_traffic(); + let payload_traffic = self.traffic.total_payload_traffic(); + let dropped = &self.traffic.dropped; + let msg = format!( + "peer_count:{}|g\ntable_entries:{}|g\n\ + traffic.protocol.inbound.bytes:{}\n\ + traffic.protocol.inbound.packets:{}\n\ + traffic.protocol.outbound.bytes:{}\n\ + traffic.protocol.outbound.packets:{}\n\ + traffic.payload.inbound.bytes:{}\n\ + traffic.payload.inbound.packets:{}\n\ + traffic.payload.outbound.bytes:{}\n\ + traffic.payload.outbound.packets:{}\n\ + invalid_protocol_traffic.bytes:{}\n\ + invalid_protocol_traffic.packets:{}\n\ + dropped_payload.bytes:{}\n\ + dropped_payload.packets:{}", + self.peers.len(), + self.table.len(), + peer_traffic.in_bytes, + peer_traffic.in_packets, + peer_traffic.out_bytes, + peer_traffic.out_packets, + payload_traffic.in_bytes, + payload_traffic.in_packets, + payload_traffic.out_bytes, + payload_traffic.out_packets, + dropped.in_bytes, + dropped.in_packets, + dropped.out_bytes, + dropped.out_packets + ); + let msg_data = msg.as_bytes(); + let addrs = resolve(endpoint)?; + if let Some(addr) = addrs.first() { + match self.socket.send(msg_data, *addr) { + Ok(written) if written == msg_data.len() => Ok(()), + Ok(_) => { + Err(Error::Socket( + "Sent out truncated packet", + io::Error::new(io::ErrorKind::Other, "truncated") + )) + } + Err(e) => Err(Error::Socket("IOError when sending", e)) + }? + } else { + error!("Failed to resolve statsd server {}", endpoint); + } + } + Ok(()) + } + /// Handles payload data coming in from the local network device /// /// This method takes payload data received from the local device and parses it to obtain the @@ -619,7 +677,7 @@ impl GenericCloud GenericCloud GenericCloud, pub stats_file: Option, + pub statsd_server: Option, pub user: Option, pub group: Option } @@ -87,6 +88,7 @@ impl Default for Config { daemonize: false, pid_file: None, stats_file: None, + statsd_server: None, user: None, group: None } @@ -163,6 +165,9 @@ impl Config { if let Some(val) = file.stats_file { self.stats_file = Some(val); } + if let Some(val) = file.statsd_server { + self.statsd_server = Some(val); + } if let Some(val) = file.user { self.user = Some(val); } @@ -238,6 +243,9 @@ impl Config { if let Some(val) = args.stats_file { self.stats_file = Some(val); } + if let Some(val) = args.statsd_server { + self.statsd_server = Some(val); + } if let Some(val) = args.user { self.user = Some(val); } @@ -298,6 +306,7 @@ pub struct ConfigFile { pub port_forwarding: Option, pub pid_file: Option, pub stats_file: Option, + pub statsd_server: Option, pub user: Option, pub group: Option } @@ -332,6 +341,7 @@ user: nobody group: nogroup pid_file: /run/vpncloud.run stats_file: /var/log/vpncloud.stats +statsd_server: example.com:1234 "; assert_eq!(serde_yaml::from_str::(config_file).unwrap(), ConfigFile { device_type: Some(Type::Tun), @@ -357,7 +367,8 @@ stats_file: /var/log/vpncloud.stats user: Some("nobody".to_string()), group: Some("nogroup".to_string()), pid_file: Some("/run/vpncloud.run".to_string()), - stats_file: Some("/var/log/vpncloud.stats".to_string()) + stats_file: Some("/var/log/vpncloud.stats".to_string()), + statsd_server: Some("example.com:1234".to_string()) }) } @@ -388,7 +399,8 @@ fn config_merge() { user: Some("nobody".to_string()), group: Some("nogroup".to_string()), pid_file: Some("/run/vpncloud.run".to_string()), - stats_file: Some("/var/log/vpncloud.stats".to_string()) + stats_file: Some("/var/log/vpncloud.stats".to_string()), + statsd_server: Some("example.com:1234".to_string()) }); assert_eq!(config, Config { device_type: Type::Tun, @@ -414,6 +426,7 @@ fn config_merge() { group: Some("nogroup".to_string()), pid_file: Some("/run/vpncloud.run".to_string()), stats_file: Some("/var/log/vpncloud.stats".to_string()), + statsd_server: Some("example.com:1234".to_string()), ..Default::default() }); config.merge_args(Args { @@ -439,6 +452,7 @@ fn config_merge() { daemon: true, pid_file: Some("/run/vpncloud-mynet.run".to_string()), stats_file: Some("/var/log/vpncloud-mynet.stats".to_string()), + statsd_server: Some("example.com:2345".to_string()), user: Some("root".to_string()), group: Some("root".to_string()), ..Default::default() @@ -471,6 +485,7 @@ fn config_merge() { group: Some("root".to_string()), pid_file: Some("/run/vpncloud-mynet.run".to_string()), stats_file: Some("/var/log/vpncloud-mynet.stats".to_string()), + statsd_server: Some("example.com:2345".to_string()), daemonize: true }); } diff --git a/src/ethernet.rs b/src/ethernet.rs index 83d03a1..2d0faa2 100644 --- a/src/ethernet.rs +++ b/src/ethernet.rs @@ -130,7 +130,7 @@ impl Table for SwitchTable { match self.table.entry(key) { Entry::Vacant(entry) => { entry.insert(SwitchTableValue { address: addr, timeout: deadline }); - info!("Learned address {} => {}", key, addr); + info!("Learned address {} => {}", key, addr_nice(addr)); } Entry::Occupied(mut entry) => { let mut entry = entry.get_mut(); @@ -171,6 +171,10 @@ impl Table for SwitchTable { self.table.remove(&key); } } + + fn len(&self) -> usize { + self.table.len() + } } diff --git a/src/ip.rs b/src/ip.rs index f7ff15b..a94136e 100644 --- a/src/ip.rs +++ b/src/ip.rs @@ -88,7 +88,7 @@ impl Table for RoutingTable { Some(val) => val, None => addr.len * 8 }; - info!("New routing entry: {}/{} => {}", addr, prefix_len, address); + info!("New routing entry: {}/{} => {}", addr, prefix_len, addr_nice(address)); // Round the prefix length down to the next multiple of 8 and extraxt a prefix of that // length. let group_len = prefix_len as usize / 8; @@ -184,6 +184,10 @@ impl Table for RoutingTable { entry.retain(|entr| &entr.address != addr); } } + + fn len(&self) -> usize { + self.0.len() + } } diff --git a/src/main.rs b/src/main.rs index eda6ef6..f5c062e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -169,6 +169,10 @@ pub struct Args { #[structopt(long)] stats_file: Option, + /// Send statistics to this statsd server + #[structopt(long)] + statsd_server: Option, + /// Run as other user #[structopt(long)] user: Option, diff --git a/src/traffic.rs b/src/traffic.rs index 9c9ecd4..f446f43 100644 --- a/src/traffic.rs +++ b/src/traffic.rs @@ -5,7 +5,8 @@ use std::{ collections::HashMap, io::{self, Write}, - net::SocketAddr + net::SocketAddr, + ops::AddAssign }; use super::{ @@ -28,6 +29,19 @@ pub struct TrafficEntry { pub idle_periods: usize } +impl AddAssign<&TrafficEntry> for TrafficEntry { + fn add_assign(&mut self, other: &TrafficEntry) { + self.out_bytes_total += other.out_bytes_total; + self.out_packets_total += other.out_packets_total; + self.out_bytes += other.out_bytes; + self.out_packets += other.out_packets; + self.in_bytes_total += other.in_bytes_total; + self.in_packets_total += other.in_packets_total; + self.in_bytes += other.in_bytes; + self.in_packets += other.in_packets; + } +} + impl TrafficEntry { #[inline] fn count_out(&mut self, bytes: usize) { @@ -63,7 +77,7 @@ impl TrafficEntry { pub struct TrafficStats { peers: HashMap, payload: HashMap<(Address, Address), TrafficEntry, Hash>, - dropped: TrafficEntry + pub dropped: TrafficEntry } impl TrafficStats { @@ -117,6 +131,22 @@ impl TrafficStats { self.payload.iter() } + pub fn total_peer_traffic(&self) -> TrafficEntry { + let mut total = TrafficEntry::default(); + for e in self.peers.values() { + total += e + } + total + } + + pub fn total_payload_traffic(&self) -> TrafficEntry { + let mut total = TrafficEntry::default(); + for e in self.payload.values() { + total += e + } + total + } + #[inline] pub fn write_out(&self, out: &mut W) -> Result<(), io::Error> { writeln!(out, "peer_traffic:")?; diff --git a/src/types.rs b/src/types.rs index 9838522..6526cf0 100644 --- a/src/types.rs +++ b/src/types.rs @@ -227,6 +227,7 @@ pub trait Table { fn write_out(&self, out: &mut W) -> Result<(), io::Error>; fn remove(&mut self, _: &Address) -> bool; fn remove_all(&mut self, _: &SocketAddr); + fn len(&self) -> usize; } pub trait Protocol: Sized {