From 5e816fe7949a848624b6d57e778e11ddcc39b2d9 Mon Sep 17 00:00:00 2001 From: Dennis Schwerdel Date: Tue, 21 Mar 2017 11:08:01 +0100 Subject: [PATCH] Rename and restructure bundledb --- src/bundle.rs | 559 ----------------------------------- src/bundledb/bundle.rs | 240 +++++++++++++++ src/bundledb/db.rs | 138 +++++++++ src/bundledb/error.rs | 83 ++++++ src/bundledb/mod.rs | 19 ++ src/bundledb/writer.rs | 114 +++++++ src/main.rs | 2 +- src/repository/basic_io.rs | 2 +- src/repository/bundle_map.rs | 2 +- src/repository/error.rs | 2 +- src/repository/info.rs | 2 +- src/repository/integrity.rs | 2 +- src/repository/metadata.rs | 2 +- src/repository/mod.rs | 2 +- src/repository/vacuum.rs | 2 +- 15 files changed, 603 insertions(+), 568 deletions(-) delete mode 100644 src/bundle.rs create mode 100644 src/bundledb/bundle.rs create mode 100644 src/bundledb/db.rs create mode 100644 src/bundledb/error.rs create mode 100644 src/bundledb/mod.rs create mode 100644 src/bundledb/writer.rs diff --git a/src/bundle.rs b/src/bundle.rs deleted file mode 100644 index 7ba0032..0000000 --- a/src/bundle.rs +++ /dev/null @@ -1,559 +0,0 @@ -use std::path::{Path, PathBuf}; -use std::collections::HashMap; -use std::fs::{self, File}; -use std::io::{self, Read, Write, Seek, SeekFrom, BufWriter, BufReader}; -use std::cmp::max; -use std::fmt::{self, Debug}; -use std::sync::{Arc, Mutex}; - -use serde::{self, Serialize, Deserialize}; -use quick_error::ResultExt; - -use util::*; - -static HEADER_STRING: [u8; 7] = *b"zvault\x01"; -static HEADER_VERSION: u8 = 1; - -/* - -Bundle format -- Magic header + version -- Encoded header structure (contains size of next structure) -- Encoded chunk list (with chunk hashes and sizes) -- Chunk data - -*/ - - - -quick_error!{ - #[derive(Debug)] - pub enum BundleError { - List(err: io::Error) { - cause(err) - description("Failed to list bundles") - display("Failed to list bundles: {}", err) - } - Io(err: io::Error, path: PathBuf) { - cause(err) - context(path: &'a Path, err: io::Error) -> (err, path.to_path_buf()) - description("Failed to read/write bundle") - display("Failed to read/write bundle {:?}: {}", path, err) - } - Decode(err: msgpack::DecodeError, path: PathBuf) { - cause(err) - context(path: &'a Path, err: msgpack::DecodeError) -> (err, path.to_path_buf()) - description("Failed to decode bundle header") - display("Failed to decode bundle header of {:?}: {}", path, err) - } - Encode(err: msgpack::EncodeError, path: PathBuf) { - cause(err) - context(path: &'a Path, err: msgpack::EncodeError) -> (err, path.to_path_buf()) - description("Failed to encode bundle header") - display("Failed to encode bundle header of {:?}: {}", path, err) - } - WrongHeader(path: PathBuf) { - description("Wrong header") - display("Wrong header on bundle {:?}", path) - } - WrongVersion(path: PathBuf, version: u8) { - description("Wrong version") - display("Wrong version on bundle {:?}: {}", path, version) - } - Integrity(bundle: BundleId, reason: &'static str) { - description("Bundle has an integrity error") - display("Bundle {:?} has an integrity error: {}", bundle, reason) - } - NoSuchBundle(bundle: BundleId) { - description("No such bundle") - display("No such bundle: {:?}", bundle) - } - NoSuchChunk(bundle: BundleId, id: usize) { - description("Bundle has no such chunk") - display("Bundle {:?} has no chunk with that id: {}", bundle, id) - } - Decompression(err: CompressionError, path: PathBuf) { - cause(err) - context(path: &'a Path, err: CompressionError) -> (err, path.to_path_buf()) - description("Decompression failed") - display("Decompression failed on bundle {:?}: {}", path, err) - } - Compression(err: CompressionError) { - from() - cause(err) - description("Compression failed") - display("Compression failed: {}", err) - } - Decryption(err: EncryptionError, path: PathBuf) { - cause(err) - context(path: &'a Path, err: EncryptionError) -> (err, path.to_path_buf()) - description("Decryption failed") - display("Decryption failed on bundle {:?}: {}", path, err) - } - Encryption(err: EncryptionError) { - from() - cause(err) - description("Encryption failed") - display("Encryption failed: {}", err) - } - Remove(err: io::Error, bundle: BundleId) { - cause(err) - description("Failed to remove bundle") - display("Failed to remove bundle {}", bundle) - } - } -} - - -#[derive(Hash, PartialEq, Eq, Clone, Default)] -pub struct BundleId(pub Hash); - -impl Serialize for BundleId { - fn serialize(&self, ser: S) -> Result { - self.0.serialize(ser) - } -} - -impl Deserialize for BundleId { - fn deserialize(de: D) -> Result { - let hash = try!(Hash::deserialize(de)); - Ok(BundleId(hash)) - } -} - -impl BundleId { - #[inline] - fn to_string(&self) -> String { - self.0.to_string() - } -} - -impl fmt::Display for BundleId { - #[inline] - fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(fmt, "{}", self.to_string()) - } -} - -impl fmt::Debug for BundleId { - #[inline] - fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(fmt, "{}", self.to_string()) - } -} - - -#[derive(Eq, Debug, PartialEq, Clone, Copy)] -pub enum BundleMode { - Content, Meta -} -serde_impl!(BundleMode(u8) { - Content => 0, - Meta => 1 -}); - - -#[derive(Clone)] -pub struct BundleInfo { - pub id: BundleId, - pub mode: BundleMode, - pub compression: Option, - pub encryption: Option, - pub hash_method: HashMethod, - pub raw_size: usize, - pub encoded_size: usize, - pub chunk_count: usize, - pub chunk_info_size: usize -} -serde_impl!(BundleInfo(u64) { - id: BundleId => 0, - mode: BundleMode => 1, - compression: Option => 2, - encryption: Option => 3, - hash_method: HashMethod => 4, - raw_size: usize => 6, - encoded_size: usize => 7, - chunk_count: usize => 8, - chunk_info_size: usize => 9 -}); - -impl Default for BundleInfo { - fn default() -> Self { - BundleInfo { - id: BundleId(Hash::empty()), - compression: None, - encryption: None, - hash_method: HashMethod::Blake2, - raw_size: 0, - encoded_size: 0, - chunk_count: 0, - mode: BundleMode::Content, - chunk_info_size: 0 - } - } -} - - -pub struct Bundle { - pub info: BundleInfo, - pub chunks: ChunkList, - pub version: u8, - pub path: PathBuf, - crypto: Arc>, - pub content_start: usize, - pub chunk_positions: Vec -} - -impl Bundle { - fn new(path: PathBuf, version: u8, content_start: usize, crypto: Arc>, info: BundleInfo, chunks: ChunkList) -> Self { - let mut chunk_positions = Vec::with_capacity(chunks.len()); - let mut pos = 0; - for &(_, len) in (&chunks).iter() { - chunk_positions.push(pos); - pos += len as usize; - } - Bundle { - info: info, - chunks: chunks, - version: version, - path: path, - crypto: crypto, - content_start: content_start, - chunk_positions: chunk_positions - } - } - - #[inline] - pub fn id(&self) -> BundleId { - self.info.id.clone() - } - - pub fn load(path: PathBuf, crypto: Arc>) -> Result { - let mut file = BufReader::new(try!(File::open(&path).context(&path as &Path))); - let mut header = [0u8; 8]; - try!(file.read_exact(&mut header).context(&path as &Path)); - if header[..HEADER_STRING.len()] != HEADER_STRING { - return Err(BundleError::WrongHeader(path.clone())) - } - let version = header[HEADER_STRING.len()]; - if version != HEADER_VERSION { - return Err(BundleError::WrongVersion(path.clone(), version)) - } - let header: BundleInfo = try!(msgpack::decode_from_stream(&mut file).context(&path as &Path)); - debug!("Load bundle {}", header.id); - let mut chunk_data = Vec::with_capacity(header.chunk_info_size); - chunk_data.resize(header.chunk_info_size, 0); - try!(file.read_exact(&mut chunk_data).context(&path as &Path)); - if let Some(ref encryption) = header.encryption { - chunk_data = try!(crypto.lock().unwrap().decrypt(&encryption, &chunk_data).context(&path as &Path)); - } - let chunks = ChunkList::read_from(&chunk_data); - let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize; - Ok(Bundle::new(path, version, content_start, crypto, header, chunks)) - } - - #[inline] - fn load_encoded_contents(&self) -> Result, BundleError> { - debug!("Load bundle data {} ({:?})", self.info.id, self.info.mode); - let mut file = BufReader::new(try!(File::open(&self.path).context(&self.path as &Path))); - try!(file.seek(SeekFrom::Start(self.content_start as u64)).context(&self.path as &Path)); - let mut data = Vec::with_capacity(max(self.info.encoded_size, self.info.raw_size)+1024); - try!(file.read_to_end(&mut data).context(&self.path as &Path)); - Ok(data) - } - - #[inline] - fn decode_contents(&self, mut data: Vec) -> Result, BundleError> { - if let Some(ref encryption) = self.info.encryption { - data = try!(self.crypto.lock().unwrap().decrypt(&encryption, &data).context(&self.path as &Path)); - } - if let Some(ref compression) = self.info.compression { - data = try!(compression.decompress(&data).context(&self.path as &Path)); - } - Ok(data) - } - - #[inline] - pub fn load_contents(&self) -> Result, BundleError> { - self.load_encoded_contents().and_then(|data| self.decode_contents(data)) - } - - #[inline] - pub fn get_chunk_position(&self, id: usize) -> Result<(usize, usize), BundleError> { - if id >= self.info.chunk_count { - return Err(BundleError::NoSuchChunk(self.id(), id)) - } - Ok((self.chunk_positions[id], self.chunks[id].1 as usize)) - } - - pub fn check(&self, full: bool) -> Result<(), BundleError> { - //FIXME: adapt to new format - if self.info.chunk_count != self.chunks.len() { - return Err(BundleError::Integrity(self.id(), - "Chunk list size does not match chunk count")) - } - if self.chunks.iter().map(|c| c.1 as usize).sum::() != self.info.raw_size { - return Err(BundleError::Integrity(self.id(), - "Individual chunk sizes do not add up to total size")) - } - if !full { - let size = try!(fs::metadata(&self.path).context(&self.path as &Path)).len(); - if size as usize != self.info.encoded_size + self.content_start { - return Err(BundleError::Integrity(self.id(), - "File size does not match size in header, truncated file")) - } - return Ok(()) - } - let encoded_contents = try!(self.load_encoded_contents()); - if self.info.encoded_size != encoded_contents.len() { - return Err(BundleError::Integrity(self.id(), - "Encoded data size does not match size in header, truncated bundle")) - } - let contents = try!(self.decode_contents(encoded_contents)); - if self.info.raw_size != contents.len() { - return Err(BundleError::Integrity(self.id(), - "Raw data size does not match size in header, truncated bundle")) - } - //TODO: verify checksum - Ok(()) - } -} - -impl Debug for Bundle { - fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(fmt, "Bundle(\n\tid: {}\n\tpath: {:?}\n\tchunks: {}\n\tsize: {}, encoded: {}\n\tcompression: {:?}\n)", - self.info.id.to_string(), self.path, self.info.chunk_count, self.info.raw_size, - self.info.encoded_size, self.info.compression) - } -} - - - -pub struct BundleWriter { - mode: BundleMode, - hash_method: HashMethod, - data: Vec, - compression: Option, - compression_stream: Option, - encryption: Option, - crypto: Arc>, - raw_size: usize, - chunk_count: usize, - chunks: ChunkList, -} - -impl BundleWriter { - fn new( - mode: BundleMode, - hash_method: HashMethod, - compression: Option, - encryption: Option, - crypto: Arc> - ) -> Result { - let compression_stream = match compression { - Some(ref compression) => Some(try!(compression.compress_stream())), - None => None - }; - Ok(BundleWriter { - mode: mode, - hash_method: hash_method, - data: vec![], - compression: compression, - compression_stream: compression_stream, - encryption: encryption, - crypto: crypto, - raw_size: 0, - chunk_count: 0, - chunks: ChunkList::new() - }) - } - - pub fn add(&mut self, chunk: &[u8], hash: Hash) -> Result { - if let Some(ref mut stream) = self.compression_stream { - try!(stream.process(chunk, &mut self.data)) - } else { - self.data.extend_from_slice(chunk) - } - self.raw_size += chunk.len(); - self.chunk_count += 1; - self.chunks.push((hash, chunk.len() as u32)); - Ok(self.chunk_count-1) - } - - fn finish(mut self, db: &BundleDb) -> Result { - if let Some(stream) = self.compression_stream { - try!(stream.finish(&mut self.data)) - } - if let Some(ref encryption) = self.encryption { - self.data = try!(self.crypto.lock().unwrap().encrypt(&encryption, &self.data)); - } - let encoded_size = self.data.len(); - let mut chunk_data = Vec::with_capacity(self.chunks.encoded_size()); - self.chunks.write_to(&mut chunk_data).unwrap(); - let id = BundleId(self.hash_method.hash(&chunk_data)); - if let Some(ref encryption) = self.encryption { - chunk_data = try!(self.crypto.lock().unwrap().encrypt(&encryption, &chunk_data)); - } - let (folder, file) = db.bundle_path(&id); - let path = folder.join(file); - try!(fs::create_dir_all(&folder).context(&path as &Path)); - let mut file = BufWriter::new(try!(File::create(&path).context(&path as &Path))); - try!(file.write_all(&HEADER_STRING).context(&path as &Path)); - try!(file.write_all(&[HEADER_VERSION]).context(&path as &Path)); - let header = BundleInfo { - mode: self.mode, - hash_method: self.hash_method, - compression: self.compression, - encryption: self.encryption, - chunk_count: self.chunk_count, - id: id.clone(), - raw_size: self.raw_size, - encoded_size: encoded_size, - chunk_info_size: chunk_data.len() - }; - try!(msgpack::encode_to_stream(&header, &mut file).context(&path as &Path)); - try!(file.write_all(&chunk_data).context(&path as &Path)); - let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize; - try!(file.write_all(&self.data).context(&path as &Path)); - Ok(Bundle::new(path, HEADER_VERSION, content_start, self.crypto, header, self.chunks)) - } - - #[inline] - pub fn size(&self) -> usize { - self.data.len() - } - - #[inline] - pub fn raw_size(&self) -> usize { - self.raw_size - } -} - - -pub struct BundleDb { - path: PathBuf, - crypto: Arc>, - bundles: HashMap, - bundle_cache: LruCache> -} - - -impl BundleDb { - fn new(path: PathBuf, crypto: Arc>) -> Self { - BundleDb { - path: path, - crypto: crypto, - bundles: HashMap::new(), - bundle_cache: LruCache::new(5, 10) - } - } - - fn bundle_path(&self, bundle: &BundleId) -> (PathBuf, PathBuf) { - let mut folder = self.path.clone(); - let mut file = bundle.to_string().to_owned() + ".bundle"; - let mut count = self.bundles.len(); - while count >= 100 { - if file.len() < 10 { - break - } - folder = folder.join(&file[0..2]); - file = file[2..].to_string(); - count /= 100; - } - (folder, file.into()) - } - - fn load_bundle_list(&mut self) -> Result<(), BundleError> { - self.bundles.clear(); - let mut paths = Vec::new(); - paths.push(self.path.clone()); - while let Some(path) = paths.pop() { - for entry in try!(fs::read_dir(path).map_err(BundleError::List)) { - let entry = try!(entry.map_err(BundleError::List)); - let path = entry.path(); - if path.is_dir() { - paths.push(path); - } else { - let bundle = try!(Bundle::load(path, self.crypto.clone())); - self.bundles.insert(bundle.id(), bundle); - } - } - } - Ok(()) - } - - #[inline] - pub fn open>(path: P, crypto: Arc>) -> Result { - let path = path.as_ref().to_owned(); - let mut self_ = Self::new(path, crypto); - try!(self_.load_bundle_list()); - Ok(self_) - } - - #[inline] - pub fn create>(path: P, crypto: Arc>) -> Result { - let path = path.as_ref().to_owned(); - try!(fs::create_dir_all(&path).context(&path as &Path)); - Ok(Self::new(path, crypto)) - } - - #[inline] - pub fn create_bundle( - &self, - mode: BundleMode, - hash_method: HashMethod, - compression: Option, - encryption: Option - ) -> Result { - BundleWriter::new(mode, hash_method, compression, encryption, self.crypto.clone()) - } - - pub fn get_chunk(&mut self, bundle_id: &BundleId, id: usize) -> Result, BundleError> { - let bundle = try!(self.bundles.get(bundle_id).ok_or(BundleError::NoSuchBundle(bundle_id.clone()))); - let (pos, len) = try!(bundle.get_chunk_position(id)); - let mut chunk = Vec::with_capacity(len); - if let Some(data) = self.bundle_cache.get(bundle_id) { - chunk.extend_from_slice(&data[pos..pos+len]); - return Ok(chunk); - } - let data = try!(bundle.load_contents()); - chunk.extend_from_slice(&data[pos..pos+len]); - self.bundle_cache.put(bundle_id.clone(), data); - Ok(chunk) - } - - #[inline] - pub fn add_bundle(&mut self, bundle: BundleWriter) -> Result<&Bundle, BundleError> { - let bundle = try!(bundle.finish(&self)); - let id = bundle.id(); - self.bundles.insert(id.clone(), bundle); - Ok(self.get_bundle(&id).unwrap()) - } - - #[inline] - pub fn get_bundle(&self, bundle: &BundleId) -> Option<&Bundle> { - self.bundles.get(bundle) - } - - #[inline] - pub fn list_bundles(&self) -> Vec<&Bundle> { - self.bundles.values().collect() - } - - #[inline] - pub fn delete_bundle(&mut self, bundle: &BundleId) -> Result<(), BundleError> { - if let Some(bundle) = self.bundles.remove(bundle) { - fs::remove_file(&bundle.path).map_err(|e| BundleError::Remove(e, bundle.id())) - } else { - Err(BundleError::NoSuchBundle(bundle.clone())) - } - } - - #[inline] - pub fn check(&self, full: bool) -> Result<(), BundleError> { - for bundle in self.bundles.values() { - try!(bundle.check(full)) - } - Ok(()) - } -} diff --git a/src/bundledb/bundle.rs b/src/bundledb/bundle.rs new file mode 100644 index 0000000..95203ef --- /dev/null +++ b/src/bundledb/bundle.rs @@ -0,0 +1,240 @@ +use std::path::{Path, PathBuf}; +use std::fs::{self, File}; +use std::io::{Read, Seek, SeekFrom, BufReader}; +use std::cmp::max; +use std::fmt::{self, Debug}; +use std::sync::{Arc, Mutex}; + +use serde::{self, Serialize, Deserialize}; +use quick_error::ResultExt; + +use util::*; +use super::*; + + +static HEADER_STRING: [u8; 7] = *b"zvault\x01"; +static HEADER_VERSION: u8 = 1; + + + +#[derive(Hash, PartialEq, Eq, Clone, Default)] +pub struct BundleId(pub Hash); + +impl Serialize for BundleId { + fn serialize(&self, ser: S) -> Result { + self.0.serialize(ser) + } +} + +impl Deserialize for BundleId { + fn deserialize(de: D) -> Result { + let hash = try!(Hash::deserialize(de)); + Ok(BundleId(hash)) + } +} + +impl BundleId { + #[inline] + fn to_string(&self) -> String { + self.0.to_string() + } +} + +impl fmt::Display for BundleId { + #[inline] + fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(fmt, "{}", self.to_string()) + } +} + +impl fmt::Debug for BundleId { + #[inline] + fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(fmt, "{}", self.to_string()) + } +} + + +#[derive(Eq, Debug, PartialEq, Clone, Copy)] +pub enum BundleMode { + Content, Meta +} +serde_impl!(BundleMode(u8) { + Content => 0, + Meta => 1 +}); + + +#[derive(Clone)] +pub struct BundleInfo { + pub id: BundleId, + pub mode: BundleMode, + pub compression: Option, + pub encryption: Option, + pub hash_method: HashMethod, + pub raw_size: usize, + pub encoded_size: usize, + pub chunk_count: usize, + pub chunk_info_size: usize +} +serde_impl!(BundleInfo(u64) { + id: BundleId => 0, + mode: BundleMode => 1, + compression: Option => 2, + encryption: Option => 3, + hash_method: HashMethod => 4, + raw_size: usize => 6, + encoded_size: usize => 7, + chunk_count: usize => 8, + chunk_info_size: usize => 9 +}); + +impl Default for BundleInfo { + fn default() -> Self { + BundleInfo { + id: BundleId(Hash::empty()), + compression: None, + encryption: None, + hash_method: HashMethod::Blake2, + raw_size: 0, + encoded_size: 0, + chunk_count: 0, + mode: BundleMode::Content, + chunk_info_size: 0 + } + } +} + + +pub struct Bundle { + pub info: BundleInfo, + pub chunks: ChunkList, + pub version: u8, + pub path: PathBuf, + crypto: Arc>, + pub content_start: usize, + pub chunk_positions: Vec +} + +impl Bundle { + pub fn new(path: PathBuf, version: u8, content_start: usize, crypto: Arc>, info: BundleInfo, chunks: ChunkList) -> Self { + let mut chunk_positions = Vec::with_capacity(chunks.len()); + let mut pos = 0; + for &(_, len) in (&chunks).iter() { + chunk_positions.push(pos); + pos += len as usize; + } + Bundle { + info: info, + chunks: chunks, + version: version, + path: path, + crypto: crypto, + content_start: content_start, + chunk_positions: chunk_positions + } + } + + #[inline] + pub fn id(&self) -> BundleId { + self.info.id.clone() + } + + pub fn load(path: PathBuf, crypto: Arc>) -> Result { + let mut file = BufReader::new(try!(File::open(&path).context(&path as &Path))); + let mut header = [0u8; 8]; + try!(file.read_exact(&mut header).context(&path as &Path)); + if header[..HEADER_STRING.len()] != HEADER_STRING { + return Err(BundleError::WrongHeader(path.clone())) + } + let version = header[HEADER_STRING.len()]; + if version != HEADER_VERSION { + return Err(BundleError::WrongVersion(path.clone(), version)) + } + let header: BundleInfo = try!(msgpack::decode_from_stream(&mut file).context(&path as &Path)); + debug!("Load bundle {}", header.id); + let mut chunk_data = Vec::with_capacity(header.chunk_info_size); + chunk_data.resize(header.chunk_info_size, 0); + try!(file.read_exact(&mut chunk_data).context(&path as &Path)); + if let Some(ref encryption) = header.encryption { + chunk_data = try!(crypto.lock().unwrap().decrypt(&encryption, &chunk_data).context(&path as &Path)); + } + let chunks = ChunkList::read_from(&chunk_data); + let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize; + Ok(Bundle::new(path, version, content_start, crypto, header, chunks)) + } + + #[inline] + fn load_encoded_contents(&self) -> Result, BundleError> { + debug!("Load bundle data {} ({:?})", self.info.id, self.info.mode); + let mut file = BufReader::new(try!(File::open(&self.path).context(&self.path as &Path))); + try!(file.seek(SeekFrom::Start(self.content_start as u64)).context(&self.path as &Path)); + let mut data = Vec::with_capacity(max(self.info.encoded_size, self.info.raw_size)+1024); + try!(file.read_to_end(&mut data).context(&self.path as &Path)); + Ok(data) + } + + #[inline] + fn decode_contents(&self, mut data: Vec) -> Result, BundleError> { + if let Some(ref encryption) = self.info.encryption { + data = try!(self.crypto.lock().unwrap().decrypt(&encryption, &data).context(&self.path as &Path)); + } + if let Some(ref compression) = self.info.compression { + data = try!(compression.decompress(&data).context(&self.path as &Path)); + } + Ok(data) + } + + #[inline] + pub fn load_contents(&self) -> Result, BundleError> { + self.load_encoded_contents().and_then(|data| self.decode_contents(data)) + } + + #[inline] + pub fn get_chunk_position(&self, id: usize) -> Result<(usize, usize), BundleError> { + if id >= self.info.chunk_count { + return Err(BundleError::NoSuchChunk(self.id(), id)) + } + Ok((self.chunk_positions[id], self.chunks[id].1 as usize)) + } + + pub fn check(&self, full: bool) -> Result<(), BundleError> { + //FIXME: adapt to new format + if self.info.chunk_count != self.chunks.len() { + return Err(BundleError::Integrity(self.id(), + "Chunk list size does not match chunk count")) + } + if self.chunks.iter().map(|c| c.1 as usize).sum::() != self.info.raw_size { + return Err(BundleError::Integrity(self.id(), + "Individual chunk sizes do not add up to total size")) + } + if !full { + let size = try!(fs::metadata(&self.path).context(&self.path as &Path)).len(); + if size as usize != self.info.encoded_size + self.content_start { + return Err(BundleError::Integrity(self.id(), + "File size does not match size in header, truncated file")) + } + return Ok(()) + } + let encoded_contents = try!(self.load_encoded_contents()); + if self.info.encoded_size != encoded_contents.len() { + return Err(BundleError::Integrity(self.id(), + "Encoded data size does not match size in header, truncated bundle")) + } + let contents = try!(self.decode_contents(encoded_contents)); + if self.info.raw_size != contents.len() { + return Err(BundleError::Integrity(self.id(), + "Raw data size does not match size in header, truncated bundle")) + } + //TODO: verify checksum + Ok(()) + } +} + +impl Debug for Bundle { + fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(fmt, "Bundle(\n\tid: {}\n\tpath: {:?}\n\tchunks: {}\n\tsize: {}, encoded: {}\n\tcompression: {:?}\n)", + self.info.id.to_string(), self.path, self.info.chunk_count, self.info.raw_size, + self.info.encoded_size, self.info.compression) + } +} diff --git a/src/bundledb/db.rs b/src/bundledb/db.rs new file mode 100644 index 0000000..a04a464 --- /dev/null +++ b/src/bundledb/db.rs @@ -0,0 +1,138 @@ +use std::path::{Path, PathBuf}; +use std::collections::HashMap; +use std::fs; +use std::sync::{Arc, Mutex}; + +use quick_error::ResultExt; + +use util::*; +use super::*; + + +pub struct BundleDb { + path: PathBuf, + crypto: Arc>, + bundles: HashMap, + bundle_cache: LruCache> +} + + +impl BundleDb { + fn new(path: PathBuf, crypto: Arc>) -> Self { + BundleDb { + path: path, + crypto: crypto, + bundles: HashMap::new(), + bundle_cache: LruCache::new(5, 10) + } + } + + pub fn bundle_path(&self, bundle: &BundleId) -> (PathBuf, PathBuf) { + let mut folder = self.path.clone(); + let mut file = bundle.to_string().to_owned() + ".bundle"; + let mut count = self.bundles.len(); + while count >= 100 { + if file.len() < 10 { + break + } + folder = folder.join(&file[0..2]); + file = file[2..].to_string(); + count /= 100; + } + (folder, file.into()) + } + + fn load_bundle_list(&mut self) -> Result<(), BundleError> { + self.bundles.clear(); + let mut paths = Vec::new(); + paths.push(self.path.clone()); + while let Some(path) = paths.pop() { + for entry in try!(fs::read_dir(path).map_err(BundleError::List)) { + let entry = try!(entry.map_err(BundleError::List)); + let path = entry.path(); + if path.is_dir() { + paths.push(path); + } else { + let bundle = try!(Bundle::load(path, self.crypto.clone())); + self.bundles.insert(bundle.id(), bundle); + } + } + } + Ok(()) + } + + #[inline] + pub fn open>(path: P, crypto: Arc>) -> Result { + let path = path.as_ref().to_owned(); + let mut self_ = Self::new(path, crypto); + try!(self_.load_bundle_list()); + Ok(self_) + } + + #[inline] + pub fn create>(path: P, crypto: Arc>) -> Result { + let path = path.as_ref().to_owned(); + try!(fs::create_dir_all(&path).context(&path as &Path)); + Ok(Self::new(path, crypto)) + } + + #[inline] + pub fn create_bundle( + &self, + mode: BundleMode, + hash_method: HashMethod, + compression: Option, + encryption: Option + ) -> Result { + BundleWriter::new(mode, hash_method, compression, encryption, self.crypto.clone()) + } + + pub fn get_chunk(&mut self, bundle_id: &BundleId, id: usize) -> Result, BundleError> { + let bundle = try!(self.bundles.get(bundle_id).ok_or(BundleError::NoSuchBundle(bundle_id.clone()))); + let (pos, len) = try!(bundle.get_chunk_position(id)); + let mut chunk = Vec::with_capacity(len); + if let Some(data) = self.bundle_cache.get(bundle_id) { + chunk.extend_from_slice(&data[pos..pos+len]); + return Ok(chunk); + } + let data = try!(bundle.load_contents()); + chunk.extend_from_slice(&data[pos..pos+len]); + self.bundle_cache.put(bundle_id.clone(), data); + Ok(chunk) + } + + #[inline] + pub fn add_bundle(&mut self, bundle: BundleWriter) -> Result<&Bundle, BundleError> { + let bundle = try!(bundle.finish(&self)); + let id = bundle.id(); + self.bundles.insert(id.clone(), bundle); + Ok(self.get_bundle(&id).unwrap()) + } + + #[inline] + pub fn get_bundle(&self, bundle: &BundleId) -> Option<&Bundle> { + self.bundles.get(bundle) + } + + #[inline] + pub fn list_bundles(&self) -> Vec<&Bundle> { + self.bundles.values().collect() + } + + #[inline] + pub fn delete_bundle(&mut self, bundle: &BundleId) -> Result<(), BundleError> { + if let Some(bundle) = self.bundles.remove(bundle) { + fs::remove_file(&bundle.path).map_err(|e| BundleError::Remove(e, bundle.id())) + } else { + Err(BundleError::NoSuchBundle(bundle.clone())) + } + } + + #[inline] + pub fn check(&self, full: bool) -> Result<(), BundleError> { + for bundle in self.bundles.values() { + try!(bundle.check(full)) + } + Ok(()) + } +} diff --git a/src/bundledb/error.rs b/src/bundledb/error.rs new file mode 100644 index 0000000..8b0ab56 --- /dev/null +++ b/src/bundledb/error.rs @@ -0,0 +1,83 @@ +use std::path::{Path, PathBuf}; +use std::io; + +use util::*; +use super::*; + +quick_error!{ + #[derive(Debug)] + pub enum BundleError { + List(err: io::Error) { + cause(err) + description("Failed to list bundles") + display("Failed to list bundles: {}", err) + } + Io(err: io::Error, path: PathBuf) { + cause(err) + context(path: &'a Path, err: io::Error) -> (err, path.to_path_buf()) + description("Failed to read/write bundle") + display("Failed to read/write bundle {:?}: {}", path, err) + } + Decode(err: msgpack::DecodeError, path: PathBuf) { + cause(err) + context(path: &'a Path, err: msgpack::DecodeError) -> (err, path.to_path_buf()) + description("Failed to decode bundle header") + display("Failed to decode bundle header of {:?}: {}", path, err) + } + Encode(err: msgpack::EncodeError, path: PathBuf) { + cause(err) + context(path: &'a Path, err: msgpack::EncodeError) -> (err, path.to_path_buf()) + description("Failed to encode bundle header") + display("Failed to encode bundle header of {:?}: {}", path, err) + } + WrongHeader(path: PathBuf) { + description("Wrong header") + display("Wrong header on bundle {:?}", path) + } + WrongVersion(path: PathBuf, version: u8) { + description("Wrong version") + display("Wrong version on bundle {:?}: {}", path, version) + } + Integrity(bundle: BundleId, reason: &'static str) { + description("Bundle has an integrity error") + display("Bundle {:?} has an integrity error: {}", bundle, reason) + } + NoSuchBundle(bundle: BundleId) { + description("No such bundle") + display("No such bundle: {:?}", bundle) + } + NoSuchChunk(bundle: BundleId, id: usize) { + description("Bundle has no such chunk") + display("Bundle {:?} has no chunk with that id: {}", bundle, id) + } + Decompression(err: CompressionError, path: PathBuf) { + cause(err) + context(path: &'a Path, err: CompressionError) -> (err, path.to_path_buf()) + description("Decompression failed") + display("Decompression failed on bundle {:?}: {}", path, err) + } + Compression(err: CompressionError) { + from() + cause(err) + description("Compression failed") + display("Compression failed: {}", err) + } + Decryption(err: EncryptionError, path: PathBuf) { + cause(err) + context(path: &'a Path, err: EncryptionError) -> (err, path.to_path_buf()) + description("Decryption failed") + display("Decryption failed on bundle {:?}: {}", path, err) + } + Encryption(err: EncryptionError) { + from() + cause(err) + description("Encryption failed") + display("Encryption failed: {}", err) + } + Remove(err: io::Error, bundle: BundleId) { + cause(err) + description("Failed to remove bundle") + display("Failed to remove bundle {}", bundle) + } + } +} diff --git a/src/bundledb/mod.rs b/src/bundledb/mod.rs new file mode 100644 index 0000000..9fa4c8e --- /dev/null +++ b/src/bundledb/mod.rs @@ -0,0 +1,19 @@ +mod error; +mod writer; +mod bundle; +mod db; + +pub use self::error::BundleError; +pub use self::writer::BundleWriter; +pub use self::bundle::*; +pub use self::db::*; + +/* + +Bundle format +- Magic header + version +- Encoded header structure (contains size of next structure) +- Encoded chunk list (with chunk hashes and sizes) +- Chunk data + +*/ diff --git a/src/bundledb/writer.rs b/src/bundledb/writer.rs new file mode 100644 index 0000000..139ff16 --- /dev/null +++ b/src/bundledb/writer.rs @@ -0,0 +1,114 @@ +use std::path::Path; +use std::fs::{self, File}; +use std::io::{Write, Seek, SeekFrom, BufWriter}; +use std::sync::{Arc, Mutex}; + +use quick_error::ResultExt; + +use util::*; +use super::*; + + +static HEADER_STRING: [u8; 7] = *b"zvault\x01"; +static HEADER_VERSION: u8 = 1; + + +pub struct BundleWriter { + mode: BundleMode, + hash_method: HashMethod, + data: Vec, + compression: Option, + compression_stream: Option, + encryption: Option, + crypto: Arc>, + raw_size: usize, + chunk_count: usize, + chunks: ChunkList, +} + +impl BundleWriter { + pub fn new( + mode: BundleMode, + hash_method: HashMethod, + compression: Option, + encryption: Option, + crypto: Arc> + ) -> Result { + let compression_stream = match compression { + Some(ref compression) => Some(try!(compression.compress_stream())), + None => None + }; + Ok(BundleWriter { + mode: mode, + hash_method: hash_method, + data: vec![], + compression: compression, + compression_stream: compression_stream, + encryption: encryption, + crypto: crypto, + raw_size: 0, + chunk_count: 0, + chunks: ChunkList::new() + }) + } + + pub fn add(&mut self, chunk: &[u8], hash: Hash) -> Result { + if let Some(ref mut stream) = self.compression_stream { + try!(stream.process(chunk, &mut self.data)) + } else { + self.data.extend_from_slice(chunk) + } + self.raw_size += chunk.len(); + self.chunk_count += 1; + self.chunks.push((hash, chunk.len() as u32)); + Ok(self.chunk_count-1) + } + + pub fn finish(mut self, db: &BundleDb) -> Result { + if let Some(stream) = self.compression_stream { + try!(stream.finish(&mut self.data)) + } + if let Some(ref encryption) = self.encryption { + self.data = try!(self.crypto.lock().unwrap().encrypt(&encryption, &self.data)); + } + let encoded_size = self.data.len(); + let mut chunk_data = Vec::with_capacity(self.chunks.encoded_size()); + self.chunks.write_to(&mut chunk_data).unwrap(); + let id = BundleId(self.hash_method.hash(&chunk_data)); + if let Some(ref encryption) = self.encryption { + chunk_data = try!(self.crypto.lock().unwrap().encrypt(&encryption, &chunk_data)); + } + let (folder, file) = db.bundle_path(&id); + let path = folder.join(file); + try!(fs::create_dir_all(&folder).context(&path as &Path)); + let mut file = BufWriter::new(try!(File::create(&path).context(&path as &Path))); + try!(file.write_all(&HEADER_STRING).context(&path as &Path)); + try!(file.write_all(&[HEADER_VERSION]).context(&path as &Path)); + let header = BundleInfo { + mode: self.mode, + hash_method: self.hash_method, + compression: self.compression, + encryption: self.encryption, + chunk_count: self.chunk_count, + id: id.clone(), + raw_size: self.raw_size, + encoded_size: encoded_size, + chunk_info_size: chunk_data.len() + }; + try!(msgpack::encode_to_stream(&header, &mut file).context(&path as &Path)); + try!(file.write_all(&chunk_data).context(&path as &Path)); + let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize; + try!(file.write_all(&self.data).context(&path as &Path)); + Ok(Bundle::new(path, HEADER_VERSION, content_start, self.crypto, header, self.chunks)) + } + + #[inline] + pub fn size(&self) -> usize { + self.data.len() + } + + #[inline] + pub fn raw_size(&self) -> usize { + self.raw_size + } +} diff --git a/src/main.rs b/src/main.rs index f8463ea..5e24c09 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,7 +18,7 @@ extern crate libc; pub mod util; -pub mod bundle; +pub mod bundledb; pub mod index; mod chunker; mod repository; diff --git a/src/repository/basic_io.rs b/src/repository/basic_io.rs index 5b73d77..cf4a34c 100644 --- a/src/repository/basic_io.rs +++ b/src/repository/basic_io.rs @@ -3,7 +3,7 @@ use std::io::{Read, Write, Cursor}; use super::{Repository, RepositoryError}; use ::index::Location; -use ::bundle::{BundleId, BundleMode}; +use ::bundledb::{BundleId, BundleMode}; use super::integrity::RepositoryIntegrityError; use ::util::*; diff --git a/src/repository/bundle_map.rs b/src/repository/bundle_map.rs index 91964a9..770207a 100644 --- a/src/repository/bundle_map.rs +++ b/src/repository/bundle_map.rs @@ -3,7 +3,7 @@ use std::path::Path; use std::io::{self, BufReader, Read, Write, BufWriter}; use std::fs::File; -use ::bundle::{Bundle, BundleId, BundleInfo}; +use ::bundledb::{Bundle, BundleId, BundleInfo}; use ::util::*; diff --git a/src/repository/error.rs b/src/repository/error.rs index 8daa8cc..6c153ea 100644 --- a/src/repository/error.rs +++ b/src/repository/error.rs @@ -6,7 +6,7 @@ use super::bundle_map::BundleMapError; use super::config::ConfigError; use super::integrity::RepositoryIntegrityError; use ::index::IndexError; -use ::bundle::BundleError; +use ::bundledb::BundleError; use ::chunker::ChunkerError; use ::util::*; diff --git a/src/repository/info.rs b/src/repository/info.rs index 50582ea..c739cc7 100644 --- a/src/repository/info.rs +++ b/src/repository/info.rs @@ -1,5 +1,5 @@ use super::Repository; -use ::bundle::BundleInfo; +use ::bundledb::BundleInfo; pub struct RepositoryInfo { pub bundle_count: usize, diff --git a/src/repository/integrity.rs b/src/repository/integrity.rs index 71a35fb..a85811f 100644 --- a/src/repository/integrity.rs +++ b/src/repository/integrity.rs @@ -1,7 +1,7 @@ use super::{Repository, RepositoryError}; use super::metadata::FileContents; -use ::bundle::BundleId; +use ::bundledb::BundleId; use ::util::*; use std::collections::VecDeque; diff --git a/src/repository/metadata.rs b/src/repository/metadata.rs index d93eff4..92342ff 100644 --- a/src/repository/metadata.rs +++ b/src/repository/metadata.rs @@ -8,7 +8,7 @@ use std::io::{Read, Write}; use ::util::*; use super::{Repository, RepositoryError}; use super::integrity::RepositoryIntegrityError; -use ::bundle::BundleMode; +use ::bundledb::BundleMode; #[derive(Debug, Eq, PartialEq)] diff --git a/src/repository/mod.rs b/src/repository/mod.rs index ecd5d1b..2b46b33 100644 --- a/src/repository/mod.rs +++ b/src/repository/mod.rs @@ -15,7 +15,7 @@ use std::fs; use std::sync::{Arc, Mutex}; use super::index::Index; -use super::bundle::{BundleDb, BundleWriter}; +use super::bundledb::{BundleDb, BundleWriter}; use super::chunker::Chunker; use ::util::*; diff --git a/src/repository/vacuum.rs b/src/repository/vacuum.rs index ca02415..818486a 100644 --- a/src/repository/vacuum.rs +++ b/src/repository/vacuum.rs @@ -3,7 +3,7 @@ use super::metadata::FileContents; use std::collections::{HashMap, HashSet, VecDeque}; -use ::bundle::BundleMode; +use ::bundledb::BundleMode; use ::util::*;