zvault/src/bundledb/db.rs

329 lines
13 KiB
Rust
Raw Normal View History

2017-03-21 10:28:11 +00:00
use ::prelude::*;
use super::*;
2017-03-21 10:08:01 +00:00
use std::path::{Path, PathBuf};
2017-03-21 14:38:42 +00:00
use std::collections::{HashMap, HashSet};
2017-03-22 08:19:16 +00:00
use std::fs;
2017-03-21 10:08:01 +00:00
use std::sync::{Arc, Mutex};
2017-03-22 08:19:16 +00:00
use std::io;
2017-04-10 06:53:55 +00:00
use std::mem;
2017-03-21 18:18:18 +00:00
2017-03-22 08:19:16 +00:00
quick_error!{
#[derive(Debug)]
pub enum BundleDbError {
ListBundles(err: io::Error) {
cause(err)
description("Failed to list bundles")
display("Bundle db error: failed to list bundles\n\tcaused by: {}", err)
}
Reader(err: BundleReaderError) {
from()
cause(err)
description("Failed to read bundle")
display("Bundle db error: failed to read bundle\n\tcaused by: {}", err)
}
Writer(err: BundleWriterError) {
from()
cause(err)
description("Failed to write bundle")
display("Bundle db error: failed to write bundle\n\tcaused by: {}", err)
}
Cache(err: BundleCacheError) {
from()
cause(err)
description("Failed to read/write bundle cache")
display("Bundle db error: failed to read/write bundle cache\n\tcaused by: {}", err)
}
2017-04-12 09:34:31 +00:00
UploadFailed {
description("Uploading a bundle failed")
}
2017-03-22 08:19:16 +00:00
Io(err: io::Error, path: PathBuf) {
cause(err)
context(path: &'a Path, err: io::Error) -> (err, path.to_path_buf())
description("Io error")
display("Bundle db error: io error on {:?}\n\tcaused by: {}", path, err)
}
NoSuchBundle(bundle: BundleId) {
description("No such bundle")
display("Bundle db error: no such bundle: {:?}", bundle)
}
Remove(err: io::Error, bundle: BundleId) {
cause(err)
description("Failed to remove bundle")
display("Bundle db error: failed to remove bundle {}\n\tcaused by: {}", bundle, err)
}
}
}
2017-03-21 10:08:01 +00:00
2017-04-08 12:35:10 +00:00
pub fn load_bundles(path: &Path, base: &Path, bundles: &mut HashMap<BundleId, StoredBundle>, crypto: Arc<Mutex<Crypto>>) -> Result<(Vec<StoredBundle>, Vec<StoredBundle>), BundleDbError> {
let mut paths = vec![path.to_path_buf()];
2017-03-21 14:38:42 +00:00
let mut bundle_paths = HashSet::new();
while let Some(path) = paths.pop() {
2017-03-22 08:19:16 +00:00
for entry in try!(fs::read_dir(path).map_err(BundleDbError::ListBundles)) {
let entry = try!(entry.map_err(BundleDbError::ListBundles));
2017-03-21 14:38:42 +00:00
let path = entry.path();
if path.is_dir() {
paths.push(path);
} else {
2017-04-08 07:59:55 +00:00
bundle_paths.insert(path.strip_prefix(base).unwrap().to_path_buf());
2017-03-21 14:38:42 +00:00
}
}
}
2017-03-24 06:09:43 +00:00
let mut gone = HashSet::new();
2017-04-10 16:21:26 +00:00
for (id, bundle) in bundles.iter() {
2017-03-21 14:38:42 +00:00
if !bundle_paths.contains(&bundle.path) {
2017-03-24 06:09:43 +00:00
gone.insert(id.clone());
2017-03-21 14:38:42 +00:00
} else {
bundle_paths.remove(&bundle.path);
}
}
let mut new = vec![];
for path in bundle_paths {
2017-04-11 06:49:45 +00:00
let info = match BundleReader::load_info(base.join(&path), crypto.clone()) {
Ok(info) => info,
Err(BundleReaderError::TruncatedBundle(path)) => {
warn!("Ignoring truncated bundle {:?}", path);
continue
},
Err(err) => return Err(err.into())
2017-03-21 14:38:42 +00:00
};
2017-04-11 06:49:45 +00:00
let bundle = StoredBundle { info: info, path: path };
2017-03-24 06:09:43 +00:00
let id = bundle.info.id.clone();
if !bundles.contains_key(&id) {
new.push(bundle.clone());
} else {
gone.remove(&id);
}
bundles.insert(id, bundle);
2017-03-21 14:38:42 +00:00
}
2017-03-24 06:09:43 +00:00
let gone = gone.iter().map(|id| bundles.remove(id).unwrap()).collect();
2017-03-21 14:38:42 +00:00
Ok((new, gone))
}
2017-03-21 10:08:01 +00:00
pub struct BundleDb {
2017-04-08 12:35:10 +00:00
pub layout: RepositoryLayout,
2017-04-10 06:53:55 +00:00
uploader: Option<Arc<BundleUploader>>,
2017-03-21 10:08:01 +00:00
crypto: Arc<Mutex<Crypto>>,
2017-03-21 14:38:42 +00:00
local_bundles: HashMap<BundleId, StoredBundle>,
remote_bundles: HashMap<BundleId, StoredBundle>,
2017-03-22 08:19:16 +00:00
bundle_cache: LruCache<BundleId, (BundleReader, Vec<u8>)>
2017-03-21 10:08:01 +00:00
}
impl BundleDb {
2017-04-08 12:35:10 +00:00
fn new(layout: RepositoryLayout, crypto: Arc<Mutex<Crypto>>) -> Self {
2017-03-21 10:08:01 +00:00
BundleDb {
2017-04-08 12:35:10 +00:00
layout: layout,
2017-03-21 10:08:01 +00:00
crypto: crypto,
2017-04-10 06:53:55 +00:00
uploader: None,
2017-03-21 14:38:42 +00:00
local_bundles: HashMap::new(),
remote_bundles: HashMap::new(),
2017-03-21 10:08:01 +00:00
bundle_cache: LruCache::new(5, 10)
}
}
2017-03-22 11:27:17 +00:00
fn load_bundle_list(&mut self) -> Result<(Vec<StoredBundle>, Vec<StoredBundle>), BundleDbError> {
2017-04-08 12:35:10 +00:00
if let Ok(list) = StoredBundle::read_list_from(&self.layout.local_bundle_cache_path()) {
2017-04-01 16:43:36 +00:00
for bundle in list {
self.local_bundles.insert(bundle.id(), bundle);
}
2017-04-09 10:04:28 +00:00
} else {
warn!("Failed to read local bundle cache, rebuilding cache");
2017-04-01 16:43:36 +00:00
}
2017-04-08 12:35:10 +00:00
if let Ok(list) = StoredBundle::read_list_from(&self.layout.remote_bundle_cache_path()) {
2017-03-21 18:18:18 +00:00
for bundle in list {
self.remote_bundles.insert(bundle.id(), bundle);
}
2017-04-09 10:04:28 +00:00
} else {
warn!("Failed to read remote bundle cache, rebuilding cache");
2017-03-21 18:18:18 +00:00
}
2017-04-08 12:35:10 +00:00
let base_path = self.layout.base_path();
let (new, gone) = try!(load_bundles(&self.layout.local_bundles_path(), base_path, &mut self.local_bundles, self.crypto.clone()));
2017-04-01 16:43:36 +00:00
if !new.is_empty() || !gone.is_empty() {
let bundles: Vec<_> = self.local_bundles.values().cloned().collect();
2017-04-08 12:35:10 +00:00
try!(StoredBundle::save_list_to(&bundles, &self.layout.local_bundle_cache_path()));
2017-04-01 16:43:36 +00:00
}
2017-04-08 12:35:10 +00:00
let (new, gone) = try!(load_bundles(&self.layout.remote_bundles_path(), base_path, &mut self.remote_bundles, self.crypto.clone()));
2017-03-21 18:18:18 +00:00
if !new.is_empty() || !gone.is_empty() {
let bundles: Vec<_> = self.remote_bundles.values().cloned().collect();
2017-04-08 12:35:10 +00:00
try!(StoredBundle::save_list_to(&bundles, &self.layout.remote_bundle_cache_path()));
2017-03-21 18:18:18 +00:00
}
Ok((new, gone))
2017-03-21 14:38:42 +00:00
}
2017-03-22 13:10:42 +00:00
pub fn save_cache(&self) -> Result<(), BundleDbError> {
2017-04-01 16:43:36 +00:00
let bundles: Vec<_> = self.local_bundles.values().cloned().collect();
2017-04-08 12:35:10 +00:00
try!(StoredBundle::save_list_to(&bundles, &self.layout.local_bundle_cache_path()));
2017-03-22 13:10:42 +00:00
let bundles: Vec<_> = self.remote_bundles.values().cloned().collect();
2017-04-08 12:35:10 +00:00
Ok(try!(StoredBundle::save_list_to(&bundles, &self.layout.remote_bundle_cache_path())))
2017-03-22 13:10:42 +00:00
}
2017-04-10 17:09:50 +00:00
pub fn update_cache(&mut self) -> Result<(), BundleDbError> {
let mut meta_bundles = HashSet::new();
for (id, bundle) in &self.remote_bundles {
2017-03-22 11:27:17 +00:00
if bundle.info.mode == BundleMode::Meta {
2017-04-10 17:09:50 +00:00
meta_bundles.insert(id.clone());
}
}
let mut remove = vec![];
for id in self.local_bundles.keys() {
if !meta_bundles.contains(id) {
remove.push(id.clone());
}
}
for id in meta_bundles {
if !self.local_bundles.contains_key(&id) {
let bundle = self.remote_bundles[&id].clone();
2017-04-10 16:21:26 +00:00
debug!("Copying new meta bundle to local cache: {}", bundle.info.id);
2017-04-10 17:09:50 +00:00
try!(self.copy_remote_bundle_to_cache(&bundle));
2017-03-22 11:27:17 +00:00
}
}
2017-04-08 12:35:10 +00:00
let base_path = self.layout.base_path();
2017-04-10 17:09:50 +00:00
for id in remove {
if let Some(bundle) = self.local_bundles.remove(&id) {
try!(fs::remove_file(base_path.join(&bundle.path)).map_err(|e| BundleDbError::Remove(e, id)))
2017-03-22 11:27:17 +00:00
}
}
Ok(())
}
2017-04-08 12:35:10 +00:00
pub fn open(layout: RepositoryLayout, crypto: Arc<Mutex<Crypto>>) -> Result<(Self, Vec<BundleInfo>, Vec<BundleInfo>), BundleDbError> {
let mut self_ = Self::new(layout, crypto);
2017-03-21 14:38:42 +00:00
let (new, gone) = try!(self_.load_bundle_list());
2017-04-10 17:09:50 +00:00
try!(self_.update_cache());
2017-03-22 11:27:17 +00:00
let new = new.into_iter().map(|s| s.info).collect();
let gone = gone.into_iter().map(|s| s.info).collect();
2017-03-21 14:38:42 +00:00
Ok((self_, new, gone))
2017-03-21 10:08:01 +00:00
}
2017-04-08 12:35:10 +00:00
pub fn create(layout: RepositoryLayout) -> Result<(), BundleDbError> {
try!(fs::create_dir_all(layout.remote_bundles_path()).context(&layout.remote_bundles_path() as &Path));
try!(fs::create_dir_all(layout.local_bundles_path()).context(&layout.local_bundles_path() as &Path));
try!(fs::create_dir_all(layout.temp_bundles_path()).context(&layout.temp_bundles_path() as &Path));
try!(StoredBundle::save_list_to(&[], layout.local_bundle_cache_path()));
try!(StoredBundle::save_list_to(&[], layout.remote_bundle_cache_path()));
2017-04-08 12:35:10 +00:00
Ok(())
2017-03-21 10:08:01 +00:00
}
#[inline]
2017-03-22 08:19:16 +00:00
pub fn create_bundle(&self, mode: BundleMode, hash_method: HashMethod, compression: Option<Compression>, encryption: Option<Encryption>) -> Result<BundleWriter, BundleDbError> {
Ok(try!(BundleWriter::new(mode, hash_method, compression, encryption, self.crypto.clone())))
2017-03-21 10:08:01 +00:00
}
2017-03-22 08:19:16 +00:00
fn get_stored_bundle(&self, bundle_id: &BundleId) -> Result<&StoredBundle, BundleDbError> {
2017-03-21 14:38:42 +00:00
if let Some(stored) = self.local_bundles.get(bundle_id).or_else(|| self.remote_bundles.get(bundle_id)) {
Ok(stored)
} else {
2017-03-22 08:19:16 +00:00
Err(BundleDbError::NoSuchBundle(bundle_id.clone()))
2017-03-21 14:38:42 +00:00
}
}
2017-04-10 18:35:28 +00:00
#[inline]
2017-03-22 08:19:16 +00:00
fn get_bundle(&self, stored: &StoredBundle) -> Result<BundleReader, BundleDbError> {
2017-04-08 12:35:10 +00:00
let base_path = self.layout.base_path();
Ok(try!(BundleReader::load(base_path.join(&stored.path), self.crypto.clone())))
2017-03-21 14:38:42 +00:00
}
2017-03-22 08:19:16 +00:00
pub fn get_chunk(&mut self, bundle_id: &BundleId, id: usize) -> Result<Vec<u8>, BundleDbError> {
2017-03-21 14:38:42 +00:00
if let Some(&mut (ref mut bundle, ref data)) = self.bundle_cache.get_mut(bundle_id) {
let (pos, len) = try!(bundle.get_chunk_position(id));
let mut chunk = Vec::with_capacity(len);
2017-03-21 10:08:01 +00:00
chunk.extend_from_slice(&data[pos..pos+len]);
return Ok(chunk);
}
2017-03-21 14:38:42 +00:00
let mut bundle = try!(self.get_stored_bundle(bundle_id).and_then(|s| self.get_bundle(s)));
let (pos, len) = try!(bundle.get_chunk_position(id));
let mut chunk = Vec::with_capacity(len);
2017-03-21 10:08:01 +00:00
let data = try!(bundle.load_contents());
chunk.extend_from_slice(&data[pos..pos+len]);
2017-03-21 14:38:42 +00:00
self.bundle_cache.put(bundle_id.clone(), (bundle, data));
2017-03-21 10:08:01 +00:00
Ok(chunk)
}
2017-03-22 11:27:17 +00:00
fn copy_remote_bundle_to_cache(&mut self, bundle: &StoredBundle) -> Result<(), BundleDbError> {
let id = bundle.id();
2017-04-08 12:35:10 +00:00
let (folder, filename) = self.layout.local_bundle_path(&id, self.local_bundles.len());
2017-03-22 11:27:17 +00:00
try!(fs::create_dir_all(&folder).context(&folder as &Path));
2017-04-08 12:35:10 +00:00
let bundle = try!(bundle.copy_to(&self.layout.base_path(), folder.join(filename)));
2017-03-22 11:27:17 +00:00
self.local_bundles.insert(id, bundle);
Ok(())
}
2017-03-22 08:19:16 +00:00
pub fn add_bundle(&mut self, bundle: BundleWriter) -> Result<BundleInfo, BundleDbError> {
2017-04-10 06:53:55 +00:00
let mut bundle = try!(bundle.finish(&self));
2017-03-21 14:38:42 +00:00
if bundle.info.mode == BundleMode::Meta {
2017-03-22 11:27:17 +00:00
try!(self.copy_remote_bundle_to_cache(&bundle))
2017-03-21 14:38:42 +00:00
}
2017-04-08 12:35:10 +00:00
let (folder, filename) = self.layout.remote_bundle_path(self.remote_bundles.len());
2017-04-10 06:53:55 +00:00
let dst_path = folder.join(filename);
let src_path = bundle.path;
bundle.path = dst_path.clone();
if self.uploader.is_none() {
self.uploader = Some(BundleUploader::new(5));
}
try!(self.uploader.as_ref().unwrap().queue(src_path, dst_path));
2017-03-21 14:38:42 +00:00
self.remote_bundles.insert(bundle.id(), bundle.clone());
Ok(bundle.info)
2017-03-21 10:08:01 +00:00
}
2017-04-10 06:53:55 +00:00
pub fn finish_uploads(&mut self) -> Result<(), BundleDbError> {
let mut uploader = None;
mem::swap(&mut self.uploader, &mut uploader);
if let Some(uploader) = uploader {
uploader.finish()
} else {
Ok(())
}
}
2017-03-22 11:27:17 +00:00
pub fn get_chunk_list(&self, bundle: &BundleId) -> Result<ChunkList, BundleDbError> {
let mut bundle = try!(self.get_stored_bundle(bundle).and_then(|stored| self.get_bundle(&stored)));
Ok(try!(bundle.get_chunk_list()).clone())
}
2017-03-21 10:08:01 +00:00
#[inline]
2017-04-07 16:57:49 +00:00
pub fn get_bundle_info(&self, bundle: &BundleId) -> Option<&StoredBundle> {
self.remote_bundles.get(bundle)
2017-03-21 10:08:01 +00:00
}
#[inline]
2017-03-21 14:38:42 +00:00
pub fn list_bundles(&self) -> Vec<&BundleInfo> {
self.remote_bundles.values().map(|b| &b.info).collect()
2017-03-21 10:08:01 +00:00
}
pub fn delete_local_bundle(&mut self, bundle: &BundleId) -> Result<(), BundleDbError> {
2017-03-21 14:38:42 +00:00
if let Some(bundle) = self.local_bundles.remove(bundle) {
2017-04-08 12:35:10 +00:00
let path = self.layout.base_path().join(&bundle.path);
try!(fs::remove_file(path).map_err(|e| BundleDbError::Remove(e, bundle.id())))
2017-03-21 14:38:42 +00:00
}
Ok(())
}
pub fn delete_bundle(&mut self, bundle: &BundleId) -> Result<(), BundleDbError> {
try!(self.delete_local_bundle(bundle));
2017-03-21 14:38:42 +00:00
if let Some(bundle) = self.remote_bundles.remove(bundle) {
2017-04-08 12:35:10 +00:00
let path = self.layout.base_path().join(&bundle.path);
fs::remove_file(path).map_err(|e| BundleDbError::Remove(e, bundle.id()))
2017-03-21 10:08:01 +00:00
} else {
2017-03-22 08:19:16 +00:00
Err(BundleDbError::NoSuchBundle(bundle.clone()))
2017-03-21 10:08:01 +00:00
}
}
2017-03-22 08:19:16 +00:00
pub fn check(&mut self, full: bool) -> Result<(), BundleDbError> {
2017-04-11 06:49:45 +00:00
for stored in ProgressIter::new("checking bundles", self.remote_bundles.len(), self.remote_bundles.values()) {
2017-03-21 14:38:42 +00:00
let mut bundle = try!(self.get_bundle(stored));
2017-03-21 10:08:01 +00:00
try!(bundle.check(full))
}
Ok(())
}
2017-04-10 16:21:26 +00:00
#[inline]
pub fn len(&self) -> usize {
self.remote_bundles.len()
}
2017-03-21 10:08:01 +00:00
}