Exchange peer_timeout and adapt keepalive

pull/46/head
Dennis Schwerdel 2019-12-04 10:04:42 +01:00
parent 04d934491c
commit ee8542307f
3 changed files with 62 additions and 33 deletions

View File

@ -40,6 +40,7 @@ pub const STATS_INTERVAL: Time = 60;
struct PeerData {
timeout: Time,
peer_timeout: u16,
node_id: NodeId,
alt_addrs: Vec<SocketAddr>
}
@ -84,6 +85,10 @@ impl<TS: TimeSource> PeerList<TS> {
del
}
pub fn min_peer_timeout(&self) -> u16 {
self.peers.iter().map(|p| p.1.peer_timeout).min().unwrap_or(1800)
}
#[inline]
pub fn contains_addr(&self, addr: &SocketAddr) -> bool {
self.addresses.contains_key(addr)
@ -106,13 +111,14 @@ impl<TS: TimeSource> PeerList<TS> {
#[inline]
fn add(&mut self, node_id: NodeId, addr: SocketAddr) {
fn add(&mut self, node_id: NodeId, addr: SocketAddr, peer_timeout: u16) {
if self.nodes.insert(node_id, addr).is_none() {
info!("New peer: {}", addr);
self.peers.insert(addr, PeerData {
timeout: TS::now() + Time::from(self.timeout),
node_id,
alt_addrs: vec![]
alt_addrs: vec![],
peer_timeout
});
self.addresses.insert(addr, node_id);
}
@ -221,7 +227,7 @@ pub struct GenericCloud<D: Device, P: Protocol, T: Table, S: Socket, TS: TimeSou
device: D,
crypto: Crypto,
next_peerlist: Time,
update_freq: Duration,
update_freq: u16,
buffer_out: [u8; 64 * 1024],
next_housekeep: Time,
next_stats_out: Time,
@ -252,7 +258,7 @@ impl<D: Device, P: Protocol, T: Table, S: Socket, TS: TimeSource> GenericCloud<D
let mut res = GenericCloud {
magic: config.get_magic(),
node_id: random(),
peers: PeerList::new(config.peer_timeout),
peers: PeerList::new(config.peer_timeout as Duration),
addresses,
learning,
broadcast,
@ -263,7 +269,7 @@ impl<D: Device, P: Protocol, T: Table, S: Socket, TS: TimeSource> GenericCloud<D
socket6,
device,
next_peerlist: now,
update_freq: config.get_keepalive(),
update_freq: config.get_keepalive() as u16,
buffer_out: [0; 64 * 1024],
next_housekeep: now,
next_stats_out: now + STATS_INTERVAL,
@ -408,7 +414,7 @@ impl<D: Device, P: Protocol, T: Table, S: Socket, TS: TimeSource> GenericCloud<D
// Send a message to each resolved address
for a in resolve(&addr)? {
// Ignore error this time
let mut msg = Message::Init(0, node_id, subnets.clone());
let mut msg = Message::Init(0, node_id, subnets.clone(), self.config.peer_timeout as u16);
self.send_msg(a, &mut msg).ok();
}
Ok(())
@ -429,7 +435,7 @@ impl<D: Device, P: Protocol, T: Table, S: Socket, TS: TimeSource> GenericCloud<D
debug!("Connecting to {:?}", addr);
let subnets = self.addresses.clone();
let node_id = self.node_id;
let mut msg = Message::Init(0, node_id, subnets.clone());
let mut msg = Message::Init(0, node_id, subnets.clone(), self.config.peer_timeout as u16);
self.send_msg(addr, &mut msg)
}
@ -466,6 +472,7 @@ impl<D: Device, P: Protocol, T: Table, S: Socket, TS: TimeSource> GenericCloud<D
let mut msg = Message::Peers(peers);
self.broadcast_msg(&mut msg)?;
// Reschedule for next update
self.update_freq = min(self.config.get_keepalive() as u16, self.peers.min_peer_timeout());
self.next_peerlist = now + Time::from(self.update_freq);
}
// Connect to those reconnect_peers that are due
@ -697,7 +704,7 @@ impl<D: Device, P: Protocol, T: Table, S: Socket, TS: TimeSource> GenericCloud<D
// Refresh peer
self.peers.refresh(&peer);
}
Message::Init(stage, node_id, ranges) => {
Message::Init(stage, node_id, ranges, peer_timeout) => {
// Avoid connecting to self
if node_id == self.node_id {
self.own_addresses.push(peer);
@ -707,7 +714,7 @@ impl<D: Device, P: Protocol, T: Table, S: Socket, TS: TimeSource> GenericCloud<D
if self.peers.contains_node(&node_id) {
self.peers.make_primary(node_id, peer);
} else {
self.peers.add(node_id, peer);
self.peers.add(node_id, peer, peer_timeout);
for range in ranges {
self.table.learn(range.base, Some(range.prefix_len), peer);
}
@ -716,7 +723,10 @@ impl<D: Device, P: Protocol, T: Table, S: Socket, TS: TimeSource> GenericCloud<D
if stage == 0 {
let own_addrs = self.addresses.clone();
let own_node_id = self.node_id;
self.send_msg(peer, &mut Message::Init(stage + 1, own_node_id, own_addrs))?;
self.send_msg(
peer,
&mut Message::Init(stage + 1, own_node_id, own_addrs, self.config.peer_timeout as u16)
)?;
}
// Send peers in any case
let peers = self.peers.as_vec();

View File

@ -17,24 +17,24 @@ fn connect_v4() {
node1.connect("2.3.4.5:6789").unwrap();
// Node 1 -> Node 2: Init 0
assert_message4!(node1, node1_addr, node2, node2_addr, Message::Init(0, node1.node_id(), vec![]));
assert_message4!(node1, node1_addr, node2, node2_addr, Message::Init(0, node1.node_id(), vec![], 1800));
assert_clean!(node1);
assert!(node2.peers().contains_node(&node1.node_id()));
// Node 2 -> Node 1: Init 1 | Node 2 -> Node 1: Peers
assert_message4!(node2, node2_addr, node1, node1_addr, Message::Init(1, node2.node_id(), vec![]));
assert_message4!(node2, node2_addr, node1, node1_addr, Message::Init(1, node2.node_id(), vec![], 1800));
assert!(node1.peers().contains_node(&node2.node_id()));
assert_message4!(node2, node2_addr, node1, node1_addr, Message::Peers(vec![node1_addr]));
assert_clean!(node2);
// Node 1 -> Node 2: Peers | Node 1 -> Node 1: Init 0
assert_message4!(node1, node1_addr, node2, node2_addr, Message::Peers(vec![node2_addr]));
assert_message4!(node1, node1_addr, node1, node1_addr, Message::Init(0, node1.node_id(), vec![]));
assert_message4!(node1, node1_addr, node1, node1_addr, Message::Init(0, node1.node_id(), vec![], 1800));
assert!(node1.own_addresses().contains(&node1_addr));
assert_clean!(node1);
// Node 2 -> Node 2: Init 0
assert_message4!(node2, node2_addr, node2, node2_addr, Message::Init(0, node2.node_id(), vec![]));
assert_message4!(node2, node2_addr, node2, node2_addr, Message::Init(0, node2.node_id(), vec![], 1800));
assert_clean!(node2);
assert!(node2.own_addresses().contains(&node2_addr));
@ -156,7 +156,7 @@ fn lost_init1() {
node1.connect("2.3.4.5:6789").unwrap();
// Node 1 -> Node 2: Init 0
assert_message4!(node1, node1_addr, node2, node2_addr, Message::Init(0, node1.node_id(), vec![]));
assert_message4!(node1, node1_addr, node2, node2_addr, Message::Init(0, node1.node_id(), vec![], 1800));
assert_clean!(node1);
// Node 2 -> Node 1: Init 1 | Node 2 -> Node 1: Peers
@ -177,7 +177,7 @@ fn wrong_magic() {
let node2_addr = addr!("2.3.4.5:6789");
node1.connect("2.3.4.5:6789").unwrap();
assert_message4!(node1, node1_addr, node2, node2_addr, Message::Init(0, node1.node_id(), vec![]));
assert_message4!(node1, node1_addr, node2, node2_addr, Message::Init(0, node1.node_id(), vec![], 1800));
assert_clean!(node1, node2);
@ -204,3 +204,8 @@ fn remove_dead_peers() {
fn update_primary_address() {
// TODO
}
#[test]
fn automatic_peer_timeout() {
// TODO
}

View File

@ -53,9 +53,9 @@ impl TopHeader {
}
pub enum Message<'a> {
Data(&'a mut [u8], usize, usize), // data, start, end
Peers(Vec<SocketAddr>), // peers
Init(u8, NodeId, Vec<Range>), // step, node_id, ranges
Data(&'a mut [u8], usize, usize), // data, start, end
Peers(Vec<SocketAddr>), // peers
Init(u8, NodeId, Vec<Range>, u16), // step, node_id, ranges
Close
}
@ -64,7 +64,7 @@ impl<'a> Message<'a> {
match self {
Message::Data(_, start, end) => Message::Data(&mut [], start, end),
Message::Peers(peers) => Message::Peers(peers),
Message::Init(step, node_id, ranges) => Message::Init(step, node_id, ranges),
Message::Init(step, node_id, ranges, timeout) => Message::Init(step, node_id, ranges, timeout),
Message::Close => Message::Close
}
}
@ -86,8 +86,15 @@ impl<'a> fmt::Debug for Message<'a> {
}
write!(formatter, "]")
}
Message::Init(stage, ref node_id, ref peers) => {
write!(formatter, "Init(stage={}, node_id={}, {:?})", stage, bytes_to_hex(node_id), peers)
Message::Init(stage, ref node_id, ref peers, ref peer_timeout) => {
write!(
formatter,
"Init(stage={}, node_id={}, peer_timeout={}, {:?})",
stage,
bytes_to_hex(node_id),
peer_timeout,
peers
)
}
Message::Close => write!(formatter, "Close")
}
@ -183,7 +190,12 @@ pub fn decode<'a>(data: &'a mut [u8], magic: HeaderMagic, crypto: &Crypto) -> Re
pos += read;
addrs.push(range);
}
Message::Init(stage, node_id, addrs)
let mut peer_timeout = 1800;
if data.len() >= pos + 2 {
peer_timeout = Encoder::read_u16(&data[pos..]);
// pos += 2; never read
}
Message::Init(stage, node_id, addrs, peer_timeout)
}
3 => Message::Close,
_ => return Err(Error::Parse("Unknown message type"))
@ -198,7 +210,7 @@ pub fn encode<'a>(
let header_type = match msg {
Message::Data(_, _, _) => 0,
Message::Peers(_) => 1,
Message::Init(_, _, _) => 2,
Message::Init(_, _, _, _) => 2,
Message::Close => 3
};
let mut start = 64;
@ -244,7 +256,7 @@ pub fn encode<'a>(
}
end = pos;
}
Message::Init(stage, ref node_id, ref ranges) => {
Message::Init(stage, ref node_id, ref ranges, peer_timeout) => {
let mut pos = start;
assert!(buf.len() >= pos + 2 + NODE_ID_BYTES);
buf[pos] = stage;
@ -257,6 +269,8 @@ pub fn encode<'a>(
for range in ranges {
pos += range.write_to(&mut buf[pos..]);
}
Encoder::write_u16(peer_timeout, &mut buf[pos..]);
pos += 2;
end = pos;
}
Message::Close => {}
@ -299,9 +313,9 @@ impl<'a> PartialEq for Message<'a> {
false
}
}
Message::Init(step1, node_id1, ref ranges1) => {
if let Message::Init(step2, node_id2, ref ranges2) = *other {
step1 == step2 && node_id1 == node_id2 && ranges1 == ranges2
Message::Init(step1, node_id1, ref ranges1, peer_timeout1) => {
if let Message::Init(step2, node_id2, ref ranges2, peer_timeout2) = *other {
step1 == step2 && node_id1 == node_id2 && ranges1 == ranges2 && peer_timeout1 == peer_timeout2
} else {
false
}
@ -421,15 +435,15 @@ fn udpmessage_init() {
Range { base: Address { data: [0, 1, 2, 3, 4, 5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], len: 6 }, prefix_len: 16 },
];
let node_id = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
let mut msg = Message::Init(0, node_id, addrs);
let mut msg = Message::Init(0, node_id, addrs, 1800);
let mut should = [
118, 112, 110, 1, 0, 0, 0, 2, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 2, 4, 0, 1, 2, 3, 24, 6,
0, 1, 2, 3, 4, 5, 16
0, 1, 2, 3, 4, 5, 16, 7, 8
];
{
let mut buf = [0; 1024];
let res = encode(&mut msg, &mut buf[..], MAGIC, &mut crypto);
assert_eq!(res.len(), 40);
assert_eq!(res.len(), 42);
for i in 0..res.len() {
assert_eq!(res[i], should[i]);
}
@ -502,9 +516,9 @@ fn message_fmt() {
base: Address { data: [0, 1, 2, 3, 4, 5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], len: 6 },
prefix_len: 16
}
])
], 1800)
),
"Init(stage=0, node_id=000102030405060708090a0b0c0d0e0f, [0.1.2.3/24, 00:01:02:03:04:05/16])"
"Init(stage=0, node_id=000102030405060708090a0b0c0d0e0f, peer_timeout=1800, [0.1.2.3/24, 00:01:02:03:04:05/16])"
);
assert_eq!(format!("{:?}", Message::Close), "Close");
}