This commit is contained in:
Dennis Schwerdel 2017-03-16 09:42:30 +01:00
parent 717fc7472d
commit 0b673d145f
17 changed files with 558 additions and 351 deletions

View File

@ -1,24 +1,77 @@
use std::path::{Path, PathBuf};
use std::collections::HashMap;
use std::fs::{self, File};
use std::io::{Read, Write, Seek, SeekFrom, BufWriter, BufReader};
use std::io::{self, Read, Write, Seek, SeekFrom, BufWriter, BufReader};
use std::cmp::max;
use std::fmt::{self, Debug, Write as FmtWrite};
use std::sync::{Arc, Mutex};
use serde::{self, Serialize, Deserialize};
use serde::bytes::ByteBuf;
use rmp_serde;
use errors::BundleError;
use util::*;
static HEADER_STRING: [u8; 7] = *b"zbundle";
static HEADER_VERSION: u8 = 1;
// TODO: Test cases
// TODO: Benchmarks
quick_error!{
#[derive(Debug)]
pub enum BundleError {
List(err: io::Error) {
cause(err)
description("Failed to list bundles")
}
Read(err: io::Error, path: PathBuf) {
cause(err)
description("Failed to read bundle")
}
Decode(err: msgpack::DecodeError, path: PathBuf) {
cause(err)
description("Failed to decode bundle header")
}
Write(err: io::Error, path: PathBuf) {
cause(err)
description("Failed to write bundle")
}
Encode(err: msgpack::EncodeError, path: PathBuf) {
cause(err)
description("Failed to encode bundle header")
}
WrongHeader(path: PathBuf) {
description("Wrong header")
display("Wrong header on bundle {:?}", path)
}
WrongVersion(path: PathBuf, version: u8) {
description("Wrong version")
display("Wrong version on bundle {:?}: {}", path, version)
}
Integrity(bundle: BundleId, reason: &'static str) {
description("Bundle has an integrity error")
display("Bundle {:?} has an integrity error: {}", bundle, reason)
}
NoSuchBundle(bundle: BundleId) {
description("No such bundle")
display("No such bundle: {:?}", bundle)
}
NoSuchChunk(bundle: BundleId, id: usize) {
description("Bundle has no such chunk")
display("Bundle {:?} has no chunk with that id: {}", bundle, id)
}
Compression(err: CompressionError) {
from()
cause(err)
}
Encryption(err: EncryptionError) {
from()
cause(err)
}
Remove(err: io::Error, bundle: BundleId) {
cause(err)
description("Failed to remove bundle")
display("Failed to remove bundle {}", bundle)
}
}
}
#[derive(Hash, PartialEq, Eq, Clone, Default)]
@ -32,7 +85,7 @@ impl Serialize for BundleId {
impl Deserialize for BundleId {
fn deserialize<D: serde::Deserializer>(de: D) -> Result<Self, D::Error> {
let bytes = try!(ByteBuf::deserialize(de));
let bytes = try!(msgpack::Bytes::deserialize(de));
Ok(BundleId(bytes.into()))
}
}
@ -92,7 +145,7 @@ impl Default for BundleInfo {
id: BundleId(vec![]),
compression: None,
encryption: None,
checksum: (ChecksumType::Blake2_256, ByteBuf::new()),
checksum: (ChecksumType::Blake2_256, msgpack::Bytes::new()),
raw_size: 0,
encoded_size: 0,
chunk_count: 0,
@ -135,34 +188,28 @@ impl Bundle {
}
pub fn load(path: PathBuf, crypto: Arc<Mutex<Crypto>>) -> Result<Self, BundleError> {
let mut file = BufReader::new(try!(File::open(&path)
.map_err(|e| BundleError::Read(e, path.clone(), "Failed to open bundle file"))));
let mut file = BufReader::new(try!(File::open(&path).map_err(|e| BundleError::Read(e, path.clone()))));
let mut header = [0u8; 8];
try!(file.read_exact(&mut header)
.map_err(|e| BundleError::Read(e, path.clone(), "Failed to read bundle header")));
try!(file.read_exact(&mut header).map_err(|e| BundleError::Read(e, path.clone())));
if header[..HEADER_STRING.len()] != HEADER_STRING {
return Err(BundleError::Format(path.clone(), "Wrong header string"))
return Err(BundleError::WrongHeader(path.clone()))
}
let version = header[HEADER_STRING.len()];
if version != HEADER_VERSION {
return Err(BundleError::Format(path.clone(), "Unsupported bundle file version"))
return Err(BundleError::WrongVersion(path.clone(), version))
}
let mut reader = rmp_serde::Deserializer::new(file);
let header = try!(BundleInfo::deserialize(&mut reader)
let header = try!(msgpack::decode_from_stream(&mut file)
.map_err(|e| BundleError::Decode(e, path.clone())));
file = reader.into_inner();
let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize;
Ok(Bundle::new(path, version, content_start, crypto, header))
}
#[inline]
fn load_encoded_contents(&self) -> Result<Vec<u8>, BundleError> {
let mut file = BufReader::new(try!(File::open(&self.path)
.map_err(|e| BundleError::Read(e, self.path.clone(), "Failed to open bundle file"))));
try!(file.seek(SeekFrom::Start(self.content_start as u64))
.map_err(|e| BundleError::Read(e, self.path.clone(), "Failed to seek to data")));
let mut file = BufReader::new(try!(File::open(&self.path).map_err(|e| BundleError::Read(e, self.path.clone()))));
try!(file.seek(SeekFrom::Start(self.content_start as u64)).map_err(|e| BundleError::Read(e, self.path.clone())));
let mut data = Vec::with_capacity(max(self.info.encoded_size, self.info.raw_size)+1024);
try!(file.read_to_end(&mut data).map_err(|_| "Failed to read data"));
try!(file.read_to_end(&mut data).map_err(|e| BundleError::Read(e, self.path.clone())));
Ok(data)
}
@ -185,7 +232,7 @@ impl Bundle {
#[inline]
pub fn get_chunk_position(&self, id: usize) -> Result<(usize, usize), BundleError> {
if id >= self.info.chunk_count {
return Err("Invalid chunk id".into())
return Err(BundleError::NoSuchChunk(self.id(), id))
}
Ok((self.chunk_positions[id], self.info.chunk_sizes[id]))
}
@ -200,8 +247,7 @@ impl Bundle {
"Individual chunk sizes do not add up to total size"))
}
if !full {
let size = try!(fs::metadata(&self.path)
.map_err(|e| BundleError::Read(e, self.path.clone(), "Failed to get size of file"))
let size = try!(fs::metadata(&self.path).map_err(|e| BundleError::Read(e, self.path.clone()))
).len();
if size as usize != self.info.encoded_size + self.content_start {
return Err(BundleError::Integrity(self.id(),
@ -290,14 +336,10 @@ impl BundleWriter {
let id = BundleId(checksum.1.to_vec());
let (folder, file) = db.bundle_path(&id);
let path = folder.join(file);
try!(fs::create_dir_all(&folder)
.map_err(|e| BundleError::Write(e, path.clone(), "Failed to create folder")));
let mut file = BufWriter::new(try!(File::create(&path)
.map_err(|e| BundleError::Write(e, path.clone(), "Failed to create bundle file"))));
try!(file.write_all(&HEADER_STRING)
.map_err(|e| BundleError::Write(e, path.clone(), "Failed to write bundle header")));
try!(file.write_all(&[HEADER_VERSION])
.map_err(|e| BundleError::Write(e, path.clone(), "Failed to write bundle header")));
try!(fs::create_dir_all(&folder).map_err(|e| BundleError::Write(e, path.clone())));
let mut file = BufWriter::new(try!(File::create(&path).map_err(|e| BundleError::Write(e, path.clone()))));
try!(file.write_all(&HEADER_STRING).map_err(|e| BundleError::Write(e, path.clone())));
try!(file.write_all(&[HEADER_VERSION]).map_err(|e| BundleError::Write(e, path.clone())));
let header = BundleInfo {
checksum: checksum,
compression: self.compression,
@ -308,14 +350,10 @@ impl BundleWriter {
encoded_size: encoded_size,
chunk_sizes: self.chunk_sizes
};
{
let mut writer = rmp_serde::Serializer::new(&mut file);
try!(header.serialize(&mut writer)
.map_err(|e| BundleError::Encode(e, path.clone())));
}
try!(msgpack::encode_to_stream(&header, &mut file)
.map_err(|e| BundleError::Encode(e, path.clone())));
let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize;
try!(file.write_all(&self.data)
.map_err(|e| BundleError::Write(e, path.clone(), "Failed to write bundle data")));
try!(file.write_all(&self.data).map_err(|e| BundleError::Write(e, path.clone())));
Ok(Bundle::new(path, HEADER_VERSION, content_start, self.crypto, header))
}
@ -402,7 +440,7 @@ impl BundleDb {
pub fn create<P: AsRef<Path>>(path: P, compression: Option<Compression>, encryption: Option<Encryption>, checksum: ChecksumType) -> Result<Self, BundleError> {
let path = path.as_ref().to_owned();
try!(fs::create_dir_all(&path)
.map_err(|e| BundleError::Write(e, path.clone(), "Failed to create folder")));
.map_err(|e| BundleError::Write(e, path.clone())));
Ok(Self::new(path, compression, encryption, checksum))
}
@ -421,7 +459,7 @@ impl BundleDb {
}
pub fn get_chunk(&mut self, bundle_id: &BundleId, id: usize) -> Result<Vec<u8>, BundleError> {
let bundle = try!(self.bundles.get(bundle_id).ok_or("Bundle not found"));
let bundle = try!(self.bundles.get(bundle_id).ok_or(BundleError::NoSuchBundle(bundle_id.clone())));
let (pos, len) = try!(bundle.get_chunk_position(id));
let mut chunk = Vec::with_capacity(len);
if let Some(data) = self.bundle_cache.get(bundle_id) {
@ -457,7 +495,7 @@ impl BundleDb {
if let Some(bundle) = self.bundles.remove(bundle) {
fs::remove_file(&bundle.path).map_err(|e| BundleError::Remove(e, bundle.id()))
} else {
Err("No such bundle".into())
Err(BundleError::NoSuchBundle(bundle.clone()))
}
}

View File

@ -1,6 +1,4 @@
use std::io::{Write, Read};
use super::errors::ChunkerError;
use std::io::{self, Write, Read};
mod ae;
mod rabin;
@ -18,6 +16,27 @@ pub use self::fastcdc::FastCdcChunker;
// https://borgbackup.readthedocs.io/en/stable/internals.html#chunks
// https://github.com/bup/bup/blob/master/lib/bup/bupsplit.c
quick_error!{
#[derive(Debug)]
pub enum ChunkerError {
Read(err: io::Error) {
from(err)
cause(err)
description("Failed to read")
}
Write(err: io::Error) {
from(err)
cause(err)
description("Failed to write")
}
Custom {
from(&'static str)
description("Custom error")
}
}
}
#[derive(Debug, Eq, PartialEq)]
pub enum ChunkerStatus {
Continue,
@ -35,6 +54,7 @@ pub enum Chunker {
FastCdc(Box<FastCdcChunker>)
}
impl IChunker for Chunker {
#[inline]
fn get_type(&self) -> ChunkerType {

View File

@ -1,72 +0,0 @@
use std::io;
use std::path::PathBuf;
use rmp_serde::decode::Error as MsgpackDecode;
use rmp_serde::encode::Error as MsgpackEncode;
use super::bundle::BundleId;
quick_error!{
#[derive(Debug)]
pub enum BundleError {
List(err: io::Error) {
cause(err)
description("Failed to list bundles")
}
Read(err: io::Error, path: PathBuf, reason: &'static str) {
cause(err)
description("Failed to read bundle")
display("Failed to read bundle {:?}: {}", path, reason)
}
Decode(err: MsgpackDecode, path: PathBuf) {
cause(err)
description("Failed to decode bundle header")
}
Write(err: io::Error, path: PathBuf, reason: &'static str) {
cause(err)
description("Failed to write bundle")
display("Failed to write bundle {:?}: {}", path, reason)
}
Encode(err: MsgpackEncode, path: PathBuf) {
cause(err)
description("Failed to encode bundle header")
}
Format(path: PathBuf, reason: &'static str) {
description("Failed to decode bundle")
display("Failed to decode bundle {:?}: {}", path, reason)
}
Integrity(bundle: BundleId, reason: &'static str) {
description("Bundle has an integrity error")
display("Bundle {:?} has an integrity error: {}", bundle, reason)
}
Remove(err: io::Error, bundle: BundleId) {
cause(err)
description("Failed to remove bundle")
display("Failed to remove bundle {}", bundle)
}
Custom {
from(&'static str)
description("Custom error")
}
}
}
quick_error!{
#[derive(Debug)]
pub enum ChunkerError {
Read(err: io::Error) {
from(err)
cause(err)
description("Failed to read")
}
Write(err: io::Error) {
from(err)
cause(err)
description("Failed to write")
}
Custom {
from(&'static str)
description("Custom error")
}
}
}

View File

@ -65,31 +65,56 @@ pub struct Index {
data: &'static mut [Entry]
}
#[derive(Debug)]
pub enum Error {
IOError(io::Error),
MapError(MapError),
NoHeader,
MagicError,
VersionError,
quick_error!{
#[derive(Debug)]
pub enum IndexError {
Io(err: io::Error) {
from()
cause(err)
description("Failed to open index file")
}
Mmap(err: MapError) {
from()
cause(err)
description("Failed to write bundle map")
}
NoHeader {
description("Index file does not contain a header")
}
WrongHeader {
description("Wrong header")
}
WrongVersion(version: u8) {
description("Wrong version")
display("Wrong version: {}", version)
}
WrongPosition(key: Hash, should: usize, is: LocateResult) {
description("Key at wrong position")
display("Key {} has wrong position, expected at: {}, but is at: {:?}", key, should, is)
}
WrongEntryCount(header: usize, actual: usize) {
description("Wrong entry count")
display("Wrong entry count, expected {}, but is {}", header, actual)
}
}
}
#[derive(Debug)]
enum LocateResult {
pub enum LocateResult {
Found(usize), // Found the key at this position
Hole(usize), // Found a hole at this position while searching for a key
Steal(usize) // Found a spot to steal at this position while searching for a key
}
impl Index {
pub fn new(path: &Path, create: bool) -> Result<Index, Error> {
let fd = try!(OpenOptions::new().read(true).write(true).create(create).open(path).map_err(|e| { Error::IOError(e) }));
pub fn new(path: &Path, create: bool) -> Result<Index, IndexError> {
let fd = try!(OpenOptions::new().read(true).write(true).create(create).open(path));
if create {
try!(Index::resize_fd(&fd, INITIAL_SIZE));
}
let mmap = try!(Index::map_fd(&fd));
if mmap.len() < mem::size_of::<Header>() {
return Err(Error::NoHeader);
return Err(IndexError::NoHeader);
}
let data = Index::mmap_as_slice(&mmap, INITIAL_SIZE as usize);
let mut index = Index{capacity: 0, max_entries: 0, min_entries: 0, entries: 0, fd: fd, mmap: mmap, data: data};
@ -105,10 +130,10 @@ impl Index {
header.capacity = INITIAL_SIZE as u64;
} else {
if header.magic != MAGIC {
return Err(Error::MagicError);
return Err(IndexError::WrongHeader);
}
if header.version != VERSION {
return Err(Error::VersionError);
return Err(IndexError::WrongVersion(header.version));
}
}
capacity = header.capacity;
@ -118,34 +143,34 @@ impl Index {
index.set_capacity(capacity as usize);
index.entries = entries as usize;
}
debug_assert!(index.is_consistent(), "Inconsistent after creation");
debug_assert!(index.check().is_ok(), "Inconsistent after creation");
Ok(index)
}
#[inline]
pub fn open(path: &Path) -> Result<Index, Error> {
pub fn open(path: &Path) -> Result<Index, IndexError> {
Index::new(path, false)
}
#[inline]
pub fn create(path: &Path) -> Result<Index, Error> {
pub fn create(path: &Path) -> Result<Index, IndexError> {
Index::new(path, true)
}
#[inline]
fn map_fd(fd: &File) -> Result<MemoryMap, Error> {
fn map_fd(fd: &File) -> Result<MemoryMap, IndexError> {
MemoryMap::new(
try!(fd.metadata().map_err(Error::IOError)).len() as usize,
try!(fd.metadata().map_err(IndexError::Io)).len() as usize,
&[MapOption::MapReadable,
MapOption::MapWritable,
MapOption::MapFd(fd.as_raw_fd()),
MapOption::MapNonStandardFlags(0x0001) //libc::consts::os::posix88::MAP_SHARED
]).map_err(|e| { Error::MapError(e) })
]).map_err(IndexError::Mmap)
}
#[inline]
fn resize_fd(fd: &File, capacity: usize) -> Result<(), Error> {
fd.set_len((mem::size_of::<Header>() + capacity * mem::size_of::<Entry>()) as u64).map_err( Error::IOError)
fn resize_fd(fd: &File, capacity: usize) -> Result<(), IndexError> {
fd.set_len((mem::size_of::<Header>() + capacity * mem::size_of::<Entry>()) as u64).map_err(IndexError::Io)
}
#[inline]
@ -172,7 +197,7 @@ impl Index {
self.max_entries = (capacity as f64 * MAX_USAGE) as usize;
}
fn reinsert(&mut self, start: usize, end: usize) -> Result<(), Error> {
fn reinsert(&mut self, start: usize, end: usize) -> Result<(), IndexError> {
for pos in start..end {
let key;
let data;
@ -191,7 +216,7 @@ impl Index {
Ok(())
}
fn shrink(&mut self) -> Result<bool, Error> {
fn shrink(&mut self) -> Result<bool, IndexError> {
if self.entries >= self.min_entries || self.capacity <= INITIAL_SIZE {
return Ok(false)
}
@ -206,7 +231,7 @@ impl Index {
Ok(true)
}
fn extend(&mut self) -> Result<bool, Error> {
fn extend(&mut self) -> Result<bool, IndexError> {
if self.entries <= self.max_entries {
return Ok(false)
}
@ -220,8 +245,7 @@ impl Index {
Ok(true)
}
#[allow(dead_code)]
pub fn is_consistent(&self) -> bool {
pub fn check(&self) -> Result<(), IndexError> {
let mut entries = 0;
for pos in 0..self.capacity {
let entry = &self.data[pos];
@ -231,30 +255,17 @@ impl Index {
entries += 1;
match self.locate(&entry.key) {
LocateResult::Found(p) if p == pos => true,
found => {
println!("Inconsistency found: Key {:?} should be at {} but is at {:?}", entry.key, pos, found);
return false
}
found => return Err(IndexError::WrongPosition(entry.key, pos, found))
};
}
if entries != self.entries {
println!("Inconsistency found: Index contains {} entries, should contain {}", entries, self.entries);
return false
}
true
}
pub fn check(&self) -> Result<(), &'static str> {
//TODO: proper errors instead of string
if self.is_consistent() {
Ok(())
} else {
Err("Inconsistent")
return Err(IndexError::WrongEntryCount(self.entries, entries));
}
Ok(())
}
#[inline]
fn increase_count(&mut self) -> Result<(), Error> {
fn increase_count(&mut self) -> Result<(), IndexError> {
self.entries += 1;
try!(self.extend());
self.write_header();
@ -262,7 +273,7 @@ impl Index {
}
#[inline]
fn decrease_count(&mut self) -> Result<(), Error> {
fn decrease_count(&mut self) -> Result<(), IndexError> {
self.entries -= 1;
try!(self.shrink());
self.write_header();
@ -328,7 +339,7 @@ impl Index {
/// Adds the key, data pair into the table.
/// If the key existed in the table before, it is overwritten and false is returned.
/// Otherwise it will be added to the table and true is returned.
pub fn set(&mut self, key: &Hash, data: &Location) -> Result<bool, Error> {
pub fn set(&mut self, key: &Hash, data: &Location) -> Result<bool, IndexError> {
match self.locate(key) {
LocateResult::Found(pos) => {
self.data[pos].data = *data;
@ -374,7 +385,7 @@ impl Index {
#[inline]
pub fn contains(&self, key: &Hash) -> bool {
debug_assert!(self.is_consistent(), "Inconsistent before get");
debug_assert!(self.check().is_ok(), "Inconsistent before get");
match self.locate(key) {
LocateResult::Found(_) => true,
_ => false
@ -383,7 +394,7 @@ impl Index {
#[inline]
pub fn get(&self, key: &Hash) -> Option<Location> {
debug_assert!(self.is_consistent(), "Inconsistent before get");
debug_assert!(self.check().is_ok(), "Inconsistent before get");
match self.locate(key) {
LocateResult::Found(pos) => Some(self.data[pos].data),
_ => None
@ -392,7 +403,7 @@ impl Index {
#[inline]
pub fn modify<F>(&mut self, key: &Hash, mut f: F) -> bool where F: FnMut(&mut Location) {
debug_assert!(self.is_consistent(), "Inconsistent before get");
debug_assert!(self.check().is_ok(), "Inconsistent before get");
match self.locate(key) {
LocateResult::Found(pos) => {
f(&mut self.data[pos].data);
@ -403,7 +414,7 @@ impl Index {
}
#[inline]
pub fn delete(&mut self, key: &Hash) -> Result<bool, Error> {
pub fn delete(&mut self, key: &Hash) -> Result<bool, IndexError> {
match self.locate(key) {
LocateResult::Found(pos) => {
self.backshift(pos);
@ -414,7 +425,7 @@ impl Index {
}
}
pub fn filter<F>(&mut self, mut f: F) -> Result<usize, Error> where F: FnMut(&Hash, &Location) -> bool {
pub fn filter<F>(&mut self, mut f: F) -> Result<usize, IndexError> where F: FnMut(&Hash, &Location) -> bool {
//TODO: is it faster to walk in reverse direction?
let mut deleted = 0;
let mut pos = 0;
@ -485,7 +496,7 @@ impl Index {
#[inline]
pub fn size(&self) -> usize {
self.mmap.len()
self.mmap.len()
}
#[inline]

View File

@ -10,7 +10,6 @@ extern crate serde_yaml;
extern crate docopt;
extern crate rustc_serialize;
mod errors;
pub mod util;
pub mod bundle;
pub mod index;

View File

@ -1,7 +1,6 @@
use super::{Repository, Chunk};
use super::{Repository, Chunk, RepositoryError};
use rmp_serde;
use serde::{Deserialize, Serialize};
use ::util::*;
use std::fs::{self, File};
use std::path::Path;
@ -39,19 +38,19 @@ serde_impl!(Backup(u8) {
impl Repository {
pub fn list_backups(&self) -> Result<Vec<String>, &'static str> {
pub fn list_backups(&self) -> Result<Vec<String>, RepositoryError> {
let mut backups = Vec::new();
let mut paths = Vec::new();
let base_path = self.path.join("backups");
paths.push(base_path.clone());
while let Some(path) = paths.pop() {
for entry in try!(fs::read_dir(path).map_err(|_| "Failed to list files")) {
let entry = try!(entry.map_err(|_| "Failed to list files"));
for entry in try!(fs::read_dir(path)) {
let entry = try!(entry);
let path = entry.path();
if path.is_dir() {
paths.push(path);
} else {
let relpath = try!(path.strip_prefix(&base_path).map_err(|_| "Failed to obtain relative path"));
let relpath = path.strip_prefix(&base_path).unwrap();
backups.push(relpath.to_string_lossy().to_string());
}
}
@ -59,25 +58,23 @@ impl Repository {
Ok(backups)
}
pub fn get_backup(&self, name: &str) -> Result<Backup, &'static str> {
let file = try!(File::open(self.path.join("backups").join(name)).map_err(|_| "Failed to load backup"));
let mut reader = rmp_serde::Deserializer::new(file);
Backup::deserialize(&mut reader).map_err(|_| "Failed to read backup data")
pub fn get_backup(&self, name: &str) -> Result<Backup, RepositoryError> {
let mut file = try!(File::open(self.path.join("backups").join(name)));
Ok(try!(msgpack::decode_from_stream(&mut file)))
}
pub fn save_backup(&mut self, backup: &Backup, name: &str) -> Result<(), &'static str> {
let mut file = try!(File::create(self.path.join("backups").join(name)).map_err(|_| "Failed to save backup"));
let mut writer = rmp_serde::Serializer::new(&mut file);
backup.serialize(&mut writer).map_err(|_| "Failed to write backup data")
pub fn save_backup(&mut self, backup: &Backup, name: &str) -> Result<(), RepositoryError> {
let mut file = try!(File::create(self.path.join("backups").join(name)));
Ok(try!(msgpack::encode_to_stream(backup, &mut file)))
}
pub fn restore_backup<P: AsRef<Path>>(&mut self, backup: &Backup, path: P) -> Result<(), &'static str> {
pub fn restore_backup<P: AsRef<Path>>(&mut self, backup: &Backup, path: P) -> Result<(), RepositoryError> {
let inode = try!(self.get_inode(&backup.root));
try!(self.save_inode_at(&inode, path));
Ok(())
}
pub fn create_full_backup<P: AsRef<Path>>(&mut self, path: P) -> Result<Backup, &'static str> {
pub fn create_full_backup<P: AsRef<Path>>(&mut self, path: P) -> Result<Backup, RepositoryError> {
// Maintain a stack of folders still todo
// Maintain a map of path->inode entries
// Work on topmost stack entry

View File

@ -1,8 +1,10 @@
use std::mem;
use std::io::{Read, Write, Cursor};
use super::{Repository, Mode};
use super::{Repository, Mode, RepositoryError};
use ::index::Location;
use ::bundle::BundleId;
use super::integrity::RepositoryIntegrityError;
use ::util::Hash;
use ::chunker::{IChunker, ChunkerStatus};
@ -12,7 +14,15 @@ pub type Chunk = (Hash, usize);
impl Repository {
pub fn get_chunk(&mut self, hash: Hash) -> Result<Option<Vec<u8>>, &'static str> {
pub fn get_bundle_id(&self, id: u32) -> Result<BundleId, RepositoryError> {
if let Some(bundle_info) = self.bundle_map.get(id) {
Ok(bundle_info.id())
} else {
Err(RepositoryIntegrityError::MissingBundleId(id).into())
}
}
pub fn get_chunk(&mut self, hash: Hash) -> Result<Option<Vec<u8>>, RepositoryError> {
// Find bundle and chunk id in index
let found = if let Some(found) = self.index.get(&hash) {
found
@ -20,20 +30,12 @@ impl Repository {
return Ok(None)
};
// Lookup bundle id from map
let bundle_id = if let Some(bundle_info) = self.bundle_map.get(found.bundle) {
bundle_info.id()
} else {
return Err("Bundle id not found in map")
};
let bundle_id = try!(self.get_bundle_id(found.bundle));
// Get chunk from bundle
if let Ok(chunk) = self.bundles.get_chunk(&bundle_id, found.chunk as usize) {
Ok(Some(chunk))
} else {
Err("Failed to load chunk from bundle")
}
Ok(Some(try!(self.bundles.get_chunk(&bundle_id, found.chunk as usize))))
}
pub fn put_chunk(&mut self, mode: Mode, hash: Hash, data: &[u8]) -> Result<(), &'static str> {
pub fn put_chunk(&mut self, mode: Mode, hash: Hash, data: &[u8]) -> Result<(), RepositoryError> {
// If this chunk is in the index, ignore it
if self.index.contains(&hash) {
return Ok(())
@ -47,7 +49,7 @@ impl Repository {
};
// ...alocate one if needed
if writer.is_none() {
*writer = Some(try!(self.bundles.create_bundle().map_err(|_| "Failed to create new bundle")));
*writer = Some(try!(self.bundles.create_bundle()));
}
debug_assert!(writer.is_some());
let chunk_id;
@ -56,7 +58,7 @@ impl Repository {
{
// Add chunk to bundle writer and determine the size of the bundle
let writer_obj = writer.as_mut().unwrap();
chunk_id = try!(writer_obj.add(data).map_err(|_| "Failed to write chunk"));
chunk_id = try!(writer_obj.add(data));
size = writer_obj.size();
raw_size = writer_obj.raw_size();
}
@ -68,7 +70,7 @@ impl Repository {
if size >= self.config.bundle_size || raw_size >= 4 * self.config.bundle_size {
let mut finished = None;
mem::swap(writer, &mut finished);
let bundle = try!(self.bundles.add_bundle(finished.unwrap()).map_err(|_| "Failed to write finished bundle"));
let bundle = try!(self.bundles.add_bundle(finished.unwrap()));
self.bundle_map.set(bundle_id, bundle);
if self.next_meta_bundle == bundle_id {
self.next_meta_bundle = next_free_bundle_id
@ -79,27 +81,27 @@ impl Repository {
// Not saving the bundle map, this will be done by flush
}
// Add location to the index
try!(self.index.set(&hash, &Location::new(bundle_id, chunk_id as u32)).map_err(|_| "Failed to add chunk location to index"));
try!(self.index.set(&hash, &Location::new(bundle_id, chunk_id as u32)));
Ok(())
}
#[inline]
pub fn put_data(&mut self, mode: Mode, data: &[u8]) -> Result<Vec<Chunk>, &'static str> {
pub fn put_data(&mut self, mode: Mode, data: &[u8]) -> Result<Vec<Chunk>, RepositoryError> {
let mut input = Cursor::new(data);
self.put_stream(mode, &mut input)
}
pub fn put_stream<R: Read>(&mut self, mode: Mode, data: &mut R) -> Result<Vec<Chunk>, &'static str> {
pub fn put_stream<R: Read>(&mut self, mode: Mode, data: &mut R) -> Result<Vec<Chunk>, RepositoryError> {
let avg_size = self.config.chunker.avg_size();
let mut chunks = Vec::new();
let mut chunk = Vec::with_capacity(avg_size * 2);
loop {
chunk.clear();
let mut output = Cursor::new(chunk);
let res = try!(self.chunker.chunk(data, &mut output).map_err(|_| "Failed to chunk"));
let res = try!(self.chunker.chunk(data, &mut output));
chunk = output.into_inner();
let hash = self.config.hash.hash(&chunk);
try!(self.put_chunk(mode, hash, &chunk).map_err(|_| "Failed to store chunk"));
try!(self.put_chunk(mode, hash, &chunk));
chunks.push((hash, chunk.len()));
if res == ChunkerStatus::Finished {
break
@ -109,18 +111,18 @@ impl Repository {
}
#[inline]
pub fn get_data(&mut self, chunks: &[Chunk]) -> Result<Vec<u8>, &'static str> {
pub fn get_data(&mut self, chunks: &[Chunk]) -> Result<Vec<u8>, RepositoryError> {
let mut data = Vec::with_capacity(chunks.iter().map(|&(_, size)| size).sum());
try!(self.get_stream(chunks, &mut data));
Ok(data)
}
#[inline]
pub fn get_stream<W: Write>(&mut self, chunks: &[Chunk], w: &mut W) -> Result<(), &'static str> {
pub fn get_stream<W: Write>(&mut self, chunks: &[Chunk], w: &mut W) -> Result<(), RepositoryError> {
for &(ref hash, len) in chunks {
let data = try!(try!(self.get_chunk(*hash).map_err(|_| "Failed to load chunk")).ok_or("Chunk missing"));
let data = try!(try!(self.get_chunk(*hash)).ok_or_else(|| RepositoryIntegrityError::MissingChunk(hash.clone())));
debug_assert_eq!(data.len(), len);
try!(w.write_all(&data).map_err(|_| "Failed to write to sink"));
try!(w.write_all(&data));
}
Ok(())
}

View File

@ -1,19 +1,45 @@
use std::collections::HashMap;
use std::path::Path;
use std::io::{BufReader, Read, Write, BufWriter};
use std::io::{self, BufReader, Read, Write, BufWriter};
use std::fs::File;
use rmp_serde;
use serde::Deserialize;
use serde::Serialize;
use ::bundle::{Bundle, BundleId, BundleInfo};
use ::util::*;
static HEADER_STRING: [u8; 7] = *b"zbunmap";
static HEADER_VERSION: u8 = 1;
quick_error!{
#[derive(Debug)]
pub enum BundleMapError {
Io(err: io::Error) {
from()
cause(err)
description("Failed to read/write bundle map")
}
Decode(err: msgpack::DecodeError) {
from()
cause(err)
description("Failed to decode bundle map")
}
Encode(err: msgpack::EncodeError) {
from()
cause(err)
description("Failed to encode bundle map")
}
WrongHeader {
description("Wrong header")
}
WrongVersion(version: u8) {
description("Wrong version")
display("Wrong version: {}", version)
}
}
}
#[derive(Default)]
pub struct BundleData {
pub info: BundleInfo
@ -37,36 +63,26 @@ impl BundleMap {
BundleMap(Default::default())
}
pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, &'static str> {
let mut file = BufReader::new(try!(File::open(path.as_ref())
.map_err(|_| "Failed to open bundle map file")));
pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, BundleMapError> {
let mut file = BufReader::new(try!(File::open(path.as_ref())));
let mut header = [0u8; 8];
try!(file.read_exact(&mut header)
.map_err(|_| "Failed to read bundle map header"));
try!(file.read_exact(&mut header));
if header[..HEADER_STRING.len()] != HEADER_STRING {
return Err("Wrong header string")
return Err(BundleMapError::WrongHeader)
}
let version = header[HEADER_STRING.len()];
if version != HEADER_VERSION {
return Err("Unsupported bundle map file version")
return Err(BundleMapError::WrongVersion(version))
}
let mut reader = rmp_serde::Deserializer::new(file);
let map = try!(HashMap::deserialize(&mut reader)
.map_err(|_| "Failed to read bundle map data"));
Ok(BundleMap(map))
Ok(BundleMap(try!(msgpack::decode_from_stream(&mut file))))
}
pub fn save<P: AsRef<Path>>(&self, path: P) -> Result<(), &'static str> {
let mut file = BufWriter::new(try!(File::create(path)
.map_err(|_| "Failed to create bundle file")));
try!(file.write_all(&HEADER_STRING)
.map_err(|_| "Failed to write bundle header"));
try!(file.write_all(&[HEADER_VERSION])
.map_err(|_| "Failed to write bundle header"));
let mut writer = rmp_serde::Serializer::new(&mut file);
self.0.serialize(&mut writer)
.map_err(|_| "Failed to write bundle map data")
pub fn save<P: AsRef<Path>>(&self, path: P) -> Result<(), BundleMapError> {
let mut file = BufWriter::new(try!(File::create(path)));
try!(file.write_all(&HEADER_STRING));
try!(file.write_all(&[HEADER_VERSION]));
msgpack::encode_to_stream(&self.0, &mut file).map_err(BundleMapError::Encode)
}
#[inline]

View File

@ -2,14 +2,36 @@ use serde_yaml;
use std::fs::File;
use std::path::Path;
use std::io;
use ::util::*;
use ::chunker::ChunkerType;
quick_error!{
#[derive(Debug)]
pub enum ConfigError {
Io(err: io::Error) {
from()
cause(err)
}
Parse(reason: &'static str) {
from()
description("Failed to parse config")
display("Failed to parse config: {}", reason)
}
Yaml(err: serde_yaml::Error) {
from()
cause(err)
description("Yaml format error")
}
}
}
impl HashMethod {
fn from_yaml(yaml: String) -> Result<Self, &'static str> {
HashMethod::from(&yaml)
fn from_yaml(yaml: String) -> Result<Self, ConfigError> {
HashMethod::from(&yaml).map_err(ConfigError::Parse)
}
fn to_yaml(&self) -> String {
@ -20,8 +42,8 @@ impl HashMethod {
impl ChecksumType {
fn from_yaml(yaml: String) -> Result<Self, &'static str> {
ChecksumType::from(&yaml)
fn from_yaml(yaml: String) -> Result<Self, ConfigError> {
ChecksumType::from(&yaml).map_err(ConfigError::Parse)
}
fn to_yaml(&self) -> String {
@ -52,8 +74,8 @@ serde_impl!(ChunkerYaml(String) {
});
impl ChunkerType {
fn from_yaml(yaml: ChunkerYaml) -> Result<Self, &'static str> {
ChunkerType::from(&yaml.method, yaml.avg_size, yaml.seed)
fn from_yaml(yaml: ChunkerYaml) -> Result<Self, ConfigError> {
ChunkerType::from(&yaml.method, yaml.avg_size, yaml.seed).map_err(ConfigError::Parse)
}
fn to_yaml(&self) -> ChunkerYaml {
@ -69,8 +91,8 @@ impl ChunkerType {
impl Compression {
#[inline]
fn from_yaml(yaml: String) -> Result<Self, &'static str> {
Compression::from_string(&yaml)
fn from_yaml(yaml: String) -> Result<Self, ConfigError> {
Compression::from_string(&yaml).map_err(|_| ConfigError::Parse("Invalid codec"))
}
#[inline]
@ -118,7 +140,7 @@ pub struct Config {
pub hash: HashMethod
}
impl Config {
fn from_yaml(yaml: ConfigYaml) -> Result<Self, &'static str> {
fn from_yaml(yaml: ConfigYaml) -> Result<Self, ConfigError> {
let compression = if let Some(c) = yaml.compression {
Some(try!(Compression::from_yaml(c)))
} else {
@ -143,15 +165,15 @@ impl Config {
}
}
pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, &'static str> {
let f = try!(File::open(path).map_err(|_| "Failed to open config"));
let config = try!(serde_yaml::from_reader(f).map_err(|_| "Failed to parse config"));
pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, ConfigError> {
let f = try!(File::open(path));
let config = try!(serde_yaml::from_reader(f));
Config::from_yaml(config)
}
pub fn save<P: AsRef<Path>>(&self, path: P) -> Result<(), &'static str> {
let mut f = try!(File::create(path).map_err(|_| "Failed to open config"));
try!(serde_yaml::to_writer(&mut f, &self.to_yaml()).map_err(|_| "Failed to wrtie config"));
pub fn save<P: AsRef<Path>>(&self, path: P) -> Result<(), ConfigError> {
let mut f = try!(File::create(path));
try!(serde_yaml::to_writer(&mut f, &self.to_yaml()));
Ok(())
}
}

66
src/repository/error.rs Normal file
View File

@ -0,0 +1,66 @@
use std::io;
use std::path::PathBuf;
use super::bundle_map::BundleMapError;
use super::config::ConfigError;
use super::integrity::RepositoryIntegrityError;
use ::index::IndexError;
use ::bundle::BundleError;
use ::chunker::ChunkerError;
use ::util::*;
quick_error!{
#[derive(Debug)]
pub enum RepositoryError {
Io(err: io::Error) {
from()
cause(err)
description("IO Error")
}
Config(err: ConfigError) {
from()
cause(err)
description("Configuration error")
}
BundleMap(err: BundleMapError) {
from()
cause(err)
description("Bundle map error")
}
Index(err: IndexError) {
from()
cause(err)
description("Index error")
}
Bundle(err: BundleError) {
from()
cause(err)
description("Bundle error")
}
Chunker(err: ChunkerError) {
from()
cause(err)
description("Chunker error")
}
Decode(err: msgpack::DecodeError) {
from()
cause(err)
description("Failed to decode metadata")
}
Encode(err: msgpack::EncodeError) {
from()
cause(err)
description("Failed to encode metadata")
}
Integrity(err: RepositoryIntegrityError) {
from()
cause(err)
description("Integrity error")
}
InvalidFileType(path: PathBuf) {
description("Invalid file type")
display("{:?} has an invalid file type", path)
}
}
}

View File

@ -1,41 +1,66 @@
use super::Repository;
use super::{Repository, RepositoryError};
use ::bundle::BundleId;
use ::util::Hash;
quick_error!{
#[derive(Debug)]
pub enum RepositoryIntegrityError {
MissingChunk(hash: Hash) {
description("Missing chunk")
display("Missing chunk: {}", hash)
}
MissingBundleId(id: u32) {
description("Missing bundle")
display("Missing bundle: {}", id)
}
MissingBundle(id: BundleId) {
description("Missing bundle")
display("Missing bundle: {}", id)
}
NoSuchChunk(bundle: BundleId, chunk: u32) {
description("No such chunk")
display("Bundle {} does not conain the chunk {}", bundle, chunk)
}
InvalidNextBundleId {
description("Invalid next bundle id")
}
SymlinkWithoutTarget {
description("Symlink without target")
}
}
}
impl Repository {
fn check_chunk(&self, hash: Hash) -> Result<(), &'static str> {
fn check_chunk(&self, hash: Hash) -> Result<(), RepositoryError> {
// Find bundle and chunk id in index
let found = if let Some(found) = self.index.get(&hash) {
found
} else {
return Err("Chunk not in index");
return Err(RepositoryIntegrityError::MissingChunk(hash).into());
};
// Lookup bundle id from map
let bundle_id = if let Some(bundle_info) = self.bundle_map.get(found.bundle) {
bundle_info.id()
} else {
return Err("Bundle id not found in map")
};
let bundle_id = try!(self.get_bundle_id(found.bundle));
// Get bundle object from bundledb
let bundle = if let Some(bundle) = self.bundles.get_bundle(&bundle_id) {
bundle
} else {
return Err("Bundle not found in bundledb")
return Err(RepositoryIntegrityError::MissingBundle(bundle_id.clone()).into())
};
// Get chunk from bundle
if bundle.info.chunk_count > found.chunk as usize {
Ok(())
} else {
Err("Bundle does not contain that chunk")
Err(RepositoryIntegrityError::NoSuchChunk(bundle_id.clone(), found.chunk).into())
}
//TODO: check that contents match their hash
}
pub fn check(&mut self, full: bool) -> Result<(), &'static str> {
pub fn check(&mut self, full: bool) -> Result<(), RepositoryError> {
try!(self.flush());
try!(self.bundles.check(full).map_err(|_| "Bundles inconsistent"));
try!(self.index.check().map_err(|_| "Index inconsistent"));
try!(self.bundles.check(full));