2017-03-21 10:28:11 +00:00
|
|
|
use ::prelude::*;
|
|
|
|
|
2017-03-10 11:43:32 +00:00
|
|
|
use std::mem;
|
2017-04-03 13:18:06 +00:00
|
|
|
use std::cmp::min;
|
|
|
|
use std::collections::VecDeque;
|
|
|
|
use std::io::{self, Read, Write, Cursor};
|
|
|
|
|
|
|
|
|
|
|
|
pub struct ChunkReader<'a> {
|
|
|
|
chunks: VecDeque<Chunk>,
|
|
|
|
data: Vec<u8>,
|
|
|
|
pos: usize,
|
|
|
|
repo: &'a mut Repository
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<'a> ChunkReader<'a> {
|
|
|
|
pub fn new(repo: &'a mut Repository, chunks: ChunkList) -> Self {
|
|
|
|
ChunkReader {
|
|
|
|
repo: repo,
|
|
|
|
chunks: chunks.into_inner().into(),
|
|
|
|
data: vec![],
|
|
|
|
pos: 0
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<'a> Read for ChunkReader<'a> {
|
|
|
|
fn read(&mut self, mut buf: &mut [u8]) -> Result<usize, io::Error> {
|
|
|
|
let mut bpos = 0;
|
|
|
|
loop {
|
|
|
|
if buf.len() == bpos {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
if self.data.len() == self.pos {
|
|
|
|
if let Some(chunk) = self.chunks.pop_front() {
|
|
|
|
self.data = match self.repo.get_chunk(chunk.0) {
|
|
|
|
Ok(Some(data)) => data,
|
2017-04-10 15:53:26 +00:00
|
|
|
Ok(None) => return Err(io::Error::new(io::ErrorKind::Other, IntegrityError::MissingChunk(chunk.0))),
|
2017-04-03 13:18:06 +00:00
|
|
|
Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err))
|
|
|
|
};
|
|
|
|
self.pos = 0;
|
|
|
|
} else {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
let l = min(self.data.len()-self.pos, buf.len() - bpos);
|
|
|
|
buf[bpos..bpos+l].copy_from_slice(&self.data[self.pos..self.pos+l]);
|
|
|
|
bpos += l;
|
|
|
|
self.pos += l;
|
|
|
|
}
|
|
|
|
Ok(bpos)
|
|
|
|
}
|
|
|
|
}
|
2017-03-10 11:43:32 +00:00
|
|
|
|
|
|
|
|
|
|
|
impl Repository {
|
2017-04-10 18:35:28 +00:00
|
|
|
#[inline]
|
2017-03-16 08:42:30 +00:00
|
|
|
pub fn get_bundle_id(&self, id: u32) -> Result<BundleId, RepositoryError> {
|
2017-04-10 15:53:26 +00:00
|
|
|
self.bundle_map.get(id).ok_or_else(|| IntegrityError::MissingBundleId(id).into())
|
2017-03-16 08:42:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn get_chunk(&mut self, hash: Hash) -> Result<Option<Vec<u8>>, RepositoryError> {
|
2017-03-10 11:43:32 +00:00
|
|
|
// Find bundle and chunk id in index
|
|
|
|
let found = if let Some(found) = self.index.get(&hash) {
|
|
|
|
found
|
|
|
|
} else {
|
|
|
|
return Ok(None)
|
|
|
|
};
|
|
|
|
// Lookup bundle id from map
|
2017-03-16 08:42:30 +00:00
|
|
|
let bundle_id = try!(self.get_bundle_id(found.bundle));
|
2017-03-10 11:43:32 +00:00
|
|
|
// Get chunk from bundle
|
2017-03-16 08:42:30 +00:00
|
|
|
Ok(Some(try!(self.bundles.get_chunk(&bundle_id, found.chunk as usize))))
|
2017-03-10 11:43:32 +00:00
|
|
|
}
|
|
|
|
|
2017-03-20 13:03:29 +00:00
|
|
|
#[inline]
|
2017-03-17 06:15:19 +00:00
|
|
|
pub fn put_chunk(&mut self, mode: BundleMode, hash: Hash, data: &[u8]) -> Result<(), RepositoryError> {
|
2017-03-10 11:43:32 +00:00
|
|
|
// If this chunk is in the index, ignore it
|
|
|
|
if self.index.contains(&hash) {
|
|
|
|
return Ok(())
|
|
|
|
}
|
2017-03-20 13:03:29 +00:00
|
|
|
self.put_chunk_override(mode, hash, data)
|
|
|
|
}
|
|
|
|
|
2017-04-07 16:57:49 +00:00
|
|
|
fn write_chunk_to_bundle_and_index(&mut self, mode: BundleMode, hash: Hash, data: &[u8]) -> Result<(), RepositoryError> {
|
2017-03-10 11:43:32 +00:00
|
|
|
let writer = match mode {
|
2017-04-03 12:05:16 +00:00
|
|
|
BundleMode::Data => &mut self.data_bundle,
|
2017-03-17 06:15:19 +00:00
|
|
|
BundleMode::Meta => &mut self.meta_bundle
|
2017-03-10 11:43:32 +00:00
|
|
|
};
|
|
|
|
// ...alocate one if needed
|
|
|
|
if writer.is_none() {
|
2017-03-18 16:22:11 +00:00
|
|
|
*writer = Some(try!(self.bundles.create_bundle(
|
|
|
|
mode,
|
|
|
|
self.config.hash,
|
|
|
|
self.config.compression.clone(),
|
|
|
|
self.config.encryption.clone()
|
|
|
|
)));
|
2017-03-10 11:43:32 +00:00
|
|
|
}
|
|
|
|
debug_assert!(writer.is_some());
|
2017-04-07 16:57:49 +00:00
|
|
|
// Add chunk to bundle writer and determine the size of the bundle
|
|
|
|
let writer_obj = writer.as_mut().unwrap();
|
|
|
|
let chunk_id = try!(writer_obj.add(data, hash));
|
|
|
|
let bundle_id = match mode {
|
|
|
|
BundleMode::Data => self.next_data_bundle,
|
|
|
|
BundleMode::Meta => self.next_meta_bundle
|
|
|
|
};
|
|
|
|
// Add location to the index
|
|
|
|
try!(self.index.set(&hash, &Location::new(bundle_id, chunk_id as u32)));
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn finish_bundle(&mut self, mode: BundleMode) -> Result<(), RepositoryError> {
|
|
|
|
// Calculate the next free bundle id now (late lifetime prevents this)
|
|
|
|
let next_free_bundle_id = self.next_free_bundle_id();
|
|
|
|
let writer = match mode {
|
|
|
|
BundleMode::Data => &mut self.data_bundle,
|
|
|
|
BundleMode::Meta => &mut self.meta_bundle
|
|
|
|
};
|
|
|
|
if writer.is_none() {
|
|
|
|
return Ok(())
|
2017-03-10 11:43:32 +00:00
|
|
|
}
|
|
|
|
let bundle_id = match mode {
|
2017-04-03 12:05:16 +00:00
|
|
|
BundleMode::Data => self.next_data_bundle,
|
2017-03-17 06:15:19 +00:00
|
|
|
BundleMode::Meta => self.next_meta_bundle
|
2017-03-10 11:43:32 +00:00
|
|
|
};
|
2017-04-07 16:57:49 +00:00
|
|
|
let mut finished = None;
|
|
|
|
mem::swap(writer, &mut finished);
|
|
|
|
let bundle = try!(self.bundles.add_bundle(finished.unwrap()));
|
|
|
|
self.bundle_map.set(bundle_id, bundle.id.clone());
|
|
|
|
if self.next_meta_bundle == bundle_id {
|
|
|
|
self.next_meta_bundle = next_free_bundle_id
|
|
|
|
}
|
|
|
|
if self.next_data_bundle == bundle_id {
|
|
|
|
self.next_data_bundle = next_free_bundle_id
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn finish_bundle_if_needed(&mut self, mode: BundleMode) -> Result<(), RepositoryError> {
|
|
|
|
let (size, raw_size) = {
|
|
|
|
let writer = match mode {
|
|
|
|
BundleMode::Data => &mut self.data_bundle,
|
|
|
|
BundleMode::Meta => &mut self.meta_bundle
|
|
|
|
};
|
|
|
|
if let Some(ref writer) = *writer {
|
|
|
|
(writer.size(), writer.raw_size())
|
|
|
|
} else {
|
|
|
|
return Ok(())
|
2017-03-10 11:43:32 +00:00
|
|
|
}
|
2017-04-07 16:57:49 +00:00
|
|
|
};
|
|
|
|
if size >= self.config.bundle_size || raw_size >= 4 * self.config.bundle_size {
|
|
|
|
if mode == BundleMode::Meta {
|
|
|
|
//First store the current data bundle as meta referrs to those chunks
|
|
|
|
try!(self.finish_bundle(BundleMode::Data))
|
2017-03-10 11:43:32 +00:00
|
|
|
}
|
2017-04-07 16:57:49 +00:00
|
|
|
try!(self.finish_bundle(mode))
|
2017-03-10 11:43:32 +00:00
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2017-04-07 16:57:49 +00:00
|
|
|
#[inline]
|
|
|
|
pub fn put_chunk_override(&mut self, mode: BundleMode, hash: Hash, data: &[u8]) -> Result<(), RepositoryError> {
|
|
|
|
try!(self.write_chunk_to_bundle_and_index(mode, hash, data));
|
|
|
|
self.finish_bundle_if_needed(mode)
|
|
|
|
}
|
|
|
|
|
2017-03-10 11:43:32 +00:00
|
|
|
#[inline]
|
2017-03-18 14:41:59 +00:00
|
|
|
pub fn put_data(&mut self, mode: BundleMode, data: &[u8]) -> Result<ChunkList, RepositoryError> {
|
2017-03-10 11:43:32 +00:00
|
|
|
let mut input = Cursor::new(data);
|
|
|
|
self.put_stream(mode, &mut input)
|
|
|
|
}
|
|
|
|
|
2017-03-18 14:41:59 +00:00
|
|
|
pub fn put_stream<R: Read>(&mut self, mode: BundleMode, data: &mut R) -> Result<ChunkList, RepositoryError> {
|
2017-03-10 11:43:32 +00:00
|
|
|
let avg_size = self.config.chunker.avg_size();
|
|
|
|
let mut chunks = Vec::new();
|
|
|
|
let mut chunk = Vec::with_capacity(avg_size * 2);
|
|
|
|
loop {
|
|
|
|
chunk.clear();
|
|
|
|
let mut output = Cursor::new(chunk);
|
2017-03-16 08:42:30 +00:00
|
|
|
let res = try!(self.chunker.chunk(data, &mut output));
|
2017-03-10 11:43:32 +00:00
|
|
|
chunk = output.into_inner();
|
|
|
|
let hash = self.config.hash.hash(&chunk);
|
2017-03-16 08:42:30 +00:00
|
|
|
try!(self.put_chunk(mode, hash, &chunk));
|
2017-03-18 14:41:59 +00:00
|
|
|
chunks.push((hash, chunk.len() as u32));
|
2017-03-10 11:43:32 +00:00
|
|
|
if res == ChunkerStatus::Finished {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
2017-03-18 14:41:59 +00:00
|
|
|
Ok(chunks.into())
|
2017-03-10 11:43:32 +00:00
|
|
|
}
|
|
|
|
|
2017-03-16 08:42:30 +00:00
|
|
|
pub fn get_data(&mut self, chunks: &[Chunk]) -> Result<Vec<u8>, RepositoryError> {
|
2017-03-18 14:41:59 +00:00
|
|
|
let mut data = Vec::with_capacity(chunks.iter().map(|&(_, size)| size).sum::<u32>() as usize);
|
2017-03-10 11:43:32 +00:00
|
|
|
try!(self.get_stream(chunks, &mut data));
|
|
|
|
Ok(data)
|
|
|
|
}
|
|
|
|
|
2017-04-03 13:18:06 +00:00
|
|
|
#[inline]
|
|
|
|
pub fn get_reader(&mut self, chunks: ChunkList) -> ChunkReader {
|
|
|
|
ChunkReader::new(self, chunks)
|
|
|
|
}
|
|
|
|
|
2017-03-16 08:42:30 +00:00
|
|
|
pub fn get_stream<W: Write>(&mut self, chunks: &[Chunk], w: &mut W) -> Result<(), RepositoryError> {
|
2017-03-10 11:43:32 +00:00
|
|
|
for &(ref hash, len) in chunks {
|
2017-04-10 15:53:26 +00:00
|
|
|
let data = try!(try!(self.get_chunk(*hash)).ok_or_else(|| IntegrityError::MissingChunk(hash.clone())));
|
2017-03-18 14:41:59 +00:00
|
|
|
debug_assert_eq!(data.len() as u32, len);
|
2017-03-16 08:42:30 +00:00
|
|
|
try!(w.write_all(&data));
|
2017-03-10 11:43:32 +00:00
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|