Browse Source

Reformatted using rustfmt

pull/10/head
Dennis Schwerdel 4 years ago
parent
commit
d062aaa6d4
  1. 1
      CHANGELOG.md
  2. 2
      rustfmt.toml
  3. 26
      src/bundledb/cache.rs
  4. 187
      src/bundledb/db.rs
  5. 10
      src/bundledb/mod.rs
  6. 146
      src/bundledb/reader.rs
  7. 7
      src/bundledb/uploader.rs
  8. 41
      src/bundledb/writer.rs
  9. 18
      src/chunker.rs
  10. 162
      src/cli/algotest.rs
  11. 260
      src/cli/args.rs
  12. 22
      src/cli/logger.rs
  13. 780
      src/cli/mod.rs
  14. 20
      src/main.rs
  15. 390
      src/mount.rs
  16. 15
      src/prelude.rs
  17. 223
      src/repository/backup.rs
  18. 61
      src/repository/backup_file.rs
  19. 98
      src/repository/basic_io.rs
  20. 13
      src/repository/bundle_map.rs
  21. 23
      src/repository/config.rs
  22. 2
      src/repository/error.rs
  23. 34
      src/repository/info.rs
  24. 218
      src/repository/integrity.rs
  25. 19
      src/repository/layout.rs
  26. 129
      src/repository/metadata.rs
  27. 106
      src/repository/mod.rs
  28. 142
      src/repository/tarfile.rs
  29. 40
      src/repository/vacuum.rs
  30. 4
      src/util/bitmap.rs
  31. 72
      src/util/chunk.rs
  32. 8
      src/util/cli.rs
  33. 143
      src/util/compression.rs
  34. 124
      src/util/encryption.rs
  35. 8
      src/util/fs.rs
  36. 59
      src/util/hash.rs
  37. 15
      src/util/hex.rs
  38. 10
      src/util/hostname.rs
  39. 43
      src/util/lock.rs
  40. 6
      src/util/lru_cache.rs

1
CHANGELOG.md

