From 77e396094a3fdcd4cec65b2e4e1608fb01988e27 Mon Sep 17 00:00:00 2001 From: Dennis Schwerdel Date: Mon, 10 Apr 2017 08:53:55 +0200 Subject: [PATCH] Detach bundle upload --- Cargo.lock | 7 +++ Cargo.toml | 1 + TODO.md | 1 - src/bundledb/cache.rs | 11 ----- src/bundledb/db.rs | 25 +++++++++-- src/bundledb/error.rs | 96 ---------------------------------------- src/bundledb/mod.rs | 2 + src/bundledb/uploader.rs | 87 ++++++++++++++++++++++++++++++++++++ src/main.rs | 1 + src/repository/mod.rs | 1 + 10 files changed, 121 insertions(+), 111 deletions(-) delete mode 100644 src/bundledb/error.rs create mode 100644 src/bundledb/uploader.rs diff --git a/Cargo.lock b/Cargo.lock index 24ec19d..966d251 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7,6 +7,7 @@ dependencies = [ "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.23.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", "filetime 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "fuse 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -105,6 +106,11 @@ name = "constant_time_eq" version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "crossbeam" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "filetime" version = "0.1.10" @@ -466,6 +472,7 @@ dependencies = [ "checksum chrono 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "158b0bd7d75cbb6bf9c25967a48a2e9f77da95876b858eadfabaa99cd069de6e" "checksum clap 2.23.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d480c39a2e5f9b3a3798c661613e1b0e7a7ae71e005102d4aa910fc3289df484" "checksum constant_time_eq 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "07dcb7959f0f6f1cf662f9a7ff389bcb919924d99ac41cf31f10d611d8721323" +"checksum crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "0c5ea215664ca264da8a9d9c3be80d2eaf30923c259d03e870388eb927508f97" "checksum filetime 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "5363ab8e4139b8568a6237db5248646e5a8a2f89bd5ccb02092182b11fd3e922" "checksum fuse 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5087262ce5b36fed6ccd4abf0a8224e48d055a2bb07fecb5605765de6f114a28" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" diff --git a/Cargo.toml b/Cargo.toml index f742a4c..d74c456 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ lazy_static = "0.2" rand = "0.3" tar = "0.4" xattr = "0.1" +crossbeam = "0.2" time = "*" libc = "*" diff --git a/TODO.md b/TODO.md index 7a26200..1fb3ef8 100644 --- a/TODO.md +++ b/TODO.md @@ -1,7 +1,6 @@ # TODO ## Functionality -* Detach bundle upload * XAttrs in fuse * XAttrs in tar * `check --repair` diff --git a/src/bundledb/cache.rs b/src/bundledb/cache.rs index 47c8951..0153773 100644 --- a/src/bundledb/cache.rs +++ b/src/bundledb/cache.rs @@ -62,17 +62,6 @@ impl StoredBundle { self.info.id.clone() } - pub fn move_to>(mut self, base_path: &Path, path: P) -> Result { - let src_path = base_path.join(&self.path); - let dst_path = path.as_ref(); - if fs::rename(&src_path, dst_path).is_err() { - try!(fs::copy(&src_path, dst_path).context(dst_path)); - try!(fs::remove_file(&src_path).context(&src_path as &Path)); - } - self.path = dst_path.strip_prefix(base_path).unwrap().to_path_buf(); - Ok(self) - } - pub fn copy_to>(&self, base_path: &Path, path: P) -> Result { let src_path = base_path.join(&self.path); let dst_path = path.as_ref(); diff --git a/src/bundledb/db.rs b/src/bundledb/db.rs index f70baa5..1ff3192 100644 --- a/src/bundledb/db.rs +++ b/src/bundledb/db.rs @@ -6,6 +6,7 @@ use std::collections::{HashMap, HashSet}; use std::fs; use std::sync::{Arc, Mutex}; use std::io; +use std::mem; quick_error!{ @@ -97,6 +98,7 @@ pub fn load_bundles(path: &Path, base: &Path, bundles: &mut HashMap>, crypto: Arc>, local_bundles: HashMap, remote_bundles: HashMap, @@ -109,6 +111,7 @@ impl BundleDb { BundleDb { layout: layout, crypto: crypto, + uploader: None, local_bundles: HashMap::new(), remote_bundles: HashMap::new(), bundle_cache: LruCache::new(5, 10) @@ -230,17 +233,33 @@ impl BundleDb { #[inline] pub fn add_bundle(&mut self, bundle: BundleWriter) -> Result { - let bundle = try!(bundle.finish(&self)); + let mut bundle = try!(bundle.finish(&self)); if bundle.info.mode == BundleMode::Meta { try!(self.copy_remote_bundle_to_cache(&bundle)) } let (folder, filename) = self.layout.remote_bundle_path(self.remote_bundles.len()); - try!(fs::create_dir_all(&folder).context(&folder as &Path)); - let bundle = try!(bundle.move_to(&self.layout.base_path(), folder.join(filename))); + let dst_path = folder.join(filename); + let src_path = bundle.path; + bundle.path = dst_path.clone(); + if self.uploader.is_none() { + self.uploader = Some(BundleUploader::new(5)); + } + try!(self.uploader.as_ref().unwrap().queue(src_path, dst_path)); self.remote_bundles.insert(bundle.id(), bundle.clone()); Ok(bundle.info) } + #[inline] + pub fn finish_uploads(&mut self) -> Result<(), BundleDbError> { + let mut uploader = None; + mem::swap(&mut self.uploader, &mut uploader); + if let Some(uploader) = uploader { + uploader.finish() + } else { + Ok(()) + } + } + #[inline] pub fn get_chunk_list(&self, bundle: &BundleId) -> Result { let mut bundle = try!(self.get_stored_bundle(bundle).and_then(|stored| self.get_bundle(&stored))); diff --git a/src/bundledb/error.rs b/src/bundledb/error.rs deleted file mode 100644 index 8090160..0000000 --- a/src/bundledb/error.rs +++ /dev/null @@ -1,96 +0,0 @@ -use ::prelude::*; -use super::*; - -use std::path::{Path, PathBuf}; -use std::io; - - -quick_error!{ - #[derive(Debug)] - pub enum BundleDbError { - List(err: io::Error) { - cause(err) - description("Failed to list bundles") - display("Failed to list bundles: {}", err) - } - Io(err: io::Error, path: PathBuf) { - cause(err) - context(path: &'a Path, err: io::Error) -> (err, path.to_path_buf()) - description("Failed to read/write bundle") - display("Failed to read/write bundle {:?}: {}", path, err) - } - Decode(err: msgpack::DecodeError, path: PathBuf) { - cause(err) - context(path: &'a Path, err: msgpack::DecodeError) -> (err, path.to_path_buf()) - description("Failed to decode bundle header") - display("Failed to decode bundle header of {:?}: {}", path, err) - } - Encode(err: msgpack::EncodeError, path: PathBuf) { - cause(err) - context(path: &'a Path, err: msgpack::EncodeError) -> (err, path.to_path_buf()) - description("Failed to encode bundle header") - display("Failed to encode bundle header of {:?}: {}", path, err) - } - WrongHeader(path: PathBuf) { - description("Wrong header") - display("Wrong header on bundle {:?}", path) - } - WrongVersion(path: PathBuf, version: u8) { - description("Wrong version") - display("Wrong version on bundle {:?}: {}", path, version) - } - Integrity(bundle: BundleId, reason: &'static str) { - description("Bundle has an integrity error") - display("Bundle {:?} has an integrity error: {}", bundle, reason) - } - NoSuchBundle(bundle: BundleId) { - description("No such bundle") - display("No such bundle: {:?}", bundle) - } - NoSuchChunk(bundle: BundleId, id: usize) { - description("Bundle has no such chunk") - display("Bundle {:?} has no chunk with that id: {}", bundle, id) - } - Decompression(err: CompressionError, path: PathBuf) { - cause(err) - context(path: &'a Path, err: CompressionError) -> (err, path.to_path_buf()) - description("Decompression failed") - display("Decompression failed on bundle {:?}: {}", path, err) - } - Compression(err: CompressionError) { - from() - cause(err) - description("Compression failed") - display("Compression failed: {}", err) - } - Decryption(err: EncryptionError, path: PathBuf) { - cause(err) - context(path: &'a Path, err: EncryptionError) -> (err, path.to_path_buf()) - description("Decryption failed") - display("Decryption failed on bundle {:?}: {}", path, err) - } - Encryption(err: EncryptionError) { - from() - cause(err) - description("Encryption failed") - display("Encryption failed: {}", err) - } - Remove(err: io::Error, bundle: BundleId) { - cause(err) - description("Failed to remove bundle") - display("Failed to remove bundle {}", bundle) - } - Writer(err: BundleWriterError) { - from() - cause(err) - description("Failed to write new bundle") - display("Bundle db error: failed to write new bundle\n\tcaused by: {}", err) - } - Reader(err: BundleReaderError) { - from() - cause(err) - description("Failed to read bundle") - display("Bundle db error: failed to read a bundle\n\tcaused by: {}", err) - } - } -} diff --git a/src/bundledb/mod.rs b/src/bundledb/mod.rs index 9d494cb..3f5378f 100644 --- a/src/bundledb/mod.rs +++ b/src/bundledb/mod.rs @@ -2,11 +2,13 @@ mod writer; mod reader; mod db; mod cache; +mod uploader; pub use self::cache::{StoredBundle, BundleCacheError}; pub use self::writer::{BundleWriter, BundleWriterError}; pub use self::reader::{BundleReader, BundleReaderError}; pub use self::db::*; +pub use self::uploader::BundleUploader; use ::prelude::*; diff --git a/src/bundledb/uploader.rs b/src/bundledb/uploader.rs new file mode 100644 index 0000000..5f24178 --- /dev/null +++ b/src/bundledb/uploader.rs @@ -0,0 +1,87 @@ +use ::prelude::*; + +use std::sync::atomic::{Ordering, AtomicBool, AtomicUsize}; +use std::sync::{Mutex, Condvar, Arc}; +use std::{mem, fs, thread}; +use std::path::{Path, PathBuf}; + +use crossbeam::sync::MsQueue; + + +pub struct BundleUploader { + capacity: usize, + error_present: AtomicBool, + error: Mutex>, + waiting: AtomicUsize, + queue: MsQueue>, + wait: (Condvar, Mutex<()>) +} + +impl BundleUploader { + pub fn new(capacity: usize) -> Arc { + let self_ = Arc::new(BundleUploader { + capacity: capacity, + error_present: AtomicBool::new(false), + error: Mutex::new(None), + waiting: AtomicUsize::new(0), + queue: MsQueue::new(), + wait: (Condvar::new(), Mutex::new(())) + }); + let self2 = self_.clone(); + thread::Builder::new().name("uploader".to_string()).spawn(move || self2.worker_thread()).unwrap(); + self_ + } + + fn get_status(&self) -> Result<(), BundleDbError> { + if self.error_present.load(Ordering::SeqCst) { + let mut error = None; + mem::swap(&mut error, &mut self.error.lock().unwrap()); + Err(error.unwrap()) + } else { + Ok(()) + } + } + + pub fn queue(&self, local_path: PathBuf, remote_path: PathBuf) -> Result<(), BundleDbError> { + while self.waiting.load(Ordering::SeqCst) >= self.capacity { + let _ = self.wait.0.wait(self.wait.1.lock().unwrap()).unwrap(); + } + if !self.error_present.load(Ordering::SeqCst) { + self.waiting.fetch_add(1, Ordering::SeqCst); + self.queue.push(Some((local_path, remote_path))); + } + self.get_status() + } + + pub fn finish(&self) -> Result<(), BundleDbError> { + if !self.error_present.load(Ordering::SeqCst) { + self.waiting.fetch_add(1, Ordering::SeqCst); + self.queue.push(None); + } + while self.waiting.load(Ordering::SeqCst) > 0 { + let _ = self.wait.0.wait(self.wait.1.lock().unwrap()); + } + self.get_status() + } + + fn worker_thread_inner(&self) -> Result<(), BundleDbError> { + while let Some((src_path, dst_path)) = self.queue.pop() { + let folder = dst_path.parent().unwrap(); + try!(fs::create_dir_all(&folder).context(&folder as &Path)); + if fs::rename(&src_path, &dst_path).is_err() { + try!(fs::copy(&src_path, &dst_path).context(&dst_path as &Path)); + try!(fs::remove_file(&src_path).context(&src_path as &Path)); + } + } + Ok(()) + } + + fn worker_thread(&self) { + if let Err(err) = self.worker_thread_inner() { + *self.error.lock().unwrap() = Some(err); + self.error_present.store(true, Ordering::SeqCst); + } + self.waiting.swap(0, Ordering::SeqCst); + self.wait.0.notify_all(); + } +} diff --git a/src/main.rs b/src/main.rs index 9e11ddc..ed08f5b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,6 +23,7 @@ extern crate fuse; extern crate rand; extern crate time; extern crate xattr; +extern crate crossbeam; extern crate libc; extern crate tar; diff --git a/src/repository/mod.rs b/src/repository/mod.rs index 9f41e7b..72527ec 100644 --- a/src/repository/mod.rs +++ b/src/repository/mod.rs @@ -208,6 +208,7 @@ impl Repository { } self.next_meta_bundle = self.next_free_bundle_id() } + try!(self.bundles.finish_uploads()); try!(self.save_bundle_map()); try!(self.bundles.save_cache()); Ok(())