2017-07-21 09:21:59 +00:00
|
|
|
use prelude::*;
|
2017-04-10 06:53:55 +00:00
|
|
|
|
|
|
|
use std::sync::atomic::{Ordering, AtomicBool, AtomicUsize};
|
|
|
|
use std::sync::{Mutex, Condvar, Arc};
|
|
|
|
use std::{mem, fs, thread};
|
|
|
|
use std::path::{Path, PathBuf};
|
|
|
|
|
|
|
|
use crossbeam::sync::MsQueue;
|
|
|
|
|
|
|
|
|
|
|
|
pub struct BundleUploader {
|
|
|
|
capacity: usize,
|
|
|
|
error_present: AtomicBool,
|
|
|
|
error: Mutex<Option<BundleDbError>>,
|
|
|
|
waiting: AtomicUsize,
|
|
|
|
queue: MsQueue<Option<(PathBuf, PathBuf)>>,
|
|
|
|
wait: (Condvar, Mutex<()>)
|
|
|
|
}
|
|
|
|
|
|
|
|
impl BundleUploader {
|
|
|
|
pub fn new(capacity: usize) -> Arc<Self> {
|
|
|
|
let self_ = Arc::new(BundleUploader {
|
|
|
|
capacity: capacity,
|
|
|
|
error_present: AtomicBool::new(false),
|
|
|
|
error: Mutex::new(None),
|
|
|
|
waiting: AtomicUsize::new(0),
|
|
|
|
queue: MsQueue::new(),
|
|
|
|
wait: (Condvar::new(), Mutex::new(()))
|
|
|
|
});
|
|
|
|
let self2 = self_.clone();
|
2017-07-21 09:21:59 +00:00
|
|
|
thread::Builder::new()
|
|
|
|
.name("uploader".to_string())
|
|
|
|
.spawn(move || self2.worker_thread())
|
|
|
|
.unwrap();
|
2017-04-10 06:53:55 +00:00
|
|
|
self_
|
|
|
|
}
|
|
|
|
|
|
|
|
fn get_status(&self) -> Result<(), BundleDbError> {
|
|
|
|
if self.error_present.load(Ordering::SeqCst) {
|
|
|
|
let mut error = None;
|
|
|
|
mem::swap(&mut error, &mut self.error.lock().unwrap());
|
2017-04-12 09:34:31 +00:00
|
|
|
if let Some(err) = error {
|
|
|
|
Err(err)
|
|
|
|
} else {
|
|
|
|
Err(BundleDbError::UploadFailed)
|
|
|
|
}
|
2017-04-10 06:53:55 +00:00
|
|
|
} else {
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn queue(&self, local_path: PathBuf, remote_path: PathBuf) -> Result<(), BundleDbError> {
|
|
|
|
while self.waiting.load(Ordering::SeqCst) >= self.capacity {
|
2017-04-12 08:32:46 +00:00
|
|
|
debug!("Upload queue is full, waiting for slots");
|
2017-04-10 06:53:55 +00:00
|
|
|
let _ = self.wait.0.wait(self.wait.1.lock().unwrap()).unwrap();
|
|
|
|
}
|
2017-04-12 08:32:46 +00:00
|
|
|
trace!("Adding to upload queue: {:?}", local_path);
|
2017-04-10 06:53:55 +00:00
|
|
|
if !self.error_present.load(Ordering::SeqCst) {
|
|
|
|
self.waiting.fetch_add(1, Ordering::SeqCst);
|
|
|
|
self.queue.push(Some((local_path, remote_path)));
|
|
|
|
}
|
|
|
|
self.get_status()
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn finish(&self) -> Result<(), BundleDbError> {
|
|
|
|
if !self.error_present.load(Ordering::SeqCst) {
|
|
|
|
self.waiting.fetch_add(1, Ordering::SeqCst);
|
|
|
|
self.queue.push(None);
|
|
|
|
}
|
|
|
|
while self.waiting.load(Ordering::SeqCst) > 0 {
|
|
|
|
let _ = self.wait.0.wait(self.wait.1.lock().unwrap());
|
|
|
|
}
|
|
|
|
self.get_status()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn worker_thread_inner(&self) -> Result<(), BundleDbError> {
|
|
|
|
while let Some((src_path, dst_path)) = self.queue.pop() {
|
2017-04-12 08:32:46 +00:00
|
|
|
trace!("Uploading {:?} to {:?}", src_path, dst_path);
|
|
|
|
self.waiting.fetch_sub(1, Ordering::SeqCst);
|
|
|
|
self.wait.0.notify_all();
|
2017-04-10 06:53:55 +00:00
|
|
|
let folder = dst_path.parent().unwrap();
|
2017-04-19 20:43:08 +00:00
|
|
|
try!(fs::create_dir_all(&folder).context(folder as &Path));
|
2017-04-12 08:32:46 +00:00
|
|
|
try!(fs::copy(&src_path, &dst_path).context(&dst_path as &Path));
|
|
|
|
try!(fs::remove_file(&src_path).context(&src_path as &Path));
|
|
|
|
debug!("Uploaded {:?} to {:?}", src_path, dst_path);
|
2017-04-10 06:53:55 +00:00
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn worker_thread(&self) {
|
|
|
|
if let Err(err) = self.worker_thread_inner() {
|
2017-04-12 09:34:31 +00:00
|
|
|
debug!("Upload thread failed with error: {}", err);
|
2017-04-10 06:53:55 +00:00
|
|
|
*self.error.lock().unwrap() = Some(err);
|
|
|
|
self.error_present.store(true, Ordering::SeqCst);
|
|
|
|
}
|
2017-04-12 09:34:31 +00:00
|
|
|
self.waiting.store(0, Ordering::SeqCst);
|
2017-04-10 06:53:55 +00:00
|
|
|
self.wait.0.notify_all();
|
|
|
|
}
|
|
|
|
}
|