Statsd support

pull/61/head
Dennis Schwerdel 2020-05-29 11:51:04 +02:00
parent ed0fdbc366
commit 1f94d0deff
7 changed files with 129 additions and 13 deletions

View File

@ -73,7 +73,7 @@ impl<TS: TimeSource> PeerList<TS> {
} }
} }
for addr in &del { for addr in &del {
info!("Forgot peer: {}", addr); info!("Forgot peer: {}", addr_nice(*addr));
if let Some(data) = self.peers.remove(addr) { if let Some(data) = self.peers.remove(addr) {
self.nodes.remove(&data.node_id); self.nodes.remove(&data.node_id);
self.addresses.remove(addr); self.addresses.remove(addr);
@ -113,7 +113,7 @@ impl<TS: TimeSource> PeerList<TS> {
#[inline] #[inline]
fn add(&mut self, node_id: NodeId, addr: SocketAddr, peer_timeout: u16) { fn add(&mut self, node_id: NodeId, addr: SocketAddr, peer_timeout: u16) {
if self.nodes.insert(node_id, addr).is_none() { if self.nodes.insert(node_id, addr).is_none() {
info!("New peer: {}", addr); info!("New peer: {}", addr_nice(addr));
let mut alt_addrs = vec![]; let mut alt_addrs = vec![];
if let SocketAddr::V6(v6_addr) = addr { if let SocketAddr::V6(v6_addr) = addr {
if let Some(ipv4) = v6_addr.ip().to_ipv4() { if let Some(ipv4) = v6_addr.ip().to_ipv4() {
@ -188,7 +188,7 @@ impl<TS: TimeSource> PeerList<TS> {
#[inline] #[inline]
fn remove(&mut self, addr: &SocketAddr) { fn remove(&mut self, addr: &SocketAddr) {
if let Some(data) = self.peers.remove(addr) { if let Some(data) = self.peers.remove(addr) {
info!("Removed peer: {}", addr); info!("Removed peer: {}", addr_nice(*addr));
self.nodes.remove(&data.node_id); self.nodes.remove(&data.node_id);
self.addresses.remove(addr); self.addresses.remove(addr);
for addr in data.alt_addrs { for addr in data.alt_addrs {
@ -238,6 +238,7 @@ pub struct GenericCloud<D: Device, P: Protocol, T: Table, S: Socket, TS: TimeSou
update_freq: u16, update_freq: u16,
buffer_out: [u8; 64 * 1024], buffer_out: [u8; 64 * 1024],
stats_file: Option<File>, stats_file: Option<File>,
statsd_server: Option<String>,
next_housekeep: Time, next_housekeep: Time,
next_stats_out: Time, next_stats_out: Time,
next_beacon: Time, next_beacon: Time,
@ -277,6 +278,7 @@ impl<D: Device, P: Protocol, T: Table, S: Socket, TS: TimeSource> GenericCloud<D
next_peerlist: now, next_peerlist: now,
update_freq, update_freq,
stats_file, stats_file,
statsd_server: config.statsd_server.clone(),
buffer_out: [0; 64 * 1024], buffer_out: [0; 64 * 1024],
next_housekeep: now, next_housekeep: now,
next_stats_out: now + STATS_INTERVAL, next_stats_out: now + STATS_INTERVAL,
@ -516,6 +518,7 @@ impl<D: Device, P: Protocol, T: Table, S: Socket, TS: TimeSource> GenericCloud<D
if self.next_stats_out < now { if self.next_stats_out < now {
// Write out the statistics // Write out the statistics
self.write_out_stats().map_err(|err| Error::File("Failed to write stats file", err))?; self.write_out_stats().map_err(|err| Error::File("Failed to write stats file", err))?;
self.send_stats_to_statsd()?;
self.next_stats_out = now + STATS_INTERVAL; self.next_stats_out = now + STATS_INTERVAL;
self.traffic.period(Some(5)); self.traffic.period(Some(5));
} }
@ -575,7 +578,7 @@ impl<D: Device, P: Protocol, T: Table, S: Socket, TS: TimeSource> GenericCloud<D
Ok(()) Ok(())
} }
/// Calculates, resets and writes out the statistics to a file /// Writes out the statistics to a file
fn write_out_stats(&mut self) -> Result<(), io::Error> { fn write_out_stats(&mut self) -> Result<(), io::Error> {
if let Some(ref mut f) = self.stats_file { if let Some(ref mut f) = self.stats_file {
debug!("Writing out stats"); debug!("Writing out stats");
@ -591,6 +594,61 @@ impl<D: Device, P: Protocol, T: Table, S: Socket, TS: TimeSource> GenericCloud<D
Ok(()) Ok(())
} }
/// Sends the statistics to a statsd endpoint
fn send_stats_to_statsd(&mut self) -> Result<(), Error> {
if let Some(ref endpoint) = self.statsd_server {
let peer_traffic = self.traffic.total_peer_traffic();
let payload_traffic = self.traffic.total_payload_traffic();
let dropped = &self.traffic.dropped;
let msg = format!(
"peer_count:{}|g\ntable_entries:{}|g\n\
traffic.protocol.inbound.bytes:{}\n\
traffic.protocol.inbound.packets:{}\n\
traffic.protocol.outbound.bytes:{}\n\
traffic.protocol.outbound.packets:{}\n\
traffic.payload.inbound.bytes:{}\n\
traffic.payload.inbound.packets:{}\n\
traffic.payload.outbound.bytes:{}\n\
traffic.payload.outbound.packets:{}\n\
invalid_protocol_traffic.bytes:{}\n\
invalid_protocol_traffic.packets:{}\n\
dropped_payload.bytes:{}\n\
dropped_payload.packets:{}",
self.peers.len(),
self.table.len(),
peer_traffic.in_bytes,
peer_traffic.in_packets,
peer_traffic.out_bytes,
peer_traffic.out_packets,
payload_traffic.in_bytes,
payload_traffic.in_packets,
payload_traffic.out_bytes,
payload_traffic.out_packets,
dropped.in_bytes,
dropped.in_packets,
dropped.out_bytes,
dropped.out_packets
);
let msg_data = msg.as_bytes();
let addrs = resolve(endpoint)?;
if let Some(addr) = addrs.first() {
match self.socket.send(msg_data, *addr) {
Ok(written) if written == msg_data.len() => Ok(()),
Ok(_) => {
Err(Error::Socket(
"Sent out truncated packet",
io::Error::new(io::ErrorKind::Other, "truncated")
))
}
Err(e) => Err(Error::Socket("IOError when sending", e))
}?
} else {
error!("Failed to resolve statsd server {}", endpoint);
}
}
Ok(())
}
/// Handles payload data coming in from the local network device /// Handles payload data coming in from the local network device
/// ///
/// This method takes payload data received from the local device and parses it to obtain the /// This method takes payload data received from the local device and parses it to obtain the
@ -619,7 +677,7 @@ impl<D: Device, P: Protocol, T: Table, S: Socket, TS: TimeSource> GenericCloud<D
if !self.peers.contains_addr(&addr) { if !self.peers.contains_addr(&addr) {
// If the peer is not actually connected, remove the entry in the table and try // If the peer is not actually connected, remove the entry in the table and try
// to reconnect. // to reconnect.
warn!("Destination for {} not found in peers: {}", dst, addr); warn!("Destination for {} not found in peers: {}", dst, addr_nice(addr));
self.table.remove(&dst); self.table.remove(&dst);
self.connect_sock(addr)?; self.connect_sock(addr)?;
} }
@ -715,7 +773,7 @@ impl<D: Device, P: Protocol, T: Table, S: Socket, TS: TimeSource> GenericCloud<D
} else { } else {
self.peers.add(node_id, peer, peer_timeout); self.peers.add(node_id, peer, peer_timeout);
if self.learning && !ranges.is_empty() { if self.learning && !ranges.is_empty() {
warn!("Ignoring claimed addresses received from {} in learning mode.", peer); warn!("Ignoring claimed addresses received from {} in learning mode.", addr_nice(peer));
} else { } else {
for range in ranges { for range in ranges {
self.table.learn(range.base, Some(range.prefix_len), peer); self.table.learn(range.base, Some(range.prefix_len), peer);
@ -756,7 +814,7 @@ impl<D: Device, P: Protocol, T: Table, S: Socket, TS: TimeSource> GenericCloud<D
self.traffic.count_in_traffic(src, size); self.traffic.count_in_traffic(src, size);
self.handle_net_message(src, msg) self.handle_net_message(src, msg)
}) { }) {
error!("Error: {}, from: {}", e, src); error!("Error: {}, from: {}", e, addr_nice(src));
self.traffic.count_invalid_protocol(size); self.traffic.count_invalid_protocol(size);
} }
} }

View File

@ -58,6 +58,7 @@ pub struct Config {
pub daemonize: bool, pub daemonize: bool,
pub pid_file: Option<String>, pub pid_file: Option<String>,
pub stats_file: Option<String>, pub stats_file: Option<String>,
pub statsd_server: Option<String>,
pub user: Option<String>, pub user: Option<String>,
pub group: Option<String> pub group: Option<String>
} }
@ -87,6 +88,7 @@ impl Default for Config {
daemonize: false, daemonize: false,
pid_file: None, pid_file: None,
stats_file: None, stats_file: None,
statsd_server: None,
user: None, user: None,
group: None group: None
} }
@ -163,6 +165,9 @@ impl Config {
if let Some(val) = file.stats_file { if let Some(val) = file.stats_file {
self.stats_file = Some(val); self.stats_file = Some(val);
} }
if let Some(val) = file.statsd_server {
self.statsd_server = Some(val);
}
if let Some(val) = file.user { if let Some(val) = file.user {
self.user = Some(val); self.user = Some(val);
} }
@ -238,6 +243,9 @@ impl Config {
if let Some(val) = args.stats_file { if let Some(val) = args.stats_file {
self.stats_file = Some(val); self.stats_file = Some(val);
} }
if let Some(val) = args.statsd_server {
self.statsd_server = Some(val);
}
if let Some(val) = args.user { if let Some(val) = args.user {
self.user = Some(val); self.user = Some(val);
} }
@ -298,6 +306,7 @@ pub struct ConfigFile {
pub port_forwarding: Option<bool>, pub port_forwarding: Option<bool>,
pub pid_file: Option<String>, pub pid_file: Option<String>,
pub stats_file: Option<String>, pub stats_file: Option<String>,
pub statsd_server: Option<String>,
pub user: Option<String>, pub user: Option<String>,
pub group: Option<String> pub group: Option<String>
} }
@ -332,6 +341,7 @@ user: nobody
group: nogroup group: nogroup
pid_file: /run/vpncloud.run pid_file: /run/vpncloud.run
stats_file: /var/log/vpncloud.stats stats_file: /var/log/vpncloud.stats
statsd_server: example.com:1234
"; ";
assert_eq!(serde_yaml::from_str::<ConfigFile>(config_file).unwrap(), ConfigFile { assert_eq!(serde_yaml::from_str::<ConfigFile>(config_file).unwrap(), ConfigFile {
device_type: Some(Type::Tun), device_type: Some(Type::Tun),
@ -357,7 +367,8 @@ stats_file: /var/log/vpncloud.stats
user: Some("nobody".to_string()), user: Some("nobody".to_string()),
group: Some("nogroup".to_string()), group: Some("nogroup".to_string()),
pid_file: Some("/run/vpncloud.run".to_string()), pid_file: Some("/run/vpncloud.run".to_string()),
stats_file: Some("/var/log/vpncloud.stats".to_string()) stats_file: Some("/var/log/vpncloud.stats".to_string()),
statsd_server: Some("example.com:1234".to_string())
}) })
} }
@ -388,7 +399,8 @@ fn config_merge() {
user: Some("nobody".to_string()), user: Some("nobody".to_string()),
group: Some("nogroup".to_string()), group: Some("nogroup".to_string()),
pid_file: Some("/run/vpncloud.run".to_string()), pid_file: Some("/run/vpncloud.run".to_string()),
stats_file: Some("/var/log/vpncloud.stats".to_string()) stats_file: Some("/var/log/vpncloud.stats".to_string()),
statsd_server: Some("example.com:1234".to_string())
}); });
assert_eq!(config, Config { assert_eq!(config, Config {
device_type: Type::Tun, device_type: Type::Tun,
@ -414,6 +426,7 @@ fn config_merge() {
group: Some("nogroup".to_string()), group: Some("nogroup".to_string()),
pid_file: Some("/run/vpncloud.run".to_string()), pid_file: Some("/run/vpncloud.run".to_string()),
stats_file: Some("/var/log/vpncloud.stats".to_string()), stats_file: Some("/var/log/vpncloud.stats".to_string()),
statsd_server: Some("example.com:1234".to_string()),
..Default::default() ..Default::default()
}); });
config.merge_args(Args { config.merge_args(Args {
@ -439,6 +452,7 @@ fn config_merge() {
daemon: true, daemon: true,
pid_file: Some("/run/vpncloud-mynet.run".to_string()), pid_file: Some("/run/vpncloud-mynet.run".to_string()),
stats_file: Some("/var/log/vpncloud-mynet.stats".to_string()), stats_file: Some("/var/log/vpncloud-mynet.stats".to_string()),
statsd_server: Some("example.com:2345".to_string()),
user: Some("root".to_string()), user: Some("root".to_string()),
group: Some("root".to_string()), group: Some("root".to_string()),
..Default::default() ..Default::default()
@ -471,6 +485,7 @@ fn config_merge() {
group: Some("root".to_string()), group: Some("root".to_string()),
pid_file: Some("/run/vpncloud-mynet.run".to_string()), pid_file: Some("/run/vpncloud-mynet.run".to_string()),
stats_file: Some("/var/log/vpncloud-mynet.stats".to_string()), stats_file: Some("/var/log/vpncloud-mynet.stats".to_string()),
statsd_server: Some("example.com:2345".to_string()),
daemonize: true daemonize: true
}); });
} }

View File

@ -130,7 +130,7 @@ impl<TS: TimeSource> Table for SwitchTable<TS> {
match self.table.entry(key) { match self.table.entry(key) {
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
entry.insert(SwitchTableValue { address: addr, timeout: deadline }); entry.insert(SwitchTableValue { address: addr, timeout: deadline });
info!("Learned address {} => {}", key, addr); info!("Learned address {} => {}", key, addr_nice(addr));
} }
Entry::Occupied(mut entry) => { Entry::Occupied(mut entry) => {
let mut entry = entry.get_mut(); let mut entry = entry.get_mut();
@ -171,6 +171,10 @@ impl<TS: TimeSource> Table for SwitchTable<TS> {
self.table.remove(&key); self.table.remove(&key);
} }
} }
fn len(&self) -> usize {
self.table.len()
}
} }

View File

@ -88,7 +88,7 @@ impl Table for RoutingTable {
Some(val) => val, Some(val) => val,
None => addr.len * 8 None => addr.len * 8
}; };
info!("New routing entry: {}/{} => {}", addr, prefix_len, address); info!("New routing entry: {}/{} => {}", addr, prefix_len, addr_nice(address));
// Round the prefix length down to the next multiple of 8 and extraxt a prefix of that // Round the prefix length down to the next multiple of 8 and extraxt a prefix of that
// length. // length.
let group_len = prefix_len as usize / 8; let group_len = prefix_len as usize / 8;
@ -184,6 +184,10 @@ impl Table for RoutingTable {
entry.retain(|entr| &entr.address != addr); entry.retain(|entr| &entr.address != addr);
} }
} }
fn len(&self) -> usize {
self.0.len()
}
} }

View File

@ -169,6 +169,10 @@ pub struct Args {
#[structopt(long)] #[structopt(long)]
stats_file: Option<String>, stats_file: Option<String>,
/// Send statistics to this statsd server
#[structopt(long)]
statsd_server: Option<String>,
/// Run as other user /// Run as other user
#[structopt(long)] #[structopt(long)]
user: Option<String>, user: Option<String>,

View File

@ -5,7 +5,8 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
io::{self, Write}, io::{self, Write},
net::SocketAddr net::SocketAddr,
ops::AddAssign
}; };
use super::{ use super::{
@ -28,6 +29,19 @@ pub struct TrafficEntry {
pub idle_periods: usize pub idle_periods: usize
} }
impl AddAssign<&TrafficEntry> for TrafficEntry {
fn add_assign(&mut self, other: &TrafficEntry) {
self.out_bytes_total += other.out_bytes_total;
self.out_packets_total += other.out_packets_total;
self.out_bytes += other.out_bytes;
self.out_packets += other.out_packets;
self.in_bytes_total += other.in_bytes_total;
self.in_packets_total += other.in_packets_total;
self.in_bytes += other.in_bytes;
self.in_packets += other.in_packets;
}
}
impl TrafficEntry { impl TrafficEntry {
#[inline] #[inline]
fn count_out(&mut self, bytes: usize) { fn count_out(&mut self, bytes: usize) {
@ -63,7 +77,7 @@ impl TrafficEntry {
pub struct TrafficStats { pub struct TrafficStats {
peers: HashMap<SocketAddr, TrafficEntry, Hash>, peers: HashMap<SocketAddr, TrafficEntry, Hash>,
payload: HashMap<(Address, Address), TrafficEntry, Hash>, payload: HashMap<(Address, Address), TrafficEntry, Hash>,
dropped: TrafficEntry pub dropped: TrafficEntry
} }
impl TrafficStats { impl TrafficStats {
@ -117,6 +131,22 @@ impl TrafficStats {
self.payload.iter() self.payload.iter()
} }
pub fn total_peer_traffic(&self) -> TrafficEntry {
let mut total = TrafficEntry::default();
for e in self.peers.values() {
total += e
}
total
}
pub fn total_payload_traffic(&self) -> TrafficEntry {
let mut total = TrafficEntry::default();
for e in self.payload.values() {
total += e
}
total
}
#[inline] #[inline]
pub fn write_out<W: Write>(&self, out: &mut W) -> Result<(), io::Error> { pub fn write_out<W: Write>(&self, out: &mut W) -> Result<(), io::Error> {
writeln!(out, "peer_traffic:")?; writeln!(out, "peer_traffic:")?;

View File

@ -227,6 +227,7 @@ pub trait Table {
fn write_out<W: Write>(&self, out: &mut W) -> Result<(), io::Error>; fn write_out<W: Write>(&self, out: &mut W) -> Result<(), io::Error>;
fn remove(&mut self, _: &Address) -> bool; fn remove(&mut self, _: &Address) -> bool;
fn remove_all(&mut self, _: &SocketAddr); fn remove_all(&mut self, _: &SocketAddr);
fn len(&self) -> usize;
} }
pub trait Protocol: Sized { pub trait Protocol: Sized {