mirror of https://github.com/dswd/vpncloud.git
Compare commits
No commits in common. "675031a4ae00074f319de1ef09e5debff6ffc6a9" and "21d58f25a3b86bc83b846253fbb35f12af2a7838" have entirely different histories.
675031a4ae
...
21d58f25a3
|
@ -241,7 +241,7 @@ impl PeerCrypto {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn encrypt_message(&mut self, buffer: &mut MsgBuffer) {
|
||||
fn encrypt_message(&mut self, buffer: &mut MsgBuffer) {
|
||||
// HOT PATH
|
||||
if let PeerCrypto::Encrypted { core, .. } = self {
|
||||
core.encrypt(buffer)
|
||||
|
@ -283,6 +283,13 @@ impl PeerCrypto {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn send_message(&mut self, type_: u8, buffer: &mut MsgBuffer) {
|
||||
// HOT PATH
|
||||
assert_ne!(type_, MESSAGE_TYPE_ROTATION);
|
||||
buffer.prepend_byte(type_);
|
||||
self.encrypt_message(buffer);
|
||||
}
|
||||
|
||||
pub fn every_second(&mut self, out: &mut MsgBuffer) -> MessageResult {
|
||||
out.clear();
|
||||
if let PeerCrypto::Encrypted { core, rotation, rotate_counter, algorithm, .. } = self {
|
||||
|
@ -360,8 +367,7 @@ mod tests {
|
|||
buffer.set_length(1000);
|
||||
rng.fill(buffer.message_mut()).unwrap();
|
||||
for _ in 0..1000 {
|
||||
buffer.prepend_byte(1);
|
||||
node1.encrypt_message(&mut buffer);
|
||||
node1.send_message(1, &mut buffer);
|
||||
let res = node2.handle_message(&mut buffer).unwrap();
|
||||
assert_eq!(res, MessageResult::Message(1));
|
||||
|
||||
|
|
|
@ -552,7 +552,7 @@ impl<P: Payload> InitState<P> {
|
|||
out.set_length(len);
|
||||
}
|
||||
|
||||
pub fn repeat_last_message(&self, out: &mut MsgBuffer) {
|
||||
fn repeat_last_message(&self, out: &mut MsgBuffer) {
|
||||
if let Some(ref bytes) = self.last_message {
|
||||
debug!("Repeating last init message");
|
||||
let buffer = out.buffer();
|
||||
|
|
|
@ -42,9 +42,11 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
|
|||
#[inline]
|
||||
fn send_msg(&mut self, addr: SocketAddr, type_: u8, msg: &mut MsgBuffer) -> Result<(), Error> {
|
||||
debug!("Sending msg with {} bytes to {}", msg.len(), addr);
|
||||
msg.prepend_byte(type_);
|
||||
self.peer_crypto.encrypt_for(addr, msg)?;
|
||||
self.send_to(addr, msg)
|
||||
if self.peer_crypto.send_message(addr, type_, msg)? {
|
||||
self.send_to(addr, msg)
|
||||
} else {
|
||||
Err(Error::Message("Sending to node that is not a peer"))
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
@ -55,10 +57,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
|
|||
msg_data.set_start(msg.get_start());
|
||||
msg_data.set_length(msg.len());
|
||||
msg_data.message_mut().clone_from_slice(msg.message());
|
||||
msg_data.prepend_byte(type_);
|
||||
if let Some(crypto) = crypto {
|
||||
crypto.encrypt(&mut msg_data);
|
||||
}
|
||||
crypto.send_message(type_, &mut msg_data)?;
|
||||
self.traffic.count_out_traffic(addr, msg_data.len());
|
||||
match self.socket.send(msg_data.message(), addr) {
|
||||
Ok(written) if written == msg_data.len() => Ok(()),
|
||||
|
|
|
@ -26,7 +26,7 @@ use smallvec::{smallvec, SmallVec};
|
|||
use crate::{
|
||||
beacon::BeaconSerializer,
|
||||
config::{Config, DEFAULT_PEER_TIMEOUT, DEFAULT_PORT},
|
||||
crypto::{is_init_message, Crypto, MessageResult, PeerCrypto, InitState, InitResult},
|
||||
crypto::{is_init_message, Crypto, MessageResult, PeerCrypto},
|
||||
device::{Device, Type},
|
||||
error::Error,
|
||||
messages::{
|
||||
|
@ -58,7 +58,7 @@ struct PeerData {
|
|||
timeout: Time,
|
||||
peer_timeout: u16,
|
||||
node_id: NodeId,
|
||||
crypto: PeerCrypto
|
||||
crypto: PeerCrypto<NodeInfo>
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -190,8 +190,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
|
|||
msg_data.set_start(msg.get_start());
|
||||
msg_data.set_length(msg.len());
|
||||
msg_data.message_mut().clone_from_slice(msg.message());
|
||||
msg_data.prepend_byte(type_);
|
||||
peer.crypto.encrypt_message(&mut msg_data);
|
||||
peer.crypto.send_message(type_, &mut msg_data)?;
|
||||
self.traffic.count_out_traffic(*addr, msg_data.len());
|
||||
match self.socket.send(msg_data.message(), *addr) {
|
||||
Ok(written) if written == msg_data.len() => Ok(()),
|
||||
|
@ -222,8 +221,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
|
|||
Some(peer) => peer,
|
||||
None => return Err(Error::Message("Sending to node that is not a peer"))
|
||||
};
|
||||
msg.prepend_byte(type_);
|
||||
peer.crypto.encrypt_message(msg);
|
||||
peer.crypto.send_message(type_, msg)?;
|
||||
self.send_to(addr, msg)
|
||||
}
|
||||
|
||||
|
@ -332,7 +330,7 @@ impl<D: Device, P: Protocol, S: Socket, TS: TimeSource> GenericCloud<D, P, S, TS
|
|||
let payload = self.create_node_info();
|
||||
let mut peer_crypto = self.crypto.peer_instance(payload);
|
||||
let mut msg = MsgBuffer::new(SPACE_BEFORE);
|
||||
peer_crypto.send_ping(&mut msg);
|
||||
peer_crypto.initialize(&mut msg)?;
|
||||
self.pending_inits.insert(addr, peer_crypto);
|
||||
self.send_to(addr, &mut msg)
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ use parking_lot::Mutex;
|
|||
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
|
||||
|
||||
pub struct SharedPeerCrypto {
|
||||
peers: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>
|
||||
peers: Arc<Mutex<HashMap<SocketAddr, Arc<CryptoCore>, Hash>>>
|
||||
}
|
||||
|
||||
impl SharedPeerCrypto {
|
||||
|
@ -20,19 +20,20 @@ impl SharedPeerCrypto {
|
|||
// TODO sync if needed
|
||||
}
|
||||
|
||||
pub fn encrypt_for(&mut self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
|
||||
pub fn send_message(&mut self, peer: SocketAddr, type_: u8, data: &mut MsgBuffer) -> Result<bool, Error> {
|
||||
let mut peers = self.peers.lock();
|
||||
match peers.get_mut(&peer) {
|
||||
None => Err(Error::InvalidCryptoState("No crypto found for peer")),
|
||||
Some(None) => Ok(()),
|
||||
Some(Some(crypto)) => Ok(crypto.encrypt(data))
|
||||
}
|
||||
if let Some(peer) = peers.get_mut(&peer) {
|
||||
peer.send_message(type_, data);
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn for_each(&mut self, mut callback: impl FnMut(SocketAddr, Option<Arc<CryptoCore>>) -> Result<(), Error>) -> Result<(), Error> {
|
||||
pub fn for_each(&mut self, mut callback: impl FnMut(SocketAddr, &mut CryptoCore) -> Result<(), Error>) -> Result<(), Error> {
|
||||
let mut peers = self.peers.lock();
|
||||
for (k, v) in peers.iter_mut() {
|
||||
callback(*k, v.clone())?
|
||||
callback(*k, v)?
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@ use super::{
|
|||
|
||||
use crate::{
|
||||
config::DEFAULT_PEER_TIMEOUT,
|
||||
crypto::{is_init_message, MessageResult, PeerCrypto, InitState, InitResult},
|
||||
crypto::{is_init_message, MessageResult, PeerCrypto},
|
||||
engine::{addr_nice, resolve, Hash, PeerData},
|
||||
error::Error,
|
||||
messages::{AddrList, NodeInfo, PeerInfo},
|
||||
|
@ -38,7 +38,7 @@ pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
|
|||
device: D,
|
||||
next_housekeep: Time,
|
||||
own_addresses: AddrList,
|
||||
pending_inits: HashMap<SocketAddr, InitState<NodeInfo>, Hash>,
|
||||
pending_inits: HashMap<SocketAddr, PeerCrypto<NodeInfo>, Hash>,
|
||||
crypto: Crypto,
|
||||
peers: HashMap<SocketAddr, PeerData, Hash>,
|
||||
// Shared fields
|
||||
|
@ -48,7 +48,7 @@ pub struct SocketThread<S: Socket, D: Device, P: Protocol, TS: TimeSource> {
|
|||
|
||||
impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS> {
|
||||
#[inline]
|
||||
fn send_to(&mut self, addr: SocketAddr, msg: &MsgBuffer) -> Result<(), Error> {
|
||||
fn send_to(&mut self, addr: SocketAddr, msg: &mut MsgBuffer) -> Result<(), Error> {
|
||||
debug!("Sending msg with {} bytes to {}", msg.len(), addr);
|
||||
self.traffic.count_out_traffic(addr, msg.len());
|
||||
match self.socket.send(msg.message(), addr) {
|
||||
|
@ -68,10 +68,10 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
}
|
||||
debug!("Connecting to {:?}", addr);
|
||||
let payload = self.create_node_info();
|
||||
let mut init = self.crypto.peer_instance(payload);
|
||||
let mut peer_crypto = self.crypto.peer_instance(payload);
|
||||
let mut msg = MsgBuffer::new(SPACE_BEFORE);
|
||||
init.send_ping(&mut msg);
|
||||
self.pending_inits.insert(addr, init);
|
||||
peer_crypto.initialize(&mut msg)?;
|
||||
self.pending_inits.insert(addr, peer_crypto);
|
||||
self.send_to(addr, &mut msg)
|
||||
}
|
||||
|
||||
|
@ -129,23 +129,18 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn add_new_peer(&mut self, addr: SocketAddr, info: NodeInfo, msg: &mut MsgBuffer) -> Result<(), Error> {
|
||||
fn add_new_peer(&mut self, addr: SocketAddr, info: NodeInfo) -> Result<(), Error> {
|
||||
info!("Added peer {}", addr_nice(addr));
|
||||
if let Some(init) = self.pending_inits.remove(&addr) {
|
||||
msg.clear();
|
||||
let crypto = init.finish(&mut msg);
|
||||
self.peers.insert(addr, PeerData {
|
||||
addrs: info.addrs.clone(),
|
||||
crypto,
|
||||
crypto: init,
|
||||
node_id: info.node_id,
|
||||
peer_timeout: info.peer_timeout.unwrap_or(DEFAULT_PEER_TIMEOUT),
|
||||
last_seen: TS::now(),
|
||||
timeout: TS::now() + self.config.peer_timeout as Time
|
||||
});
|
||||
self.update_peer_info(addr, Some(info))?;
|
||||
if !msg.is_empty() {
|
||||
self.send_to(addr, msg)?;
|
||||
}
|
||||
} else {
|
||||
error!("No init for new peer {}", addr_nice(addr));
|
||||
}
|
||||
|
@ -198,7 +193,7 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
}
|
||||
|
||||
fn process_message(
|
||||
&mut self, src: SocketAddr, msg_result: MessageResult, data: &mut MsgBuffer
|
||||
&mut self, src: SocketAddr, msg_result: MessageResult<NodeInfo>, data: &mut MsgBuffer
|
||||
) -> Result<(), Error> {
|
||||
match msg_result {
|
||||
MessageResult::Message(type_) => {
|
||||
|
@ -222,6 +217,11 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
}
|
||||
}
|
||||
}
|
||||
MessageResult::Initialized(info) => self.add_new_peer(src, info)?,
|
||||
MessageResult::InitializedWithReply(info) => {
|
||||
self.add_new_peer(src, info)?;
|
||||
self.send_to(src, data)?
|
||||
}
|
||||
MessageResult::Reply => self.send_to(src, data)?,
|
||||
MessageResult::None => ()
|
||||
}
|
||||
|
@ -231,67 +231,49 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> SocketThread<S, D, P, TS
|
|||
fn handle_message(&mut self, src: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
|
||||
let src = mapped_addr(src);
|
||||
debug!("Received {} bytes from {}", data.len(), src);
|
||||
if let Some(result) = self.peers.get_mut(&src).map(|peer| {
|
||||
peer.crypto.handle_message(data)
|
||||
}) {
|
||||
return self.process_message(src, result?, data)
|
||||
}
|
||||
let is_init = is_init_message(data.message());
|
||||
if let Some(result) = self.pending_inits.get_mut(&src).map(|init| {
|
||||
if is_init {
|
||||
init.handle_init(data)
|
||||
} else {
|
||||
data.clear();
|
||||
init.repeat_last_message(data);
|
||||
Ok(InitResult::Continue)
|
||||
}
|
||||
}) {
|
||||
match result? {
|
||||
InitResult::Continue => {
|
||||
if !data.is_empty() {
|
||||
self.send_to(src, data)?
|
||||
}
|
||||
},
|
||||
InitResult::Success { peer_payload, is_initiator } => {
|
||||
self.add_new_peer(src, peer_payload, data)?
|
||||
let msg_result = if let Some(init) = self.pending_inits.get_mut(&src) {
|
||||
init.handle_message(data)
|
||||
} else if is_init_message(data.message()) {
|
||||
let mut result = None;
|
||||
if let Some(peer) = self.peers.get_mut(&src) {
|
||||
if peer.crypto.has_init() {
|
||||
result = Some(peer.crypto.handle_message(data))
|
||||
}
|
||||
}
|
||||
return Ok(())
|
||||
}
|
||||
if !is_init_message(data.message()) {
|
||||
if let Some(result) = result {
|
||||
result
|
||||
} else {
|
||||
let mut init = self.crypto.peer_instance(self.create_node_info());
|
||||
let msg_result = init.handle_message(data);
|
||||
match msg_result {
|
||||
Ok(res) => {
|
||||
self.pending_inits.insert(src, init);
|
||||
Ok(res)
|
||||
}
|
||||
Err(err) => {
|
||||
self.traffic.count_invalid_protocol(data.len());
|
||||
return Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if let Some(peer) = self.peers.get_mut(&src) {
|
||||
peer.crypto.handle_message(data)
|
||||
} else {
|
||||
info!("Ignoring non-init message from unknown peer {}", addr_nice(src));
|
||||
self.traffic.count_invalid_protocol(data.len());
|
||||
return Ok(())
|
||||
}
|
||||
let mut init = self.crypto.peer_instance(self.create_node_info());
|
||||
let msg_result = init.handle_init(data);
|
||||
};
|
||||
match msg_result {
|
||||
Ok(res) => {
|
||||
self.pending_inits.insert(src, init);
|
||||
self.send_to(src, data)
|
||||
}
|
||||
Ok(val) => self.process_message(src, val, data),
|
||||
Err(err) => {
|
||||
self.traffic.count_invalid_protocol(data.len());
|
||||
return Err(err)
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn housekeep(&mut self) -> Result<(), Error> {
|
||||
// self.shared.sync();
|
||||
// * = can be in different thread, ** only with caching/sync
|
||||
//TODO: peers: timeout **
|
||||
//TODO: table: timeout **
|
||||
//TODO: rotate crypto keys
|
||||
//TODO: time out pending inits
|
||||
//TODO: extend port forwarding *
|
||||
//TODO: reset own address **
|
||||
//TODO: send peer lists **
|
||||
//TODO: reconnect to peers **
|
||||
//TODO: write to statsd **
|
||||
//TODO: write to stats file **
|
||||
//TODO: read beacon **
|
||||
//TODO: write beacon **
|
||||
// TODO: sync
|
||||
unimplemented!();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue