More efficient chunk list encoding

This commit is contained in:
Dennis Schwerdel 2017-03-18 15:41:59 +01:00 committed by Dennis Schwerdel
parent 1b9cf888e7
commit e67ddbb275
14 changed files with 228 additions and 146 deletions

1
Cargo.lock generated
View File

@ -3,6 +3,7 @@ name = "zvault"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"blake2-rfc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)", "blake2-rfc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.21.1 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.21.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -17,3 +17,4 @@ rustc-serialize = "0.3"
chrono = "0.3" chrono = "0.3"
clap = "2.19" clap = "2.19"
log = "0.3" log = "0.3"
byteorder = "1.0"

View File

@ -1,9 +1,9 @@
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::{self, Read, Write, Seek, SeekFrom, BufWriter, BufReader, Cursor}; use std::io::{self, Read, Write, Seek, SeekFrom, BufWriter, BufReader};
use std::cmp::max; use std::cmp::max;
use std::fmt::{self, Debug, Write as FmtWrite}; use std::fmt::{self, Debug};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use serde::{self, Serialize, Deserialize}; use serde::{self, Serialize, Deserialize};
@ -86,29 +86,25 @@ quick_error!{
#[derive(Hash, PartialEq, Eq, Clone, Default)] #[derive(Hash, PartialEq, Eq, Clone, Default)]
pub struct BundleId(pub Vec<u8>); pub struct BundleId(pub Hash);
impl Serialize for BundleId { impl Serialize for BundleId {
fn serialize<S: serde::Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> { fn serialize<S: serde::Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
ser.serialize_bytes(&self.0) self.0.serialize(ser)
} }
} }
impl Deserialize for BundleId { impl Deserialize for BundleId {
fn deserialize<D: serde::Deserializer>(de: D) -> Result<Self, D::Error> { fn deserialize<D: serde::Deserializer>(de: D) -> Result<Self, D::Error> {
let bytes = try!(msgpack::Bytes::deserialize(de)); let hash = try!(Hash::deserialize(de));
Ok(BundleId(bytes.into())) Ok(BundleId(hash))
} }
} }
impl BundleId { impl BundleId {
#[inline] #[inline]
fn to_string(&self) -> String { fn to_string(&self) -> String {
let mut buf = String::with_capacity(self.0.len()*2); self.0.to_string()
for b in &self.0 {
write!(&mut buf, "{:02x}", b).unwrap()
}
buf
} }
} }
@ -144,11 +140,10 @@ pub struct BundleInfo {
pub compression: Option<Compression>, pub compression: Option<Compression>,
pub encryption: Option<Encryption>, pub encryption: Option<Encryption>,
pub hash_method: HashMethod, pub hash_method: HashMethod,
pub checksum: Checksum,
pub raw_size: usize, pub raw_size: usize,
pub encoded_size: usize, pub encoded_size: usize,
pub chunk_count: usize, pub chunk_count: usize,
pub contents_info_size: usize pub chunk_info_size: usize
} }
serde_impl!(BundleInfo(u64) { serde_impl!(BundleInfo(u64) {
id: BundleId => 0, id: BundleId => 0,
@ -156,45 +151,32 @@ serde_impl!(BundleInfo(u64) {
compression: Option<Compression> => 2, compression: Option<Compression> => 2,
encryption: Option<Encryption> => 3, encryption: Option<Encryption> => 3,
hash_method: HashMethod => 4, hash_method: HashMethod => 4,
checksum: Checksum => 5,
raw_size: usize => 6, raw_size: usize => 6,
encoded_size: usize => 7, encoded_size: usize => 7,
chunk_count: usize => 8, chunk_count: usize => 8,
contents_info_size: usize => 9 chunk_info_size: usize => 9
}); });
impl Default for BundleInfo { impl Default for BundleInfo {
fn default() -> Self { fn default() -> Self {
BundleInfo { BundleInfo {
id: BundleId(vec![]), id: BundleId(Hash::empty()),
compression: None, compression: None,
encryption: None, encryption: None,
hash_method: HashMethod::Blake2, hash_method: HashMethod::Blake2,
checksum: (ChecksumType::Blake2_256, msgpack::Bytes::new()),
raw_size: 0, raw_size: 0,
encoded_size: 0, encoded_size: 0,
chunk_count: 0, chunk_count: 0,
mode: BundleMode::Content, mode: BundleMode::Content,
contents_info_size: 0 chunk_info_size: 0
} }
} }
} }
#[derive(Clone, Default)]
pub struct BundleContentInfo {
pub chunk_sizes: Vec<usize>,
pub chunk_hashes: Vec<Hash>
}
serde_impl!(BundleContentInfo(u64) {
chunk_sizes: Vec<usize> => 0,
chunk_hashes: Vec<Hash> => 1
});
pub struct Bundle { pub struct Bundle {
pub info: BundleInfo, pub info: BundleInfo,
pub contents: BundleContentInfo, pub chunks: ChunkList,
pub version: u8, pub version: u8,
pub path: PathBuf, pub path: PathBuf,
crypto: Arc<Mutex<Crypto>>, crypto: Arc<Mutex<Crypto>>,
@ -203,16 +185,16 @@ pub struct Bundle {
} }
impl Bundle { impl Bundle {
fn new(path: PathBuf, version: u8, content_start: usize, crypto: Arc<Mutex<Crypto>>, info: BundleInfo, contents: BundleContentInfo) -> Self { fn new(path: PathBuf, version: u8, content_start: usize, crypto: Arc<Mutex<Crypto>>, info: BundleInfo, chunks: ChunkList) -> Self {
let mut chunk_positions = Vec::with_capacity(contents.chunk_sizes.len()); let mut chunk_positions = Vec::with_capacity(chunks.len());
let mut pos = 0; let mut pos = 0;
for len in &contents.chunk_sizes { for &(_, len) in (&chunks).iter() {
chunk_positions.push(pos); chunk_positions.push(pos);
pos += *len; pos += len as usize;
} }
Bundle { Bundle {
info: info, info: info,
contents: contents, chunks: chunks,
version: version, version: version,
path: path, path: path,
crypto: crypto, crypto: crypto,
@ -239,16 +221,15 @@ impl Bundle {
} }
let header: BundleInfo = try!(msgpack::decode_from_stream(&mut file) let header: BundleInfo = try!(msgpack::decode_from_stream(&mut file)
.map_err(|e| BundleError::Decode(e, path.clone()))); .map_err(|e| BundleError::Decode(e, path.clone())));
let mut contents_data = Vec::with_capacity(header.contents_info_size); let mut chunk_data = Vec::with_capacity(header.chunk_info_size);
contents_data.resize(header.contents_info_size, 0); chunk_data.resize(header.chunk_info_size, 0);
try!(file.read_exact(&mut contents_data).map_err(|e| BundleError::Read(e, path.clone()))); try!(file.read_exact(&mut chunk_data).map_err(|e| BundleError::Read(e, path.clone())));
if let Some(ref encryption) = header.encryption { if let Some(ref encryption) = header.encryption {
contents_data = try!(crypto.lock().unwrap().decrypt(encryption.clone(), &contents_data)); chunk_data = try!(crypto.lock().unwrap().decrypt(encryption.clone(), &chunk_data));
} }
let contents = try!(msgpack::decode_from_stream(&mut Cursor::new(&contents_data)) let chunks = ChunkList::read_from(&chunk_data);
.map_err(|e| BundleError::Decode(e, path.clone())));
let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize; let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize;
Ok(Bundle::new(path, version, content_start, crypto, header, contents)) Ok(Bundle::new(path, version, content_start, crypto, header, chunks))
} }
#[inline] #[inline]
@ -281,16 +262,16 @@ impl Bundle {
if id >= self.info.chunk_count { if id >= self.info.chunk_count {
return Err(BundleError::NoSuchChunk(self.id(), id)) return Err(BundleError::NoSuchChunk(self.id(), id))
} }
Ok((self.chunk_positions[id], self.contents.chunk_sizes[id])) Ok((self.chunk_positions[id], self.chunks[id].1 as usize))
} }
pub fn check(&self, full: bool) -> Result<(), BundleError> { pub fn check(&self, full: bool) -> Result<(), BundleError> {
//FIXME: adapt to new format //FIXME: adapt to new format
if self.info.chunk_count != self.contents.chunk_sizes.len() { if self.info.chunk_count != self.chunks.len() {
return Err(BundleError::Integrity(self.id(), return Err(BundleError::Integrity(self.id(),
"Chunk list size does not match chunk count")) "Chunk list size does not match chunk count"))
} }
if self.contents.chunk_sizes.iter().sum::<usize>() != self.info.raw_size { if self.chunks.iter().map(|c| c.1 as usize).sum::<usize>() != self.info.raw_size {
return Err(BundleError::Integrity(self.id(), return Err(BundleError::Integrity(self.id(),
"Individual chunk sizes do not add up to total size")) "Individual chunk sizes do not add up to total size"))
} }
@ -331,16 +312,14 @@ impl Debug for Bundle {
pub struct BundleWriter { pub struct BundleWriter {
mode: BundleMode, mode: BundleMode,
hash_method: HashMethod, hash_method: HashMethod,
hashes: Vec<Hash>,
data: Vec<u8>, data: Vec<u8>,
compression: Option<Compression>, compression: Option<Compression>,
compression_stream: Option<CompressionStream>, compression_stream: Option<CompressionStream>,
encryption: Option<Encryption>, encryption: Option<Encryption>,
crypto: Arc<Mutex<Crypto>>, crypto: Arc<Mutex<Crypto>>,
checksum: ChecksumCreator,
raw_size: usize, raw_size: usize,
chunk_count: usize, chunk_count: usize,
chunk_sizes: Vec<usize> chunks: ChunkList,
} }
impl BundleWriter { impl BundleWriter {
@ -349,8 +328,7 @@ impl BundleWriter {
hash_method: HashMethod, hash_method: HashMethod,
compression: Option<Compression>, compression: Option<Compression>,
encryption: Option<Encryption>, encryption: Option<Encryption>,
crypto: Arc<Mutex<Crypto>>, crypto: Arc<Mutex<Crypto>>
checksum: ChecksumType
) -> Result<Self, BundleError> { ) -> Result<Self, BundleError> {
let compression_stream = match compression { let compression_stream = match compression {
Some(ref compression) => Some(try!(compression.compress_stream())), Some(ref compression) => Some(try!(compression.compress_stream())),
@ -359,16 +337,14 @@ impl BundleWriter {
Ok(BundleWriter { Ok(BundleWriter {
mode: mode, mode: mode,
hash_method: hash_method, hash_method: hash_method,
hashes: vec![],
data: vec![], data: vec![],
compression: compression, compression: compression,
compression_stream: compression_stream, compression_stream: compression_stream,
encryption: encryption, encryption: encryption,
crypto: crypto, crypto: crypto,
checksum: ChecksumCreator::new(checksum),
raw_size: 0, raw_size: 0,
chunk_count: 0, chunk_count: 0,
chunk_sizes: vec![] chunks: ChunkList::new()
}) })
} }
@ -378,11 +354,9 @@ impl BundleWriter {
} else { } else {
self.data.extend_from_slice(chunk) self.data.extend_from_slice(chunk)
} }
self.checksum.update(chunk);
self.raw_size += chunk.len(); self.raw_size += chunk.len();
self.chunk_count += 1; self.chunk_count += 1;
self.chunk_sizes.push(chunk.len()); self.chunks.push((hash, chunk.len() as u32));
self.hashes.push(hash);
Ok(self.chunk_count-1) Ok(self.chunk_count-1)
} }
@ -394,42 +368,35 @@ impl BundleWriter {
self.data = try!(self.crypto.lock().unwrap().encrypt(encryption.clone(), &self.data)); self.data = try!(self.crypto.lock().unwrap().encrypt(encryption.clone(), &self.data));
} }
let encoded_size = self.data.len(); let encoded_size = self.data.len();
let checksum = self.checksum.finish(); let mut chunk_data = Vec::with_capacity(self.chunks.encoded_size());
let id = BundleId(checksum.1.to_vec()); self.chunks.write_to(&mut chunk_data).unwrap();
let id = BundleId(self.hash_method.hash(&chunk_data));
if let Some(ref encryption) = self.encryption {
chunk_data = try!(self.crypto.lock().unwrap().encrypt(encryption.clone(), &chunk_data));
}
let (folder, file) = db.bundle_path(&id); let (folder, file) = db.bundle_path(&id);
let path = folder.join(file); let path = folder.join(file);
try!(fs::create_dir_all(&folder).map_err(|e| BundleError::Write(e, path.clone()))); try!(fs::create_dir_all(&folder).map_err(|e| BundleError::Write(e, path.clone())));
let mut file = BufWriter::new(try!(File::create(&path).map_err(|e| BundleError::Write(e, path.clone())))); let mut file = BufWriter::new(try!(File::create(&path).map_err(|e| BundleError::Write(e, path.clone()))));
try!(file.write_all(&HEADER_STRING).map_err(|e| BundleError::Write(e, path.clone()))); try!(file.write_all(&HEADER_STRING).map_err(|e| BundleError::Write(e, path.clone())));
try!(file.write_all(&[HEADER_VERSION]).map_err(|e| BundleError::Write(e, path.clone()))); try!(file.write_all(&[HEADER_VERSION]).map_err(|e| BundleError::Write(e, path.clone())));
let contents = BundleContentInfo {
chunk_sizes: self.chunk_sizes,
chunk_hashes: self.hashes
};
let mut contents_data = Vec::new();
try!(msgpack::encode_to_stream(&contents, &mut contents_data)
.map_err(|e| BundleError::Encode(e, path.clone())));
if let Some(ref encryption) = self.encryption {
contents_data = try!(self.crypto.lock().unwrap().encrypt(encryption.clone(), &contents_data));
}
let header = BundleInfo { let header = BundleInfo {
mode: self.mode, mode: self.mode,
hash_method: self.hash_method, hash_method: self.hash_method,
checksum: checksum,
compression: self.compression, compression: self.compression,
encryption: self.encryption, encryption: self.encryption,
chunk_count: self.chunk_count, chunk_count: self.chunk_count,
id: id.clone(), id: id.clone(),
raw_size: self.raw_size, raw_size: self.raw_size,
encoded_size: encoded_size, encoded_size: encoded_size,
contents_info_size: contents_data.len() chunk_info_size: chunk_data.len()
}; };
try!(msgpack::encode_to_stream(&header, &mut file) try!(msgpack::encode_to_stream(&header, &mut file)
.map_err(|e| BundleError::Encode(e, path.clone()))); .map_err(|e| BundleError::Encode(e, path.clone())));
try!(file.write_all(&contents_data).map_err(|e| BundleError::Write(e, path.clone()))); try!(file.write_all(&chunk_data).map_err(|e| BundleError::Write(e, path.clone())));
let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize; let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize;
try!(file.write_all(&self.data).map_err(|e| BundleError::Write(e, path.clone()))); try!(file.write_all(&self.data).map_err(|e| BundleError::Write(e, path.clone())));
Ok(Bundle::new(path, HEADER_VERSION, content_start, self.crypto, header, contents)) Ok(Bundle::new(path, HEADER_VERSION, content_start, self.crypto, header, self.chunks))
} }
#[inline] #[inline]
@ -449,21 +416,19 @@ pub struct BundleDb {
compression: Option<Compression>, compression: Option<Compression>,
encryption: Option<Encryption>, encryption: Option<Encryption>,
crypto: Arc<Mutex<Crypto>>, crypto: Arc<Mutex<Crypto>>,
checksum: ChecksumType,
bundles: HashMap<BundleId, Bundle>, bundles: HashMap<BundleId, Bundle>,
bundle_cache: LruCache<BundleId, Vec<u8>> bundle_cache: LruCache<BundleId, Vec<u8>>
} }
impl BundleDb { impl BundleDb {
fn new(path: PathBuf, compression: Option<Compression>, encryption: Option<Encryption>, checksum: ChecksumType) -> Self { fn new(path: PathBuf, compression: Option<Compression>, encryption: Option<Encryption>) -> Self {
BundleDb { BundleDb {
path: path, path: path,
compression: compression:
compression, compression,
crypto: Arc::new(Mutex::new(Crypto::new())), crypto: Arc::new(Mutex::new(Crypto::new())),
encryption: encryption, encryption: encryption,
checksum: checksum,
bundles: HashMap::new(), bundles: HashMap::new(),
bundle_cache: LruCache::new(5, 10) bundle_cache: LruCache::new(5, 10)
} }
@ -504,33 +469,33 @@ impl BundleDb {
} }
#[inline] #[inline]
pub fn open<P: AsRef<Path>>(path: P, compression: Option<Compression>, encryption: Option<Encryption>, checksum: ChecksumType) -> Result<Self, BundleError> { pub fn open<P: AsRef<Path>>(path: P, compression: Option<Compression>, encryption: Option<Encryption>) -> Result<Self, BundleError> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
let mut self_ = Self::new(path, compression, encryption, checksum); let mut self_ = Self::new(path, compression, encryption);
try!(self_.load_bundle_list()); try!(self_.load_bundle_list());
Ok(self_) Ok(self_)
} }
#[inline] #[inline]
pub fn create<P: AsRef<Path>>(path: P, compression: Option<Compression>, encryption: Option<Encryption>, checksum: ChecksumType) -> Result<Self, BundleError> { pub fn create<P: AsRef<Path>>(path: P, compression: Option<Compression>, encryption: Option<Encryption>) -> Result<Self, BundleError> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
try!(fs::create_dir_all(&path) try!(fs::create_dir_all(&path)
.map_err(|e| BundleError::Write(e, path.clone()))); .map_err(|e| BundleError::Write(e, path.clone())));
Ok(Self::new(path, compression, encryption, checksum)) Ok(Self::new(path, compression, encryption))
} }
#[inline] #[inline]
pub fn open_or_create<P: AsRef<Path>>(path: P, compression: Option<Compression>, encryption: Option<Encryption>, checksum: ChecksumType) -> Result<Self, BundleError> { pub fn open_or_create<P: AsRef<Path>>(path: P, compression: Option<Compression>, encryption: Option<Encryption>) -> Result<Self, BundleError> {
if path.as_ref().exists() { if path.as_ref().exists() {
Self::open(path, compression, encryption, checksum) Self::open(path, compression, encryption)
} else { } else {
Self::create(path, compression, encryption, checksum) Self::create(path, compression, encryption)
} }
} }
#[inline] #[inline]
pub fn create_bundle(&self, mode: BundleMode, hash_method: HashMethod) -> Result<BundleWriter, BundleError> { pub fn create_bundle(&self, mode: BundleMode, hash_method: HashMethod) -> Result<BundleWriter, BundleError> {
BundleWriter::new(mode, hash_method, self.compression.clone(), self.encryption.clone(), self.crypto.clone(), self.checksum) BundleWriter::new(mode, hash_method, self.compression.clone(), self.encryption.clone(), self.crypto.clone())
} }
pub fn get_chunk(&mut self, bundle_id: &BundleId, id: usize) -> Result<Vec<u8>, BundleError> { pub fn get_chunk(&mut self, bundle_id: &BundleId, id: usize) -> Result<Vec<u8>, BundleError> {

View File

@ -1,5 +1,5 @@
use ::chunker::ChunkerType; use ::chunker::ChunkerType;
use ::util::{Compression, HashMethod, ChecksumType}; use ::util::{Compression, HashMethod};
use std::process::exit; use std::process::exit;
@ -115,16 +115,6 @@ fn parse_compression(val: Option<&str>) -> Option<Compression> {
} }
} }
#[allow(dead_code)]
fn parse_checksum(val: Option<&str>) -> ChecksumType {
if let Ok(checksum) = ChecksumType::from(val.unwrap_or("blake2")) {
checksum
} else {
error!("Invalid checksum method: {}", val.unwrap());
exit(1);
}
}
fn parse_hash(val: Option<&str>) -> HashMethod { fn parse_hash(val: Option<&str>) -> HashMethod {
if let Ok(hash) = HashMethod::from(val.unwrap_or("blake2")) { if let Ok(hash) = HashMethod::from(val.unwrap_or("blake2")) {
hash hash

View File

@ -6,7 +6,6 @@ use chrono::prelude::*;
use std::process::exit; use std::process::exit;
use ::repository::{Repository, Config, Backup}; use ::repository::{Repository, Config, Backup};
use ::util::ChecksumType;
use ::util::cli::*; use ::util::cli::*;
use self::args::Arguments; use self::args::Arguments;
@ -40,7 +39,6 @@ pub fn run() {
Arguments::Init{repo_path, bundle_size, chunker, compression, hash} => { Arguments::Init{repo_path, bundle_size, chunker, compression, hash} => {
Repository::create(repo_path, Config { Repository::create(repo_path, Config {
bundle_size: bundle_size, bundle_size: bundle_size,
checksum: ChecksumType::Blake2_256,
chunker: chunker, chunker: chunker,
compression: compression, compression: compression,
hash: hash hash: hash

View File

@ -11,6 +11,8 @@ extern crate rustc_serialize;
extern crate chrono; extern crate chrono;
#[macro_use] extern crate clap; #[macro_use] extern crate clap;
#[macro_use] extern crate log; #[macro_use] extern crate log;
extern crate byteorder;
pub mod util; pub mod util;
pub mod bundle; pub mod bundle;

View File

@ -1,4 +1,4 @@
use super::{Repository, Chunk, RepositoryError}; use super::{Repository, RepositoryError};
use super::metadata::{FileType, Inode}; use super::metadata::{FileType, Inode};
use ::util::*; use ::util::*;
@ -12,7 +12,7 @@ use chrono::prelude::*;
#[derive(Default, Debug, Clone)] #[derive(Default, Debug, Clone)]
pub struct Backup { pub struct Backup {
pub root: Vec<Chunk>, pub root: ChunkList,
pub total_data_size: u64, // Sum of all raw sizes of all entities pub total_data_size: u64, // Sum of all raw sizes of all entities
pub changed_data_size: u64, // Sum of all raw sizes of all entities actively stored pub changed_data_size: u64, // Sum of all raw sizes of all entities actively stored
pub deduplicated_data_size: u64, // Sum of all raw sizes of all new bundles pub deduplicated_data_size: u64, // Sum of all raw sizes of all new bundles

View File

@ -6,13 +6,10 @@ use ::index::Location;
use ::bundle::{BundleId, BundleMode}; use ::bundle::{BundleId, BundleMode};
use super::integrity::RepositoryIntegrityError; use super::integrity::RepositoryIntegrityError;
use ::util::Hash; use ::util::*;
use ::chunker::{IChunker, ChunkerStatus}; use ::chunker::{IChunker, ChunkerStatus};
pub type Chunk = (Hash, usize);
impl Repository { impl Repository {
pub fn get_bundle_id(&self, id: u32) -> Result<BundleId, RepositoryError> { pub fn get_bundle_id(&self, id: u32) -> Result<BundleId, RepositoryError> {
if let Some(bundle_info) = self.bundle_map.get(id) { if let Some(bundle_info) = self.bundle_map.get(id) {
@ -86,12 +83,12 @@ impl Repository {
} }
#[inline] #[inline]
pub fn put_data(&mut self, mode: BundleMode, data: &[u8]) -> Result<Vec<Chunk>, RepositoryError> { pub fn put_data(&mut self, mode: BundleMode, data: &[u8]) -> Result<ChunkList, RepositoryError> {
let mut input = Cursor::new(data); let mut input = Cursor::new(data);
self.put_stream(mode, &mut input) self.put_stream(mode, &mut input)
} }
pub fn put_stream<R: Read>(&mut self, mode: BundleMode, data: &mut R) -> Result<Vec<Chunk>, RepositoryError> { pub fn put_stream<R: Read>(&mut self, mode: BundleMode, data: &mut R) -> Result<ChunkList, RepositoryError> {
let avg_size = self.config.chunker.avg_size(); let avg_size = self.config.chunker.avg_size();
let mut chunks = Vec::new(); let mut chunks = Vec::new();
let mut chunk = Vec::with_capacity(avg_size * 2); let mut chunk = Vec::with_capacity(avg_size * 2);
@ -102,17 +99,17 @@ impl Repository {
chunk = output.into_inner(); chunk = output.into_inner();
let hash = self.config.hash.hash(&chunk); let hash = self.config.hash.hash(&chunk);
try!(self.put_chunk(mode, hash, &chunk)); try!(self.put_chunk(mode, hash, &chunk));
chunks.push((hash, chunk.len())); chunks.push((hash, chunk.len() as u32));
if res == ChunkerStatus::Finished { if res == ChunkerStatus::Finished {
break break
} }
} }
Ok(chunks) Ok(chunks.into())
} }
#[inline] #[inline]
pub fn get_data(&mut self, chunks: &[Chunk]) -> Result<Vec<u8>, RepositoryError> { pub fn get_data(&mut self, chunks: &[Chunk]) -> Result<Vec<u8>, RepositoryError> {
let mut data = Vec::with_capacity(chunks.iter().map(|&(_, size)| size).sum()); let mut data = Vec::with_capacity(chunks.iter().map(|&(_, size)| size).sum::<u32>() as usize);
try!(self.get_stream(chunks, &mut data)); try!(self.get_stream(chunks, &mut data));
Ok(data) Ok(data)
} }
@ -121,7 +118,7 @@ impl Repository {
pub fn get_stream<W: Write>(&mut self, chunks: &[Chunk], w: &mut W) -> Result<(), RepositoryError> { pub fn get_stream<W: Write>(&mut self, chunks: &[Chunk], w: &mut W) -> Result<(), RepositoryError> {
for &(ref hash, len) in chunks { for &(ref hash, len) in chunks {
let data = try!(try!(self.get_chunk(*hash)).ok_or_else(|| RepositoryIntegrityError::MissingChunk(hash.clone()))); let data = try!(try!(self.get_chunk(*hash)).ok_or_else(|| RepositoryIntegrityError::MissingChunk(hash.clone())));
debug_assert_eq!(data.len(), len); debug_assert_eq!(data.len() as u32, len);
try!(w.write_all(&data)); try!(w.write_all(&data));
} }
Ok(()) Ok(())

View File

@ -40,19 +40,6 @@ impl HashMethod {
} }
impl ChecksumType {
fn from_yaml(yaml: String) -> Result<Self, ConfigError> {
ChecksumType::from(&yaml).map_err(ConfigError::Parse)
}
fn to_yaml(&self) -> String {
self.name().to_string()
}
}
struct ChunkerYaml { struct ChunkerYaml {
method: String, method: String,
avg_size: usize, avg_size: usize,
@ -107,7 +94,6 @@ struct ConfigYaml {
compression: Option<String>, compression: Option<String>,
bundle_size: usize, bundle_size: usize,
chunker: ChunkerYaml, chunker: ChunkerYaml,
checksum: String,
hash: String, hash: String,
} }
impl Default for ConfigYaml { impl Default for ConfigYaml {
@ -116,7 +102,6 @@ impl Default for ConfigYaml {
compression: Some("brotli/5".to_string()), compression: Some("brotli/5".to_string()),
bundle_size: 25*1024*1024, bundle_size: 25*1024*1024,
chunker: ChunkerYaml::default(), chunker: ChunkerYaml::default(),
checksum: "blake2_256".to_string(),
hash: "blake2".to_string() hash: "blake2".to_string()
} }
} }
@ -125,7 +110,6 @@ serde_impl!(ConfigYaml(String) {
compression: Option<String> => "compression", compression: Option<String> => "compression",
bundle_size: usize => "bundle_size", bundle_size: usize => "bundle_size",
chunker: ChunkerYaml => "chunker", chunker: ChunkerYaml => "chunker",
checksum: String => "checksum",
hash: String => "hash" hash: String => "hash"
}); });
@ -136,7 +120,6 @@ pub struct Config {
pub compression: Option<Compression>, pub compression: Option<Compression>,
pub bundle_size: usize, pub bundle_size: usize,
pub chunker: ChunkerType, pub chunker: ChunkerType,
pub checksum: ChecksumType,
pub hash: HashMethod pub hash: HashMethod
} }
impl Config { impl Config {
@ -150,7 +133,6 @@ impl Config {
compression: compression, compression: compression,
bundle_size: yaml.bundle_size, bundle_size: yaml.bundle_size,
chunker: try!(ChunkerType::from_yaml(yaml.chunker)), chunker: try!(ChunkerType::from_yaml(yaml.chunker)),
checksum: try!(ChecksumType::from_yaml(yaml.checksum)),
hash: try!(HashMethod::from_yaml(yaml.hash)) hash: try!(HashMethod::from_yaml(yaml.hash))
}) })
} }
@ -160,7 +142,6 @@ impl Config {
compression: self.compression.as_ref().map(|c| c.to_yaml()), compression: self.compression.as_ref().map(|c| c.to_yaml()),
bundle_size: self.bundle_size, bundle_size: self.bundle_size,
chunker: self.chunker.to_yaml(), chunker: self.chunker.to_yaml(),
checksum: self.checksum.to_yaml(),
hash: self.hash.to_yaml() hash: self.hash.to_yaml()
} }
} }

View File

@ -6,7 +6,7 @@ use std::os::unix::fs::{PermissionsExt, symlink};
use std::io::{Read, Write}; use std::io::{Read, Write};
use ::util::*; use ::util::*;
use super::{Repository, RepositoryError, Chunk}; use super::{Repository, RepositoryError};
use super::integrity::RepositoryIntegrityError; use super::integrity::RepositoryIntegrityError;
use ::bundle::BundleMode; use ::bundle::BundleMode;
@ -27,8 +27,8 @@ serde_impl!(FileType(u8) {
#[derive(Debug)] #[derive(Debug)]
pub enum FileContents { pub enum FileContents {
Inline(msgpack::Bytes), Inline(msgpack::Bytes),
ChunkedDirect(Vec<Chunk>), ChunkedDirect(ChunkList),
ChunkedIndirect(Vec<Chunk>) ChunkedIndirect(ChunkList)
} }
serde_impl!(FileContents(u8) { serde_impl!(FileContents(u8) {
Inline(ByteBuf) => 0, Inline(ByteBuf) => 0,
@ -50,7 +50,7 @@ pub struct Inode {
pub create_time: i64, pub create_time: i64,
pub symlink_target: Option<String>, pub symlink_target: Option<String>,
pub contents: Option<FileContents>, pub contents: Option<FileContents>,
pub children: Option<HashMap<String, Vec<Chunk>>> pub children: Option<HashMap<String, ChunkList>>
} }
impl Default for Inode { impl Default for Inode {
fn default() -> Self { fn default() -> Self {
@ -82,7 +82,7 @@ serde_impl!(Inode(u8) {
create_time: i64 => 8, create_time: i64 => 8,
symlink_target: Option<String> => 9, symlink_target: Option<String> => 9,
contents: Option<FileContents> => 10, contents: Option<FileContents> => 10,
children: HashMap<String, Vec<Chunk>> => 11 children: HashMap<String, ChunkList> => 11
}); });
impl Inode { impl Inode {
@ -162,8 +162,9 @@ impl Repository {
if chunks.len() < 10 { if chunks.len() < 10 {
inode.contents = Some(FileContents::ChunkedDirect(chunks)); inode.contents = Some(FileContents::ChunkedDirect(chunks));
} else { } else {
let chunks_data = try!(msgpack::encode(&chunks)); let mut chunk_data = Vec::with_capacity(chunks.encoded_size());
chunks = try!(self.put_data(BundleMode::Content, &chunks_data)); chunks.write_to(&mut chunk_data).unwrap();
chunks = try!(self.put_data(BundleMode::Content, &chunk_data));
inode.contents = Some(FileContents::ChunkedIndirect(chunks)); inode.contents = Some(FileContents::ChunkedIndirect(chunks));
} }
} }
@ -172,7 +173,7 @@ impl Repository {
} }
#[inline] #[inline]
pub fn put_inode(&mut self, inode: &Inode) -> Result<Vec<Chunk>, RepositoryError> { pub fn put_inode(&mut self, inode: &Inode) -> Result<ChunkList, RepositoryError> {
self.put_data(BundleMode::Meta, &try!(msgpack::encode(inode))) self.put_data(BundleMode::Meta, &try!(msgpack::encode(inode)))
} }
@ -194,7 +195,7 @@ impl Repository {
}, },
FileContents::ChunkedIndirect(ref chunks) => { FileContents::ChunkedIndirect(ref chunks) => {
let chunk_data = try!(self.get_data(chunks)); let chunk_data = try!(self.get_data(chunks));
let chunks: Vec<Chunk> = try!(msgpack::decode(&chunk_data)); let chunks = ChunkList::read_from(&chunk_data);
try!(self.get_stream(&chunks, &mut file)); try!(self.get_stream(&chunks, &mut file));
} }
} }

View File

@ -19,7 +19,6 @@ use super::chunker::Chunker;
pub use self::error::RepositoryError; pub use self::error::RepositoryError;
pub use self::config::Config; pub use self::config::Config;
pub use self::metadata::{Inode, FileType}; pub use self::metadata::{Inode, FileType};
pub use self::basic_io::Chunk;
pub use self::backup::Backup; pub use self::backup::Backup;
use self::bundle_map::BundleMap; use self::bundle_map::BundleMap;
@ -46,7 +45,6 @@ impl Repository {
path.join("bundles"), path.join("bundles"),
config.compression.clone(), config.compression.clone(),
None, //FIXME: store encryption in config None, //FIXME: store encryption in config
config.checksum
)); ));
let index = try!(Index::create(&path.join("index"))); let index = try!(Index::create(&path.join("index")));
try!(config.save(path.join("config.yaml"))); try!(config.save(path.join("config.yaml")));
@ -74,7 +72,6 @@ impl Repository {
path.join("bundles"), path.join("bundles"),
config.compression.clone(), config.compression.clone(),
None, //FIXME: load encryption from config None, //FIXME: load encryption from config
config.checksum
)); ));
let index = try!(Index::open(&path.join("index"))); let index = try!(Index::open(&path.join("index")));
let bundle_map = try!(BundleMap::load(path.join("bundles.map"))); let bundle_map = try!(BundleMap::load(path.join("bundles.map")));

125
src/util/chunk.rs Normal file
View File

@ -0,0 +1,125 @@
use std::io::{self, Write, Read, Cursor};
use std::ops::{Deref, DerefMut};
use serde::{self, Serialize, Deserialize};
use serde::bytes::{Bytes, ByteBuf};
use serde::de::Error;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use super::Hash;
pub type Chunk = (Hash, u32);
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct ChunkList(Vec<Chunk>);
impl ChunkList {
#[inline]
pub fn new() -> Self {
ChunkList(Vec::new())
}
#[inline]
pub fn with_capacity(num: usize) -> Self {
ChunkList(Vec::with_capacity(num))
}
#[inline]
pub fn len(&self) -> usize {
self.0.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
#[inline]
pub fn push(&mut self, chunk: Chunk) {
self.0.push(chunk)
}
#[inline]
pub fn write_to(&self, dst: &mut Write) -> Result<(), io::Error> {
for chunk in &self.0 {
try!(chunk.0.write_to(dst));
try!(dst.write_u32::<LittleEndian>(chunk.1));
}
Ok(())
}
#[inline]
pub fn read_n_from(n: usize, src: &mut Read) -> Result<Self, io::Error> {
let mut chunks = Vec::with_capacity(n);
for _ in 0..n {
let hash = try!(Hash::read_from(src));
let len = try!(src.read_u32::<LittleEndian>());
chunks.push((hash, len));
}
Ok(ChunkList(chunks))
}
#[inline]
pub fn read_from(src: &[u8]) -> Self {
if src.len() % 20 != 0 {
warn!("Reading truncated chunk list");
}
ChunkList::read_n_from(src.len()/20, &mut Cursor::new(src)).unwrap()
}
#[inline]
pub fn encoded_size(&self) -> usize {
self.0.len() * 20
}
}
impl Default for ChunkList {
#[inline]
fn default() -> Self {
ChunkList(Vec::new())
}
}
impl From<Vec<Chunk>> for ChunkList {
fn from(val: Vec<Chunk>) -> Self {
ChunkList(val)
}
}
impl Into<Vec<Chunk>> for ChunkList {
fn into(self) -> Vec<Chunk> {
self.0
}
}
impl Deref for ChunkList {
type Target = [Chunk];
fn deref(&self) -> &[Chunk] {
&self.0
}
}
impl DerefMut for ChunkList {
fn deref_mut(&mut self) -> &mut [Chunk] {
&mut self.0
}
}
impl Serialize for ChunkList {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: serde::Serializer {
let mut buf = Vec::with_capacity(self.encoded_size());
self.write_to(&mut buf).unwrap();
Bytes::from(&buf as &[u8]).serialize(serializer)
}
}
impl Deserialize for ChunkList {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> where D: serde::Deserializer {
let data: Vec<u8> = try!(ByteBuf::deserialize(deserializer)).into();
if data.len() % 20 != 0 {
return Err(D::Error::custom("Invalid chunk list length"));
}
Ok(ChunkList::read_n_from(data.len()/20, &mut Cursor::new(data)).unwrap())
}
}

View File

@ -4,15 +4,17 @@ use serde::bytes::{ByteBuf, Bytes};
use murmurhash3::murmurhash3_x64_128; use murmurhash3::murmurhash3_x64_128;
use blake2::blake2b::blake2b; use blake2::blake2b::blake2b;
use byteorder::{LittleEndian, ByteOrder, WriteBytesExt, ReadBytesExt};
use std::mem; use std::mem;
use std::fmt; use std::fmt;
use std::u64; use std::u64;
use std::io::{self, Read, Write};
#[repr(packed)] #[repr(packed)]
#[derive(Clone, Copy, PartialEq, Hash, Eq)] #[derive(Clone, Copy, PartialEq, Hash, Eq, Default)]
pub struct Hash { pub struct Hash {
pub high: u64, pub high: u64,
pub low: u64 pub low: u64
@ -28,6 +30,24 @@ impl Hash {
pub fn empty() -> Self { pub fn empty() -> Self {
Hash{high: 0, low: 0} Hash{high: 0, low: 0}
} }
#[inline]
pub fn to_string(&self) -> String {
format!("{:016x}{:016x}", self.high, self.low)
}
#[inline]
pub fn write_to(&self, dst: &mut Write) -> Result<(), io::Error> {
try!(dst.write_u64::<LittleEndian>(self.high));
dst.write_u64::<LittleEndian>(self.low)
}
#[inline]
pub fn read_from(src: &mut Read) -> Result<Self, io::Error> {
let high = try!(src.read_u64::<LittleEndian>());
let low = try!(src.read_u64::<LittleEndian>());
Ok(Hash { high: high, low: low })
}
} }
impl fmt::Display for Hash { impl fmt::Display for Hash {
@ -47,8 +67,9 @@ impl fmt::Debug for Hash {
impl Serialize for Hash { impl Serialize for Hash {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: serde::Serializer { fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: serde::Serializer {
let hash = Hash{high: u64::to_le(self.high), low: u64::to_le(self.low)}; let mut dat = [0u8; 16];
let dat: [u8; 16] = unsafe { mem::transmute(hash) }; LittleEndian::write_u64(&mut dat[..8], self.high);
LittleEndian::write_u64(&mut dat[8..], self.low);
Bytes::from(&dat as &[u8]).serialize(serializer) Bytes::from(&dat as &[u8]).serialize(serializer)
} }
} }
@ -59,8 +80,10 @@ impl Deserialize for Hash {
if dat.len() != 16 { if dat.len() != 16 {
return Err(D::Error::custom("Invalid key length")); return Err(D::Error::custom("Invalid key length"));
} }
let hash = unsafe { &*(dat.as_ptr() as *const Hash) }; Ok(Hash{
Ok(Hash{high: u64::from_le(hash.high), low: u64::from_le(hash.low)}) high: LittleEndian::read_u64(&dat[..8]),
low: LittleEndian::read_u64(&dat[8..])
})
} }
} }

View File

@ -1,12 +1,13 @@
mod checksum; //mod checksum; not used
mod compression; mod compression;
mod encryption; mod encryption;
mod hash; mod hash;
mod lru_cache; mod lru_cache;
mod chunk;
pub mod cli; pub mod cli;
pub mod msgpack; pub mod msgpack;
pub use self::checksum::*; pub use self::chunk::*;
pub use self::compression::*; pub use self::compression::*;
pub use self::encryption::*; pub use self::encryption::*;
pub use self::hash::*; pub use self::hash::*;