mirror of https://github.com/dswd/zvault
10 changed files with 121 additions and 111 deletions
@ -1,96 +0,0 @@
|
||||
use ::prelude::*; |
||||
use super::*; |
||||
|
||||
use std::path::{Path, PathBuf}; |
||||
use std::io; |
||||
|
||||
|
||||
quick_error!{ |
||||
#[derive(Debug)] |
||||
pub enum BundleDbError { |
||||
List(err: io::Error) { |
||||
cause(err) |
||||
description("Failed to list bundles") |
||||
display("Failed to list bundles: {}", err) |
||||
} |
||||
Io(err: io::Error, path: PathBuf) { |
||||
cause(err) |
||||
context(path: &'a Path, err: io::Error) -> (err, path.to_path_buf()) |
||||
description("Failed to read/write bundle") |
||||
display("Failed to read/write bundle {:?}: {}", path, err) |
||||
} |
||||
Decode(err: msgpack::DecodeError, path: PathBuf) { |
||||
cause(err) |
||||
context(path: &'a Path, err: msgpack::DecodeError) -> (err, path.to_path_buf()) |
||||
description("Failed to decode bundle header") |
||||
display("Failed to decode bundle header of {:?}: {}", path, err) |
||||
} |
||||
Encode(err: msgpack::EncodeError, path: PathBuf) { |
||||
cause(err) |
||||
context(path: &'a Path, err: msgpack::EncodeError) -> (err, path.to_path_buf()) |
||||
description("Failed to encode bundle header") |
||||
display("Failed to encode bundle header of {:?}: {}", path, err) |
||||
} |
||||
WrongHeader(path: PathBuf) { |
||||
description("Wrong header") |
||||
display("Wrong header on bundle {:?}", path) |
||||
} |
||||
WrongVersion(path: PathBuf, version: u8) { |
||||
description("Wrong version") |
||||
display("Wrong version on bundle {:?}: {}", path, version) |
||||
} |
||||
Integrity(bundle: BundleId, reason: &'static str) { |
||||
description("Bundle has an integrity error") |
||||
display("Bundle {:?} has an integrity error: {}", bundle, reason) |
||||
} |
||||
NoSuchBundle(bundle: BundleId) { |
||||
description("No such bundle") |
||||
display("No such bundle: {:?}", bundle) |
||||
} |
||||
NoSuchChunk(bundle: BundleId, id: usize) { |
||||
description("Bundle has no such chunk") |
||||
display("Bundle {:?} has no chunk with that id: {}", bundle, id) |
||||
} |
||||
Decompression(err: CompressionError, path: PathBuf) { |
||||
cause(err) |
||||
context(path: &'a Path, err: CompressionError) -> (err, path.to_path_buf()) |
||||
description("Decompression failed") |
||||
display("Decompression failed on bundle {:?}: {}", path, err) |
||||
} |
||||
Compression(err: CompressionError) { |
||||
from() |
||||
cause(err) |
||||
description("Compression failed") |
||||
display("Compression failed: {}", err) |
||||
} |
||||
Decryption(err: EncryptionError, path: PathBuf) { |
||||
cause(err) |
||||
context(path: &'a Path, err: EncryptionError) -> (err, path.to_path_buf()) |
||||
description("Decryption failed") |
||||
display("Decryption failed on bundle {:?}: {}", path, err) |
||||
} |
||||
Encryption(err: EncryptionError) { |
||||
from() |
||||
cause(err) |
||||
description("Encryption failed") |
||||
display("Encryption failed: {}", err) |
||||
} |
||||
Remove(err: io::Error, bundle: BundleId) { |
||||
cause(err) |
||||
description("Failed to remove bundle") |
||||
display("Failed to remove bundle {}", bundle) |
||||
} |
||||
Writer(err: BundleWriterError) { |
||||
from() |
||||
cause(err) |
||||
description("Failed to write new bundle") |
||||
display("Bundle db error: failed to write new bundle\n\tcaused by: {}", err) |
||||
} |
||||
Reader(err: BundleReaderError) { |
||||
from() |
||||
cause(err) |
||||
description("Failed to read bundle") |
||||
display("Bundle db error: failed to read a bundle\n\tcaused by: {}", err) |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,87 @@
|
||||
use ::prelude::*; |
||||
|
||||
use std::sync::atomic::{Ordering, AtomicBool, AtomicUsize}; |
||||
use std::sync::{Mutex, Condvar, Arc}; |
||||
use std::{mem, fs, thread}; |
||||
use std::path::{Path, PathBuf}; |
||||
|
||||
use crossbeam::sync::MsQueue; |
||||
|
||||
|
||||
pub struct BundleUploader { |
||||
capacity: usize, |
||||
error_present: AtomicBool, |
||||
error: Mutex<Option<BundleDbError>>, |
||||
waiting: AtomicUsize, |
||||
queue: MsQueue<Option<(PathBuf, PathBuf)>>, |
||||
wait: (Condvar, Mutex<()>) |
||||
} |
||||
|
||||
impl BundleUploader { |
||||
pub fn new(capacity: usize) -> Arc<Self> { |
||||
let self_ = Arc::new(BundleUploader { |
||||
capacity: capacity, |
||||
error_present: AtomicBool::new(false), |
||||
error: Mutex::new(None), |
||||
waiting: AtomicUsize::new(0), |
||||
queue: MsQueue::new(), |
||||
wait: (Condvar::new(), Mutex::new(())) |
||||
}); |
||||
let self2 = self_.clone(); |
||||
thread::Builder::new().name("uploader".to_string()).spawn(move || self2.worker_thread()).unwrap(); |
||||
self_ |
||||
} |
||||
|
||||
fn get_status(&self) -> Result<(), BundleDbError> { |
||||
if self.error_present.load(Ordering::SeqCst) { |
||||
let mut error = None; |
||||
mem::swap(&mut error, &mut self.error.lock().unwrap()); |
||||
Err(error.unwrap()) |
||||
} else { |
||||
Ok(()) |
||||
} |
||||
} |
||||
|
||||
pub fn queue(&self, local_path: PathBuf, remote_path: PathBuf) -> Result<(), BundleDbError> { |
||||
while self.waiting.load(Ordering::SeqCst) >= self.capacity { |
||||
let _ = self.wait.0.wait(self.wait.1.lock().unwrap()).unwrap(); |
||||
} |
||||
if !self.error_present.load(Ordering::SeqCst) { |
||||
self.waiting.fetch_add(1, Ordering::SeqCst); |
||||
self.queue.push(Some((local_path, remote_path))); |
||||
} |
||||
self.get_status() |
||||
} |
||||
|
||||
pub fn finish(&self) -> Result<(), BundleDbError> { |
||||
if !self.error_present.load(Ordering::SeqCst) { |
||||
self.waiting.fetch_add(1, Ordering::SeqCst); |
||||
self.queue.push(None); |
||||
} |
||||
while self.waiting.load(Ordering::SeqCst) > 0 { |
||||
let _ = self.wait.0.wait(self.wait.1.lock().unwrap()); |
||||
} |
||||
self.get_status() |
||||
} |
||||
|
||||
fn worker_thread_inner(&self) -> Result<(), BundleDbError> { |
||||
while let Some((src_path, dst_path)) = self.queue.pop() { |
||||
let folder = dst_path.parent().unwrap(); |
||||
try!(fs::create_dir_all(&folder).context(&folder as &Path)); |
||||
if fs::rename(&src_path, &dst_path).is_err() { |
||||
try!(fs::copy(&src_path, &dst_path).context(&dst_path as &Path)); |
||||
try!(fs::remove_file(&src_path).context(&src_path as &Path)); |
||||
} |
||||
} |
||||
Ok(()) |
||||
} |
||||
|
||||
fn worker_thread(&self) { |
||||
if let Err(err) = self.worker_thread_inner() { |
||||
*self.error.lock().unwrap() = Some(err); |
||||
self.error_present.store(true, Ordering::SeqCst); |
||||
} |
||||
self.waiting.swap(0, Ordering::SeqCst); |
||||
self.wait.0.notify_all(); |
||||
} |
||||
} |
Loading…
Reference in new issue