Flexible options

This commit is contained in:
Dennis Schwerdel 2015-11-20 18:09:51 +01:00
parent 80859f4671
commit 9ea2dcb862
5 changed files with 160 additions and 76 deletions

BIN
Measurements_Glab.ods Normal file

Binary file not shown.

View File

@ -10,6 +10,8 @@ use time::{Duration, SteadyTime, precise_time_ns};
use epoll;
use super::{ethernet, udpmessage};
use super::udpmessage::{Options, Message};
use super::ethernet::VlanId;
use super::tapdev::TapDevice;
@ -23,13 +25,12 @@ impl fmt::Debug for Mac {
}
}
pub type Token = u64;
pub type NetworkId = u64;
#[derive(Debug)]
pub enum Error {
ParseError(&'static str),
WrongToken(Token),
WrongNetwork(Option<NetworkId>),
SocketError(&'static str),
TapdevError(&'static str),
}
@ -106,7 +107,7 @@ impl PeerList {
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
struct MacTableKey {
mac: Mac,
vlan: u16
vlan: VlanId
}
struct MacTableValue {
@ -139,7 +140,7 @@ impl MacTable {
}
#[inline]
fn learn(&mut self, mac: &Mac, vlan: u16, addr: &SocketAddr) {
fn learn(&mut self, mac: &Mac, vlan: VlanId, addr: &SocketAddr) {
let key = MacTableKey{mac: *mac, vlan: vlan};
let value = MacTableValue{address: *addr, timeout: SteadyTime::now()+self.timeout};
if self.table.insert(key, value).is_none() {
@ -148,7 +149,7 @@ impl MacTable {
}
#[inline]
fn lookup(&self, mac: &Mac, vlan: u16) -> Option<SocketAddr> {
fn lookup(&self, mac: &Mac, vlan: VlanId) -> Option<SocketAddr> {
let key = MacTableKey{mac: *mac, vlan: vlan};
match self.table.get(&key) {
Some(value) => Some(value.address),
@ -163,7 +164,7 @@ pub struct EthCloud {
mactable: MacTable,
socket: UdpSocket,
tapdev: TapDevice,
token: Token,
network_id: Option<NetworkId>,
next_peerlist: SteadyTime,
update_freq: Duration,
buffer_out: [u8; 64*1024],
@ -171,7 +172,7 @@ pub struct EthCloud {
}
impl EthCloud {
pub fn new(device: &str, listen: String, token: Token, mac_timeout: Duration, peer_timeout: Duration) -> Self {
pub fn new(device: &str, listen: String, network_id: Option<NetworkId>, mac_timeout: Duration, peer_timeout: Duration) -> Self {
let socket = match UdpSocket::bind(&listen as &str) {
Ok(socket) => socket,
_ => panic!("Failed to open socket")
@ -187,7 +188,7 @@ impl EthCloud {
mactable: MacTable::new(mac_timeout),
socket: socket,
tapdev: tapdev,
token: token,
network_id: network_id,
next_peerlist: SteadyTime::now(),
update_freq: peer_timeout/2,
buffer_out: [0; 64*1024],
@ -195,9 +196,11 @@ impl EthCloud {
}
}
fn send_msg<A: ToSocketAddrs + fmt::Display>(&mut self, addr: A, msg: &udpmessage::Message) -> Result<(), Error> {
fn send_msg<A: ToSocketAddrs + fmt::Display>(&mut self, addr: A, msg: &Message) -> Result<(), Error> {
debug!("Sending {:?} to {}", msg, addr);
let size = udpmessage::encode(self.token, msg, &mut self.buffer_out);
let mut options = Options::default();
options.network_id = self.network_id;
let size = udpmessage::encode(&options, msg, &mut self.buffer_out);
match self.socket.send_to(&self.buffer_out[..size], addr) {
Ok(written) if written == size => Ok(()),
Ok(_) => Err(Error::SocketError("Sent out truncated packet")),
@ -209,7 +212,6 @@ impl EthCloud {
}
pub fn connect<A: ToSocketAddrs + fmt::Display>(&mut self, addr: A, reconnect: bool) -> Result<(), Error> {
info!("Connecting to {}", addr);
if let Ok(mut addrs) = addr.to_socket_addrs() {
while let Some(addr) = addrs.next() {
if self.peers.contains(&addr) {
@ -217,11 +219,12 @@ impl EthCloud {
}
}
}
info!("Connecting to {}", addr);
if reconnect {
let addr = addr.to_socket_addrs().unwrap().next().unwrap();
self.reconnect_peers.push(addr);
}
self.send_msg(addr, &udpmessage::Message::GetPeers)
self.send_msg(addr, &Message::GetPeers)
}
fn housekeep(&mut self) -> Result<(), Error> {
@ -238,7 +241,7 @@ impl EthCloud {
}
}
let peers = self.peers.subset(peer_num, precise_time_ns() as u32);
let msg = udpmessage::Message::Peers(peers);
let msg = Message::Peers(peers);
for addr in &self.peers.as_vec() {
try!(self.send_msg(addr, &msg));
}
@ -255,11 +258,11 @@ impl EthCloud {
match self.mactable.lookup(frame.dst, frame.vlan) {
Some(addr) => {
debug!("Found destination for {:?} (vlan {}) => {}", frame.dst, frame.vlan, addr);
try!(self.send_msg(addr, &udpmessage::Message::Frame(frame)))
try!(self.send_msg(addr, &Message::Frame(frame)))
},
None => {
debug!("No destination for {:?} (vlan {}) found, broadcasting", frame.dst, frame.vlan);
let msg = udpmessage::Message::Frame(frame);
let msg = Message::Frame(frame);
for addr in &self.peers.as_vec() {
try!(self.send_msg(addr, &msg));
}
@ -268,14 +271,16 @@ impl EthCloud {
Ok(())
}
fn handle_net_message(&mut self, peer: SocketAddr, token: Token, msg: udpmessage::Message) -> Result<(), Error> {
if token != self.token {
info!("Ignoring message from {} with wrong token {}", peer, token);
return Err(Error::WrongToken(token));
fn handle_net_message(&mut self, peer: SocketAddr, options: Options, msg: Message) -> Result<(), Error> {
if let Some(id) = self.network_id {
if options.network_id != Some(id) {
info!("Ignoring message from {} with wrong token {:?}", peer, options.network_id);
return Err(Error::WrongNetwork(options.network_id));
}
}
debug!("Recieved {:?} from {}", msg, peer);
match msg {
udpmessage::Message::Frame(frame) => {
Message::Frame(frame) => {
let size = ethernet::encode(&frame, &mut self.buffer_out);
debug!("Writing ethernet frame to tap: {:?}", frame);
match self.tapdev.write(&self.buffer_out[..size]) {
@ -288,7 +293,7 @@ impl EthCloud {
self.peers.add(&peer);
self.mactable.learn(frame.src, frame.vlan, &peer);
},
udpmessage::Message::Peers(peers) => {
Message::Peers(peers) => {
self.peers.add(&peer);
for p in &peers {
if ! self.peers.contains(p) {
@ -296,12 +301,12 @@ impl EthCloud {
}
}
},
udpmessage::Message::GetPeers => {
Message::GetPeers => {
self.peers.add(&peer);
let peers = self.peers.as_vec();
try!(self.send_msg(peer, &udpmessage::Message::Peers(peers)));
try!(self.send_msg(peer, &Message::Peers(peers)));
},
udpmessage::Message::Close => {
Message::Close => {
self.peers.remove(&peer);
}
}
@ -325,7 +330,7 @@ impl EthCloud {
match &events[i as usize].data {
&0 => match self.socket.recv_from(&mut buffer) {
Ok((size, src)) => {
match udpmessage::decode(&buffer[..size]).and_then(|(token, msg)| self.handle_net_message(src, token, msg)) {
match udpmessage::decode(&buffer[..size]).and_then(|(options, msg)| self.handle_net_message(src, options, msg)) {
Ok(_) => (),
Err(e) => error!("Error: {:?}", e)
}

View File

@ -3,9 +3,11 @@ use std::{mem, ptr, fmt};
use super::ethcloud::{Mac, Error};
use super::util::{as_bytes, as_obj};
pub type VlanId = u16;
#[derive(PartialEq)]
pub struct Frame<'a> {
pub vlan: u16,
pub vlan: VlanId,
pub src: &'a Mac,
pub dst: &'a Mac,
pub payload: &'a [u8]

View File

@ -13,7 +13,7 @@ mod ethcloud;
use time::Duration;
use docopt::Docopt;
use ethcloud::{Error, Token, EthCloud};
use ethcloud::{Error, NetworkId, EthCloud};
//TODO: Implement IPv6
@ -38,13 +38,13 @@ impl log::Log for SimpleLogger {
static USAGE: &'static str = "
Usage:
ethcloud [options] [-d <device>] [-l <listen>] [-t <token>] [-c <connect>...]
ethcloud [options] [-d <device>] [-l <listen>] [-c <connect>...]
Options:
-d <device>, --device <device> Name of the tap device [default: ethcloud%d]
-l <listen>, --listen <listen> Address to listen on [default: 0.0.0.0:3210]
-t <token>, --token <token> Token that identifies the network [default: 0]
-c <connect>, --connect <connect> List of peers (addr:port) to connect to
--network-id <network_id> Optional token that identifies the network
--peer-timeout <peer_timeout> Peer timeout in seconds [default: 1800]
--mac-timeout <mac_timeout> Mac table entry timeout in seconds [default: 300]
-v, --verbose Log verbosely
@ -56,7 +56,7 @@ Options:
struct Args {
flag_device: String,
flag_listen: String,
flag_token: Token,
flag_network_id: Option<NetworkId>,
flag_connect: Vec<String>,
flag_peer_timeout: usize,
flag_mac_timeout: usize,
@ -81,7 +81,7 @@ fn main() {
let mut tapcloud = EthCloud::new(
&args.flag_device,
args.flag_listen,
args.flag_token,
args.flag_network_id,
Duration::seconds(args.flag_mac_timeout as i64),
Duration::seconds(args.flag_peer_timeout as i64)
);

View File

@ -2,9 +2,32 @@ use std::{mem, ptr, fmt};
use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr};
use std::u16;
use super::ethcloud::{Error, Token};
use super::ethcloud::{Error, NetworkId};
use super::ethernet;
use super::util::as_obj;
use super::util::{as_obj, as_bytes};
const MAGIC: [u8; 3] = [0x76, 0x70, 0x6e];
const VERSION: u8 = 0;
#[repr(packed)]
struct TopHeader {
magic: [u8; 3],
version: u8,
_reserved: [u8; 2],
option_count: u8,
msgtype: u8
}
impl Default for TopHeader {
fn default() -> Self {
TopHeader{magic: MAGIC, version: VERSION, _reserved: [0; 2], option_count: 0, msgtype: 0}
}
}
#[derive(Default, Debug, PartialEq, Eq)]
pub struct Options {
pub network_id: Option<NetworkId>
}
#[derive(PartialEq)]
@ -37,19 +60,44 @@ impl<'a> fmt::Debug for Message<'a> {
}
}
pub fn decode(data: &[u8]) -> Result<(Token, Message), Error> {
if data.len() < mem::size_of::<Token>() {
pub fn decode(data: &[u8]) -> Result<(Options, Message), Error> {
if data.len() < mem::size_of::<TopHeader>() {
return Err(Error::ParseError("Empty message"));
}
let mut pos = 0;
let mut token = Token::from_be(* unsafe { as_obj::<Token>(&data[pos..]) });
pos += mem::size_of::<Token>();
let switch = token & 0xff;
token = token >> 8;
match switch {
0 => {
Ok((token, Message::Frame(try!(ethernet::decode(&data[pos..])))))
},
let header = unsafe { as_obj::<TopHeader>(&data[pos..]) };
pos += mem::size_of::<TopHeader>();
if header.magic != MAGIC {
return Err(Error::ParseError("Wrong protocol"));
}
if header.version != VERSION {
return Err(Error::ParseError("Wrong version"));
}
let mut options = Options::default();
for _ in 0..header.option_count {
if data.len() < pos + 2 {
return Err(Error::ParseError("Truncated options"));
}
let opt_type = data[pos];
let opt_len = data[pos+1];
pos += 2;
if data.len() < pos + opt_len as usize {
return Err(Error::ParseError("Truncated options"));
}
match opt_type {
0 => {
if opt_len != 8 {
return Err(Error::ParseError("Invalid message_id length"));
}
let id = u64::from_be(*unsafe { as_obj::<u64>(&data[pos..]) });
options.network_id = Some(id);
},
_ => return Err(Error::ParseError("Unknown option"))
}
pos += opt_len as usize;
}
let msg = match header.msgtype {
0 => Message::Frame(try!(ethernet::decode(&data[pos..]))),
1 => {
if data.len() < pos + 1 {
return Err(Error::ParseError("Empty peers"));
@ -73,27 +121,42 @@ pub fn decode(data: &[u8]) -> Result<(Token, Message), Error> {
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3]), port));
peers.push(addr);
}
Ok((token, Message::Peers(peers)))
Message::Peers(peers)
},
2 => Ok((token, Message::GetPeers)),
3 => Ok((token, Message::Close)),
_ => Err(Error::ParseError("Unknown message type"))
}
2 => Message::GetPeers,
3 => Message::Close,
_ => return Err(Error::ParseError("Unknown message type"))
};
Ok((options, msg))
}
pub fn encode(token: Token, msg: &Message, buf: &mut [u8]) -> usize {
assert!(buf.len() >= mem::size_of::<Token>());
pub fn encode(options: &Options, msg: &Message, buf: &mut [u8]) -> usize {
assert!(buf.len() >= mem::size_of::<TopHeader>());
let mut pos = 0;
let switch = match msg {
let mut header = TopHeader::default();
header.msgtype = match msg {
&Message::Frame(_) => 0,
&Message::Peers(_) => 1,
&Message::GetPeers => 2,
&Message::Close => 3
};
let token = (token << 8) | switch;
let token_dat = unsafe { mem::transmute::<Token, [u8; 8]>(token.to_be()) };
unsafe { ptr::copy_nonoverlapping(token_dat.as_ptr(), buf[pos..].as_mut_ptr(), token_dat.len()) };
pos += token_dat.len();
if options.network_id.is_some() {
header.option_count += 1;
}
let header_dat = unsafe { as_bytes(&header) };
unsafe { ptr::copy_nonoverlapping(header_dat.as_ptr(), buf[pos..].as_mut_ptr(), header_dat.len()) };
pos += header_dat.len();
if let Some(id) = options.network_id {
assert!(buf.len() >= pos + 2 + 8);
buf[pos] = 0;
buf[pos+1] = 8;
pos += 2;
unsafe {
let id_dat = mem::transmute::<u64, [u8; 8]>(id.to_be());
ptr::copy_nonoverlapping(id_dat.as_ptr(), buf[pos..].as_mut_ptr(), id_dat.len());
}
pos += 8;
}
match msg {
&Message::Frame(ref frame) => {
pos += ethernet::encode(&frame, &mut buf[pos..])
@ -136,56 +199,70 @@ pub fn encode(token: Token, msg: &Message, buf: &mut [u8]) -> usize {
#[test]
fn encode_message_packet() {
use super::ethcloud::Mac;
let token = 134;
let options = Options::default();
let src = Mac([1,2,3,4,5,6]);
let dst = Mac([7,8,9,10,11,12]);
let payload = [1,2,3,4,5];
let msg = Message::Frame(ethernet::Frame{src: &src, dst: &dst, vlan: 0, payload: &payload});
let mut buf = [0; 1024];
let size = encode(token, &msg, &mut buf[..]);
let size = encode(&options, &msg, &mut buf[..]);
assert_eq!(size, 25);
assert_eq!(&buf[..8], &[0,0,0,0,0,0,134,0]);
let (token2, msg2) = decode(&buf[..size]).unwrap();
assert_eq!(token, token2);
assert_eq!(&buf[..8], &[118,112,110,0,0,0,0,0]);
let (options2, msg2) = decode(&buf[..size]).unwrap();
assert_eq!(options, options2);
assert_eq!(msg, msg2);
}
#[test]
fn encode_message_peers() {
use std::str::FromStr;
let token = 134;
let options = Options::default();
let msg = Message::Peers(vec![SocketAddr::from_str("1.2.3.4:123").unwrap(), SocketAddr::from_str("5.6.7.8:12345").unwrap()]);
let mut buf = [0; 1024];
let size = encode(token, &msg, &mut buf[..]);
let size = encode(&options, &msg, &mut buf[..]);
assert_eq!(size, 22);
assert_eq!(&buf[..size], &[0,0,0,0,0,0,134,1,2,1,2,3,4,0,123,5,6,7,8,48,57,0]);
let (token2, msg2) = decode(&buf[..size]).unwrap();
assert_eq!(token, token2);
assert_eq!(&buf[..size], &[118,112,110,0,0,0,0,1,2,1,2,3,4,0,123,5,6,7,8,48,57,0]);
let (options2, msg2) = decode(&buf[..size]).unwrap();
assert_eq!(options, options2);
assert_eq!(msg, msg2);
}
#[test]
fn encode_option_network_id() {
let mut options = Options::default();
options.network_id = Some(134);
let msg = Message::GetPeers;
let mut buf = [0; 1024];
let size = encode(&options, &msg, &mut buf[..]);
assert_eq!(size, 18);
assert_eq!(&buf[..size], &[118,112,110,0,0,0,1,2,0,8,0,0,0,0,0,0,0,134]);
let (options2, msg2) = decode(&buf[..size]).unwrap();
assert_eq!(options, options2);
assert_eq!(msg, msg2);
}
#[test]
fn encode_message_getpeers() {
let token = 134;
let options = Options::default();
let msg = Message::GetPeers;
let mut buf = [0; 1024];
let size = encode(token, &msg, &mut buf[..]);
let size = encode(&options, &msg, &mut buf[..]);
assert_eq!(size, 8);
assert_eq!(&buf[..size], &[0,0,0,0,0,0,134,2]);
let (token2, msg2) = decode(&buf[..size]).unwrap();
assert_eq!(token, token2);
assert_eq!(&buf[..size], &[118,112,110,0,0,0,0,2]);
let (options2, msg2) = decode(&buf[..size]).unwrap();
assert_eq!(options, options2);
assert_eq!(msg, msg2);
}
#[test]
fn encode_message_close() {
let token = 134;
let options = Options::default();
let msg = Message::Close;
let mut buf = [0; 1024];
let size = encode(token, &msg, &mut buf[..]);
let size = encode(&options, &msg, &mut buf[..]);
assert_eq!(size, 8);
assert_eq!(&buf[..size], &[0,0,0,0,0,0,134,3]);
let (token2, msg2) = decode(&buf[..size]).unwrap();
assert_eq!(token, token2);
assert_eq!(&buf[..size], &[118,112,110,0,0,0,0,3]);
let (options2, msg2) = decode(&buf[..size]).unwrap();
assert_eq!(options, options2);
assert_eq!(msg, msg2);
}