Bundle sync

This commit is contained in:
Dennis Schwerdel 2017-03-22 12:27:17 +01:00
parent fa01e0bdba
commit 70905fa5e8
9 changed files with 117 additions and 51 deletions

View File

@ -66,7 +66,7 @@ pub fn bundle_path(bundle: &BundleId, mut folder: PathBuf, mut count: usize) ->
(folder, file.into()) (folder, file.into())
} }
pub fn load_bundles<P: AsRef<Path>>(path: P, bundles: &mut HashMap<BundleId, StoredBundle>) -> Result<(Vec<BundleId>, Vec<BundleInfo>), BundleDbError> { pub fn load_bundles<P: AsRef<Path>>(path: P, bundles: &mut HashMap<BundleId, StoredBundle>) -> Result<(Vec<StoredBundle>, Vec<StoredBundle>), BundleDbError> {
let mut paths = vec![path.as_ref().to_path_buf()]; let mut paths = vec![path.as_ref().to_path_buf()];
let mut bundle_paths = HashSet::new(); let mut bundle_paths = HashSet::new();
while let Some(path) = paths.pop() { while let Some(path) = paths.pop() {
@ -88,14 +88,14 @@ pub fn load_bundles<P: AsRef<Path>>(path: P, bundles: &mut HashMap<BundleId, Sto
bundle_paths.remove(&bundle.path); bundle_paths.remove(&bundle.path);
} }
} }
let gone = gone.iter().map(|id| bundles.remove(id).unwrap().info).collect(); let gone = gone.iter().map(|id| bundles.remove(id).unwrap()).collect();
let mut new = vec![]; let mut new = vec![];
for path in bundle_paths { for path in bundle_paths {
let bundle = StoredBundle { let bundle = StoredBundle {
info: try!(BundleReader::load_info(&path)), info: try!(BundleReader::load_info(&path)),
path: path path: path
}; };
new.push(bundle.info.id.clone()); new.push(bundle.clone());
bundles.insert(bundle.info.id.clone(), bundle); bundles.insert(bundle.info.id.clone(), bundle);
} }
Ok((new, gone)) Ok((new, gone))
@ -129,7 +129,7 @@ impl BundleDb {
} }
} }
fn load_bundle_list(&mut self) -> Result<(Vec<BundleId>, Vec<BundleInfo>), BundleDbError> { fn load_bundle_list(&mut self) -> Result<(Vec<StoredBundle>, Vec<StoredBundle>), BundleDbError> {
let bundle_info_cache = &self.remote_cache_path; let bundle_info_cache = &self.remote_cache_path;
if let Ok(list) = StoredBundle::read_list_from(&bundle_info_cache) { if let Ok(list) = StoredBundle::read_list_from(&bundle_info_cache) {
for bundle in list { for bundle in list {
@ -145,16 +145,33 @@ impl BundleDb {
Ok((new, gone)) Ok((new, gone))
} }
pub fn update_cache(&mut self, new: &[StoredBundle], gone: &[StoredBundle]) -> Result<(), BundleDbError> {
for bundle in new {
if bundle.info.mode == BundleMode::Meta {
try!(self.copy_remote_bundle_to_cache(bundle));
}
}
for bundle in gone {
if let Some(bundle) = self.local_bundles.remove(&bundle.id()) {
try!(fs::remove_file(&bundle.path).map_err(|e| BundleDbError::Remove(e, bundle.id())))
}
}
Ok(())
}
pub fn temp_bundle_path(&self, id: &BundleId) -> PathBuf { pub fn temp_bundle_path(&self, id: &BundleId) -> PathBuf {
self.temp_path.join(id.to_string().to_owned() + ".bundle") self.temp_path.join(id.to_string().to_owned() + ".bundle")
} }
#[inline] #[inline]
pub fn open<R: AsRef<Path>, L: AsRef<Path>>(remote_path: R, local_path: L, crypto: Arc<Mutex<Crypto>>) -> Result<(Self, Vec<BundleId>, Vec<BundleInfo>), BundleDbError> { pub fn open<R: AsRef<Path>, L: AsRef<Path>>(remote_path: R, local_path: L, crypto: Arc<Mutex<Crypto>>) -> Result<(Self, Vec<BundleInfo>, Vec<BundleInfo>), BundleDbError> {
let remote_path = remote_path.as_ref().to_owned(); let remote_path = remote_path.as_ref().to_owned();
let local_path = local_path.as_ref().to_owned(); let local_path = local_path.as_ref().to_owned();
let mut self_ = Self::new(remote_path, local_path, crypto); let mut self_ = Self::new(remote_path, local_path, crypto);
let (new, gone) = try!(self_.load_bundle_list()); let (new, gone) = try!(self_.load_bundle_list());
try!(self_.update_cache(&new, &gone));
let new = new.into_iter().map(|s| s.info).collect();
let gone = gone.into_iter().map(|s| s.info).collect();
Ok((self_, new, gone)) Ok((self_, new, gone))
} }
@ -202,15 +219,21 @@ impl BundleDb {
Ok(chunk) Ok(chunk)
} }
fn copy_remote_bundle_to_cache(&mut self, bundle: &StoredBundle) -> Result<(), BundleDbError> {
let id = bundle.id();
let (folder, filename) = bundle_path(&id, self.local_bundles_path.clone(), self.local_bundles.len());
try!(fs::create_dir_all(&folder).context(&folder as &Path));
let bundle = try!(bundle.copy_to(folder.join(filename)));
self.local_bundles.insert(id, bundle);
Ok(())
}
#[inline] #[inline]
pub fn add_bundle(&mut self, bundle: BundleWriter) -> Result<BundleInfo, BundleDbError> { pub fn add_bundle(&mut self, bundle: BundleWriter) -> Result<BundleInfo, BundleDbError> {
let bundle = try!(bundle.finish(&self)); let bundle = try!(bundle.finish(&self));
let id = bundle.id(); let id = bundle.id();
if bundle.info.mode == BundleMode::Meta { if bundle.info.mode == BundleMode::Meta {
let (folder, filename) = bundle_path(&id, self.local_bundles_path.clone(), self.local_bundles.len()); try!(self.copy_remote_bundle_to_cache(&bundle))
try!(fs::create_dir_all(&folder).context(&folder as &Path));
let bundle = try!(bundle.copy_to(folder.join(filename)));
self.local_bundles.insert(id.clone(), bundle);
} }
let (folder, filename) = bundle_path(&id, self.remote_path.clone(), self.remote_bundles.len()); let (folder, filename) = bundle_path(&id, self.remote_path.clone(), self.remote_bundles.len());
try!(fs::create_dir_all(&folder).context(&folder as &Path)); try!(fs::create_dir_all(&folder).context(&folder as &Path));
@ -219,6 +242,12 @@ impl BundleDb {
Ok(bundle.info) Ok(bundle.info)
} }
#[inline]
pub fn get_chunk_list(&self, bundle: &BundleId) -> Result<ChunkList, BundleDbError> {
let mut bundle = try!(self.get_stored_bundle(bundle).and_then(|stored| self.get_bundle(&stored)));
Ok(try!(bundle.get_chunk_list()).clone())
}
#[inline] #[inline]
pub fn get_bundle_info(&self, bundle: &BundleId) -> Option<&BundleInfo> { pub fn get_bundle_info(&self, bundle: &BundleId) -> Option<&BundleInfo> {
self.get_stored_bundle(bundle).ok().map(|stored| &stored.info) self.get_stored_bundle(bundle).ok().map(|stored| &stored.info)

View File

@ -113,7 +113,7 @@ impl BundleReader {
Ok(BundleReader::new(path, version, content_start, crypto, header)) Ok(BundleReader::new(path, version, content_start, crypto, header))
} }
pub fn load_chunklist(&mut self) -> Result<(), BundleReaderError> { fn load_chunklist(&mut self) -> Result<(), BundleReaderError> {
debug!("Load bundle chunklist {} ({:?})", self.info.id, self.info.mode); debug!("Load bundle chunklist {} ({:?})", self.info.id, self.info.mode);
let mut file = BufReader::new(try!(File::open(&self.path).context(&self.path as &Path))); let mut file = BufReader::new(try!(File::open(&self.path).context(&self.path as &Path)));
let len = self.info.chunk_info_size; let len = self.info.chunk_info_size;
@ -137,6 +137,14 @@ impl BundleReader {
Ok(()) Ok(())
} }
#[inline]
pub fn get_chunk_list(&mut self) -> Result<&ChunkList, BundleReaderError> {
if self.chunks.is_none() {
try!(self.load_chunklist());
}
Ok(self.chunks.as_ref().unwrap())
}
#[inline] #[inline]
fn load_encoded_contents(&self) -> Result<Vec<u8>, BundleReaderError> { fn load_encoded_contents(&self) -> Result<Vec<u8>, BundleReaderError> {
debug!("Load bundle data {} ({:?})", self.info.id, self.info.mode); debug!("Load bundle data {} ({:?})", self.info.id, self.info.mode);

View File

@ -25,7 +25,7 @@ mod repository;
mod cli; mod cli;
mod prelude; mod prelude;
// TODO: React on changes in remote bundles // TODO: Keep backup files also remotely and sync them
// TODO: Lock during backup and vacuum // TODO: Lock during backup and vacuum
// TODO: Remove backup subtrees // TODO: Remove backup subtrees
// TODO: Recompress & combine bundles // TODO: Recompress & combine bundles

View File

@ -6,11 +6,7 @@ use std::io::{Read, Write, Cursor};
impl Repository { impl Repository {
pub fn get_bundle_id(&self, id: u32) -> Result<BundleId, RepositoryError> { pub fn get_bundle_id(&self, id: u32) -> Result<BundleId, RepositoryError> {
if let Some(bundle_info) = self.bundle_map.get(id) { self.bundle_map.get(id).ok_or_else(|| RepositoryIntegrityError::MissingBundleId(id).into())
Ok(bundle_info.id())
} else {
Err(RepositoryIntegrityError::MissingBundleId(id).into())
}
} }
pub fn get_chunk(&mut self, hash: Hash) -> Result<Option<Vec<u8>>, RepositoryError> { pub fn get_chunk(&mut self, hash: Hash) -> Result<Option<Vec<u8>>, RepositoryError> {
@ -72,7 +68,7 @@ impl Repository {
let mut finished = None; let mut finished = None;
mem::swap(writer, &mut finished); mem::swap(writer, &mut finished);
let bundle = try!(self.bundles.add_bundle(finished.unwrap())); let bundle = try!(self.bundles.add_bundle(finished.unwrap()));
self.bundle_map.set(bundle_id, bundle); self.bundle_map.set(bundle_id, bundle.id.clone());
if self.next_meta_bundle == bundle_id { if self.next_meta_bundle == bundle_id {
self.next_meta_bundle = next_free_bundle_id self.next_meta_bundle = next_free_bundle_id
} }

View File

@ -39,23 +39,7 @@ quick_error!{
} }
#[derive(Default)] pub struct BundleMap(HashMap<u32, BundleId>);
pub struct BundleData {
pub info: BundleInfo
}
serde_impl!(BundleData(u64) {
info: BundleInfo => 0
});
impl BundleData {
#[inline]
pub fn id(&self) -> BundleId {
self.info.id.clone()
}
}
pub struct BundleMap(HashMap<u32, BundleData>);
impl BundleMap { impl BundleMap {
pub fn create() -> Self { pub fn create() -> Self {
@ -84,23 +68,32 @@ impl BundleMap {
} }
#[inline] #[inline]
pub fn get(&self, id: u32) -> Option<&BundleData> { pub fn get(&self, id: u32) -> Option<BundleId> {
self.0.get(&id) self.0.get(&id).cloned()
} }
#[inline] #[inline]
pub fn remove(&mut self, id: u32) -> Option<BundleData> { pub fn remove(&mut self, id: u32) -> Option<BundleId> {
self.0.remove(&id) self.0.remove(&id)
} }
#[inline] #[inline]
pub fn set(&mut self, id: u32, bundle: BundleInfo) { pub fn find(&self, bundle: &BundleId) -> Option<u32> {
let data = BundleData { info: bundle }; for (id, bundle_id) in &self.0 {
self.0.insert(id, data); if bundle == bundle_id {
return Some(*id)
}
}
None
} }
#[inline] #[inline]
pub fn bundles(&self) -> Vec<(u32, &BundleData)> { pub fn set(&mut self, id: u32, bundle: BundleId) {
self.0.iter().map(|(id, bundle)| (*id, bundle)).collect() self.0.insert(id, bundle);
}
#[inline]
pub fn bundles(&self) -> Vec<(u32, BundleId)> {
self.0.iter().map(|(id, bundle)| (*id, bundle.clone())).collect()
} }
} }

View File

@ -17,7 +17,7 @@ pub struct RepositoryInfo {
impl Repository { impl Repository {
#[inline] #[inline]
pub fn list_bundles(&self) -> Vec<&BundleInfo> { pub fn list_bundles(&self) -> Vec<&BundleInfo> {
self.bundle_map.bundles().into_iter().map(|(_id, b)| &b.info).collect() self.bundles.list_bundles()
} }
pub fn info(&self) -> RepositoryInfo { pub fn info(&self) -> RepositoryInfo {

View File

@ -95,6 +95,12 @@ impl Repository {
content_bundle: None, content_bundle: None,
meta_bundle: None, meta_bundle: None,
}; };
for bundle in new {
try!(repo.add_new_remote_bundle(bundle))
}
for bundle in gone {
try!(repo.remove_gone_remote_bundle(bundle))
}
repo.next_meta_bundle = repo.next_free_bundle_id(); repo.next_meta_bundle = repo.next_free_bundle_id();
repo.next_content_bundle = repo.next_free_bundle_id(); repo.next_content_bundle = repo.next_free_bundle_id();
Ok(repo) Ok(repo)
@ -142,7 +148,7 @@ impl Repository {
mem::swap(&mut self.content_bundle, &mut finished); mem::swap(&mut self.content_bundle, &mut finished);
{ {
let bundle = try!(self.bundles.add_bundle(finished.unwrap())); let bundle = try!(self.bundles.add_bundle(finished.unwrap()));
self.bundle_map.set(self.next_content_bundle, bundle); self.bundle_map.set(self.next_content_bundle, bundle.id.clone());
} }
self.next_content_bundle = self.next_free_bundle_id() self.next_content_bundle = self.next_free_bundle_id()
} }
@ -151,13 +157,41 @@ impl Repository {
mem::swap(&mut self.meta_bundle, &mut finished); mem::swap(&mut self.meta_bundle, &mut finished);
{ {
let bundle = try!(self.bundles.add_bundle(finished.unwrap())); let bundle = try!(self.bundles.add_bundle(finished.unwrap()));
self.bundle_map.set(self.next_meta_bundle, bundle); self.bundle_map.set(self.next_meta_bundle, bundle.id.clone());
} }
self.next_meta_bundle = self.next_free_bundle_id() self.next_meta_bundle = self.next_free_bundle_id()
} }
try!(self.save_bundle_map()); try!(self.save_bundle_map());
Ok(()) Ok(())
} }
fn add_new_remote_bundle(&mut self, bundle: BundleInfo) -> Result<(), RepositoryError> {
info!("Adding new bundle to index: {}", bundle.id);
let bundle_id = match bundle.mode {
BundleMode::Content => self.next_content_bundle,
BundleMode::Meta => self.next_meta_bundle
};
let chunks = try!(self.bundles.get_chunk_list(&bundle.id));
self.bundle_map.set(bundle_id, bundle.id);
if self.next_meta_bundle == bundle_id {
self.next_meta_bundle = self.next_free_bundle_id()
}
if self.next_content_bundle == bundle_id {
self.next_content_bundle = self.next_free_bundle_id()
}
for (i, (hash, _len)) in chunks.into_inner().into_iter().enumerate() {
try!(self.index.set(&hash, &Location{bundle: bundle_id as u32, chunk: i as u32}));
}
Ok(())
}
fn remove_gone_remote_bundle(&mut self, bundle: BundleInfo) -> Result<(), RepositoryError> {
if let Some(id) = self.bundle_map.find(&bundle.id) {
info!("Removing bundle from index: {}", bundle.id);
self.bundle_map.remove(id);
}
Ok(())
}
} }
impl Drop for Repository { impl Drop for Repository {

View File

@ -38,11 +38,12 @@ impl Repository {
pub fn analyze_usage(&mut self) -> Result<HashMap<u32, BundleUsage>, RepositoryError> { pub fn analyze_usage(&mut self) -> Result<HashMap<u32, BundleUsage>, RepositoryError> {
let mut usage = HashMap::new(); let mut usage = HashMap::new();
for (id, bundle) in self.bundle_map.bundles() { for (id, bundle) in self.bundle_map.bundles() {
let bundle = try!(self.bundles.get_bundle_info(&bundle).ok_or_else(|| RepositoryIntegrityError::MissingBundle(bundle)));
usage.insert(id, BundleUsage { usage.insert(id, BundleUsage {
used: Bitmap::new(bundle.info.chunk_count), used: Bitmap::new(bundle.chunk_count),
mode: Bitmap::new(bundle.info.chunk_count), mode: Bitmap::new(bundle.chunk_count),
chunk_count: bundle.info.chunk_count, chunk_count: bundle.chunk_count,
total_size: bundle.info.raw_size, total_size: bundle.raw_size,
used_size: 0 used_size: 0
}); });
} }
@ -82,7 +83,7 @@ impl Repository {
fn delete_bundle(&mut self, id: u32) -> Result<(), RepositoryError> { fn delete_bundle(&mut self, id: u32) -> Result<(), RepositoryError> {
if let Some(bundle) = self.bundle_map.remove(id) { if let Some(bundle) = self.bundle_map.remove(id) {
try!(self.bundles.delete_bundle(&bundle.id())); try!(self.bundles.delete_bundle(&bundle));
Ok(()) Ok(())
} else { } else {
Err(RepositoryIntegrityError::MissingBundleId(id).into()) Err(RepositoryIntegrityError::MissingBundleId(id).into())
@ -110,7 +111,7 @@ impl Repository {
} }
for id in &rewrite_bundles { for id in &rewrite_bundles {
let bundle = &usage[id]; let bundle = &usage[id];
let bundle_id = self.bundle_map.get(*id).unwrap().id(); let bundle_id = self.bundle_map.get(*id).unwrap();
for chunk in 0..bundle.chunk_count { for chunk in 0..bundle.chunk_count {
let data = try!(self.bundles.get_chunk(&bundle_id, chunk)); let data = try!(self.bundles.get_chunk(&bundle_id, chunk));
let hash = self.config.hash.hash(&data); let hash = self.config.hash.hash(&data);

View File

@ -72,6 +72,11 @@ impl ChunkList {
pub fn encoded_size(&self) -> usize { pub fn encoded_size(&self) -> usize {
self.0.len() * 20 self.0.len() * 20
} }
#[inline]
pub fn into_inner(self) -> Vec<Chunk> {
self.0
}
} }
impl Default for ChunkList { impl Default for ChunkList {