More sync

This commit is contained in:
Dennis Schwerdel 2021-05-27 08:09:09 +02:00
parent edb841adb1
commit 76bb4aa4b6
2 changed files with 29 additions and 21 deletions

View File

@ -90,8 +90,8 @@ impl<S: Socket, D: Device, P: Protocol, TS: TimeSource> DeviceThread<S, D, P, TS
if let Some(crypto) = crypto { if let Some(crypto) = crypto {
crypto.encrypt(&mut self.broadcast_buffer); crypto.encrypt(&mut self.broadcast_buffer);
} }
traffic.count_out_traffic(addr, self.broadcast_buffer.len()); traffic.count_out_traffic(*addr, self.broadcast_buffer.len());
match socket.send(self.broadcast_buffer.message(), addr).await { match socket.send(self.broadcast_buffer.message(), *addr).await {
Ok(written) if written == self.broadcast_buffer.len() => Ok(()), Ok(written) if written == self.broadcast_buffer.len() => Ok(()),
Ok(_) => Err(Error::Socket("Sent out truncated packet")), Ok(_) => Err(Error::Socket("Sent out truncated packet")),
Err(e) => Err(Error::SocketIo("IOError when sending", e)), Err(e) => Err(Error::SocketIo("IOError when sending", e)),

View File

@ -21,45 +21,53 @@ use super::common::PeerData;
#[derive(Clone)] #[derive(Clone)]
pub struct SharedPeerCrypto { pub struct SharedPeerCrypto {
peers: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>, peers: Arc<Mutex<HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>>>,
//TODO: local hashmap as cache cache: HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash>, //TODO: local hashmap as cache
} }
impl SharedPeerCrypto { impl SharedPeerCrypto {
pub fn new() -> Self { pub fn new() -> Self {
SharedPeerCrypto { peers: Arc::new(Mutex::new(HashMap::default())) } SharedPeerCrypto { peers: Arc::new(Mutex::new(HashMap::default())), cache: HashMap::default() }
} }
pub fn encrypt_for(&self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> { pub fn encrypt_for(&mut self, peer: SocketAddr, data: &mut MsgBuffer) -> Result<(), Error> {
//TODO: use cache first let crypto = match self.cache.get(&peer) {
let mut peers = self.peers.lock(); Some(crypto) => crypto,
match peers.get_mut(&peer) { None => {
None => Err(Error::InvalidCryptoState("No crypto found for peer")), let peers = self.peers.lock();
Some(None) => Ok(()), if let Some(crypto) = peers.get(&peer) {
Some(Some(crypto)) => { self.cache.insert(peer, crypto.clone());
crypto.encrypt(data); self.cache.get(&peer).unwrap()
Ok(()) } else {
return Err(Error::InvalidCryptoState("No crypto found for peer"));
}
} }
};
if let Some(crypto) = crypto {
crypto.encrypt(data);
} }
Ok(())
} }
pub fn store(&self, data: &HashMap<SocketAddr, PeerData, Hash>) { pub fn store(&mut self, data: &HashMap<SocketAddr, PeerData, Hash>) {
//TODO: store in shared and in cache self.cache.clear();
self.cache.extend(data.iter().map(|(k, v)| (*k, v.crypto.get_core())));
let mut peers = self.peers.lock(); let mut peers = self.peers.lock();
peers.clear(); peers.clear();
peers.extend(data.iter().map(|(k, v)| (*k, v.crypto.get_core()))); peers.extend(self.cache.iter().map(|(k, v)| (*k, v.clone())));
} }
pub fn load(&mut self) { pub fn load(&mut self) {
// TODO sync if needed let peers = self.peers.lock();
self.cache.clear();
self.cache.extend(peers.iter().map(|(k, v)| (*k, v.clone())));
} }
pub fn get_snapshot(&self) -> HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash> { pub fn get_snapshot(&mut self) -> &HashMap<SocketAddr, Option<Arc<CryptoCore>>, Hash> {
//TODO: return local cache &self.cache
self.peers.lock().clone()
} }
pub fn count(&self) -> usize { pub fn count(&self) -> usize {
self.peers.lock().len() self.cache.len()
} }
} }