From 70905fa5e8729d55e200f194c70b85380eccb57b Mon Sep 17 00:00:00 2001 From: Dennis Schwerdel Date: Wed, 22 Mar 2017 12:27:17 +0100 Subject: [PATCH] Bundle sync --- src/bundledb/db.rs | 47 +++++++++++++++++++++++++++++------- src/bundledb/reader.rs | 10 +++++++- src/main.rs | 2 +- src/repository/basic_io.rs | 8 ++---- src/repository/bundle_map.rs | 43 ++++++++++++++------------------- src/repository/info.rs | 2 +- src/repository/mod.rs | 38 +++++++++++++++++++++++++++-- src/repository/vacuum.rs | 13 +++++----- src/util/chunk.rs | 5 ++++ 9 files changed, 117 insertions(+), 51 deletions(-) diff --git a/src/bundledb/db.rs b/src/bundledb/db.rs index d1c555b..7190706 100644 --- a/src/bundledb/db.rs +++ b/src/bundledb/db.rs @@ -66,7 +66,7 @@ pub fn bundle_path(bundle: &BundleId, mut folder: PathBuf, mut count: usize) -> (folder, file.into()) } -pub fn load_bundles>(path: P, bundles: &mut HashMap) -> Result<(Vec, Vec), BundleDbError> { +pub fn load_bundles>(path: P, bundles: &mut HashMap) -> Result<(Vec, Vec), BundleDbError> { let mut paths = vec![path.as_ref().to_path_buf()]; let mut bundle_paths = HashSet::new(); while let Some(path) = paths.pop() { @@ -88,14 +88,14 @@ pub fn load_bundles>(path: P, bundles: &mut HashMap Result<(Vec, Vec), BundleDbError> { + fn load_bundle_list(&mut self) -> Result<(Vec, Vec), BundleDbError> { let bundle_info_cache = &self.remote_cache_path; if let Ok(list) = StoredBundle::read_list_from(&bundle_info_cache) { for bundle in list { @@ -145,16 +145,33 @@ impl BundleDb { Ok((new, gone)) } + pub fn update_cache(&mut self, new: &[StoredBundle], gone: &[StoredBundle]) -> Result<(), BundleDbError> { + for bundle in new { + if bundle.info.mode == BundleMode::Meta { + try!(self.copy_remote_bundle_to_cache(bundle)); + } + } + for bundle in gone { + if let Some(bundle) = self.local_bundles.remove(&bundle.id()) { + try!(fs::remove_file(&bundle.path).map_err(|e| BundleDbError::Remove(e, bundle.id()))) + } + } + Ok(()) + } + pub fn temp_bundle_path(&self, id: &BundleId) -> PathBuf { self.temp_path.join(id.to_string().to_owned() + ".bundle") } #[inline] - pub fn open, L: AsRef>(remote_path: R, local_path: L, crypto: Arc>) -> Result<(Self, Vec, Vec), BundleDbError> { + pub fn open, L: AsRef>(remote_path: R, local_path: L, crypto: Arc>) -> Result<(Self, Vec, Vec), BundleDbError> { let remote_path = remote_path.as_ref().to_owned(); let local_path = local_path.as_ref().to_owned(); let mut self_ = Self::new(remote_path, local_path, crypto); let (new, gone) = try!(self_.load_bundle_list()); + try!(self_.update_cache(&new, &gone)); + let new = new.into_iter().map(|s| s.info).collect(); + let gone = gone.into_iter().map(|s| s.info).collect(); Ok((self_, new, gone)) } @@ -202,15 +219,21 @@ impl BundleDb { Ok(chunk) } + fn copy_remote_bundle_to_cache(&mut self, bundle: &StoredBundle) -> Result<(), BundleDbError> { + let id = bundle.id(); + let (folder, filename) = bundle_path(&id, self.local_bundles_path.clone(), self.local_bundles.len()); + try!(fs::create_dir_all(&folder).context(&folder as &Path)); + let bundle = try!(bundle.copy_to(folder.join(filename))); + self.local_bundles.insert(id, bundle); + Ok(()) + } + #[inline] pub fn add_bundle(&mut self, bundle: BundleWriter) -> Result { let bundle = try!(bundle.finish(&self)); let id = bundle.id(); if bundle.info.mode == BundleMode::Meta { - let (folder, filename) = bundle_path(&id, self.local_bundles_path.clone(), self.local_bundles.len()); - try!(fs::create_dir_all(&folder).context(&folder as &Path)); - let bundle = try!(bundle.copy_to(folder.join(filename))); - self.local_bundles.insert(id.clone(), bundle); + try!(self.copy_remote_bundle_to_cache(&bundle)) } let (folder, filename) = bundle_path(&id, self.remote_path.clone(), self.remote_bundles.len()); try!(fs::create_dir_all(&folder).context(&folder as &Path)); @@ -219,6 +242,12 @@ impl BundleDb { Ok(bundle.info) } + #[inline] + pub fn get_chunk_list(&self, bundle: &BundleId) -> Result { + let mut bundle = try!(self.get_stored_bundle(bundle).and_then(|stored| self.get_bundle(&stored))); + Ok(try!(bundle.get_chunk_list()).clone()) + } + #[inline] pub fn get_bundle_info(&self, bundle: &BundleId) -> Option<&BundleInfo> { self.get_stored_bundle(bundle).ok().map(|stored| &stored.info) diff --git a/src/bundledb/reader.rs b/src/bundledb/reader.rs index 6631b89..e6fa629 100644 --- a/src/bundledb/reader.rs +++ b/src/bundledb/reader.rs @@ -113,7 +113,7 @@ impl BundleReader { Ok(BundleReader::new(path, version, content_start, crypto, header)) } - pub fn load_chunklist(&mut self) -> Result<(), BundleReaderError> { + fn load_chunklist(&mut self) -> Result<(), BundleReaderError> { debug!("Load bundle chunklist {} ({:?})", self.info.id, self.info.mode); let mut file = BufReader::new(try!(File::open(&self.path).context(&self.path as &Path))); let len = self.info.chunk_info_size; @@ -137,6 +137,14 @@ impl BundleReader { Ok(()) } + #[inline] + pub fn get_chunk_list(&mut self) -> Result<&ChunkList, BundleReaderError> { + if self.chunks.is_none() { + try!(self.load_chunklist()); + } + Ok(self.chunks.as_ref().unwrap()) + } + #[inline] fn load_encoded_contents(&self) -> Result, BundleReaderError> { debug!("Load bundle data {} ({:?})", self.info.id, self.info.mode); diff --git a/src/main.rs b/src/main.rs index 21d5716..3de5284 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,7 +25,7 @@ mod repository; mod cli; mod prelude; -// TODO: React on changes in remote bundles +// TODO: Keep backup files also remotely and sync them // TODO: Lock during backup and vacuum // TODO: Remove backup subtrees // TODO: Recompress & combine bundles diff --git a/src/repository/basic_io.rs b/src/repository/basic_io.rs index 6af220e..5b624b7 100644 --- a/src/repository/basic_io.rs +++ b/src/repository/basic_io.rs @@ -6,11 +6,7 @@ use std::io::{Read, Write, Cursor}; impl Repository { pub fn get_bundle_id(&self, id: u32) -> Result { - if let Some(bundle_info) = self.bundle_map.get(id) { - Ok(bundle_info.id()) - } else { - Err(RepositoryIntegrityError::MissingBundleId(id).into()) - } + self.bundle_map.get(id).ok_or_else(|| RepositoryIntegrityError::MissingBundleId(id).into()) } pub fn get_chunk(&mut self, hash: Hash) -> Result>, RepositoryError> { @@ -72,7 +68,7 @@ impl Repository { let mut finished = None; mem::swap(writer, &mut finished); let bundle = try!(self.bundles.add_bundle(finished.unwrap())); - self.bundle_map.set(bundle_id, bundle); + self.bundle_map.set(bundle_id, bundle.id.clone()); if self.next_meta_bundle == bundle_id { self.next_meta_bundle = next_free_bundle_id } diff --git a/src/repository/bundle_map.rs b/src/repository/bundle_map.rs index e9e26d6..c5fb0a4 100644 --- a/src/repository/bundle_map.rs +++ b/src/repository/bundle_map.rs @@ -39,23 +39,7 @@ quick_error!{ } -#[derive(Default)] -pub struct BundleData { - pub info: BundleInfo -} -serde_impl!(BundleData(u64) { - info: BundleInfo => 0 -}); - -impl BundleData { - #[inline] - pub fn id(&self) -> BundleId { - self.info.id.clone() - } -} - - -pub struct BundleMap(HashMap); +pub struct BundleMap(HashMap); impl BundleMap { pub fn create() -> Self { @@ -84,23 +68,32 @@ impl BundleMap { } #[inline] - pub fn get(&self, id: u32) -> Option<&BundleData> { - self.0.get(&id) + pub fn get(&self, id: u32) -> Option { + self.0.get(&id).cloned() } #[inline] - pub fn remove(&mut self, id: u32) -> Option { + pub fn remove(&mut self, id: u32) -> Option { self.0.remove(&id) } #[inline] - pub fn set(&mut self, id: u32, bundle: BundleInfo) { - let data = BundleData { info: bundle }; - self.0.insert(id, data); + pub fn find(&self, bundle: &BundleId) -> Option { + for (id, bundle_id) in &self.0 { + if bundle == bundle_id { + return Some(*id) + } + } + None } #[inline] - pub fn bundles(&self) -> Vec<(u32, &BundleData)> { - self.0.iter().map(|(id, bundle)| (*id, bundle)).collect() + pub fn set(&mut self, id: u32, bundle: BundleId) { + self.0.insert(id, bundle); + } + + #[inline] + pub fn bundles(&self) -> Vec<(u32, BundleId)> { + self.0.iter().map(|(id, bundle)| (*id, bundle.clone())).collect() } } diff --git a/src/repository/info.rs b/src/repository/info.rs index d53ec3b..4a9d03e 100644 --- a/src/repository/info.rs +++ b/src/repository/info.rs @@ -17,7 +17,7 @@ pub struct RepositoryInfo { impl Repository { #[inline] pub fn list_bundles(&self) -> Vec<&BundleInfo> { - self.bundle_map.bundles().into_iter().map(|(_id, b)| &b.info).collect() + self.bundles.list_bundles() } pub fn info(&self) -> RepositoryInfo { diff --git a/src/repository/mod.rs b/src/repository/mod.rs index 946df35..3783d4a 100644 --- a/src/repository/mod.rs +++ b/src/repository/mod.rs @@ -95,6 +95,12 @@ impl Repository { content_bundle: None, meta_bundle: None, }; + for bundle in new { + try!(repo.add_new_remote_bundle(bundle)) + } + for bundle in gone { + try!(repo.remove_gone_remote_bundle(bundle)) + } repo.next_meta_bundle = repo.next_free_bundle_id(); repo.next_content_bundle = repo.next_free_bundle_id(); Ok(repo) @@ -142,7 +148,7 @@ impl Repository { mem::swap(&mut self.content_bundle, &mut finished); { let bundle = try!(self.bundles.add_bundle(finished.unwrap())); - self.bundle_map.set(self.next_content_bundle, bundle); + self.bundle_map.set(self.next_content_bundle, bundle.id.clone()); } self.next_content_bundle = self.next_free_bundle_id() } @@ -151,13 +157,41 @@ impl Repository { mem::swap(&mut self.meta_bundle, &mut finished); { let bundle = try!(self.bundles.add_bundle(finished.unwrap())); - self.bundle_map.set(self.next_meta_bundle, bundle); + self.bundle_map.set(self.next_meta_bundle, bundle.id.clone()); } self.next_meta_bundle = self.next_free_bundle_id() } try!(self.save_bundle_map()); Ok(()) } + + fn add_new_remote_bundle(&mut self, bundle: BundleInfo) -> Result<(), RepositoryError> { + info!("Adding new bundle to index: {}", bundle.id); + let bundle_id = match bundle.mode { + BundleMode::Content => self.next_content_bundle, + BundleMode::Meta => self.next_meta_bundle + }; + let chunks = try!(self.bundles.get_chunk_list(&bundle.id)); + self.bundle_map.set(bundle_id, bundle.id); + if self.next_meta_bundle == bundle_id { + self.next_meta_bundle = self.next_free_bundle_id() + } + if self.next_content_bundle == bundle_id { + self.next_content_bundle = self.next_free_bundle_id() + } + for (i, (hash, _len)) in chunks.into_inner().into_iter().enumerate() { + try!(self.index.set(&hash, &Location{bundle: bundle_id as u32, chunk: i as u32})); + } + Ok(()) + } + + fn remove_gone_remote_bundle(&mut self, bundle: BundleInfo) -> Result<(), RepositoryError> { + if let Some(id) = self.bundle_map.find(&bundle.id) { + info!("Removing bundle from index: {}", bundle.id); + self.bundle_map.remove(id); + } + Ok(()) + } } impl Drop for Repository { diff --git a/src/repository/vacuum.rs b/src/repository/vacuum.rs index 508cd22..09efdef 100644 --- a/src/repository/vacuum.rs +++ b/src/repository/vacuum.rs @@ -38,11 +38,12 @@ impl Repository { pub fn analyze_usage(&mut self) -> Result, RepositoryError> { let mut usage = HashMap::new(); for (id, bundle) in self.bundle_map.bundles() { + let bundle = try!(self.bundles.get_bundle_info(&bundle).ok_or_else(|| RepositoryIntegrityError::MissingBundle(bundle))); usage.insert(id, BundleUsage { - used: Bitmap::new(bundle.info.chunk_count), - mode: Bitmap::new(bundle.info.chunk_count), - chunk_count: bundle.info.chunk_count, - total_size: bundle.info.raw_size, + used: Bitmap::new(bundle.chunk_count), + mode: Bitmap::new(bundle.chunk_count), + chunk_count: bundle.chunk_count, + total_size: bundle.raw_size, used_size: 0 }); } @@ -82,7 +83,7 @@ impl Repository { fn delete_bundle(&mut self, id: u32) -> Result<(), RepositoryError> { if let Some(bundle) = self.bundle_map.remove(id) { - try!(self.bundles.delete_bundle(&bundle.id())); + try!(self.bundles.delete_bundle(&bundle)); Ok(()) } else { Err(RepositoryIntegrityError::MissingBundleId(id).into()) @@ -110,7 +111,7 @@ impl Repository { } for id in &rewrite_bundles { let bundle = &usage[id]; - let bundle_id = self.bundle_map.get(*id).unwrap().id(); + let bundle_id = self.bundle_map.get(*id).unwrap(); for chunk in 0..bundle.chunk_count { let data = try!(self.bundles.get_chunk(&bundle_id, chunk)); let hash = self.config.hash.hash(&data); diff --git a/src/util/chunk.rs b/src/util/chunk.rs index 50d8642..09009d0 100644 --- a/src/util/chunk.rs +++ b/src/util/chunk.rs @@ -72,6 +72,11 @@ impl ChunkList { pub fn encoded_size(&self) -> usize { self.0.len() * 20 } + + #[inline] + pub fn into_inner(self) -> Vec { + self.0 + } } impl Default for ChunkList {