mirror of https://github.com/dswd/zvault
Remote folder
This commit is contained in:
parent
c354abac91
commit
5964625ec7
|
@ -128,20 +128,32 @@ impl Bundle {
|
|||
self.info.id.clone()
|
||||
}
|
||||
|
||||
pub fn load(path: PathBuf, crypto: Arc<Mutex<Crypto>>) -> Result<Self, BundleError> {
|
||||
let mut file = BufReader::new(try!(File::open(&path).context(&path as &Path)));
|
||||
fn load_header<P: AsRef<Path>>(path: P) -> Result<(BundleInfo, u8, usize), BundleError> {
|
||||
let path = path.as_ref();
|
||||
let mut file = BufReader::new(try!(File::open(path).context(path)));
|
||||
let mut header = [0u8; 8];
|
||||
try!(file.read_exact(&mut header).context(&path as &Path));
|
||||
try!(file.read_exact(&mut header).context(path));
|
||||
if header[..HEADER_STRING.len()] != HEADER_STRING {
|
||||
return Err(BundleError::WrongHeader(path.clone()))
|
||||
return Err(BundleError::WrongHeader(path.to_path_buf()))
|
||||
}
|
||||
let version = header[HEADER_STRING.len()];
|
||||
if version != HEADER_VERSION {
|
||||
return Err(BundleError::WrongVersion(path.clone(), version))
|
||||
return Err(BundleError::WrongVersion(path.to_path_buf(), version))
|
||||
}
|
||||
let header: BundleInfo = try!(msgpack::decode_from_stream(&mut file).context(&path as &Path));
|
||||
let header: BundleInfo = try!(msgpack::decode_from_stream(&mut file).context(path));
|
||||
debug!("Load bundle {}", header.id);
|
||||
let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize + header.chunk_info_size;
|
||||
Ok((header, version, content_start))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn load_info<P: AsRef<Path>>(path: P) -> Result<BundleInfo, BundleError> {
|
||||
Self::load_header(path).map(|b| b.0)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn load(path: PathBuf, crypto: Arc<Mutex<Crypto>>) -> Result<Self, BundleError> {
|
||||
let (header, version, content_start) = try!(Self::load_header(&path));
|
||||
Ok(Bundle::new(path, version, content_start, crypto, header))
|
||||
}
|
||||
|
||||
|
|
|
@ -2,17 +2,103 @@ use ::prelude::*;
|
|||
use super::*;
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fs;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
|
||||
pub fn bundle_path(bundle: &BundleId, mut folder: PathBuf, mut count: usize) -> (PathBuf, PathBuf) {
|
||||
let mut file = bundle.to_string().to_owned() + ".bundle";
|
||||
while count >= 100 {
|
||||
if file.len() < 10 {
|
||||
break
|
||||
}
|
||||
folder = folder.join(&file[0..2]);
|
||||
file = file[2..].to_string();
|
||||
count /= 100;
|
||||
}
|
||||
(folder, file.into())
|
||||
}
|
||||
|
||||
pub fn load_bundles<P: AsRef<Path>>(path: P, bundles: &mut HashMap<BundleId, StoredBundle>) -> Result<(Vec<BundleId>, Vec<BundleInfo>), BundleError> {
|
||||
let mut paths = vec![path.as_ref().to_path_buf()];
|
||||
let mut bundle_paths = HashSet::new();
|
||||
while let Some(path) = paths.pop() {
|
||||
for entry in try!(fs::read_dir(path).map_err(BundleError::List)) {
|
||||
let entry = try!(entry.map_err(BundleError::List));
|
||||
let path = entry.path();
|
||||
if path.is_dir() {
|
||||
paths.push(path);
|
||||
} else {
|
||||
bundle_paths.insert(path);
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut gone = vec![];
|
||||
for (id, bundle) in bundles.iter_mut() {
|
||||
if !bundle_paths.contains(&bundle.path) {
|
||||
gone.push(id.clone());
|
||||
} else {
|
||||
bundle_paths.remove(&bundle.path);
|
||||
}
|
||||
}
|
||||
let gone = gone.iter().map(|id| bundles.remove(&id).unwrap().info).collect();
|
||||
let mut new = vec![];
|
||||
for path in bundle_paths {
|
||||
let bundle = StoredBundle {
|
||||
info: try!(Bundle::load_info(&path)),
|
||||
path: path
|
||||
};
|
||||
new.push(bundle.info.id.clone());
|
||||
bundles.insert(bundle.info.id.clone(), bundle);
|
||||
}
|
||||
Ok((new, gone))
|
||||
}
|
||||
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct StoredBundle {
|
||||
pub info: BundleInfo,
|
||||
pub path: PathBuf
|
||||
}
|
||||
serde_impl!(StoredBundle(u64) {
|
||||
info: BundleInfo => 0,
|
||||
path: PathBuf => 1
|
||||
});
|
||||
|
||||
impl StoredBundle {
|
||||
#[inline]
|
||||
pub fn id(&self) -> BundleId {
|
||||
self.info.id.clone()
|
||||
}
|
||||
|
||||
pub fn move_to<P: AsRef<Path>>(mut self, path: P) -> Result<Self, BundleError> {
|
||||
let path = path.as_ref();
|
||||
if fs::rename(&self.path, path).is_err() {
|
||||
try!(fs::copy(&self.path, path).context(path));
|
||||
try!(fs::remove_file(&self.path).context(&self.path as &Path));
|
||||
}
|
||||
self.path = path.to_path_buf();
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
pub fn copy_to<P: AsRef<Path>>(&self, path: P) -> Result<Self, BundleError> {
|
||||
let path = path.as_ref();
|
||||
try!(fs::copy(&self.path, path).context(path));
|
||||
let mut bundle = self.clone();
|
||||
bundle.path = path.to_path_buf();
|
||||
Ok(bundle)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub struct BundleDb {
|
||||
remote_path: PathBuf,
|
||||
local_path: PathBuf,
|
||||
crypto: Arc<Mutex<Crypto>>,
|
||||
bundles: HashMap<BundleId, Bundle>,
|
||||
bundle_cache: LruCache<BundleId, Vec<u8>>
|
||||
local_bundles: HashMap<BundleId, StoredBundle>,
|
||||
remote_bundles: HashMap<BundleId, StoredBundle>,
|
||||
bundle_cache: LruCache<BundleId, (Bundle, Vec<u8>)>
|
||||
}
|
||||
|
||||
|
||||
|
@ -22,52 +108,28 @@ impl BundleDb {
|
|||
remote_path: remote_path,
|
||||
local_path: local_path,
|
||||
crypto: crypto,
|
||||
bundles: HashMap::new(),
|
||||
local_bundles: HashMap::new(),
|
||||
remote_bundles: HashMap::new(),
|
||||
bundle_cache: LruCache::new(5, 10)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn bundle_path(&self, bundle: &BundleId) -> (PathBuf, PathBuf) {
|
||||
let mut folder = self.remote_path.clone();
|
||||
let mut file = bundle.to_string().to_owned() + ".bundle";
|
||||
let mut count = self.bundles.len();
|
||||
while count >= 100 {
|
||||
if file.len() < 10 {
|
||||
break
|
||||
}
|
||||
folder = folder.join(&file[0..2]);
|
||||
file = file[2..].to_string();
|
||||
count /= 100;
|
||||
}
|
||||
(folder, file.into())
|
||||
fn load_bundle_list(&mut self) -> Result<(Vec<BundleId>, Vec<BundleInfo>), BundleError> {
|
||||
try!(load_bundles(self.local_path.join("cached"), &mut self.local_bundles));
|
||||
load_bundles(&self.remote_path, &mut self.remote_bundles)
|
||||
}
|
||||
|
||||
fn load_bundle_list(&mut self) -> Result<(), BundleError> {
|
||||
self.bundles.clear();
|
||||
let mut paths = Vec::new();
|
||||
paths.push(self.remote_path.clone());
|
||||
while let Some(path) = paths.pop() {
|
||||
for entry in try!(fs::read_dir(path).map_err(BundleError::List)) {
|
||||
let entry = try!(entry.map_err(BundleError::List));
|
||||
let path = entry.path();
|
||||
if path.is_dir() {
|
||||
paths.push(path);
|
||||
} else {
|
||||
let bundle = try!(Bundle::load(path, self.crypto.clone()));
|
||||
self.bundles.insert(bundle.id(), bundle);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
pub fn temp_bundle_path(&self, id: &BundleId) -> PathBuf {
|
||||
self.local_path.join("temp").join(id.to_string().to_owned() + ".bundle")
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn open<R: AsRef<Path>, L: AsRef<Path>>(remote_path: R, local_path: L, crypto: Arc<Mutex<Crypto>>) -> Result<Self, BundleError> {
|
||||
pub fn open<R: AsRef<Path>, L: AsRef<Path>>(remote_path: R, local_path: L, crypto: Arc<Mutex<Crypto>>) -> Result<(Self, Vec<BundleId>, Vec<BundleInfo>), BundleError> {
|
||||
let remote_path = remote_path.as_ref().to_owned();
|
||||
let local_path = local_path.as_ref().to_owned();
|
||||
let mut self_ = Self::new(remote_path, local_path, crypto);
|
||||
try!(self_.load_bundle_list());
|
||||
Ok(self_)
|
||||
let (new, gone) = try!(self_.load_bundle_list());
|
||||
Ok((self_, new, gone))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
@ -75,7 +137,8 @@ impl BundleDb {
|
|||
let remote_path = remote_path.as_ref().to_owned();
|
||||
let local_path = local_path.as_ref().to_owned();
|
||||
try!(fs::create_dir_all(&remote_path).context(&remote_path as &Path));
|
||||
try!(fs::create_dir_all(&local_path).context(&local_path as &Path));
|
||||
try!(fs::create_dir_all(local_path.join("cached")).context(&local_path as &Path));
|
||||
try!(fs::create_dir_all(local_path.join("temp")).context(&local_path as &Path));
|
||||
Ok(Self::new(remote_path, local_path, crypto))
|
||||
}
|
||||
|
||||
|
@ -90,41 +153,67 @@ impl BundleDb {
|
|||
BundleWriter::new(mode, hash_method, compression, encryption, self.crypto.clone())
|
||||
}
|
||||
|
||||
fn get_stored_bundle(&self, bundle_id: &BundleId) -> Result<&StoredBundle, BundleError> {
|
||||
if let Some(stored) = self.local_bundles.get(bundle_id).or_else(|| self.remote_bundles.get(bundle_id)) {
|
||||
Ok(stored)
|
||||
} else {
|
||||
Err(BundleError::NoSuchBundle(bundle_id.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
fn get_bundle(&self, stored: &StoredBundle) -> Result<Bundle, BundleError> {
|
||||
Bundle::load(stored.path.clone(), self.crypto.clone())
|
||||
}
|
||||
|
||||
pub fn get_chunk(&mut self, bundle_id: &BundleId, id: usize) -> Result<Vec<u8>, BundleError> {
|
||||
let bundle = try!(self.bundles.get_mut(bundle_id).ok_or(BundleError::NoSuchBundle(bundle_id.clone())));
|
||||
let (pos, len) = try!(bundle.get_chunk_position(id));
|
||||
let mut chunk = Vec::with_capacity(len);
|
||||
if let Some(data) = self.bundle_cache.get(bundle_id) {
|
||||
if let Some(&mut (ref mut bundle, ref data)) = self.bundle_cache.get_mut(bundle_id) {
|
||||
let (pos, len) = try!(bundle.get_chunk_position(id));
|
||||
let mut chunk = Vec::with_capacity(len);
|
||||
chunk.extend_from_slice(&data[pos..pos+len]);
|
||||
return Ok(chunk);
|
||||
}
|
||||
let mut bundle = try!(self.get_stored_bundle(bundle_id).and_then(|s| self.get_bundle(s)));
|
||||
let (pos, len) = try!(bundle.get_chunk_position(id));
|
||||
let mut chunk = Vec::with_capacity(len);
|
||||
let data = try!(bundle.load_contents());
|
||||
chunk.extend_from_slice(&data[pos..pos+len]);
|
||||
self.bundle_cache.put(bundle_id.clone(), data);
|
||||
self.bundle_cache.put(bundle_id.clone(), (bundle, data));
|
||||
Ok(chunk)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn add_bundle(&mut self, bundle: BundleWriter) -> Result<&Bundle, BundleError> {
|
||||
pub fn add_bundle(&mut self, bundle: BundleWriter) -> Result<BundleInfo, BundleError> {
|
||||
let bundle = try!(bundle.finish(&self));
|
||||
let id = bundle.id();
|
||||
self.bundles.insert(id.clone(), bundle);
|
||||
Ok(self.get_bundle(&id).unwrap())
|
||||
if bundle.info.mode == BundleMode::Meta {
|
||||
let (folder, filename) = bundle_path(&id, self.local_path.join("cached"), self.local_bundles.len());
|
||||
try!(fs::create_dir_all(&folder).context(&folder as &Path));
|
||||
let bundle = try!(bundle.copy_to(folder.join(filename)));
|
||||
self.local_bundles.insert(id.clone(), bundle);
|
||||
}
|
||||
let (folder, filename) = bundle_path(&id, self.remote_path.clone(), self.remote_bundles.len());
|
||||
try!(fs::create_dir_all(&folder).context(&folder as &Path));
|
||||
let bundle = try!(bundle.copy_to(folder.join(filename)));
|
||||
self.remote_bundles.insert(bundle.id(), bundle.clone());
|
||||
Ok(bundle.info)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn get_bundle(&self, bundle: &BundleId) -> Option<&Bundle> {
|
||||
self.bundles.get(bundle)
|
||||
pub fn get_bundle_info(&self, bundle: &BundleId) -> Option<&BundleInfo> {
|
||||
self.get_stored_bundle(bundle).ok().map(|stored| &stored.info)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn list_bundles(&self) -> Vec<&Bundle> {
|
||||
self.bundles.values().collect()
|
||||
pub fn list_bundles(&self) -> Vec<&BundleInfo> {
|
||||
self.remote_bundles.values().map(|b| &b.info).collect()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn delete_bundle(&mut self, bundle: &BundleId) -> Result<(), BundleError> {
|
||||
if let Some(bundle) = self.bundles.remove(bundle) {
|
||||
if let Some(bundle) = self.local_bundles.remove(bundle) {
|
||||
try!(fs::remove_file(&bundle.path).map_err(|e| BundleError::Remove(e, bundle.id())))
|
||||
}
|
||||
if let Some(bundle) = self.remote_bundles.remove(bundle) {
|
||||
fs::remove_file(&bundle.path).map_err(|e| BundleError::Remove(e, bundle.id()))
|
||||
} else {
|
||||
Err(BundleError::NoSuchBundle(bundle.clone()))
|
||||
|
@ -133,7 +222,8 @@ impl BundleDb {
|
|||
|
||||
#[inline]
|
||||
pub fn check(&mut self, full: bool) -> Result<(), BundleError> {
|
||||
for bundle in self.bundles.values_mut() {
|
||||
for stored in self.remote_bundles.values() {
|
||||
let mut bundle = try!(self.get_bundle(stored));
|
||||
try!(bundle.check(full))
|
||||
}
|
||||
Ok(())
|
||||
|
|
|
@ -2,8 +2,8 @@ use ::prelude::*;
|
|||
use super::*;
|
||||
|
||||
use std::path::Path;
|
||||
use std::fs::{self, File};
|
||||
use std::io::{Write, Seek, SeekFrom, BufWriter};
|
||||
use std::fs::File;
|
||||
use std::io::{Write, BufWriter};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
|
||||
|
@ -58,7 +58,7 @@ impl BundleWriter {
|
|||
Ok(self.chunk_count-1)
|
||||
}
|
||||
|
||||
pub fn finish(mut self, db: &BundleDb) -> Result<Bundle, BundleError> {
|
||||
pub fn finish(mut self, db: &BundleDb) -> Result<StoredBundle, BundleError> {
|
||||
if let Some(stream) = self.compression_stream {
|
||||
try!(stream.finish(&mut self.data))
|
||||
}
|
||||
|
@ -72,9 +72,7 @@ impl BundleWriter {
|
|||
if let Some(ref encryption) = self.encryption {
|
||||
chunk_data = try!(self.crypto.lock().unwrap().encrypt(&encryption, &chunk_data));
|
||||
}
|
||||
let (folder, file) = db.bundle_path(&id);
|
||||
let path = folder.join(file);
|
||||
try!(fs::create_dir_all(&folder).context(&path as &Path));
|
||||
let path = db.temp_bundle_path(&id);
|
||||
let mut file = BufWriter::new(try!(File::create(&path).context(&path as &Path)));
|
||||
try!(file.write_all(&HEADER_STRING).context(&path as &Path));
|
||||
try!(file.write_all(&[HEADER_VERSION]).context(&path as &Path));
|
||||
|
@ -91,9 +89,8 @@ impl BundleWriter {
|
|||
};
|
||||
try!(msgpack::encode_to_stream(&header, &mut file).context(&path as &Path));
|
||||
try!(file.write_all(&chunk_data).context(&path as &Path));
|
||||
let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize;
|
||||
try!(file.write_all(&self.data).context(&path as &Path));
|
||||
Ok(Bundle::new(path, HEADER_VERSION, content_start, self.crypto, header))
|
||||
Ok(StoredBundle { path: path, info: header })
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
|
|
@ -26,10 +26,7 @@ mod prelude;
|
|||
|
||||
// TODO: Seperate remote folder
|
||||
// TODO: - Copy/move backup files to remote folder
|
||||
// TODO: - Keep meta bundles also locally
|
||||
// TODO: - Load and compare remote bundles to bundle map
|
||||
// TODO: - Write backup files there as well
|
||||
// TODO: - Avoid loading remote backups
|
||||
// TODO: - Lock during vacuum
|
||||
// TODO: Remove backup subtrees
|
||||
// TODO: Recompress & combine bundles
|
||||
|
|
|
@ -94,8 +94,8 @@ impl BundleMap {
|
|||
}
|
||||
|
||||
#[inline]
|
||||
pub fn set(&mut self, id: u32, bundle: &Bundle) {
|
||||
let data = BundleData { info: bundle.info.clone() };
|
||||
pub fn set(&mut self, id: u32, bundle: BundleInfo) {
|
||||
let data = BundleData { info: bundle };
|
||||
self.0.insert(id, data);
|
||||
}
|
||||
|
||||
|
|
|
@ -46,13 +46,13 @@ impl Repository {
|
|||
// Lookup bundle id from map
|
||||
let bundle_id = try!(self.get_bundle_id(entry.data.bundle));
|
||||
// Get bundle object from bundledb
|
||||
let bundle = if let Some(bundle) = self.bundles.get_bundle(&bundle_id) {
|
||||
let bundle = if let Some(bundle) = self.bundles.get_bundle_info(&bundle_id) {
|
||||
bundle
|
||||
} else {
|
||||
return Err(RepositoryIntegrityError::MissingBundle(bundle_id.clone()).into())
|
||||
};
|
||||
// Get chunk from bundle
|
||||
if bundle.info.chunk_count <= entry.data.chunk as usize {
|
||||
if bundle.chunk_count <= entry.data.chunk as usize {
|
||||
return Err(RepositoryIntegrityError::NoSuchChunk(bundle_id.clone(), entry.data.chunk).into())
|
||||
}
|
||||
pos += 1;
|
||||
|
|
|
@ -75,7 +75,7 @@ impl Repository {
|
|||
let path = path.as_ref().to_owned();
|
||||
let config = try!(Config::load(path.join("config.yaml")));
|
||||
let crypto = Arc::new(Mutex::new(try!(Crypto::open(path.join("keys")))));
|
||||
let bundles = try!(BundleDb::open(
|
||||
let (bundles, new, gone) = try!(BundleDb::open(
|
||||
path.join("remote/bundles"),
|
||||
path.join("bundles"),
|
||||
crypto.clone()
|
||||
|
|
|
@ -41,6 +41,17 @@ impl<K: Eq+Hash, V> LruCache<K, V> {
|
|||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
|
||||
if let Some(&mut (ref mut item, ref mut n)) = self.items.get_mut(key) {
|
||||
*n = self.next;
|
||||
self.next += 1;
|
||||
Some(item)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn shrink(&mut self) {
|
||||
let mut tags: Vec<u64> = self.items.values().map(|&(_, n)| n).collect();
|
||||
|
|
Loading…
Reference in New Issue