@ -8,6 +8,7 @@ This project follows [semantic versioning](http://semver.org).
* [added] Added support for xattrs in fuse mount
* [added] Added support for block/char devices
* [added] Added support for fifo files
* [modified] Reformatted sources using rustfmt
* [modified] Also documenting common flags in subcommands
* [modified] Using repository aliases (**conversion needed**)
* [modified] Remote path must be absolute

2
rustfmt.toml

@ -0,0 +1,2 @@
trailing_semicolon = false
trailing_comma = "Never"

26
src/bundledb/cache.rs

@ -1,4 +1,4 @@
use ::prelude::*;
use prelude::*;
use std::path::{Path, PathBuf};
use std::fs::{self, File};
@ -62,7 +62,11 @@ impl StoredBundle {
self.info.id.clone()
}
pub fn copy_to<P: AsRef<Path>>(&self, base_path: &Path, path: P) -> Result<Self, BundleDbError> {
pub fn copy_to<P: AsRef<Path>>(
&self,
base_path: &Path,
path: P,
) -> Result<Self, BundleDbError> {
let src_path = base_path.join(&self.path);
let dst_path = path.as_ref();
try!(fs::copy(&src_path, dst_path).context(dst_path));
@ -71,7 +75,11 @@ impl StoredBundle {
Ok(bundle)
}
pub fn move_to<P: AsRef<Path>>(&mut self, base_path: &Path, path: P) -> Result<(), BundleDbError> {
pub fn move_to<P: AsRef<Path>>(
&mut self,
base_path: &Path,
path: P,
) -> Result<(), BundleDbError> {
let src_path = base_path.join(&self.path);
let dst_path = path.as_ref();
if fs::rename(&src_path, dst_path).is_err() {
@ -88,11 +96,11 @@ impl StoredBundle {
let mut header = [0u8; 8];
try!(file.read_exact(&mut header).map_err(BundleCacheError::Read));
if header[..CACHE_FILE_STRING.len()] != CACHE_FILE_STRING {
return Err(BundleCacheError::WrongHeader)
return Err(BundleCacheError::WrongHeader);
}
let version = header[CACHE_FILE_STRING.len()];
if version != CACHE_FILE_VERSION {
return Err(BundleCacheError::UnsupportedVersion(version))
return Err(BundleCacheError::UnsupportedVersion(version));
}
Ok(try!(msgpack::decode_from_stream(&mut file)))
}
@ -100,8 +108,12 @@ impl StoredBundle {
pub fn save_list_to<P: AsRef<Path>>(list: &[Self], path: P) -> Result<(), BundleCacheError> {
let path = path.as_ref();
let mut file = BufWriter::new(try!(File::create(path).map_err(BundleCacheError::Write)));
try!(file.write_all(&CACHE_FILE_STRING).map_err(BundleCacheError::Write));
try!(file.write_all(&[CACHE_FILE_VERSION]).map_err(BundleCacheError::Write));
try!(file.write_all(&CACHE_FILE_STRING).map_err(
BundleCacheError::Write
));
try!(file.write_all(&[CACHE_FILE_VERSION]).map_err(
BundleCacheError::Write
));
try!(msgpack::encode_to_stream(&list, &mut file));
Ok(())
}

187
src/bundledb/db.rs

@ -1,4 +1,4 @@
use ::prelude::*;
use prelude::*;
use super::*;
use std::path::{Path, PathBuf};
@ -57,7 +57,12 @@ quick_error!{
}
fn load_bundles(path: &Path, base: &Path, bundles: &mut HashMap<BundleId, StoredBundle>, crypto: Arc<Mutex<Crypto>>) -> Result<(Vec<StoredBundle>, Vec<StoredBundle>), BundleDbError> {
fn load_bundles(
path: &Path,
base: &Path,
bundles: &mut HashMap<BundleId, StoredBundle>,
crypto: Arc<Mutex<Crypto>>,
) -> Result<(Vec<StoredBundle>, Vec<StoredBundle>), BundleDbError> {
let mut paths = vec![path.to_path_buf()];
let mut bundle_paths = HashSet::new();
while let Some(path) = paths.pop() {
@ -68,7 +73,7 @@ fn load_bundles(path: &Path, base: &Path, bundles: &mut HashMap<BundleId, Stored
paths.push(path);
} else {
if path.extension() != Some("bundle".as_ref()) {
continue
continue;
}
bundle_paths.insert(path.strip_prefix(base).unwrap().to_path_buf());
}
@ -89,10 +94,13 @@ fn load_bundles(path: &Path, base: &Path, bundles: &mut HashMap<BundleId, Stored
Err(err) => {
warn!("Failed to read bundle {:?}\n\tcaused by: {}", path, err);
info!("Ignoring unreadable bundle");
continue
continue;
}
};
let bundle = StoredBundle { info: info, path: path };
let bundle = StoredBundle {
info: info,
path: path
};
let id = bundle.info.id.clone();
if !bundles.contains_key(&id) {
new.push(bundle.clone());
@ -129,7 +137,9 @@ impl BundleDb {
}
}
fn load_bundle_list(&mut self) -> Result<(Vec<StoredBundle>, Vec<StoredBundle>), BundleDbError> {
fn load_bundle_list(
&mut self,
) -> Result<(Vec<StoredBundle>, Vec<StoredBundle>), BundleDbError> {
if let Ok(list) = StoredBundle::read_list_from(&self.layout.local_bundle_cache_path()) {
for bundle in list {
self.local_bundles.insert(bundle.id(), bundle);
@ -145,15 +155,31 @@ impl BundleDb {
warn!("Failed to read remote bundle cache, rebuilding cache");
}
let base_path = self.layout.base_path();
let (new, gone) = try!(load_bundles(&self.layout.local_bundles_path(), base_path, &mut self.local_bundles, self.crypto.clone()));
let (new, gone) = try!(load_bundles(
&self.layout.local_bundles_path(),
base_path,
&mut self.local_bundles,
self.crypto.clone()
));
if !new.is_empty() || !gone.is_empty() {
let bundles: Vec<_> = self.local_bundles.values().cloned().collect();
try!(StoredBundle::save_list_to(&bundles, &self.layout.local_bundle_cache_path()));
try!(StoredBundle::save_list_to(
&bundles,
&self.layout.local_bundle_cache_path()
));
}
let (new, gone) = try!(load_bundles(&self.layout.remote_bundles_path(), base_path, &mut self.remote_bundles, self.crypto.clone()));
let (new, gone) = try!(load_bundles(
&self.layout.remote_bundles_path(),
base_path,
&mut self.remote_bundles,
self.crypto.clone()
));
if !new.is_empty() || !gone.is_empty() {
let bundles: Vec<_> = self.remote_bundles.values().cloned().collect();
try!(StoredBundle::save_list_to(&bundles, &self.layout.remote_bundle_cache_path()));
try!(StoredBundle::save_list_to(
&bundles,
&self.layout.remote_bundle_cache_path()
));
}
Ok((new, gone))
}
@ -164,9 +190,15 @@ impl BundleDb {
fn save_cache(&self) -> Result<(), BundleDbError> {
let bundles: Vec<_> = self.local_bundles.values().cloned().collect();
try!(StoredBundle::save_list_to(&bundles, &self.layout.local_bundle_cache_path()));
try!(StoredBundle::save_list_to(
&bundles,
&self.layout.local_bundle_cache_path()
));
let bundles: Vec<_> = self.remote_bundles.values().cloned().collect();
Ok(try!(StoredBundle::save_list_to(&bundles, &self.layout.remote_bundle_cache_path())))
Ok(try!(StoredBundle::save_list_to(
&bundles,
&self.layout.remote_bundle_cache_path()
)))
}
fn update_cache(&mut self) -> Result<(), BundleDbError> {
@ -192,13 +224,18 @@ impl BundleDb {
let base_path = self.layout.base_path();
for id in remove {
if let Some(bundle) = self.local_bundles.remove(&id) {
try!(fs::remove_file(base_path.join(&bundle.path)).map_err(|e| BundleDbError::Remove(e, id)))
try!(fs::remove_file(base_path.join(&bundle.path)).map_err(|e| {
BundleDbError::Remove(e, id)
}))
}
}
Ok(())
}
pub fn open(layout: RepositoryLayout, crypto: Arc<Mutex<Crypto>>) -> Result<(Self, Vec<BundleInfo>, Vec<BundleInfo>), BundleDbError> {
pub fn open(
layout: RepositoryLayout,
crypto: Arc<Mutex<Crypto>>,
) -> Result<(Self, Vec<BundleInfo>, Vec<BundleInfo>), BundleDbError> {
let mut self_ = Self::new(layout, crypto);
let (new, gone) = try!(self_.load_bundle_list());
try!(self_.update_cache());
@ -208,21 +245,51 @@ impl BundleDb {
}
pub fn create(layout: RepositoryLayout) -> Result<(), BundleDbError> {
try!(fs::create_dir_all(layout.remote_bundles_path()).context(&layout.remote_bundles_path() as &Path));
try!(fs::create_dir_all(layout.local_bundles_path()).context(&layout.local_bundles_path() as &Path));
try!(fs::create_dir_all(layout.temp_bundles_path()).context(&layout.temp_bundles_path() as &Path));
try!(StoredBundle::save_list_to(&[], layout.local_bundle_cache_path()));
try!(StoredBundle::save_list_to(&[], layout.remote_bundle_cache_path()));
try!(fs::create_dir_all(layout.remote_bundles_path()).context(
&layout.remote_bundles_path() as
&Path
));
try!(fs::create_dir_all(layout.local_bundles_path()).context(
&layout.local_bundles_path() as
&Path
));
try!(fs::create_dir_all(layout.temp_bundles_path()).context(
&layout.temp_bundles_path() as
&Path
));
try!(StoredBundle::save_list_to(
&[],
layout.local_bundle_cache_path()
));
try!(StoredBundle::save_list_to(
&[],
layout.remote_bundle_cache_path()
));
Ok(())
}
#[inline]
pub fn create_bundle(&self, mode: BundleMode, hash_method: HashMethod, compression: Option<Compression>, encryption: Option<Encryption>) -> Result<BundleWriter, BundleDbError> {
Ok(try!(BundleWriter::new(mode, hash_method, compression, encryption, self.crypto.clone())))
pub fn create_bundle(
&self,
mode: BundleMode,
hash_method: HashMethod,
compression: Option<Compression>,
encryption: Option<Encryption>,
) -> Result<BundleWriter, BundleDbError> {
Ok(try!(BundleWriter::new(
mode,
hash_method,
compression,
encryption,
self.crypto.clone()
)))
}
fn get_stored_bundle(&self, bundle_id: &BundleId) -> Result<&StoredBundle, BundleDbError> {
if let Some(stored) = self.local_bundles.get(bundle_id).or_else(|| self.remote_bundles.get(bundle_id)) {
if let Some(stored) = self.local_bundles.get(bundle_id).or_else(|| {
self.remote_bundles.get(bundle_id)
})
{
Ok(stored)
} else {
Err(BundleDbError::NoSuchBundle(bundle_id.clone()))
@ -232,21 +299,26 @@ impl BundleDb {
#[inline]
fn get_bundle(&self, stored: &StoredBundle) -> Result<BundleReader, BundleDbError> {
let base_path = self.layout.base_path();
Ok(try!(BundleReader::load(base_path.join(&stored.path), self.crypto.clone())))
Ok(try!(BundleReader::load(
base_path.join(&stored.path),
self.crypto.clone()
)))
}
pub fn get_chunk(&mut self, bundle_id: &BundleId, id: usize) -> Result<Vec<u8>, BundleDbError> {
if let Some(&mut (ref mut bundle, ref data)) = self.bundle_cache.get_mut(bundle_id) {
let (pos, len) = try!(bundle.get_chunk_position(id));
let mut chunk = Vec::with_capacity(len);
chunk.extend_from_slice(&data[pos..pos+len]);
chunk.extend_from_slice(&data[pos..pos + len]);
return Ok(chunk);
}
let mut bundle = try!(self.get_stored_bundle(bundle_id).and_then(|s| self.get_bundle(s)));
let mut bundle = try!(self.get_stored_bundle(bundle_id).and_then(
|s| self.get_bundle(s)
));
let (pos, len) = try!(bundle.get_chunk_position(id));
let mut chunk = Vec::with_capacity(len);
let data = try!(bundle.load_contents());
chunk.extend_from_slice(&data[pos..pos+len]);
chunk.extend_from_slice(&data[pos..pos + len]);
self.bundle_cache.put(bundle_id.clone(), (bundle, data));
Ok(chunk)
}
@ -255,7 +327,10 @@ impl BundleDb {
let id = bundle.id();
let (folder, filename) = self.layout.local_bundle_path(&id, self.local_bundles.len());
try!(fs::create_dir_all(&folder).context(&folder as &Path));
let bundle = try!(bundle.copy_to(self.layout.base_path(), folder.join(filename)));
let bundle = try!(bundle.copy_to(
self.layout.base_path(),
folder.join(filename)
));
self.local_bundles.insert(id, bundle);
Ok(())
}
@ -268,7 +343,10 @@ impl BundleDb {
let (folder, filename) = self.layout.remote_bundle_path(self.remote_bundles.len());
let dst_path = folder.join(filename);
let src_path = self.layout.base_path().join(bundle.path);
bundle.path = dst_path.strip_prefix(self.layout.base_path()).unwrap().to_path_buf();
bundle.path = dst_path
.strip_prefix(self.layout.base_path())
.unwrap()
.to_path_buf();
if self.uploader.is_none() {
self.uploader = Some(BundleUploader::new(5));
}
@ -288,7 +366,9 @@ impl BundleDb {
}
pub fn get_chunk_list(&self, bundle: &BundleId) -> Result<ChunkList, BundleDbError> {
let mut bundle = try!(self.get_stored_bundle(bundle).and_then(|stored| self.get_bundle(stored)));
let mut bundle = try!(self.get_stored_bundle(bundle).and_then(|stored| {
self.get_bundle(stored)
}));
Ok(try!(bundle.get_chunk_list()).clone())
}
@ -305,7 +385,9 @@ impl BundleDb {
pub fn delete_local_bundle(&mut self, bundle: &BundleId) -> Result<(), BundleDbError> {
if let Some(bundle) = self.local_bundles.remove(bundle) {
let path = self.layout.base_path().join(&bundle.path);
try!(fs::remove_file(path).map_err(|e| BundleDbError::Remove(e, bundle.id())))
try!(fs::remove_file(path).map_err(|e| {
BundleDbError::Remove(e, bundle.id())
}))
}
Ok(())
}
@ -322,24 +404,29 @@ impl BundleDb {
pub fn check(&mut self, full: bool, repair: bool) -> Result<bool, BundleDbError> {
let mut to_repair = vec![];
for (id, stored) in ProgressIter::new("checking bundles", self.remote_bundles.len(), self.remote_bundles.iter()) {
for (id, stored) in ProgressIter::new(
"checking bundles",
self.remote_bundles.len(),
self.remote_bundles.iter()
)
{
let mut bundle = match self.get_bundle(stored) {
Ok(bundle) => bundle,
Err(err) => {
if repair {
to_repair.push(id.clone());
continue
continue;
} else {
return Err(err)
return Err(err);
}
}
};
if let Err(err) = bundle.check(full) {
if repair {
to_repair.push(id.clone());
continue
continue;
} else {
return Err(err.into())
return Err(err.into());
}
}
}
@ -371,35 +458,52 @@ impl BundleDb {
let mut bundle = match self.get_bundle(&stored) {
Ok(bundle) => bundle,
Err(err) => {
warn!("Problem detected: failed to read bundle header: {}\n\tcaused by: {}", id, err);
warn!(
"Problem detected: failed to read bundle header: {}\n\tcaused by: {}",
id,
err
);
return self.evacuate_broken_bundle(stored);
}
};
let chunks = match bundle.get_chunk_list() {
Ok(chunks) => chunks.clone(),
Err(err) => {
warn!("Problem detected: failed to read bundle chunks: {}\n\tcaused by: {}", id, err);
warn!(
"Problem detected: failed to read bundle chunks: {}\n\tcaused by: {}",
id,
err
);
return self.evacuate_broken_bundle(stored);
}
};
let data = match bundle.load_contents() {
Ok(data) => data,
Err(err) => {
warn!("Problem detected: failed to read bundle data: {}\n\tcaused by: {}", id, err);
warn!(
"Problem detected: failed to read bundle data: {}\n\tcaused by: {}",
id,
err
);
return self.evacuate_broken_bundle(stored);
}
};
warn!("Problem detected: bundle data was truncated: {}", id);
info!("Copying readable data into new bundle");
let info = stored.info.clone();
let mut new_bundle = try!(self.create_bundle(info.mode, info.hash_method, info.compression, info.encryption));
let mut new_bundle = try!(self.create_bundle(
info.mode,
info.hash_method,
info.compression,
info.encryption
));
let mut pos = 0;
for (hash, mut len) in chunks.into_inner() {
if pos >= data.len() {
break
break;
}
len = min(len, (data.len() - pos) as u32);
try!(new_bundle.add(&data[pos..pos+len as usize], hash));
try!(new_bundle.add(&data[pos..pos + len as usize], hash));
pos += len as usize;
}
let bundle = try!(self.add_bundle(new_bundle));
@ -411,5 +515,4 @@ impl BundleDb {
pub fn len(&self) -> usize {
self.remote_bundles.len()
}
}

10
src/bundledb/mod.rs

@ -10,7 +10,7 @@ pub use self::reader::{BundleReader, BundleReaderError};
pub use self::db::*;
pub use self::uploader::BundleUploader;
use ::prelude::*;
use prelude::*;
use std::fmt;
use serde;
@ -47,7 +47,10 @@ impl BundleId {
#[inline]
pub fn random() -> Self {
BundleId(Hash{high: rand::random(), low: rand::random()})
BundleId(Hash {
high: rand::random(),
low: rand::random()
})
}
}
@ -68,7 +71,8 @@ impl fmt::Debug for BundleId {
#[derive(Eq, Debug, PartialEq, Clone, Copy)]
pub enum BundleMode {
Data, Meta
Data,
Meta
}
serde_impl!(BundleMode(u8) {
Data => 0,

146
src/bundledb/reader.rs

@ -1,4 +1,4 @@
use ::prelude::*;
use prelude::*;
use super::*;
use std::path::{Path, PathBuf};
@ -67,7 +67,13 @@ pub struct BundleReader {
}
impl BundleReader {
pub fn new(path: PathBuf, version: u8, content_start: usize, crypto: Arc<Mutex<Crypto>>, info: BundleInfo) -> Self {
pub fn new(
path: PathBuf,
version: u8,
content_start: usize,
crypto: Arc<Mutex<Crypto>>,
info: BundleInfo,
) -> Self {
BundleReader {
info: info,
chunks: None,
@ -84,54 +90,90 @@ impl BundleReader {
self.info.id.clone()
}
fn load_header<P: AsRef<Path>>(path: P, crypto: Arc<Mutex<Crypto>>) -> Result<(BundleInfo, u8, usize), BundleReaderError> {
fn load_header<P: AsRef<Path>>(
path: P,
crypto: Arc<Mutex<Crypto>>,
) -> Result<(BundleInfo, u8, usize), BundleReaderError> {
let path = path.as_ref();
let mut file = BufReader::new(try!(File::open(path).context(path)));
let mut header = [0u8; 8];
try!(file.read_exact(&mut header).context(path));
if header[..HEADER_STRING.len()] != HEADER_STRING {
return Err(BundleReaderError::WrongHeader(path.to_path_buf()))
return Err(BundleReaderError::WrongHeader(path.to_path_buf()));
}
let version = header[HEADER_STRING.len()];
if version != HEADER_VERSION {
return Err(BundleReaderError::UnsupportedVersion(path.to_path_buf(), version))
return Err(BundleReaderError::UnsupportedVersion(
path.to_path_buf(),
version
));
}
let header: BundleHeader = try!(msgpack::decode_from_stream(&mut file).context(path));
let mut info_data = Vec::with_capacity(header.info_size);
info_data.resize(header.info_size, 0);
try!(file.read_exact(&mut info_data).context(path));
if let Some(ref encryption) = header.encryption {
info_data = try!(crypto.lock().unwrap().decrypt(encryption, &info_data).context(path));
info_data = try!(
crypto
.lock()
.unwrap()
.decrypt(encryption, &info_data)
.context(path)
);
}
let mut info: BundleInfo = try!(msgpack::decode(&info_data).context(path));
info.encryption = header.encryption;
debug!("Load bundle {}", info.id);
let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize + info.chunk_list_size;
let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize +
info.chunk_list_size;
Ok((info, version, content_start))
}
#[inline]
pub fn load_info<P: AsRef<Path>>(path: P, crypto: Arc<Mutex<Crypto>>) -> Result<BundleInfo, BundleReaderError> {
pub fn load_info<P: AsRef<Path>>(
path: P,
crypto: Arc<Mutex<Crypto>>,
) -> Result<BundleInfo, BundleReaderError> {
Self::load_header(path, crypto).map(|b| b.0)
}
#[inline]
pub fn load(path: PathBuf, crypto: Arc<Mutex<Crypto>>) -> Result<Self, BundleReaderError> {
let (header, version, content_start) = try!(Self::load_header(&path, crypto.clone()));
Ok(BundleReader::new(path, version, content_start, crypto, header))
Ok(BundleReader::new(
path,
version,
content_start,
crypto,
header
))
}
fn load_chunklist(&mut self) -> Result<(), BundleReaderError> {
debug!("Load bundle chunklist {} ({:?})", self.info.id, self.info.mode);
debug!(
"Load bundle chunklist {} ({:?})",
self.info.id,
self.info.mode
);
let mut file = BufReader::new(try!(File::open(&self.path).context(&self.path as &Path)));
let len = self.info.chunk_list_size;
let start = self.content_start - len;
try!(file.seek(SeekFrom::Start(start as u64)).context(&self.path as &Path));
try!(file.seek(SeekFrom::Start(start as u64)).context(
&self.path as &Path
));
let mut chunk_data = Vec::with_capacity(len);
chunk_data.resize(self.info.chunk_list_size, 0);
try!(file.read_exact(&mut chunk_data).context(&self.path as &Path));
try!(file.read_exact(&mut chunk_data).context(
&self.path as &Path
));
if let Some(ref encryption) = self.info.encryption {
chunk_data = try!(self.crypto.lock().unwrap().decrypt(encryption, &chunk_data).context(&self.path as &Path));
chunk_data = try!(
self.crypto
.lock()
.unwrap()
.decrypt(encryption, &chunk_data)
.context(&self.path as &Path)
);
}
let chunks = ChunkList::read_from(&chunk_data);
let mut chunk_positions = Vec::with_capacity(chunks.len());
@ -156,20 +198,31 @@ impl BundleReader {
fn load_encoded_contents(&self) -> Result<Vec<u8>, BundleReaderError> {
debug!("Load bundle data {} ({:?})", self.info.id, self.info.mode);
let mut file = BufReader::new(try!(File::open(&self.path).context(&self.path as &Path)));
try!(file.seek(SeekFrom::Start(self.content_start as u64)).context(&self.path as &Path));
let mut data = Vec::with_capacity(max(self.info.encoded_size, self.info.raw_size)+1024);
try!(
file.seek(SeekFrom::Start(self.content_start as u64))
.context(&self.path as &Path)
);
let mut data = Vec::with_capacity(max(self.info.encoded_size, self.info.raw_size) + 1024);
try!(file.read_to_end(&mut data).context(&self.path as &Path));
Ok(data)
}
fn decode_contents(&self, mut data: Vec<u8>) -> Result<Vec<u8>, BundleReaderError> {
if let Some(ref encryption) = self.info.encryption {
data = try!(self.crypto.lock().unwrap().decrypt(encryption, &data).context(&self.path as &Path));
data = try!(
self.crypto
.lock()
.unwrap()
.decrypt(encryption, &data)
.context(&self.path as &Path)
);
}
if let Some(ref compression) = self.info.compression {
let mut stream = try!(compression.decompress_stream().context(&self.path as &Path));
let mut buffer = Vec::with_capacity(self.info.raw_size);
try!(stream.process(&data, &mut buffer).context(&self.path as &Path));
try!(stream.process(&data, &mut buffer).context(
&self.path as &Path
));
try!(stream.finish(&mut buffer).context(&self.path as &Path));
data = buffer;
}
@ -178,12 +231,14 @@ impl BundleReader {
#[inline]
pub fn load_contents(&self) -> Result<Vec<u8>, BundleReaderError> {
self.load_encoded_contents().and_then(|data| self.decode_contents(data))
self.load_encoded_contents().and_then(|data| {
self.decode_contents(data)
})
}
pub fn get_chunk_position(&mut self, id: usize) -> Result<(usize, usize), BundleReaderError> {
if id >= self.info.chunk_count {
return Err(BundleReaderError::NoSuchChunk(self.id(), id))
return Err(BundleReaderError::NoSuchChunk(self.id(), id));
}
if self.chunks.is_none() || self.chunk_positions.is_none() {
try!(self.load_chunklist());
@ -198,30 +253,46 @@ impl BundleReader {
try!(self.load_chunklist());
}
if self.info.chunk_count != self.chunks.as_ref().unwrap().len() {
return Err(BundleReaderError::Integrity(self.id(),
"Chunk list size does not match chunk count"))
return Err(BundleReaderError::Integrity(
self.id(),
"Chunk list size does not match chunk count"
));
}
if self.chunks.as_ref().unwrap().iter().map(|c| c.1 as usize).sum::<usize>() != self.info.raw_size {
return Err(BundleReaderError::Integrity(self.id(),
"Individual chunk sizes do not add up to total size"))
if self.chunks
.as_ref()
.unwrap()
.iter()
.map(|c| c.1 as usize)
.sum::<usize>() != self.info.raw_size
{
return Err(BundleReaderError::Integrity(
self.id(),
"Individual chunk sizes do not add up to total size"
));
}
if !full {
let size = try!(fs::metadata(&self.path).context(&self.path as &Path)).len();
if size as usize != self.info.encoded_size + self.content_start {
return Err(BundleReaderError::Integrity(self.id(),
"File size does not match size in header, truncated file"))
return Err(BundleReaderError::Integrity(
self.id(),
"File size does not match size in header, truncated file"
));
}
return Ok(())
return Ok(());
}
let encoded_contents = try!(self.load_encoded_contents());
if self.info.encoded_size != encoded_contents.len() {
return Err(BundleReaderError::Integrity(self.id(),
"Encoded data size does not match size in header, truncated bundle"))
return Err(BundleReaderError::Integrity(
self.id(),
"Encoded data size does not match size in header, truncated bundle"
));
}
let contents = try!(self.decode_contents(encoded_contents));
if self.info.raw_size != contents.len() {
return Err(BundleReaderError::Integrity(self.id(),
"Raw data size does not match size in header, truncated bundle"))
return Err(BundleReaderError::Integrity(
self.id(),
"Raw data size does not match size in header, truncated bundle"
));
}
//TODO: verify checksum
Ok(())
@ -230,8 +301,15 @@ impl BundleReader {
impl Debug for BundleReader {
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(fmt, "Bundle(\n\tid: {}\n\tpath: {:?}\n\tchunks: {}\n\tsize: {}, encoded: {}\n\tcompression: {:?}\n)",
self.info.id.to_string(), self.path, self.info.chunk_count, self.info.raw_size,
self.info.encoded_size, self.info.compression)
write!(
fmt,
"Bundle(\n\tid: {}\n\tpath: {:?}\n\tchunks: {}\n\tsize: {}, encoded: {}\n\tcompression: {:?}\n)",
self.info.id.to_string(),
self.path,
self.info.chunk_count,
self.info.raw_size,
self.info.encoded_size,
self.info.compression
)
}
}

7
src/bundledb/uploader.rs

@ -1,4 +1,4 @@
use ::prelude::*;
use prelude::*;
use std::sync::atomic::{Ordering, AtomicBool, AtomicUsize};
use std::sync::{Mutex, Condvar, Arc};
@ -28,7 +28,10 @@ impl BundleUploader {
wait: (Condvar::new(), Mutex::new(()))
});
let self2 = self_.clone();
thread::Builder::new().name("uploader".to_string()).spawn(move || self2.worker_thread()).unwrap();
thread::Builder::new()
.name("uploader".to_string())
.spawn(move || self2.worker_thread())
.unwrap();
self_
}

41
src/bundledb/writer.rs

@ -1,4 +1,4 @@
use ::prelude::*;
use prelude::*;
use super::*;
use std::path::{Path, PathBuf};
@ -54,14 +54,22 @@ pub struct BundleWriter {
crypto: Arc<Mutex<Crypto>>,
raw_size: usize,
chunk_count: usize,
chunks: ChunkList,
chunks: ChunkList
}
impl BundleWriter {
pub fn new(mode: BundleMode, hash_method: HashMethod, compression: Option<Compression>, encryption: Option<Encryption>, crypto: Arc<Mutex<Crypto>>) -> Result<Self, BundleWriterError> {
pub fn new(
mode: BundleMode,
hash_method: HashMethod,
compression: Option<Compression>,
encryption: Option<Encryption>,
crypto: Arc<Mutex<Crypto>>,
) -> Result<Self, BundleWriterError> {
let compression_stream = match compression {
Some(ref compression) => Some(try!(compression.compress_stream().map_err(BundleWriterError::CompressionSetup))),
None => None
Some(ref compression) => Some(try!(compression.compress_stream().map_err(
BundleWriterError::CompressionSetup
))),
None => None,
};
Ok(BundleWriter {
mode: mode,
@ -79,19 +87,23 @@ impl BundleWriter {
pub fn add(&mut self, chunk: &[u8], hash: Hash) -> Result<usize, BundleWriterError> {
if let Some(ref mut stream) = self.compression_stream {
try!(stream.process(chunk, &mut self.data).map_err(BundleWriterError::Compression))
try!(stream.process(chunk, &mut self.data).map_err(
BundleWriterError::Compression
))
} else {
self.data.extend_from_slice(chunk)
}
self.raw_size += chunk.len();
self.chunk_count += 1;
self.chunks.push((hash, chunk.len() as u32));
Ok(self.chunk_count-1)
Ok(self.chunk_count - 1)
}
pub fn finish(mut self, db: &BundleDb) -> Result<StoredBundle, BundleWriterError> {
if let Some(stream) = self.compression_stream {
try!(stream.finish(&mut self.data).map_err(BundleWriterError::Compression))
try!(stream.finish(&mut self.data).map_err(
BundleWriterError::Compression
))
}
if let Some(ref encryption) = self.encryption {
self.data = try!(self.crypto.lock().unwrap().encrypt(encryption, &self.data));
@ -127,12 +139,19 @@ impl BundleWriter {
encryption: self.encryption,
info_size: info_data.len()
};
try!(msgpack::encode_to_stream(&header, &mut file).context(&path as &Path));
try!(msgpack::encode_to_stream(&header, &mut file).context(
&path as &Path
));
try!(file.write_all(&info_data).context(&path as &Path));
try!(file.write_all(&chunk_data).context(&path as &Path));
try!(file.write_all(&self.data).context(&path as &Path));
path = path.strip_prefix(db.layout.base_path()).unwrap().to_path_buf();
Ok(StoredBundle { path: path, info: info })
path = path.strip_prefix(db.layout.base_path())
.unwrap()
.to_path_buf();
Ok(StoredBundle {
path: path,
info: info
})
}
#[inline]

18
src/chunker.rs

@ -25,13 +25,15 @@ impl ChunkerType {
"rabin" => Ok(ChunkerType::Rabin((avg_size, seed as u32))),
"fastcdc" => Ok(ChunkerType::FastCdc((avg_size, seed))),
"fixed" => Ok(ChunkerType::Fixed(avg_size)),
_ => Err("Unsupported chunker type")
_ => Err("Unsupported chunker type"),
}
}
pub fn from_string(name: &str) -> Result<Self, &'static str> {
let (name, size) = if let Some(pos) = name.find('/') {
let size = try!(usize::from_str(&name[pos+1..]).map_err(|_| "Chunk size must be a number"));
let size = try!(usize::from_str(&name[pos + 1..]).map_err(
|_| "Chunk size must be a number"
));
let name = &name[..pos];
(name, size)
} else {
@ -62,21 +64,23 @@ impl ChunkerType {
pub fn avg_size(&self) -> usize {
match *self {
ChunkerType::Ae(size) | ChunkerType::Fixed(size) => size,
ChunkerType::Ae(size) |
ChunkerType::Fixed(size) => size,
ChunkerType::Rabin((size, _seed)) => size,
ChunkerType::FastCdc((size, _seed)) => size
ChunkerType::FastCdc((size, _seed)) => size,
}
}
pub fn to_string(&self) -> String {
format!("{}/{}", self.name(), self.avg_size()/1024)
format!("{}/{}", self.name(), self.avg_size() / 1024)
}
pub fn seed(&self) -> u64 {
match *self {
ChunkerType::Ae(_size) | ChunkerType::Fixed(_size) => 0,
ChunkerType::Ae(_size) |
ChunkerType::Fixed(_size) => 0,
ChunkerType::Rabin((_size, seed)) => seed as u64,
ChunkerType::FastCdc((_size, seed)) => seed
ChunkerType::FastCdc((_size, seed)) => seed,
}
}
}

162
src/cli/algotest.rs

@ -1,4 +1,4 @@
use ::prelude::*;
use prelude::*;
use std::io::{self, Cursor, Read, Write};
use std::fs::File;
@ -41,7 +41,14 @@ fn chunk(data: &[u8], mut chunker: Box<Chunker>, sink: &mut ChunkSink) {
}
#[allow(dead_code)]
pub fn run(path: &str, bundle_size: usize, chunker: ChunkerType, compression: Option<Compression>, encrypt: bool,hash: HashMethod) {
pub fn run(
path: &str,
bundle_size: usize,
chunker: ChunkerType,
compression: Option<Compression>,
encrypt: bool,
hash: HashMethod,
) {
let mut total_write_time = 0.0;
let mut total_read_time = 0.0;
@ -50,42 +57,64 @@ pub fn run(path: &str, bundle_size: usize, chunker: ChunkerType, compression: Op
let total_size = file.metadata().unwrap().len();
let mut size = total_size;
let mut data = Vec::with_capacity(size as usize);
let read_time = Duration::span(|| {
file.read_to_end(&mut data).unwrap();
}).num_milliseconds() as f32 / 1_000.0;
println!("- {}, {}", to_duration(read_time), to_speed(size, read_time));
let read_time = Duration::span(|| { file.read_to_end(&mut data).unwrap(); })
.num_milliseconds() as f32 / 1_000.0;
println!(
"- {}, {}",
to_duration(read_time),
to_speed(size, read_time)
);
println!();
println!("Chunking data with {}, avg chunk size {} ...", chunker.name(), to_file_size(chunker.avg_size() as u64));
println!(
"Chunking data with {}, avg chunk size {} ...",
chunker.name(),
to_file_size(chunker.avg_size() as u64)
);
let mut chunk_sink = ChunkSink {
chunks: Vec::with_capacity(2*size as usize/chunker.avg_size()),
chunks: Vec::with_capacity(2 * size as usize / chunker.avg_size()),
written: 0,
pos: 0
};
let chunker = chunker.create();
let chunk_time = Duration::span(|| {
chunk(&data, chunker, &mut chunk_sink)
}).num_milliseconds() as f32 / 1_000.0;
let chunk_time = Duration::span(|| chunk(&data, chunker, &mut chunk_sink))
.num_milliseconds() as f32 / 1_000.0;
total_write_time += chunk_time;
println!("- {}, {}", to_duration(chunk_time), to_speed(size, chunk_time));
println!(
"- {}, {}",
to_duration(chunk_time),
to_speed(size, chunk_time)
);
let mut chunks = chunk_sink.chunks;
assert_eq!(chunks.iter().map(|c| c.1).sum::<usize>(), size as usize);
let chunk_size_avg = size as f32 / chunks.len() as f32;
let chunk_size_stddev = (chunks.iter().map(|c| (c.1 as f32 - chunk_size_avg).powi(2)).sum::<f32>() / (chunks.len() as f32 - 1.0)).sqrt();
println!("- {} chunks, avg size: {} ±{}", chunks.len(), to_file_size(chunk_size_avg as u64), to_file_size(chunk_size_stddev as u64));
let chunk_size_stddev = (chunks
.iter()
.map(|c| (c.1 as f32 - chunk_size_avg).powi(2))
.sum::<f32>() /
(chunks.len() as f32 - 1.0))
.sqrt();
println!(
"- {} chunks, avg size: {} ±{}",
chunks.len(),
to_file_size(chunk_size_avg as u64),
to_file_size(chunk_size_stddev as u64)
);
println!();
println!("Hashing chunks with {} ...", hash.name());
let mut hashes = Vec::with_capacity(chunks.len());
let hash_time = Duration::span(|| {
for &(pos, len) in &chunks {
hashes.push(hash.hash(&data[pos..pos+len]))
}
let hash_time = Duration::span(|| for &(pos, len) in &chunks {
hashes.push(hash.hash(&data[pos..pos + len]))
}).num_milliseconds() as f32 / 1_000.0;
total_write_time += hash_time;
println!("- {}, {}", to_duration(hash_time), to_speed(size, hash_time));
println!(
"- {}, {}",
to_duration(hash_time),
to_speed(size, hash_time)
);
let mut seen_hashes = HashSet::with_capacity(hashes.len());
let mut dups = Vec::new();
for (i, hash) in hashes.into_iter().enumerate() {
@ -99,7 +128,12 @@ pub fn run(path: &str, bundle_size: usize, chunker: ChunkerType, compression: Op
let (_, len) = chunks.remove(*i);
dup_size += len;
}
println!("- {} duplicate chunks, {}, {:.1}% saved", dups.len(), to_file_size(dup_size as u64), dup_size as f32 / size as f32*100.0);
println!(
"- {} duplicate chunks, {}, {:.1}% saved",
dups.len(),
to_file_size(dup_size as u64),
dup_size as f32 / size as f32 * 100.0
);
size -= dup_size as u64;
let mut bundles = Vec::new();
@ -109,14 +143,14 @@ pub fn run(path: &str, bundle_size: usize, chunker: ChunkerType, compression: Op
println!("Compressing chunks with {} ...", compression.to_string());
let compress_time = Duration::span(|| {
let mut bundle = Vec::with_capacity(bundle_size + 2*chunk_size_avg as usize);
let mut bundle = Vec::with_capacity(bundle_size + 2 * chunk_size_avg as usize);
let mut c = compression.compress_stream().unwrap();
for &(pos, len) in &chunks {
c.process(&data[pos..pos+len], &mut bundle).unwrap();
c.process(&data[pos..pos + len], &mut bundle).unwrap();
if bundle.len() >= bundle_size {
c.finish(&mut bundle).unwrap();
bundles.push(bundle);
bundle = Vec::with_capacity(bundle_size + 2*chunk_size_avg as usize);
bundle = Vec::with_capacity(bundle_size + 2 * chunk_size_avg as usize);
c = compression.compress_stream().unwrap();
}
}
@ -124,17 +158,26 @@ pub fn run(path: &str, bundle_size: usize, chunker: ChunkerType, compression: Op
bundles.push(bundle);
}).num_milliseconds() as f32 / 1_000.0;
total_write_time += compress_time;
println!("- {}, {}", to_duration(compress_time), to_speed(size, compress_time));
println!(
"- {}, {}",
to_duration(compress_time),
to_speed(size, compress_time)
);
let compressed_size = bundles.iter().map(|b| b.len()).sum::<usize>();
println!("- {} bundles, {}, {:.1}% saved", bundles.len(), to_file_size(compressed_size as u64), (size as f32 - compressed_size as f32)/size as f32*100.0);
println!(
"- {} bundles, {}, {:.1}% saved",
bundles.len(),
to_file_size(compressed_size as u64),
(size as f32 - compressed_size as f32) / size as f32 * 100.0
);
size = compressed_size as u64;
} else {
let mut bundle = Vec::with_capacity(bundle_size + 2*chunk_size_avg as usize);
let mut bundle = Vec::with_capacity(bundle_size + 2 * chunk_size_avg as usize);
for &(pos, len) in &chunks {
bundle.extend_from_slice(&data[pos..pos+len]);
bundle.extend_from_slice(&data[pos..pos + len]);
if bundle.len() >= bundle_size {
bundles.push(bundle);
bundle = Vec::with_capacity(bundle_size + 2*chunk_size_avg as usize);
bundle = Vec::with_capacity(bundle_size + 2 * chunk_size_avg as usize);
}
}
bundles.push(bundle);
@ -151,24 +194,28 @@ pub fn run(path: &str, bundle_size: usize, chunker: ChunkerType, compression: Op
println!("Encrypting bundles...");
let mut encrypted_bundles = Vec::with_capacity(bundles.len());
let encrypt_time = Duration::span(|| {
for bundle in bundles {
encrypted_bundles.push(crypto.encrypt(&encryption, &bundle).unwrap());
}
let encrypt_time = Duration::span(|| for bundle in bundles {
encrypted_bundles.push(crypto.encrypt(&encryption, &bundle).unwrap());
}).num_milliseconds() as f32 / 1_000.0;
println!("- {}, {}", to_duration(encrypt_time), to_speed(size, encrypt_time));
println!(
"- {}, {}",
to_duration(encrypt_time),
to_speed(size, encrypt_time)
);
total_write_time += encrypt_time;
println!();
println!("Decrypting bundles...");
bundles = Vec::with_capacity(encrypted_bundles.len());
let decrypt_time = Duration::span(|| {
for bundle in encrypted_bundles {
bundles.push(crypto.decrypt(&encryption, &bundle).unwrap());
}
let decrypt_time = Duration::span(|| for bundle in encrypted_bundles {
bundles.push(crypto.decrypt(&encryption, &bundle).unwrap());
}).num_milliseconds() as f32 / 1_000.0;
println!("- {}, {}", to_duration(decrypt_time), to_speed(size, decrypt_time));
println!(
"- {}, {}",
to_duration(decrypt_time),
to_speed(size, decrypt_time)
);
total_read_time += decrypt_time;
}
@ -176,21 +223,38 @@ pub fn run(path: &str, bundle_size: usize, chunker: ChunkerType, compression: Op
println!();
println!("Decompressing bundles with {} ...", compression.to_string());
let mut dummy = ChunkSink { chunks: vec![], written: 0, pos: 0 };
let decompress_time = Duration::span(|| {
for bundle in &bundles {
let mut c = compression.decompress_stream().unwrap();
c.process(bundle, &mut dummy).unwrap();
c.finish(&mut dummy).unwrap();
}
let mut dummy = ChunkSink {
chunks: vec![],
written: 0,
pos: 0
};
let decompress_time = Duration::span(|| for bundle in &bundles {
let mut c = compression.decompress_stream().unwrap();
c.process(bundle, &mut dummy).unwrap();
c.finish(&mut dummy).unwrap();
}).num_milliseconds() as f32 / 1_000.0;
println!("- {}, {}", to_duration(decompress_time), to_speed(total_size - dup_size as u64, decompress_time));
println!(
"- {}, {}",
to_duration(decompress_time),
to_speed(total_size - dup_size as u64, decompress_time)
);
total_read_time += decompress_time;
}
println!();
println!("Total storage size: {} / {}, ratio: {:.1}%", to_file_size(size as u64), to_file_size(total_size as u64), size as f32/total_size as f32*100.0);
println!("Total processing speed: {}", to_speed(total_size, total_write_time));
println!("Total read speed: {}", to_speed(total_size, total_read_time));
println!(
"Total storage size: {} / {}, ratio: {:.1}%",
to_file_size(size as u64),
to_file_size(total_size as u64),
size as f32 / total_size as f32 * 100.0
);
println!(
"Total processing speed: {}",
to_speed(total_size, total_write_time)
);
println!(
"Total read speed: {}",
to_speed(total_size, total_read_time)
);
}

260
src/cli/args.rs

@ -1,4 +1,4 @@
use ::prelude::*;
use prelude::*;
use super::*;
use std::path::{Path, PathBuf};
@ -78,7 +78,7 @@ pub enum Arguments {
repo_path_src: PathBuf,
backup_name_src: String,
repo_path_dst: PathBuf,
backup_name_dst: String,
backup_name_dst: String
},
Mount {
repo_path: PathBuf,
@ -86,10 +86,7 @@ pub enum Arguments {
inode: Option<String>,
mount_point: String
},
Versions {
repo_path: PathBuf,
path: String
},
Versions { repo_path: PathBuf, path: String },
Diff {
repo_path_old: PathBuf,
backup_name_old: String,
@ -98,12 +95,8 @@ pub enum Arguments {
backup_name_new: String,
inode_new: Option<String>
},
Analyze {
repo_path: PathBuf
},
BundleList {
repo_path: PathBuf
},
Analyze { repo_path: PathBuf },
BundleList { repo_path: PathBuf },
BundleInfo {
repo_path: PathBuf,
bundle_id: BundleId
@ -154,7 +147,12 @@ fn convert_repo_path(mut path_str: &str) -> PathBuf {
}
}
fn parse_repo_path(repo_path: &str, existing: bool, backup_restr: Option<bool>, path_restr: Option<bool>) -> Result<(PathBuf, Option<&str>, Option<&str>), String> {
fn parse_repo_path(
repo_path: &str,
existing: bool,
backup_restr: Option<bool>,
path_restr: Option<bool>,
) -> Result<(PathBuf, Option<&str>, Option<&str>), String> {
let mut parts = repo_path.splitn(3, "::");
let repo = convert_repo_path(parts.next().unwrap_or(""));
if existing && !repo.join("config.yaml").exists() {
@ -194,8 +192,13 @@ fn parse_repo_path(repo_path: &str, existing: bool, backup_restr: Option<bool>,
Ok((repo, backup, path))
}
#[allow(unknown_lints,needless_pass_by_value)]
fn validate_repo_path(repo_path: String, existing: bool, backup_restr: Option<bool>