mirror of https://github.com/dswd/vpncloud.git
Compare commits
No commits in common. "675031a4ae00074f319de1ef09e5debff6ffc6a9" and "21d58f25a3b86bc83b846253fbb35f12af2a7838" have entirely different histories.
675031a4ae
...
21d58f25a3
|
@ -241,7 +241,7 @@ impl PeerCrypto {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn encrypt_message(&mut self, buffer: &mut MsgBuffer) {
|
fn encrypt_message(&mut self, buffer: &mut MsgBuffer) {
|
||||||
// HOT PATH
|
// HOT PATH
|
||||||
if let PeerCrypto::Encrypted { core, .. } = self {
|
if let PeerCrypto::Encrypted { core, .. } = self {
|
||||||
core.encrypt(buffer)
|
core.encrypt(buffer)
|
||||||
|
@ -283,6 +283,13 @@ impl PeerCrypto {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn send_message(&mut self, type_: u8, buffer: &mut MsgBuffer) {
|
||||||
|
// HOT PATH
|
||||||
|
assert_ne!(type_, MESSAGE_TYPE_ROTATION);
|
||||||
|
buffer.prepend_byte(type_);
|
||||||
|
self.encrypt_message(buffer);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn every_second(&mut self, out: &mut MsgBuffer) -> MessageResult {
|
pub fn every_second(&mut self, out: &mut MsgBuffer) -> MessageResult {
|
||||||
out.clear();
|
out.clear();
|
||||||
if let PeerCrypto::Encrypted { core, rotation, rotate_counter, algorithm, .. } = self {
|
if let PeerCrypto::Encrypted { core, rotation, rotate_counter, algorithm, .. } = self {
|
||||||
|
@ -360,8 +367,7 @@ mod tests {
|
||||||
buffer.set_length(1000);
|
buffer.set_length(1000);
|
||||||
rng.fill(buffer.message_mut()).unwrap();
|
rng.fill(buffer.message_mut()).unwrap();
|
||||||
for _ in 0..1000 {
|
for _ in 0..1000 {
|
||||||
buffer.prepend_byte(1);
|
node1.send_message(1, &mut buffer);
|
||||||
node1.encrypt_message(&mut buffer);
|
|
||||||
let res = node2.handle_message(&mut buffer).unwrap();
|
let res = node2.handle_message(&mut buffer).unwrap();
|
||||||
assert_eq!(res, MessageResult::Message(1));
|
assert_eq!(res, MessageResult::Message(1));
|
||||||
|
|
||||||
|
|
|
@ -552,7 +552,7 @@ impl<P: Payload> InitState<P> {
|
||||||
out.set_length(len);
|
out.set_length(len);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn repeat_last_message(&self, out: &mut MsgBuffer) {
|
fn repeat_last_message(&self, out: &mut MsgBuffer) {
|
||||||
if let Some(ref bytes) = self.last_message {
|
if let Some(ref bytes) = self.last_message {
|
||||||
debug!("Repeating last init message");
|
debug!("Repeating last init message");
|
||||||
let buffer = out.buffer();
|
let buffer = out.buffer();
|
||||||
|
|
|
@ -42,9 +42,11 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
|
||||||
#[inline]
|
#[inline]
|
||||||
fn send_msg(&mut self, addr: SocketAddr, type_: u8, msg: &mut MsgBuffer) -> Result<(), Error> {
|
fn send_msg(&mut self, addr: SocketAddr, type_: u8, msg: &mut MsgBuffer) -> Result<(), Error> {
|
||||||
debug!("Sending msg with {} bytes to {}", msg.len(), addr);
|
debug!("Sending msg with {} bytes to {}", msg.len(), addr);
|
||||||
msg.prepend_byte(type_);
|
if self.peer_crypto.send_message(addr, type_, msg)? {
|
||||||
self.peer_crypto.encrypt_for(addr, msg)?;
|
self.send_to(addr, msg)
|
||||||
self.send_to(addr, msg)
|
} else {
|
||||||
|
Err(Error::Message("Sending to node that is not a peer"))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
|
@ -55,10 +57,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
|
||||||
msg_data.set_start(msg.get_start());
|
msg_data.set_start(msg.get_start());
|
||||||
msg_data.set_length(msg.len());
|
msg_data.set_length(msg.len());
|
||||||
msg_data.message_mut().clone_from_slice(msg.message());
|
msg_data.message_mut().clone_from_slice(msg.message());
|
||||||
msg_data.prepend_byte(type_);
|
crypto.send_message(type_, &mut msg_data)?;
|
||||||
if let Some(crypto) = crypto {
|
|
||||||
crypto.encrypt(&mut msg_data);
|
|
||||||
}
|
|
||||||
self.traffic.count_out_traffic(addr, msg_data.len());
|
self.traffic.count_out_traffic(addr, msg_data.len());
|
||||||
match self.socket.send(msg_data.message(), addr) {
|
match self.socket.send(msg_data.message(), addr) {
|
||||||
Ok(written) if written == msg_data.len() => Ok(()),
|
Ok(written) if written == msg_data.len() => Ok(()),
|
||||||
|
|
|
@ -26,7 +26,7 @@ use smallvec::{smallvec, SmallVec};
|
||||||
use crate::{
|
use crate::{
|
||||||
beacon::BeaconSerializer,
|
beacon::BeaconSerializer,
|
||||||
config::{Config, DEFAULT_PEER_TIMEOUT, DEFAULT_PORT},
|
config::{Config, DEFAULT_PEER_TIMEOUT, DEFAULT_PORT},
|
||||||
crypto::{is_init_message, Crypto, MessageResult, PeerCrypto, InitState, InitResult},
|
crypto::{is_init_message, Crypto, MessageResult, PeerCrypto},
|
||||||
device::{Device, Type},
|
device::{Device, Type},
|
||||||
error::Error,
|
error::Error,
|
||||||
messages::{
|
messages::{
|
||||||
|
@ -58,7 +58,7 @@ struct PeerData {
|
||||||
timeout: Time,
|
timeout: Time,
|
||||||
peer_timeout: u16,
|
peer_timeout: u16,
|
||||||
node_id: NodeId,
|
node_id: NodeId,
|
||||||
crypto: PeerCrypto
|
crypto: PeerCrypto<NodeInfo>
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
@ -190,8 +190,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
|
||||||
msg_data.set_start(msg.get_start());
|
msg_data.set_start(msg.get_start());
|
||||||
msg_data.set_length(msg.len());
|
msg_data.set_length(msg.len());
|
||||||
msg_data.message_mut().clone_from_slice(msg.message());
|
msg_data.message_mut().clone_from_slice(msg.message());
|
||||||
msg_data.prepend_byte(type_);
|
peer.crypto.send_message(type_, &mut msg_data)?;
|
||||||
peer.crypto.encrypt_message(&mut msg_data);
|
|
||||||
self.traffic.count_out_traffic(*addr, msg_data.len());
|
self.traffic.count_out_traffic(*addr, msg_data.len());
|
||||||
match self.socket.send(msg_data.message(), *addr) {
|
match self.socket.send(msg_data.message(), *addr) {
|
||||||
Ok(written) if written == msg_data.len() => Ok(()),
|
Ok(written) if written == msg_data.len() => Ok(()),
|
||||||
|
@ -222,8 +221,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
|
||||||
Some(peer) => peer,
|
Some(peer) => peer,
|
||||||
None => return Err(Error::Message("Sending to node that is not a peer"))
|
None => return Err(Error::Message("Sending to node that is not a peer"))
|
||||||
};
|
};
|
||||||
msg.prepend_byte(type_);
|
peer.crypto.send_message(type_, msg)?;
|
||||||
peer.crypto.encrypt_message(msg);
|
|
||||||
self.send_to(addr, msg)
|
self.send_to(addr, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -332,7 +330,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
|
||||||
let payload = self.create_node_info();
|
let payload = self.create_node_info();
|
||||||
let mut peer_crypto = self.crypto.peer_instance(payload);
|
let mut peer_crypto = self.crypto.peer_instance(payload);
|
||||||
let mut msg = MsgBuffer::new(SPACE_BEFORE);
|
let mut msg = MsgBuffer::new(SPACE_BEFORE);
|
||||||
peer_crypto.send_ping(&mut msg);
|
peer_crypto.initialize(&mut msg)?;
|
||||||
self.pending_inits.insert(addr, peer_crypto);
|
self.pending_inits.insert(addr, peer_crypto);
|
||||||
self.send_to(addr, &mut msg)
|
self.send_to(addr, &mut msg)
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,7 @@ use parking_lot::Mutex;
|
||||||
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
|
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
|
||||||
|
|
||||||
pub struct SharedPeerCrypto {
|
pub struct SharedPeerCrypto {
|
||||||
peers: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>
|
peers: Arc<Mutex<HashMap<SocketAddr, Arc<CryptoCore>, Hash>>>
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SharedPeerCrypto {
|
impl SharedPeerCrypto {
|
||||||
|
@ -20,19 +20,20 @@ impl SharedPeerCrypto {
|
||||||
// TODO sync if needed
|
// TODO sync if needed
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn encrypt_for(&mut self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
|
pub fn send_message(&mut self, peer: SocketAddr, type_: u8, data: &mut MsgBuffer) -> Result<bool, Error> {
|
||||||
let mut peers = self.peers.lock();
|
let mut peers = self.peers.lock();
|
||||||
match peers.get_mut(&peer) {
|
if let Some(peer) = peers.get_mut(&peer) {
|
||||||
None => Err(Error::InvalidCryptoState("No crypto found for peer")),
|
peer.send_message(type_, data);
|
||||||
Some(None) => Ok(()),
|
Ok(true)
|
||||||
Some(Some(crypto)) => Ok(crypto.encrypt(data))
|
} else {
|
||||||
|
Ok(false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn for_each(&mut self, mut callback: impl FnMut(SocketAddr, Option<Arc<CryptoCore>>) -> Result<(), Error>) -> Result<(), Error> {
|
pub fn for_each(&mut self, mut callback: impl FnMut(SocketAddr, &mut CryptoCore) -> Result<(), Error>) -> Result<(), Error> {
|
||||||
let mut peers = self.peers.lock();
|
let mut peers = self.peers.lock();
|
||||||
for (k, v) in peers.iter_mut() {
|
for (k, v) in peers.iter_mut() {
|
||||||
callback(*k, v.clone())?
|
callback(*k, v)?
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,7 @@ use super::{
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
config::DEFAULT_PEER_TIMEOUT,
|
config::DEFAULT_PEER_TIMEOUT,
|
||||||
crypto::{is_init_message, MessageResult, PeerCrypto, InitState, InitResult},
|
crypto::{is_init_message, MessageResult, PeerCrypto},
|
||||||
engine::{addr_nice, resolve, Hash, PeerData},
|
engine::{addr_nice, resolve, Hash, PeerData},
|
||||||
error::Error,
|
error::Error,
|
||||||
messages::{AddrList, NodeInfo, PeerInfo},
|
messages::{AddrList, NodeInfo, PeerInfo},
|
||||||
|
@ -38,7 +38,7 @@ pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
|
||||||
device: D,
|
device: D,
|
||||||
next_housekeep: Time,
|
next_housekeep: Time,
|
||||||
own_addresses: AddrList,
|
own_addresses: AddrList,
|
||||||
pending_inits: HashMap<SocketAddr, InitState<NodeInfo>, Hash>,
|
pending_inits: HashMap<SocketAddr, PeerCrypto<NodeInfo>, Hash>,
|
||||||
crypto: Crypto,
|
crypto: Crypto,
|
||||||
peers: HashMap<SocketAddr, PeerData, Hash>,
|
peers: HashMap<SocketAddr, PeerData, Hash>,
|
||||||
// Shared fields
|
// Shared fields
|
||||||
|
@ -48,7 +48,7 @@ pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
|
||||||
|
|
||||||
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS> {
|
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS> {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn send_to(&mut self, addr: SocketAddr, msg: &MsgBuffer) -> Result<(), Error> {
|
fn send_to(&mut self, addr: SocketAddr, msg: &mut MsgBuffer) -> Result<(), Error> {
|
||||||
debug!("Sending msg with {} bytes to {}", msg.len(), addr);
|
debug!("Sending msg with {} bytes to {}", msg.len(), addr);
|
||||||
self.traffic.count_out_traffic(addr, msg.len());
|
self.traffic.count_out_traffic(addr, msg.len());
|
||||||
match self.socket.send(msg.message(), addr) {
|
match self.socket.send(msg.message(), addr) {
|
||||||
|
@ -68,10 +68,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
}
|
}
|
||||||
debug!("Connecting to {:?}", addr);
|
debug!("Connecting to {:?}", addr);
|
||||||
let payload = self.create_node_info();
|
let payload = self.create_node_info();
|
||||||
let mut init = self.crypto.peer_instance(payload);
|
let mut peer_crypto = self.crypto.peer_instance(payload);
|
||||||
let mut msg = MsgBuffer::new(SPACE_BEFORE);
|
let mut msg = MsgBuffer::new(SPACE_BEFORE);
|
||||||
init.send_ping(&mut msg);
|
peer_crypto.initialize(&mut msg)?;
|
||||||
self.pending_inits.insert(addr, init);
|
self.pending_inits.insert(addr, peer_crypto);
|
||||||
self.send_to(addr, &mut msg)
|
self.send_to(addr, &mut msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,23 +129,18 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add_new_peer(&mut self, addr: SocketAddr, info: NodeInfo, msg: &mut MsgBuffer) -> Result<(), Error> {
|
fn add_new_peer(&mut self, addr: SocketAddr, info: NodeInfo) -> Result<(), Error> {
|
||||||
info!("Added peer {}", addr_nice(addr));
|
info!("Added peer {}", addr_nice(addr));
|
||||||
if let Some(init) = self.pending_inits.remove(&addr) {
|
if let Some(init) = self.pending_inits.remove(&addr) {
|
||||||
msg.clear();
|
|
||||||
let crypto = init.finish(&mut msg);
|
|
||||||
self.peers.insert(addr, PeerData {
|
self.peers.insert(addr, PeerData {
|
||||||
addrs: info.addrs.clone(),
|
addrs: info.addrs.clone(),
|
||||||
crypto,
|
crypto: init,
|
||||||
node_id: info.node_id,
|
node_id: info.node_id,
|
||||||
peer_timeout: info.peer_timeout.unwrap_or(DEFAULT_PEER_TIMEOUT),
|
peer_timeout: info.peer_timeout.unwrap_or(DEFAULT_PEER_TIMEOUT),
|
||||||
last_seen: TS::now(),
|
last_seen: TS::now(),
|
||||||
timeout: TS::now() + self.config.peer_timeout as Time
|
timeout: TS::now() + self.config.peer_timeout as Time
|
||||||
});
|
});
|
||||||
self.update_peer_info(addr, Some(info))?;
|
self.update_peer_info(addr, Some(info))?;
|
||||||
if !msg.is_empty() {
|
|
||||||
self.send_to(addr, msg)?;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
error!("No init for new peer {}", addr_nice(addr));
|
error!("No init for new peer {}", addr_nice(addr));
|
||||||
}
|
}
|
||||||
|
@ -198,7 +193,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_message(
|
fn process_message(
|
||||||
&mut self, src: SocketAddr, msg_result: MessageResult, data: &mut MsgBuffer
|
&mut self, src: SocketAddr, msg_result: MessageResult<NodeInfo>, data: &mut MsgBuffer
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
match msg_result {
|
match msg_result {
|
||||||
MessageResult::Message(type_) => {
|
MessageResult::Message(type_) => {
|
||||||
|
@ -222,6 +217,11 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
MessageResult::Initialized(info) => self.add_new_peer(src, info)?,
|
||||||
|
MessageResult::InitializedWithReply(info) => {
|
||||||
|
self.add_new_peer(src, info)?;
|
||||||
|
self.send_to(src, data)?
|
||||||
|
}
|
||||||
MessageResult::Reply => self.send_to(src, data)?,
|
MessageResult::Reply => self.send_to(src, data)?,
|
||||||
MessageResult::None => ()
|
MessageResult::None => ()
|
||||||
}
|
}
|
||||||
|
@ -231,67 +231,49 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
||||||
fn handle_message(&mut self, src: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
|
fn handle_message(&mut self, src: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
|
||||||
let src = mapped_addr(src);
|
let src = mapped_addr(src);
|
||||||
debug!("Received {} bytes from {}", data.len(), src);
|
debug!("Received {} bytes from {}", data.len(), src);
|
||||||
if let Some(result) = self.peers.get_mut(&src).map(|peer| {
|
let msg_result = if let Some(init) = self.pending_inits.get_mut(&src) {
|
||||||
peer.crypto.handle_message(data)
|
init.handle_message(data)
|
||||||
}) {
|
} else if is_init_message(data.message()) {
|
||||||
return self.process_message(src, result?, data)
|
let mut result = None;
|
||||||
}
|
if let Some(peer) = self.peers.get_mut(&src) {
|
||||||
let is_init = is_init_message(data.message());
|
if peer.crypto.has_init() {
|
||||||
if let Some(result) = self.pending_inits.get_mut(&src).map(|init| {
|
result = Some(peer.crypto.handle_message(data))
|
||||||
if is_init {
|
|
||||||
init.handle_init(data)
|
|
||||||
} else {
|
|
||||||
data.clear();
|
|
||||||
init.repeat_last_message(data);
|
|
||||||
Ok(InitResult::Continue)
|
|
||||||
}
|
|
||||||
}) {
|
|
||||||
match result? {
|
|
||||||
InitResult::Continue => {
|
|
||||||
if !data.is_empty() {
|
|
||||||
self.send_to(src, data)?
|
|
||||||
}
|
|
||||||
},
|
|
||||||
InitResult::Success { peer_payload, is_initiator } => {
|
|
||||||
self.add_new_peer(src, peer_payload, data)?
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Ok(())
|
if let Some(result) = result {
|
||||||
}
|
result
|
||||||
if !is_init_message(data.message()) {
|
} else {
|
||||||
|
let mut init = self.crypto.peer_instance(self.create_node_info());
|
||||||
|
let msg_result = init.handle_message(data);
|
||||||
|
match msg_result {
|
||||||
|
Ok(res) => {
|
||||||
|
self.pending_inits.insert(src, init);
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
self.traffic.count_invalid_protocol(data.len());
|
||||||
|
return Err(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if let Some(peer) = self.peers.get_mut(&src) {
|
||||||
|
peer.crypto.handle_message(data)
|
||||||
|
} else {
|
||||||
info!("Ignoring non-init message from unknown peer {}", addr_nice(src));
|
info!("Ignoring non-init message from unknown peer {}", addr_nice(src));
|
||||||
self.traffic.count_invalid_protocol(data.len());
|
self.traffic.count_invalid_protocol(data.len());
|
||||||
return Ok(())
|
return Ok(())
|
||||||
}
|
};
|
||||||
let mut init = self.crypto.peer_instance(self.create_node_info());
|
|
||||||
let msg_result = init.handle_init(data);
|
|
||||||
match msg_result {
|
match msg_result {
|
||||||
Ok(res) => {
|
Ok(val) => self.process_message(src, val, data),
|
||||||
self.pending_inits.insert(src, init);
|
|
||||||
self.send_to(src, data)
|
|
||||||
}
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
self.traffic.count_invalid_protocol(data.len());
|
self.traffic.count_invalid_protocol(data.len());
|
||||||
return Err(err)
|
Err(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn housekeep(&mut self) -> Result<(), Error> {
|
fn housekeep(&mut self) -> Result<(), Error> {
|
||||||
// self.shared.sync();
|
// self.shared.sync();
|
||||||
// * = can be in different thread, ** only with caching/sync
|
|
||||||
//TODO: peers: timeout **
|
|
||||||
//TODO: table: timeout **
|
|
||||||
//TODO: rotate crypto keys
|
|
||||||
//TODO: time out pending inits
|
|
||||||
//TODO: extend port forwarding *
|
|
||||||
//TODO: reset own address **
|
|
||||||
//TODO: send peer lists **
|
|
||||||
//TODO: reconnect to peers **
|
|
||||||
//TODO: write to statsd **
|
|
||||||
//TODO: write to stats file **
|
|
||||||
//TODO: read beacon **
|
|
||||||
//TODO: write beacon **
|
|
||||||
// TODO: sync
|
// TODO: sync
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue