From 5964625ec7f996689e96a8b659cc14d7e70ffab3 Mon Sep 17 00:00:00 2001 From: Dennis Schwerdel Date: Tue, 21 Mar 2017 15:38:42 +0100 Subject: [PATCH] Remote folder --- src/bundledb/bundle.rs | 24 +++-- src/bundledb/db.rs | 194 +++++++++++++++++++++++++---------- src/bundledb/writer.rs | 13 +-- src/main.rs | 3 - src/repository/bundle_map.rs | 4 +- src/repository/integrity.rs | 4 +- src/repository/mod.rs | 2 +- src/util/lru_cache.rs | 11 ++ 8 files changed, 181 insertions(+), 74 deletions(-) diff --git a/src/bundledb/bundle.rs b/src/bundledb/bundle.rs index bf346ad..f10e764 100644 --- a/src/bundledb/bundle.rs +++ b/src/bundledb/bundle.rs @@ -128,20 +128,32 @@ impl Bundle { self.info.id.clone() } - pub fn load(path: PathBuf, crypto: Arc>) -> Result { - let mut file = BufReader::new(try!(File::open(&path).context(&path as &Path))); + fn load_header>(path: P) -> Result<(BundleInfo, u8, usize), BundleError> { + let path = path.as_ref(); + let mut file = BufReader::new(try!(File::open(path).context(path))); let mut header = [0u8; 8]; - try!(file.read_exact(&mut header).context(&path as &Path)); + try!(file.read_exact(&mut header).context(path)); if header[..HEADER_STRING.len()] != HEADER_STRING { - return Err(BundleError::WrongHeader(path.clone())) + return Err(BundleError::WrongHeader(path.to_path_buf())) } let version = header[HEADER_STRING.len()]; if version != HEADER_VERSION { - return Err(BundleError::WrongVersion(path.clone(), version)) + return Err(BundleError::WrongVersion(path.to_path_buf(), version)) } - let header: BundleInfo = try!(msgpack::decode_from_stream(&mut file).context(&path as &Path)); + let header: BundleInfo = try!(msgpack::decode_from_stream(&mut file).context(path)); debug!("Load bundle {}", header.id); let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize + header.chunk_info_size; + Ok((header, version, content_start)) + } + + #[inline] + pub fn load_info>(path: P) -> Result { + Self::load_header(path).map(|b| b.0) + } + + #[inline] + pub fn load(path: PathBuf, crypto: Arc>) -> Result { + let (header, version, content_start) = try!(Self::load_header(&path)); Ok(Bundle::new(path, version, content_start, crypto, header)) } diff --git a/src/bundledb/db.rs b/src/bundledb/db.rs index 293ba19..3a83c83 100644 --- a/src/bundledb/db.rs +++ b/src/bundledb/db.rs @@ -2,17 +2,103 @@ use ::prelude::*; use super::*; use std::path::{Path, PathBuf}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fs; use std::sync::{Arc, Mutex}; +pub fn bundle_path(bundle: &BundleId, mut folder: PathBuf, mut count: usize) -> (PathBuf, PathBuf) { + let mut file = bundle.to_string().to_owned() + ".bundle"; + while count >= 100 { + if file.len() < 10 { + break + } + folder = folder.join(&file[0..2]); + file = file[2..].to_string(); + count /= 100; + } + (folder, file.into()) +} + +pub fn load_bundles>(path: P, bundles: &mut HashMap) -> Result<(Vec, Vec), BundleError> { + let mut paths = vec![path.as_ref().to_path_buf()]; + let mut bundle_paths = HashSet::new(); + while let Some(path) = paths.pop() { + for entry in try!(fs::read_dir(path).map_err(BundleError::List)) { + let entry = try!(entry.map_err(BundleError::List)); + let path = entry.path(); + if path.is_dir() { + paths.push(path); + } else { + bundle_paths.insert(path); + } + } + } + let mut gone = vec![]; + for (id, bundle) in bundles.iter_mut() { + if !bundle_paths.contains(&bundle.path) { + gone.push(id.clone()); + } else { + bundle_paths.remove(&bundle.path); + } + } + let gone = gone.iter().map(|id| bundles.remove(&id).unwrap().info).collect(); + let mut new = vec![]; + for path in bundle_paths { + let bundle = StoredBundle { + info: try!(Bundle::load_info(&path)), + path: path + }; + new.push(bundle.info.id.clone()); + bundles.insert(bundle.info.id.clone(), bundle); + } + Ok((new, gone)) +} + + +#[derive(Clone, Default)] +pub struct StoredBundle { + pub info: BundleInfo, + pub path: PathBuf +} +serde_impl!(StoredBundle(u64) { + info: BundleInfo => 0, + path: PathBuf => 1 +}); + +impl StoredBundle { + #[inline] + pub fn id(&self) -> BundleId { + self.info.id.clone() + } + + pub fn move_to>(mut self, path: P) -> Result { + let path = path.as_ref(); + if fs::rename(&self.path, path).is_err() { + try!(fs::copy(&self.path, path).context(path)); + try!(fs::remove_file(&self.path).context(&self.path as &Path)); + } + self.path = path.to_path_buf(); + Ok(self) + } + + pub fn copy_to>(&self, path: P) -> Result { + let path = path.as_ref(); + try!(fs::copy(&self.path, path).context(path)); + let mut bundle = self.clone(); + bundle.path = path.to_path_buf(); + Ok(bundle) + } +} + + pub struct BundleDb { remote_path: PathBuf, local_path: PathBuf, crypto: Arc>, - bundles: HashMap, - bundle_cache: LruCache> + local_bundles: HashMap, + remote_bundles: HashMap, + bundle_cache: LruCache)> } @@ -22,52 +108,28 @@ impl BundleDb { remote_path: remote_path, local_path: local_path, crypto: crypto, - bundles: HashMap::new(), + local_bundles: HashMap::new(), + remote_bundles: HashMap::new(), bundle_cache: LruCache::new(5, 10) } } - pub fn bundle_path(&self, bundle: &BundleId) -> (PathBuf, PathBuf) { - let mut folder = self.remote_path.clone(); - let mut file = bundle.to_string().to_owned() + ".bundle"; - let mut count = self.bundles.len(); - while count >= 100 { - if file.len() < 10 { - break - } - folder = folder.join(&file[0..2]); - file = file[2..].to_string(); - count /= 100; - } - (folder, file.into()) + fn load_bundle_list(&mut self) -> Result<(Vec, Vec), BundleError> { + try!(load_bundles(self.local_path.join("cached"), &mut self.local_bundles)); + load_bundles(&self.remote_path, &mut self.remote_bundles) } - fn load_bundle_list(&mut self) -> Result<(), BundleError> { - self.bundles.clear(); - let mut paths = Vec::new(); - paths.push(self.remote_path.clone()); - while let Some(path) = paths.pop() { - for entry in try!(fs::read_dir(path).map_err(BundleError::List)) { - let entry = try!(entry.map_err(BundleError::List)); - let path = entry.path(); - if path.is_dir() { - paths.push(path); - } else { - let bundle = try!(Bundle::load(path, self.crypto.clone())); - self.bundles.insert(bundle.id(), bundle); - } - } - } - Ok(()) + pub fn temp_bundle_path(&self, id: &BundleId) -> PathBuf { + self.local_path.join("temp").join(id.to_string().to_owned() + ".bundle") } #[inline] - pub fn open, L: AsRef>(remote_path: R, local_path: L, crypto: Arc>) -> Result { + pub fn open, L: AsRef>(remote_path: R, local_path: L, crypto: Arc>) -> Result<(Self, Vec, Vec), BundleError> { let remote_path = remote_path.as_ref().to_owned(); let local_path = local_path.as_ref().to_owned(); let mut self_ = Self::new(remote_path, local_path, crypto); - try!(self_.load_bundle_list()); - Ok(self_) + let (new, gone) = try!(self_.load_bundle_list()); + Ok((self_, new, gone)) } #[inline] @@ -75,7 +137,8 @@ impl BundleDb { let remote_path = remote_path.as_ref().to_owned(); let local_path = local_path.as_ref().to_owned(); try!(fs::create_dir_all(&remote_path).context(&remote_path as &Path)); - try!(fs::create_dir_all(&local_path).context(&local_path as &Path)); + try!(fs::create_dir_all(local_path.join("cached")).context(&local_path as &Path)); + try!(fs::create_dir_all(local_path.join("temp")).context(&local_path as &Path)); Ok(Self::new(remote_path, local_path, crypto)) } @@ -90,41 +153,67 @@ impl BundleDb { BundleWriter::new(mode, hash_method, compression, encryption, self.crypto.clone()) } + fn get_stored_bundle(&self, bundle_id: &BundleId) -> Result<&StoredBundle, BundleError> { + if let Some(stored) = self.local_bundles.get(bundle_id).or_else(|| self.remote_bundles.get(bundle_id)) { + Ok(stored) + } else { + Err(BundleError::NoSuchBundle(bundle_id.clone())) + } + } + + fn get_bundle(&self, stored: &StoredBundle) -> Result { + Bundle::load(stored.path.clone(), self.crypto.clone()) + } + pub fn get_chunk(&mut self, bundle_id: &BundleId, id: usize) -> Result, BundleError> { - let bundle = try!(self.bundles.get_mut(bundle_id).ok_or(BundleError::NoSuchBundle(bundle_id.clone()))); - let (pos, len) = try!(bundle.get_chunk_position(id)); - let mut chunk = Vec::with_capacity(len); - if let Some(data) = self.bundle_cache.get(bundle_id) { + if let Some(&mut (ref mut bundle, ref data)) = self.bundle_cache.get_mut(bundle_id) { + let (pos, len) = try!(bundle.get_chunk_position(id)); + let mut chunk = Vec::with_capacity(len); chunk.extend_from_slice(&data[pos..pos+len]); return Ok(chunk); } + let mut bundle = try!(self.get_stored_bundle(bundle_id).and_then(|s| self.get_bundle(s))); + let (pos, len) = try!(bundle.get_chunk_position(id)); + let mut chunk = Vec::with_capacity(len); let data = try!(bundle.load_contents()); chunk.extend_from_slice(&data[pos..pos+len]); - self.bundle_cache.put(bundle_id.clone(), data); + self.bundle_cache.put(bundle_id.clone(), (bundle, data)); Ok(chunk) } #[inline] - pub fn add_bundle(&mut self, bundle: BundleWriter) -> Result<&Bundle, BundleError> { + pub fn add_bundle(&mut self, bundle: BundleWriter) -> Result { let bundle = try!(bundle.finish(&self)); let id = bundle.id(); - self.bundles.insert(id.clone(), bundle); - Ok(self.get_bundle(&id).unwrap()) + if bundle.info.mode == BundleMode::Meta { + let (folder, filename) = bundle_path(&id, self.local_path.join("cached"), self.local_bundles.len()); + try!(fs::create_dir_all(&folder).context(&folder as &Path)); + let bundle = try!(bundle.copy_to(folder.join(filename))); + self.local_bundles.insert(id.clone(), bundle); + } + let (folder, filename) = bundle_path(&id, self.remote_path.clone(), self.remote_bundles.len()); + try!(fs::create_dir_all(&folder).context(&folder as &Path)); + let bundle = try!(bundle.copy_to(folder.join(filename))); + self.remote_bundles.insert(bundle.id(), bundle.clone()); + Ok(bundle.info) } #[inline] - pub fn get_bundle(&self, bundle: &BundleId) -> Option<&Bundle> { - self.bundles.get(bundle) + pub fn get_bundle_info(&self, bundle: &BundleId) -> Option<&BundleInfo> { + self.get_stored_bundle(bundle).ok().map(|stored| &stored.info) } #[inline] - pub fn list_bundles(&self) -> Vec<&Bundle> { - self.bundles.values().collect() + pub fn list_bundles(&self) -> Vec<&BundleInfo> { + self.remote_bundles.values().map(|b| &b.info).collect() } #[inline] pub fn delete_bundle(&mut self, bundle: &BundleId) -> Result<(), BundleError> { - if let Some(bundle) = self.bundles.remove(bundle) { + if let Some(bundle) = self.local_bundles.remove(bundle) { + try!(fs::remove_file(&bundle.path).map_err(|e| BundleError::Remove(e, bundle.id()))) + } + if let Some(bundle) = self.remote_bundles.remove(bundle) { fs::remove_file(&bundle.path).map_err(|e| BundleError::Remove(e, bundle.id())) } else { Err(BundleError::NoSuchBundle(bundle.clone())) @@ -133,7 +222,8 @@ impl BundleDb { #[inline] pub fn check(&mut self, full: bool) -> Result<(), BundleError> { - for bundle in self.bundles.values_mut() { + for stored in self.remote_bundles.values() { + let mut bundle = try!(self.get_bundle(stored)); try!(bundle.check(full)) } Ok(()) diff --git a/src/bundledb/writer.rs b/src/bundledb/writer.rs index 43bf4c2..63a0dd3 100644 --- a/src/bundledb/writer.rs +++ b/src/bundledb/writer.rs @@ -2,8 +2,8 @@ use ::prelude::*; use super::*; use std::path::Path; -use std::fs::{self, File}; -use std::io::{Write, Seek, SeekFrom, BufWriter}; +use std::fs::File; +use std::io::{Write, BufWriter}; use std::sync::{Arc, Mutex}; @@ -58,7 +58,7 @@ impl BundleWriter { Ok(self.chunk_count-1) } - pub fn finish(mut self, db: &BundleDb) -> Result { + pub fn finish(mut self, db: &BundleDb) -> Result { if let Some(stream) = self.compression_stream { try!(stream.finish(&mut self.data)) } @@ -72,9 +72,7 @@ impl BundleWriter { if let Some(ref encryption) = self.encryption { chunk_data = try!(self.crypto.lock().unwrap().encrypt(&encryption, &chunk_data)); } - let (folder, file) = db.bundle_path(&id); - let path = folder.join(file); - try!(fs::create_dir_all(&folder).context(&path as &Path)); + let path = db.temp_bundle_path(&id); let mut file = BufWriter::new(try!(File::create(&path).context(&path as &Path))); try!(file.write_all(&HEADER_STRING).context(&path as &Path)); try!(file.write_all(&[HEADER_VERSION]).context(&path as &Path)); @@ -91,9 +89,8 @@ impl BundleWriter { }; try!(msgpack::encode_to_stream(&header, &mut file).context(&path as &Path)); try!(file.write_all(&chunk_data).context(&path as &Path)); - let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize; try!(file.write_all(&self.data).context(&path as &Path)); - Ok(Bundle::new(path, HEADER_VERSION, content_start, self.crypto, header)) + Ok(StoredBundle { path: path, info: header }) } #[inline] diff --git a/src/main.rs b/src/main.rs index b589f33..4164b6f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,10 +26,7 @@ mod prelude; // TODO: Seperate remote folder // TODO: - Copy/move backup files to remote folder -// TODO: - Keep meta bundles also locally // TODO: - Load and compare remote bundles to bundle map -// TODO: - Write backup files there as well -// TODO: - Avoid loading remote backups // TODO: - Lock during vacuum // TODO: Remove backup subtrees // TODO: Recompress & combine bundles diff --git a/src/repository/bundle_map.rs b/src/repository/bundle_map.rs index 7fff73b..e9e26d6 100644 --- a/src/repository/bundle_map.rs +++ b/src/repository/bundle_map.rs @@ -94,8 +94,8 @@ impl BundleMap { } #[inline] - pub fn set(&mut self, id: u32, bundle: &Bundle) { - let data = BundleData { info: bundle.info.clone() }; + pub fn set(&mut self, id: u32, bundle: BundleInfo) { + let data = BundleData { info: bundle }; self.0.insert(id, data); } diff --git a/src/repository/integrity.rs b/src/repository/integrity.rs index 0d05a7b..d96cb77 100644 --- a/src/repository/integrity.rs +++ b/src/repository/integrity.rs @@ -46,13 +46,13 @@ impl Repository { // Lookup bundle id from map let bundle_id = try!(self.get_bundle_id(entry.data.bundle)); // Get bundle object from bundledb - let bundle = if let Some(bundle) = self.bundles.get_bundle(&bundle_id) { + let bundle = if let Some(bundle) = self.bundles.get_bundle_info(&bundle_id) { bundle } else { return Err(RepositoryIntegrityError::MissingBundle(bundle_id.clone()).into()) }; // Get chunk from bundle - if bundle.info.chunk_count <= entry.data.chunk as usize { + if bundle.chunk_count <= entry.data.chunk as usize { return Err(RepositoryIntegrityError::NoSuchChunk(bundle_id.clone(), entry.data.chunk).into()) } pos += 1; diff --git a/src/repository/mod.rs b/src/repository/mod.rs index f6d298d..2d94a0d 100644 --- a/src/repository/mod.rs +++ b/src/repository/mod.rs @@ -75,7 +75,7 @@ impl Repository { let path = path.as_ref().to_owned(); let config = try!(Config::load(path.join("config.yaml"))); let crypto = Arc::new(Mutex::new(try!(Crypto::open(path.join("keys"))))); - let bundles = try!(BundleDb::open( + let (bundles, new, gone) = try!(BundleDb::open( path.join("remote/bundles"), path.join("bundles"), crypto.clone() diff --git a/src/util/lru_cache.rs b/src/util/lru_cache.rs index b387f4f..78ccec8 100644 --- a/src/util/lru_cache.rs +++ b/src/util/lru_cache.rs @@ -41,6 +41,17 @@ impl LruCache { } } + #[inline] + pub fn get_mut(&mut self, key: &K) -> Option<&mut V> { + if let Some(&mut (ref mut item, ref mut n)) = self.items.get_mut(key) { + *n = self.next; + self.next += 1; + Some(item) + } else { + None + } + } + #[inline] fn shrink(&mut self) { let mut tags: Vec = self.items.values().map(|&(_, n)| n).collect();