New bundle format

pull/10/head
Dennis Schwerdel 2017-03-17 12:58:22 +01:00
parent 01c2ab16f9
commit 1b9cf888e7
6 changed files with 198 additions and 44 deletions

View File

@ -1,7 +1,7 @@
use std::path::{Path, PathBuf};
use std::collections::HashMap;
use std::fs::{self, File};
use std::io::{self, Read, Write, Seek, SeekFrom, BufWriter, BufReader};
use std::io::{self, Read, Write, Seek, SeekFrom, BufWriter, BufReader, Cursor};
use std::cmp::max;
use std::fmt::{self, Debug, Write as FmtWrite};
use std::sync::{Arc, Mutex};
@ -13,6 +13,17 @@ use util::*;
static HEADER_STRING: [u8; 7] = *b"zbundle";
static HEADER_VERSION: u8 = 1;
/*
Bundle format
- Magic header + version
- Encoded header structure (contains size of next structure)
- Encoded contents structure (with chunk sizes and hashes)
- Chunk data
*/
quick_error!{
#[derive(Debug)]
@ -132,22 +143,24 @@ pub struct BundleInfo {
pub mode: BundleMode,
pub compression: Option<Compression>,
pub encryption: Option<Encryption>,
pub hash_method: HashMethod,
pub checksum: Checksum,
pub raw_size: usize,
pub encoded_size: usize,
pub chunk_count: usize,
pub chunk_sizes: Vec<usize>
pub contents_info_size: usize
}
serde_impl!(BundleInfo(u64) {
id: BundleId => 0,
mode: BundleMode => 8,
compression: Option<Compression> => 1,
encryption: Option<Encryption> => 2,
checksum: Checksum => 3,
raw_size: usize => 4,
encoded_size: usize => 5,
chunk_count: usize => 6,
chunk_sizes: Vec<usize> => 7
mode: BundleMode => 1,
compression: Option<Compression> => 2,
encryption: Option<Encryption> => 3,
hash_method: HashMethod => 4,
checksum: Checksum => 5,
raw_size: usize => 6,
encoded_size: usize => 7,
chunk_count: usize => 8,
contents_info_size: usize => 9
});
impl Default for BundleInfo {
@ -156,19 +169,32 @@ impl Default for BundleInfo {
id: BundleId(vec![]),
compression: None,
encryption: None,
hash_method: HashMethod::Blake2,
checksum: (ChecksumType::Blake2_256, msgpack::Bytes::new()),
raw_size: 0,
encoded_size: 0,
chunk_count: 0,
chunk_sizes: vec![],
mode: BundleMode::Content
mode: BundleMode::Content,
contents_info_size: 0
}
}
}
#[derive(Clone, Default)]
pub struct BundleContentInfo {
pub chunk_sizes: Vec<usize>,
pub chunk_hashes: Vec<Hash>
}
serde_impl!(BundleContentInfo(u64) {
chunk_sizes: Vec<usize> => 0,
chunk_hashes: Vec<Hash> => 1
});
pub struct Bundle {
pub info: BundleInfo,
pub contents: BundleContentInfo,
pub version: u8,
pub path: PathBuf,
crypto: Arc<Mutex<Crypto>>,
@ -177,15 +203,16 @@ pub struct Bundle {
}
impl Bundle {
fn new(path: PathBuf, version: u8, content_start: usize, crypto: Arc<Mutex<Crypto>>, info: BundleInfo) -> Self {
let mut chunk_positions = Vec::with_capacity(info.chunk_sizes.len());
fn new(path: PathBuf, version: u8, content_start: usize, crypto: Arc<Mutex<Crypto>>, info: BundleInfo, contents: BundleContentInfo) -> Self {
let mut chunk_positions = Vec::with_capacity(contents.chunk_sizes.len());
let mut pos = 0;
for len in &info.chunk_sizes {
for len in &contents.chunk_sizes {
chunk_positions.push(pos);
pos += *len;
}
Bundle {
info: info,
contents: contents,
version: version,
path: path,
crypto: crypto,
@ -210,10 +237,18 @@ impl Bundle {
if version != HEADER_VERSION {
return Err(BundleError::WrongVersion(path.clone(), version))
}
let header = try!(msgpack::decode_from_stream(&mut file)
let header: BundleInfo = try!(msgpack::decode_from_stream(&mut file)
.map_err(|e| BundleError::Decode(e, path.clone())));
let mut contents_data = Vec::with_capacity(header.contents_info_size);
contents_data.resize(header.contents_info_size, 0);
try!(file.read_exact(&mut contents_data).map_err(|e| BundleError::Read(e, path.clone())));
if let Some(ref encryption) = header.encryption {
contents_data = try!(crypto.lock().unwrap().decrypt(encryption.clone(), &contents_data));
}
let contents = try!(msgpack::decode_from_stream(&mut Cursor::new(&contents_data))
.map_err(|e| BundleError::Decode(e, path.clone())));
let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize;
Ok(Bundle::new(path, version, content_start, crypto, header))
Ok(Bundle::new(path, version, content_start, crypto, header, contents))
}
#[inline]
@ -246,15 +281,16 @@ impl Bundle {
if id >= self.info.chunk_count {
return Err(BundleError::NoSuchChunk(self.id(), id))
}
Ok((self.chunk_positions[id], self.info.chunk_sizes[id]))
Ok((self.chunk_positions[id], self.contents.chunk_sizes[id]))
}
pub fn check(&self, full: bool) -> Result<(), BundleError> {
if self.info.chunk_count != self.info.chunk_sizes.len() {
//FIXME: adapt to new format
if self.info.chunk_count != self.contents.chunk_sizes.len() {
return Err(BundleError::Integrity(self.id(),
"Chunk list size does not match chunk count"))
}
if self.info.chunk_sizes.iter().sum::<usize>() != self.info.raw_size {
if self.contents.chunk_sizes.iter().sum::<usize>() != self.info.raw_size {
return Err(BundleError::Integrity(self.id(),
"Individual chunk sizes do not add up to total size"))
}
@ -294,6 +330,8 @@ impl Debug for Bundle {
pub struct BundleWriter {
mode: BundleMode,
hash_method: HashMethod,
hashes: Vec<Hash>,
data: Vec<u8>,
compression: Option<Compression>,
compression_stream: Option<CompressionStream>,
@ -306,13 +344,22 @@ pub struct BundleWriter {
}
impl BundleWriter {
fn new(mode: BundleMode, compression: Option<Compression>, encryption: Option<Encryption>, crypto: Arc<Mutex<Crypto>>, checksum: ChecksumType) -> Result<Self, BundleError> {
fn new(
mode: BundleMode,
hash_method: HashMethod,
compression: Option<Compression>,
encryption: Option<Encryption>,
crypto: Arc<Mutex<Crypto>>,
checksum: ChecksumType
) -> Result<Self, BundleError> {
let compression_stream = match compression {
Some(ref compression) => Some(try!(compression.compress_stream())),
None => None
};
Ok(BundleWriter {
mode: mode,
hash_method: hash_method,
hashes: vec![],
data: vec![],
compression: compression,
compression_stream: compression_stream,
@ -325,7 +372,7 @@ impl BundleWriter {
})
}
pub fn add(&mut self, chunk: &[u8]) -> Result<usize, BundleError> {
pub fn add(&mut self, chunk: &[u8], hash: Hash) -> Result<usize, BundleError> {
if let Some(ref mut stream) = self.compression_stream {
try!(stream.process(chunk, &mut self.data))
} else {
@ -335,6 +382,7 @@ impl BundleWriter {
self.raw_size += chunk.len();
self.chunk_count += 1;
self.chunk_sizes.push(chunk.len());
self.hashes.push(hash);
Ok(self.chunk_count-1)
}
@ -354,8 +402,19 @@ impl BundleWriter {
let mut file = BufWriter::new(try!(File::create(&path).map_err(|e| BundleError::Write(e, path.clone()))));
try!(file.write_all(&HEADER_STRING).map_err(|e| BundleError::Write(e, path.clone())));
try!(file.write_all(&[HEADER_VERSION]).map_err(|e| BundleError::Write(e, path.clone())));
let contents = BundleContentInfo {
chunk_sizes: self.chunk_sizes,
chunk_hashes: self.hashes
};
let mut contents_data = Vec::new();
try!(msgpack::encode_to_stream(&contents, &mut contents_data)
.map_err(|e| BundleError::Encode(e, path.clone())));
if let Some(ref encryption) = self.encryption {
contents_data = try!(self.crypto.lock().unwrap().encrypt(encryption.clone(), &contents_data));
}
let header = BundleInfo {
mode: self.mode,
hash_method: self.hash_method,
checksum: checksum,
compression: self.compression,
encryption: self.encryption,
@ -363,13 +422,14 @@ impl BundleWriter {
id: id.clone(),
raw_size: self.raw_size,
encoded_size: encoded_size,
chunk_sizes: self.chunk_sizes
contents_info_size: contents_data.len()
};
try!(msgpack::encode_to_stream(&header, &mut file)
.map_err(|e| BundleError::Encode(e, path.clone())));
try!(file.write_all(&contents_data).map_err(|e| BundleError::Write(e, path.clone())));
let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize;
try!(file.write_all(&self.data).map_err(|e| BundleError::Write(e, path.clone())));
Ok(Bundle::new(path, HEADER_VERSION, content_start, self.crypto, header))
Ok(Bundle::new(path, HEADER_VERSION, content_start, self.crypto, header, contents))
}
#[inline]
@ -469,8 +529,8 @@ impl BundleDb {
}
#[inline]
pub fn create_bundle(&self, mode: BundleMode) -> Result<BundleWriter, BundleError> {
BundleWriter::new(mode, self.compression.clone(), self.encryption.clone(), self.crypto.clone(), self.checksum)
pub fn create_bundle(&self, mode: BundleMode, hash_method: HashMethod) -> Result<BundleWriter, BundleError> {
BundleWriter::new(mode, hash_method, self.compression.clone(), self.encryption.clone(), self.crypto.clone(), self.checksum)
}
pub fn get_chunk(&mut self, bundle_id: &BundleId, id: usize) -> Result<Vec<u8>, BundleError> {

View File

@ -24,6 +24,15 @@ pub enum Arguments {
inode: Option<String>,
dst_path: String
},
Remove {
repo_path: String,
backup_name: String,
inode: Option<String>
},
Vacuum {
repo_path: String,
ratio: f32
},
Check {
repo_path: String,
backup_name: Option<String>,
@ -43,6 +52,10 @@ pub enum Arguments {
ListBundles {
repo_path: String
},
Import {
repo_path: String,
remote_path: String
},
AlgoTest {
file: String,
bundle_size: usize,
@ -70,6 +83,16 @@ fn parse_num(num: &str, name: &str) -> u64 {
}
}
fn parse_float(num: &str, name: &str) -> f64 {
if let Ok(num) = num.parse::<f64>() {
num
} else {
error!("{} must be a floating-point number, was '{}'", name, num);
exit(1);
}
}
fn parse_chunker(val: Option<&str>) -> ChunkerType {
if let Ok(chunker) = ChunkerType::from_string(val.unwrap_or("fastcdc/8")) {
chunker
@ -136,12 +159,21 @@ pub fn parse() -> Arguments {
(@arg SRC: +required "source path to backup")
)
(@subcommand restore =>
(about: "restores a backup")
(about: "restores a backup (or subpath)")
(@arg BACKUP: +required "repository::backup[::subpath] path")
(@arg DST: +required "destination path for backup")
)
(@subcommand remove =>
(about: "removes a backup or a subpath")
(@arg BACKUP: +required "repository::backup[::subpath] path")
)
(@subcommand vacuum =>
(about: "saves space by combining and recompressing bundles")
(@arg ratio: --ratio -r "ratio of unused chunks in a bundle to rewrite that bundle")
(@arg REPO: +required "path of the repository")
)
(@subcommand check =>
(about: "checks the repository")
(about: "checks the repository, a backup or a backup subpath")
(@arg full: --full "also check file contents")
(@arg PATH: +required "repository[::backup] path")
)
@ -151,7 +183,12 @@ pub fn parse() -> Arguments {
)
(@subcommand listbundles =>
(about: "lists bundles in a repository")
(@arg PATH: +required "repository path")
(@arg REPO: +required "path of the repository")
)
(@subcommand import =>
(about: "reconstruct a repository from the remote files")
(@arg REPO: +required "path of the local repository to create")
(@arg REMOTE: +required "remote repository path")
)
(@subcommand info =>
(about: "displays information on a repository, a backup or a path in a backup")
@ -210,6 +247,29 @@ pub fn parse() -> Arguments {
dst_path: args.value_of("DST").unwrap().to_string()
}
}
if let Some(args) = args.subcommand_matches("remove") {
let (repository, backup, inode) = split_repo_path(args.value_of("BACKUP").unwrap());
if backup.is_none() {
println!("A backup must be specified");
exit(1);
}
return Arguments::Remove {
repo_path: repository.to_string(),
backup_name: backup.unwrap().to_string(),
inode: inode.map(|v| v.to_string())
}
}
if let Some(args) = args.subcommand_matches("vacuum") {
let (repository, backup, inode) = split_repo_path(args.value_of("REPO").unwrap());
if backup.is_some() || inode.is_some() {
println!("No backups or subpaths may be given here");
exit(1);
}
return Arguments::Vacuum {
repo_path: repository.to_string(),
ratio: parse_float(args.value_of("ratio").unwrap_or("0.5"), "ratio") as f32
}
}
if let Some(args) = args.subcommand_matches("check") {
let (repository, backup, inode) = split_repo_path(args.value_of("PATH").unwrap());
return Arguments::Check {
@ -228,7 +288,7 @@ pub fn parse() -> Arguments {
}
}
if let Some(args) = args.subcommand_matches("listbundles") {
let (repository, backup, inode) = split_repo_path(args.value_of("PATH").unwrap());
let (repository, backup, inode) = split_repo_path(args.value_of("REPO").unwrap());
if backup.is_some() || inode.is_some() {
println!("No backups or subpaths may be given here");
exit(1);
@ -245,6 +305,17 @@ pub fn parse() -> Arguments {
inode: inode.map(|v| v.to_string())
}
}
if let Some(args) = args.subcommand_matches("import") {
let (repository, backup, inode) = split_repo_path(args.value_of("REPO").unwrap());
if backup.is_some() || inode.is_some() {
println!("No backups or subpaths may be given here");
exit(1);
}
return Arguments::Import {
repo_path: repository.to_string(),
remote_path: args.value_of("REMOTE").unwrap().to_string()
}
}
if let Some(args) = args.subcommand_matches("algotest") {
return Arguments::AlgoTest {
bundle_size: (parse_num(args.value_of("bundle_size").unwrap_or("25"), "Bundle size") * 1024 * 1024) as usize,

View File

@ -5,7 +5,7 @@ mod algotest;
use chrono::prelude::*;
use std::process::exit;
use ::repository::{Repository, Config, Inode, Backup};
use ::repository::{Repository, Config, Backup};
use ::util::ChecksumType;
use ::util::cli::*;
use self::args::Arguments;
@ -64,14 +64,32 @@ pub fn run() {
repo.restore_backup(&backup, &dst_path).unwrap();
}
},
Arguments::Remove{repo_path, backup_name, inode} => {
let repo = open_repository(&repo_path);
let _backup = get_backup(&repo, &backup_name);
if let Some(_inode) = inode {
error!("Removing backup subtrees is not implemented yet");
return
} else {
error!("Removing backups is not implemented yet");
return
}
},
Arguments::Vacuum{repo_path, ..} => {
let _repo = open_repository(&repo_path);
error!("Vaccum is not implemented yet");
return
},
Arguments::Check{repo_path, backup_name, inode, full} => {
let mut repo = open_repository(&repo_path);
if let Some(backup_name) = backup_name {
let backup = get_backup(&repo, &backup_name);
if let Some(inode) = inode {
unimplemented!()
let _backup = get_backup(&repo, &backup_name);
if let Some(_inode) = inode {
error!("Checking backup subtrees is not implemented yet");
return
} else {
unimplemented!()
error!("Checking backups is not implemented yet");
return
}
} else {
repo.check(full).unwrap()
@ -94,13 +112,14 @@ pub fn run() {
println!("{}", backup);
}
}
}
},
Arguments::Info{repo_path, backup_name, inode} => {
let repo = open_repository(&repo_path);
if let Some(backup_name) = backup_name {
let backup = get_backup(&repo, &backup_name);
if let Some(inode) = inode {
unimplemented!()
if let Some(_inode) = inode {
error!("Displaying information on single inodes is not implemented yet");
return
} else {
println!("Date: {}", Local.timestamp(backup.date, 0).to_rfc2822());
println!("Duration: {}", to_duration(backup.duration));
@ -124,12 +143,13 @@ pub fn run() {
let index_usage = info.index_entries as f32 / info.index_capacity as f32;
println!("Index: {}, {:.0}% full", to_file_size(info.index_size as u64), index_usage * 100.0);
}
}
},
Arguments::ListBundles{repo_path} => {
let repo = open_repository(&repo_path);
for bundle in repo.list_bundles() {
println!("Bundle {}", bundle.id);
println!(" - Mode: {:?}", bundle.mode);
println!(" - Hash method: {:?}", bundle.hash_method);
println!(" - Chunks: {}", bundle.chunk_count);
println!(" - Size: {}", to_file_size(bundle.encoded_size as u64));
println!(" - Data size: {}", to_file_size(bundle.raw_size as u64));
@ -142,7 +162,11 @@ pub fn run() {
println!(" - Compression: {}, ratio: {:.1}%", compression, ratio * 100.0);
println!();
}
}
},
Arguments::Import{..} => {
error!("Import is not implemented yet");
return
},
Arguments::AlgoTest{bundle_size, chunker, compression, hash, file} => {
algotest::run(&file, bundle_size, chunker, compression, hash);
}

View File

@ -24,7 +24,7 @@ mod cli;
// TODO: - Keep meta bundles also locally
// TODO: - Load and compare remote bundles to bundle map
// TODO: - Write backup files there as well
// TODO: Store list of hashes in bundle
// TODO: Store list of hashes and hash method in bundle
// TODO: Remove backups/subtrees
// TODO: Recompress & combine bundles
// TODO: Prune backups (based on age like attic)

View File

@ -49,7 +49,7 @@ impl Repository {
};
// ...alocate one if needed
if writer.is_none() {
*writer = Some(try!(self.bundles.create_bundle(mode)));
*writer = Some(try!(self.bundles.create_bundle(mode, self.config.hash)));
}
debug_assert!(writer.is_some());
let chunk_id;
@ -58,7 +58,7 @@ impl Repository {
{
// Add chunk to bundle writer and determine the size of the bundle
let writer_obj = writer.as_mut().unwrap();
chunk_id = try!(writer_obj.add(data));
chunk_id = try!(writer_obj.add(data, hash));
size = writer_obj.size();
raw_size = writer_obj.raw_size();
}

View File

@ -77,7 +77,6 @@ impl BundleMap {
Ok(BundleMap(try!(msgpack::decode_from_stream(&mut file))))
}
pub fn save<P: AsRef<Path>>(&self, path: P) -> Result<(), BundleMapError> {
let mut file = BufWriter::new(try!(File::create(path)));
try!(file.write_all(&HEADER_STRING));