From 1b9cf888e7d12c060a3b1d9c3e949f904f8e71ff Mon Sep 17 00:00:00 2001 From: Dennis Schwerdel Date: Fri, 17 Mar 2017 12:58:22 +0100 Subject: [PATCH] New bundle format --- src/bundle.rs | 112 +++++++++++++++++++++++++++-------- src/cli/args.rs | 79 ++++++++++++++++++++++-- src/cli/mod.rs | 44 ++++++++++---- src/main.rs | 2 +- src/repository/basic_io.rs | 4 +- src/repository/bundle_map.rs | 1 - 6 files changed, 198 insertions(+), 44 deletions(-) diff --git a/src/bundle.rs b/src/bundle.rs index 284570e..e82831b 100644 --- a/src/bundle.rs +++ b/src/bundle.rs @@ -1,7 +1,7 @@ use std::path::{Path, PathBuf}; use std::collections::HashMap; use std::fs::{self, File}; -use std::io::{self, Read, Write, Seek, SeekFrom, BufWriter, BufReader}; +use std::io::{self, Read, Write, Seek, SeekFrom, BufWriter, BufReader, Cursor}; use std::cmp::max; use std::fmt::{self, Debug, Write as FmtWrite}; use std::sync::{Arc, Mutex}; @@ -13,6 +13,17 @@ use util::*; static HEADER_STRING: [u8; 7] = *b"zbundle"; static HEADER_VERSION: u8 = 1; +/* + +Bundle format +- Magic header + version +- Encoded header structure (contains size of next structure) +- Encoded contents structure (with chunk sizes and hashes) +- Chunk data + +*/ + + quick_error!{ #[derive(Debug)] @@ -132,22 +143,24 @@ pub struct BundleInfo { pub mode: BundleMode, pub compression: Option, pub encryption: Option, + pub hash_method: HashMethod, pub checksum: Checksum, pub raw_size: usize, pub encoded_size: usize, pub chunk_count: usize, - pub chunk_sizes: Vec + pub contents_info_size: usize } serde_impl!(BundleInfo(u64) { id: BundleId => 0, - mode: BundleMode => 8, - compression: Option => 1, - encryption: Option => 2, - checksum: Checksum => 3, - raw_size: usize => 4, - encoded_size: usize => 5, - chunk_count: usize => 6, - chunk_sizes: Vec => 7 + mode: BundleMode => 1, + compression: Option => 2, + encryption: Option => 3, + hash_method: HashMethod => 4, + checksum: Checksum => 5, + raw_size: usize => 6, + encoded_size: usize => 7, + chunk_count: usize => 8, + contents_info_size: usize => 9 }); impl Default for BundleInfo { @@ -156,19 +169,32 @@ impl Default for BundleInfo { id: BundleId(vec![]), compression: None, encryption: None, + hash_method: HashMethod::Blake2, checksum: (ChecksumType::Blake2_256, msgpack::Bytes::new()), raw_size: 0, encoded_size: 0, chunk_count: 0, - chunk_sizes: vec![], - mode: BundleMode::Content + mode: BundleMode::Content, + contents_info_size: 0 } } } +#[derive(Clone, Default)] +pub struct BundleContentInfo { + pub chunk_sizes: Vec, + pub chunk_hashes: Vec +} +serde_impl!(BundleContentInfo(u64) { + chunk_sizes: Vec => 0, + chunk_hashes: Vec => 1 +}); + + pub struct Bundle { pub info: BundleInfo, + pub contents: BundleContentInfo, pub version: u8, pub path: PathBuf, crypto: Arc>, @@ -177,15 +203,16 @@ pub struct Bundle { } impl Bundle { - fn new(path: PathBuf, version: u8, content_start: usize, crypto: Arc>, info: BundleInfo) -> Self { - let mut chunk_positions = Vec::with_capacity(info.chunk_sizes.len()); + fn new(path: PathBuf, version: u8, content_start: usize, crypto: Arc>, info: BundleInfo, contents: BundleContentInfo) -> Self { + let mut chunk_positions = Vec::with_capacity(contents.chunk_sizes.len()); let mut pos = 0; - for len in &info.chunk_sizes { + for len in &contents.chunk_sizes { chunk_positions.push(pos); pos += *len; } Bundle { info: info, + contents: contents, version: version, path: path, crypto: crypto, @@ -210,10 +237,18 @@ impl Bundle { if version != HEADER_VERSION { return Err(BundleError::WrongVersion(path.clone(), version)) } - let header = try!(msgpack::decode_from_stream(&mut file) + let header: BundleInfo = try!(msgpack::decode_from_stream(&mut file) + .map_err(|e| BundleError::Decode(e, path.clone()))); + let mut contents_data = Vec::with_capacity(header.contents_info_size); + contents_data.resize(header.contents_info_size, 0); + try!(file.read_exact(&mut contents_data).map_err(|e| BundleError::Read(e, path.clone()))); + if let Some(ref encryption) = header.encryption { + contents_data = try!(crypto.lock().unwrap().decrypt(encryption.clone(), &contents_data)); + } + let contents = try!(msgpack::decode_from_stream(&mut Cursor::new(&contents_data)) .map_err(|e| BundleError::Decode(e, path.clone()))); let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize; - Ok(Bundle::new(path, version, content_start, crypto, header)) + Ok(Bundle::new(path, version, content_start, crypto, header, contents)) } #[inline] @@ -246,15 +281,16 @@ impl Bundle { if id >= self.info.chunk_count { return Err(BundleError::NoSuchChunk(self.id(), id)) } - Ok((self.chunk_positions[id], self.info.chunk_sizes[id])) + Ok((self.chunk_positions[id], self.contents.chunk_sizes[id])) } pub fn check(&self, full: bool) -> Result<(), BundleError> { - if self.info.chunk_count != self.info.chunk_sizes.len() { + //FIXME: adapt to new format + if self.info.chunk_count != self.contents.chunk_sizes.len() { return Err(BundleError::Integrity(self.id(), "Chunk list size does not match chunk count")) } - if self.info.chunk_sizes.iter().sum::() != self.info.raw_size { + if self.contents.chunk_sizes.iter().sum::() != self.info.raw_size { return Err(BundleError::Integrity(self.id(), "Individual chunk sizes do not add up to total size")) } @@ -294,6 +330,8 @@ impl Debug for Bundle { pub struct BundleWriter { mode: BundleMode, + hash_method: HashMethod, + hashes: Vec, data: Vec, compression: Option, compression_stream: Option, @@ -306,13 +344,22 @@ pub struct BundleWriter { } impl BundleWriter { - fn new(mode: BundleMode, compression: Option, encryption: Option, crypto: Arc>, checksum: ChecksumType) -> Result { + fn new( + mode: BundleMode, + hash_method: HashMethod, + compression: Option, + encryption: Option, + crypto: Arc>, + checksum: ChecksumType + ) -> Result { let compression_stream = match compression { Some(ref compression) => Some(try!(compression.compress_stream())), None => None }; Ok(BundleWriter { mode: mode, + hash_method: hash_method, + hashes: vec![], data: vec![], compression: compression, compression_stream: compression_stream, @@ -325,7 +372,7 @@ impl BundleWriter { }) } - pub fn add(&mut self, chunk: &[u8]) -> Result { + pub fn add(&mut self, chunk: &[u8], hash: Hash) -> Result { if let Some(ref mut stream) = self.compression_stream { try!(stream.process(chunk, &mut self.data)) } else { @@ -335,6 +382,7 @@ impl BundleWriter { self.raw_size += chunk.len(); self.chunk_count += 1; self.chunk_sizes.push(chunk.len()); + self.hashes.push(hash); Ok(self.chunk_count-1) } @@ -354,8 +402,19 @@ impl BundleWriter { let mut file = BufWriter::new(try!(File::create(&path).map_err(|e| BundleError::Write(e, path.clone())))); try!(file.write_all(&HEADER_STRING).map_err(|e| BundleError::Write(e, path.clone()))); try!(file.write_all(&[HEADER_VERSION]).map_err(|e| BundleError::Write(e, path.clone()))); + let contents = BundleContentInfo { + chunk_sizes: self.chunk_sizes, + chunk_hashes: self.hashes + }; + let mut contents_data = Vec::new(); + try!(msgpack::encode_to_stream(&contents, &mut contents_data) + .map_err(|e| BundleError::Encode(e, path.clone()))); + if let Some(ref encryption) = self.encryption { + contents_data = try!(self.crypto.lock().unwrap().encrypt(encryption.clone(), &contents_data)); + } let header = BundleInfo { mode: self.mode, + hash_method: self.hash_method, checksum: checksum, compression: self.compression, encryption: self.encryption, @@ -363,13 +422,14 @@ impl BundleWriter { id: id.clone(), raw_size: self.raw_size, encoded_size: encoded_size, - chunk_sizes: self.chunk_sizes + contents_info_size: contents_data.len() }; try!(msgpack::encode_to_stream(&header, &mut file) .map_err(|e| BundleError::Encode(e, path.clone()))); + try!(file.write_all(&contents_data).map_err(|e| BundleError::Write(e, path.clone()))); let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize; try!(file.write_all(&self.data).map_err(|e| BundleError::Write(e, path.clone()))); - Ok(Bundle::new(path, HEADER_VERSION, content_start, self.crypto, header)) + Ok(Bundle::new(path, HEADER_VERSION, content_start, self.crypto, header, contents)) } #[inline] @@ -469,8 +529,8 @@ impl BundleDb { } #[inline] - pub fn create_bundle(&self, mode: BundleMode) -> Result { - BundleWriter::new(mode, self.compression.clone(), self.encryption.clone(), self.crypto.clone(), self.checksum) + pub fn create_bundle(&self, mode: BundleMode, hash_method: HashMethod) -> Result { + BundleWriter::new(mode, hash_method, self.compression.clone(), self.encryption.clone(), self.crypto.clone(), self.checksum) } pub fn get_chunk(&mut self, bundle_id: &BundleId, id: usize) -> Result, BundleError> { diff --git a/src/cli/args.rs b/src/cli/args.rs index 387d3a5..6286b23 100644 --- a/src/cli/args.rs +++ b/src/cli/args.rs @@ -24,6 +24,15 @@ pub enum Arguments { inode: Option, dst_path: String }, + Remove { + repo_path: String, + backup_name: String, + inode: Option + }, + Vacuum { + repo_path: String, + ratio: f32 + }, Check { repo_path: String, backup_name: Option, @@ -43,6 +52,10 @@ pub enum Arguments { ListBundles { repo_path: String }, + Import { + repo_path: String, + remote_path: String + }, AlgoTest { file: String, bundle_size: usize, @@ -70,6 +83,16 @@ fn parse_num(num: &str, name: &str) -> u64 { } } +fn parse_float(num: &str, name: &str) -> f64 { + if let Ok(num) = num.parse::() { + num + } else { + error!("{} must be a floating-point number, was '{}'", name, num); + exit(1); + } +} + + fn parse_chunker(val: Option<&str>) -> ChunkerType { if let Ok(chunker) = ChunkerType::from_string(val.unwrap_or("fastcdc/8")) { chunker @@ -136,12 +159,21 @@ pub fn parse() -> Arguments { (@arg SRC: +required "source path to backup") ) (@subcommand restore => - (about: "restores a backup") + (about: "restores a backup (or subpath)") (@arg BACKUP: +required "repository::backup[::subpath] path") (@arg DST: +required "destination path for backup") ) + (@subcommand remove => + (about: "removes a backup or a subpath") + (@arg BACKUP: +required "repository::backup[::subpath] path") + ) + (@subcommand vacuum => + (about: "saves space by combining and recompressing bundles") + (@arg ratio: --ratio -r "ratio of unused chunks in a bundle to rewrite that bundle") + (@arg REPO: +required "path of the repository") + ) (@subcommand check => - (about: "checks the repository") + (about: "checks the repository, a backup or a backup subpath") (@arg full: --full "also check file contents") (@arg PATH: +required "repository[::backup] path") ) @@ -151,7 +183,12 @@ pub fn parse() -> Arguments { ) (@subcommand listbundles => (about: "lists bundles in a repository") - (@arg PATH: +required "repository path") + (@arg REPO: +required "path of the repository") + ) + (@subcommand import => + (about: "reconstruct a repository from the remote files") + (@arg REPO: +required "path of the local repository to create") + (@arg REMOTE: +required "remote repository path") ) (@subcommand info => (about: "displays information on a repository, a backup or a path in a backup") @@ -210,6 +247,29 @@ pub fn parse() -> Arguments { dst_path: args.value_of("DST").unwrap().to_string() } } + if let Some(args) = args.subcommand_matches("remove") { + let (repository, backup, inode) = split_repo_path(args.value_of("BACKUP").unwrap()); + if backup.is_none() { + println!("A backup must be specified"); + exit(1); + } + return Arguments::Remove { + repo_path: repository.to_string(), + backup_name: backup.unwrap().to_string(), + inode: inode.map(|v| v.to_string()) + } + } + if let Some(args) = args.subcommand_matches("vacuum") { + let (repository, backup, inode) = split_repo_path(args.value_of("REPO").unwrap()); + if backup.is_some() || inode.is_some() { + println!("No backups or subpaths may be given here"); + exit(1); + } + return Arguments::Vacuum { + repo_path: repository.to_string(), + ratio: parse_float(args.value_of("ratio").unwrap_or("0.5"), "ratio") as f32 + } + } if let Some(args) = args.subcommand_matches("check") { let (repository, backup, inode) = split_repo_path(args.value_of("PATH").unwrap()); return Arguments::Check { @@ -228,7 +288,7 @@ pub fn parse() -> Arguments { } } if let Some(args) = args.subcommand_matches("listbundles") { - let (repository, backup, inode) = split_repo_path(args.value_of("PATH").unwrap()); + let (repository, backup, inode) = split_repo_path(args.value_of("REPO").unwrap()); if backup.is_some() || inode.is_some() { println!("No backups or subpaths may be given here"); exit(1); @@ -245,6 +305,17 @@ pub fn parse() -> Arguments { inode: inode.map(|v| v.to_string()) } } + if let Some(args) = args.subcommand_matches("import") { + let (repository, backup, inode) = split_repo_path(args.value_of("REPO").unwrap()); + if backup.is_some() || inode.is_some() { + println!("No backups or subpaths may be given here"); + exit(1); + } + return Arguments::Import { + repo_path: repository.to_string(), + remote_path: args.value_of("REMOTE").unwrap().to_string() + } + } if let Some(args) = args.subcommand_matches("algotest") { return Arguments::AlgoTest { bundle_size: (parse_num(args.value_of("bundle_size").unwrap_or("25"), "Bundle size") * 1024 * 1024) as usize, diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 5629f45..fde3929 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -5,7 +5,7 @@ mod algotest; use chrono::prelude::*; use std::process::exit; -use ::repository::{Repository, Config, Inode, Backup}; +use ::repository::{Repository, Config, Backup}; use ::util::ChecksumType; use ::util::cli::*; use self::args::Arguments; @@ -64,14 +64,32 @@ pub fn run() { repo.restore_backup(&backup, &dst_path).unwrap(); } }, + Arguments::Remove{repo_path, backup_name, inode} => { + let repo = open_repository(&repo_path); + let _backup = get_backup(&repo, &backup_name); + if let Some(_inode) = inode { + error!("Removing backup subtrees is not implemented yet"); + return + } else { + error!("Removing backups is not implemented yet"); + return + } + }, + Arguments::Vacuum{repo_path, ..} => { + let _repo = open_repository(&repo_path); + error!("Vaccum is not implemented yet"); + return + }, Arguments::Check{repo_path, backup_name, inode, full} => { let mut repo = open_repository(&repo_path); if let Some(backup_name) = backup_name { - let backup = get_backup(&repo, &backup_name); - if let Some(inode) = inode { - unimplemented!() + let _backup = get_backup(&repo, &backup_name); + if let Some(_inode) = inode { + error!("Checking backup subtrees is not implemented yet"); + return } else { - unimplemented!() + error!("Checking backups is not implemented yet"); + return } } else { repo.check(full).unwrap() @@ -94,13 +112,14 @@ pub fn run() { println!("{}", backup); } } - } + }, Arguments::Info{repo_path, backup_name, inode} => { let repo = open_repository(&repo_path); if let Some(backup_name) = backup_name { let backup = get_backup(&repo, &backup_name); - if let Some(inode) = inode { - unimplemented!() + if let Some(_inode) = inode { + error!("Displaying information on single inodes is not implemented yet"); + return } else { println!("Date: {}", Local.timestamp(backup.date, 0).to_rfc2822()); println!("Duration: {}", to_duration(backup.duration)); @@ -124,12 +143,13 @@ pub fn run() { let index_usage = info.index_entries as f32 / info.index_capacity as f32; println!("Index: {}, {:.0}% full", to_file_size(info.index_size as u64), index_usage * 100.0); } - } + }, Arguments::ListBundles{repo_path} => { let repo = open_repository(&repo_path); for bundle in repo.list_bundles() { println!("Bundle {}", bundle.id); println!(" - Mode: {:?}", bundle.mode); + println!(" - Hash method: {:?}", bundle.hash_method); println!(" - Chunks: {}", bundle.chunk_count); println!(" - Size: {}", to_file_size(bundle.encoded_size as u64)); println!(" - Data size: {}", to_file_size(bundle.raw_size as u64)); @@ -142,7 +162,11 @@ pub fn run() { println!(" - Compression: {}, ratio: {:.1}%", compression, ratio * 100.0); println!(); } - } + }, + Arguments::Import{..} => { + error!("Import is not implemented yet"); + return + }, Arguments::AlgoTest{bundle_size, chunker, compression, hash, file} => { algotest::run(&file, bundle_size, chunker, compression, hash); } diff --git a/src/main.rs b/src/main.rs index 9482388..9cb46e1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,7 +24,7 @@ mod cli; // TODO: - Keep meta bundles also locally // TODO: - Load and compare remote bundles to bundle map // TODO: - Write backup files there as well -// TODO: Store list of hashes in bundle +// TODO: Store list of hashes and hash method in bundle // TODO: Remove backups/subtrees // TODO: Recompress & combine bundles // TODO: Prune backups (based on age like attic) diff --git a/src/repository/basic_io.rs b/src/repository/basic_io.rs index b67d3c3..4c81c13 100644 --- a/src/repository/basic_io.rs +++ b/src/repository/basic_io.rs @@ -49,7 +49,7 @@ impl Repository { }; // ...alocate one if needed if writer.is_none() { - *writer = Some(try!(self.bundles.create_bundle(mode))); + *writer = Some(try!(self.bundles.create_bundle(mode, self.config.hash))); } debug_assert!(writer.is_some()); let chunk_id; @@ -58,7 +58,7 @@ impl Repository { { // Add chunk to bundle writer and determine the size of the bundle let writer_obj = writer.as_mut().unwrap(); - chunk_id = try!(writer_obj.add(data)); + chunk_id = try!(writer_obj.add(data, hash)); size = writer_obj.size(); raw_size = writer_obj.raw_size(); } diff --git a/src/repository/bundle_map.rs b/src/repository/bundle_map.rs index d27073f..b8aaa04 100644 --- a/src/repository/bundle_map.rs +++ b/src/repository/bundle_map.rs @@ -77,7 +77,6 @@ impl BundleMap { Ok(BundleMap(try!(msgpack::decode_from_stream(&mut file)))) } - pub fn save>(&self, path: P) -> Result<(), BundleMapError> { let mut file = BufWriter::new(try!(File::create(path))); try!(file.write_all(&HEADER_STRING));