Repairing bundles and index (re #3)

This commit is contained in:
Dennis Schwerdel 2017-04-12 20:19:21 +02:00 committed by Dennis Schwerdel
parent 0354a73f54
commit 489a442821
9 changed files with 171 additions and 74 deletions

View File

@ -9,6 +9,7 @@ This project follows [semantic versioning](http://semver.org).
- [added] Storing user/group names in backups
- [modified] No longer trying to upload by rename
- [modified] No longer failing restore if setting file attributes fails
- [modified] Backup files must end with `.backup` (**conversion needed**)
- [fixed] Creating empty bundle cache on init to avoid warnings
- [fixed] Calling sodiumoxide::init for faster algorithms and thread safety (not needed)
- [fixed] Fixed a deadlock in the bundle upload code

View File

@ -71,6 +71,17 @@ impl StoredBundle {
Ok(bundle)
}
pub fn move_to<P: AsRef<Path>>(&mut self, base_path: &Path, path: P) -> Result<(), BundleDbError> {
let src_path = base_path.join(&self.path);
let dst_path = path.as_ref();
if fs::rename(&src_path, dst_path).is_err() {
try!(fs::copy(&src_path, dst_path).context(dst_path));
try!(fs::remove_file(&src_path).context(&src_path as &Path));
}
self.path = dst_path.strip_prefix(base_path).unwrap().to_path_buf();
Ok(())
}
pub fn read_list_from<P: AsRef<Path>>(path: P) -> Result<Vec<Self>, BundleCacheError> {
let path = path.as_ref();
let mut file = BufReader::new(try!(File::open(path).map_err(BundleCacheError::Read)));

View File

@ -7,6 +7,7 @@ use std::fs;
use std::sync::{Arc, Mutex};
use std::io;
use std::mem;
use std::cmp::min;
quick_error!{
#[derive(Debug)]
@ -66,6 +67,9 @@ pub fn load_bundles(path: &Path, base: &Path, bundles: &mut HashMap<BundleId, St
if path.is_dir() {
paths.push(path);
} else {
if path.extension() != Some("bundle".as_ref()) {
continue
}
bundle_paths.insert(path.strip_prefix(base).unwrap().to_path_buf());
}
}
@ -82,11 +86,11 @@ pub fn load_bundles(path: &Path, base: &Path, bundles: &mut HashMap<BundleId, St
for path in bundle_paths {
let info = match BundleReader::load_info(base.join(&path), crypto.clone()) {
Ok(info) => info,
Err(BundleReaderError::TruncatedBundle(path)) => {
warn!("Ignoring truncated bundle {:?}", path);
Err(err) => {
warn!("Failed to read bundle {:?}\n\tcaused by: {}", path, err);
info!("Ignoring unreadable bundle");
continue
},
Err(err) => return Err(err.into())
}
};
let bundle = StoredBundle { info: info, path: path };
let id = bundle.info.id.clone();
@ -260,7 +264,7 @@ impl BundleDb {
let (folder, filename) = self.layout.remote_bundle_path(self.remote_bundles.len());
let dst_path = folder.join(filename);
let src_path = self.layout.base_path().join(bundle.path);
bundle.path = dst_path.clone();
bundle.path = dst_path.strip_prefix(self.layout.base_path()).unwrap().to_path_buf();
if self.uploader.is_none() {
self.uploader = Some(BundleUploader::new(5));
}
@ -312,14 +316,83 @@ impl BundleDb {
}
}
pub fn check(&mut self, full: bool) -> Result<(), BundleDbError> {
for stored in ProgressIter::new("checking bundles", self.remote_bundles.len(), self.remote_bundles.values()) {
let mut bundle = try!(self.get_bundle(stored));
try!(bundle.check(full))
pub fn check(&mut self, full: bool, repair: bool) -> Result<bool, BundleDbError> {
let mut to_repair = vec![];
for (id, stored) in ProgressIter::new("checking bundles", self.remote_bundles.len(), self.remote_bundles.iter()) {
let mut bundle = match self.get_bundle(stored) {
Ok(bundle) => bundle,
Err(err) => {
if repair {
to_repair.push(id.clone());
continue
} else {
return Err(err)
}
}
};
if let Err(err) = bundle.check(full) {
if repair {
to_repair.push(id.clone());
continue
} else {
return Err(err.into())
}
}
}
for id in ProgressIter::new("repairing bundles", to_repair.len(), to_repair.iter()) {
try!(self.repair_bundle(id.clone()));
}
Ok(!to_repair.is_empty())
}
fn evacuate_broken_bundle(&mut self, mut bundle: StoredBundle) -> Result<(), BundleDbError> {
let new_path = self.layout.base_path().join(bundle.path.with_extension("bundle.broken"));
warn!("Moving bundle to {:?}", new_path);
try!(bundle.move_to(self.layout.base_path(), new_path));
self.remote_bundles.remove(&bundle.info.id);
Ok(())
}
fn repair_bundle(&mut self, id: BundleId) -> Result<(), BundleDbError> {
let stored = self.remote_bundles[&id].clone();
let mut bundle = match self.get_bundle(&stored) {
Ok(bundle) => bundle,
Err(err) => {
warn!("Problem detected: failed to read bundle header: {}\n\tcaused by: {}", id, err);
return self.evacuate_broken_bundle(stored);
}
};
let chunks = match bundle.get_chunk_list() {
Ok(chunks) => chunks.clone(),
Err(err) => {
warn!("Problem detected: failed to read bundle chunks: {}\n\tcaused by: {}", id, err);
return self.evacuate_broken_bundle(stored);
}
};
let data = match bundle.load_contents() {
Ok(data) => data,
Err(err) => {
warn!("Problem detected: failed to read bundle data: {}\n\tcaused by: {}", id, err);
return self.evacuate_broken_bundle(stored);
}
};
info!("Copying readable data into new bundle");
let info = stored.info.clone();
let mut new_bundle = try!(self.create_bundle(info.mode, info.hash_method, info.compression, info.encryption));
let mut pos = 0;
for (hash, mut len) in chunks.into_inner() {
if pos >= data.len() {
break
}
len = min(len, (data.len() - pos) as u32);
try!(new_bundle.add(&data[pos..pos+len as usize], hash));
pos += len as usize;
}
let bundle = try!(self.add_bundle(new_bundle));
info!("New bundle id is {}", bundle.id);
self.evacuate_broken_bundle(stored)
}
#[inline]
pub fn len(&self) -> usize {
self.remote_bundles.len()

View File

@ -12,10 +12,6 @@ use std::sync::{Arc, Mutex};
quick_error!{
#[derive(Debug)]
pub enum BundleReaderError {
TruncatedBundle(path: PathBuf) {
description("Bundle file is truncated")
display("Bundle reader error: bundle file is truncated {:?}", path)
}
Read(err: io::Error, path: PathBuf) {
cause(err)
context(path: &'a Path, err: io::Error) -> (err, path.to_path_buf())
@ -111,10 +107,6 @@ impl BundleReader {
info.encryption = header.encryption;
debug!("Load bundle {}", info.id);
let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize + info.chunk_list_size;
let actual_size = try!(fs::metadata(path).context(path)).len();
if content_start + info.encoded_size != actual_size as usize {
return Err(BundleReaderError::TruncatedBundle(path.to_path_buf()));
}
Ok((info, version, content_start))
}

View File

@ -59,7 +59,8 @@ pub enum Arguments {
inode: Option<String>,
bundles: bool,
bundle_data: bool,
index: bool
index: bool,
repair: bool
},
List {
repo_path: String,
@ -342,6 +343,7 @@ pub fn parse() -> Result<(LogLevel, Arguments), ErrorCode> {
.arg(Arg::from_usage("-b --bundles 'Check the bundles'"))
.arg(Arg::from_usage("[bundle_data] --bundle-data 'Check bundle contents (slow)'").requires("bundles").alias("data"))
.arg(Arg::from_usage("-i --index 'Check the chunk index'"))
.arg(Arg::from_usage("-r --repair 'Try to repair errors'"))
.arg(Arg::from_usage("<PATH> 'Path of the repository/backup/subtree, [repository][::backup[::subtree]]'")
.validator(|val| validate_repo_path(val, true, None, None))))
.subcommand(SubCommand::with_name("list").alias("ls").about("List backups or backup contents")
@ -497,7 +499,8 @@ pub fn parse() -> Result<(LogLevel, Arguments), ErrorCode> {
inode: inode.map(|v| v.to_string()),
bundles: args.is_present("bundles"),
bundle_data: args.is_present("bundle_data"),
index: args.is_present("index")
index: args.is_present("index"),
repair: args.is_present("repair")
}
},
("list", Some(args)) => {

View File

@ -420,14 +420,14 @@ pub fn run() -> Result<(), ErrorCode> {
info!("Reclaimed {}", to_file_size(info_before.encoded_data_size - info_after.encoded_data_size));
}
},
Arguments::Check{repo_path, backup_name, inode, bundles, index, bundle_data} => {
Arguments::Check{repo_path, backup_name, inode, bundles, index, bundle_data, repair} => {
let mut repo = try!(open_repository(&repo_path));
checked!(repo.check_repository(), "check repository", ErrorCode::CheckRun);
if bundles {
checked!(repo.check_bundles(bundle_data), "check bundles", ErrorCode::CheckRun);
checked!(repo.check_bundles(bundle_data, repair), "check bundles", ErrorCode::CheckRun);
}
if index {
checked!(repo.check_index(), "check index", ErrorCode::CheckRun);
checked!(repo.check_index(repair), "check index", ErrorCode::CheckRun);
}
if let Some(backup_name) = backup_name {
let backup = try!(get_backup(&repo, &backup_name));

View File

@ -168,7 +168,7 @@ impl Backup {
paths.push(path);
} else {
let relpath = path.strip_prefix(&base_path).unwrap();
if relpath.extension() != Some(".backup".as_ref()) {
if relpath.extension() != Some("backup".as_ref()) {
continue
}
let name = relpath.file_stem().unwrap().to_string_lossy().to_string();

View File

@ -1,5 +1,7 @@
use ::prelude::*;
use super::*;
use std::collections::VecDeque;
use std::path::{Path, PathBuf};
use std::time::Duration;
@ -32,9 +34,6 @@ quick_error!{
MapContainsDuplicates {
description("Map contains duplicates")
}
InvalidNextBundleId {
description("Invalid next bundle id")
}
BrokenInode(path: PathBuf, err: Box<RepositoryError>) {
cause(err)
description("Broken inode")
@ -170,15 +169,6 @@ impl Repository {
pub fn check_repository(&mut self) -> Result<(), RepositoryError> {
info!("Checking repository integrity...");
if self.next_data_bundle == self.next_meta_bundle {
return Err(IntegrityError::InvalidNextBundleId.into())
}
if self.bundle_map.get(self.next_data_bundle).is_some() {
return Err(IntegrityError::InvalidNextBundleId.into())
}
if self.bundle_map.get(self.next_meta_bundle).is_some() {
return Err(IntegrityError::InvalidNextBundleId.into())
}
for (_id, bundle_id) in self.bundle_map.bundles() {
if self.bundles.get_bundle_info(&bundle_id).is_none() {
return Err(IntegrityError::MissingBundle(bundle_id).into())
@ -193,17 +183,75 @@ impl Repository {
Ok(())
}
#[inline]
pub fn check_index(&mut self) -> Result<(), RepositoryError> {
info!("Checking index integrity...");
try!(self.index.check());
info!("Checking index entries...");
self.check_index_chunks()
pub fn rebuild_bundle_map(&mut self) -> Result<(), RepositoryError> {
info!("Rebuilding bundle map from bundles");
self.bundle_map = BundleMap::create();
for bundle in self.bundles.list_bundles() {
let bundle_id = match bundle.mode {
BundleMode::Data => self.next_data_bundle,
BundleMode::Meta => self.next_meta_bundle
};
self.bundle_map.set(bundle_id, bundle.id.clone());
if self.next_meta_bundle == bundle_id {
self.next_meta_bundle = self.next_free_bundle_id()
}
if self.next_data_bundle == bundle_id {
self.next_data_bundle = self.next_free_bundle_id()
}
}
self.save_bundle_map()
}
pub fn rebuild_index(&mut self) -> Result<(), RepositoryError> {
info!("Rebuilding index from bundles");
self.index.clear();
for (num, id) in self.bundle_map.bundles() {
let chunks = try!(self.bundles.get_chunk_list(&id));
for (i, (hash, _len)) in chunks.into_inner().into_iter().enumerate() {
try!(self.index.set(&hash, &Location{bundle: num as u32, chunk: i as u32}));
}
}
Ok(())
}
#[inline]
pub fn check_bundles(&mut self, full: bool) -> Result<(), RepositoryError> {
pub fn check_index(&mut self, repair: bool) -> Result<(), RepositoryError> {
if repair {
try!(self.write_mode());
}
info!("Checking index integrity...");
if let Err(err) = self.index.check() {
if repair {
warn!("Problem detected: index was corrupted\n\tcaused by: {}", err);
return self.rebuild_index();
} else {
return Err(err.into())
}
}
info!("Checking index entries...");
if let Err(err) = self.check_index_chunks() {
if repair {
warn!("Problem detected: index entries were inconsistent\n\tcaused by: {}", err);
return self.rebuild_index();
} else {
return Err(err.into())
}
}
Ok(())
}
#[inline]
pub fn check_bundles(&mut self, full: bool, repair: bool) -> Result<(), RepositoryError> {
if repair {
try!(self.write_mode());
}
info!("Checking bundle integrity...");
Ok(try!(self.bundles.check(full)))
if try!(self.bundles.check(full, repair)) {
// Some bundles got repaired
try!(self.bundles.finish_uploads());
try!(self.rebuild_bundle_map());
try!(self.rebuild_index());
}
Ok(())
}
}

View File

@ -281,37 +281,6 @@ impl Repository {
Ok(())
}
fn rebuild_bundle_map(&mut self) -> Result<(), RepositoryError> {
info!("Rebuilding bundle map from bundles");
self.bundle_map = BundleMap::create();
for bundle in self.bundles.list_bundles() {
let bundle_id = match bundle.mode {
BundleMode::Data => self.next_data_bundle,
BundleMode::Meta => self.next_meta_bundle
};
self.bundle_map.set(bundle_id, bundle.id.clone());
if self.next_meta_bundle == bundle_id {
self.next_meta_bundle = self.next_free_bundle_id()
}
if self.next_data_bundle == bundle_id {
self.next_data_bundle = self.next_free_bundle_id()
}
}
self.save_bundle_map()
}
fn rebuild_index(&mut self) -> Result<(), RepositoryError> {
info!("Rebuilding index from bundles");
self.index.clear();
for (num, id) in self.bundle_map.bundles() {
let chunks = try!(self.bundles.get_chunk_list(&id));
for (i, (hash, _len)) in chunks.into_inner().into_iter().enumerate() {
try!(self.index.set(&hash, &Location{bundle: num as u32, chunk: i as u32}));
}
}
Ok(())
}
fn remove_gone_remote_bundle(&mut self, bundle: BundleInfo) -> Result<(), RepositoryError> {
if let Some(id) = self.bundle_map.find(&bundle.id) {
debug!("Removing bundle from index: {}", bundle.id);