zvault/src/repository/basic_io.rs

132 lines
4.9 KiB
Rust
Raw Normal View History

2017-03-10 11:43:32 +00:00
use std::mem;
use std::io::{Read, Write, Cursor};
2017-03-17 06:15:19 +00:00
use super::{Repository, RepositoryError};
2017-03-10 11:43:32 +00:00
use ::index::Location;
2017-03-17 06:15:19 +00:00
use ::bundle::{BundleId, BundleMode};
2017-03-16 08:42:30 +00:00
use super::integrity::RepositoryIntegrityError;
2017-03-10 11:43:32 +00:00
2017-03-18 14:41:59 +00:00
use ::util::*;
2017-03-10 11:43:32 +00:00
use ::chunker::{IChunker, ChunkerStatus};
impl Repository {
2017-03-16 08:42:30 +00:00
pub fn get_bundle_id(&self, id: u32) -> Result<BundleId, RepositoryError> {
if let Some(bundle_info) = self.bundle_map.get(id) {
Ok(bundle_info.id())
} else {
Err(RepositoryIntegrityError::MissingBundleId(id).into())
}
}
pub fn get_chunk(&mut self, hash: Hash) -> Result<Option<Vec<u8>>, RepositoryError> {
2017-03-10 11:43:32 +00:00
// Find bundle and chunk id in index
let found = if let Some(found) = self.index.get(&hash) {
found
} else {
return Ok(None)
};
// Lookup bundle id from map
2017-03-16 08:42:30 +00:00
let bundle_id = try!(self.get_bundle_id(found.bundle));
2017-03-10 11:43:32 +00:00
// Get chunk from bundle
2017-03-16 08:42:30 +00:00
Ok(Some(try!(self.bundles.get_chunk(&bundle_id, found.chunk as usize))))
2017-03-10 11:43:32 +00:00
}
2017-03-17 06:15:19 +00:00
pub fn put_chunk(&mut self, mode: BundleMode, hash: Hash, data: &[u8]) -> Result<(), RepositoryError> {
2017-03-10 11:43:32 +00:00
// If this chunk is in the index, ignore it
if self.index.contains(&hash) {
return Ok(())
}
// Calculate the next free bundle id now (late lifetime prevents this)
let next_free_bundle_id = self.next_free_bundle_id();
// Select a bundle writer according to the mode and...
let writer = match mode {
2017-03-17 06:15:19 +00:00
BundleMode::Content => &mut self.content_bundle,
BundleMode::Meta => &mut self.meta_bundle
2017-03-10 11:43:32 +00:00
};
// ...alocate one if needed
if writer.is_none() {
2017-03-18 16:22:11 +00:00
*writer = Some(try!(self.bundles.create_bundle(
mode,
self.config.hash,
self.config.compression.clone(),
self.config.encryption.clone()
)));
2017-03-10 11:43:32 +00:00
}
debug_assert!(writer.is_some());
let chunk_id;
let size;
2017-03-15 07:27:27 +00:00
let raw_size;
2017-03-10 11:43:32 +00:00
{
// Add chunk to bundle writer and determine the size of the bundle
let writer_obj = writer.as_mut().unwrap();
2017-03-17 11:58:22 +00:00
chunk_id = try!(writer_obj.add(data, hash));
2017-03-10 11:43:32 +00:00
size = writer_obj.size();
2017-03-15 07:27:27 +00:00
raw_size = writer_obj.raw_size();
2017-03-10 11:43:32 +00:00
}
let bundle_id = match mode {
2017-03-17 06:15:19 +00:00
BundleMode::Content => self.next_content_bundle,
BundleMode::Meta => self.next_meta_bundle
2017-03-10 11:43:32 +00:00
};
// Finish bundle if over maximum size
2017-03-15 07:27:27 +00:00
if size >= self.config.bundle_size || raw_size >= 4 * self.config.bundle_size {
2017-03-10 11:43:32 +00:00
let mut finished = None;
mem::swap(writer, &mut finished);
2017-03-16 08:42:30 +00:00
let bundle = try!(self.bundles.add_bundle(finished.unwrap()));
2017-03-15 07:27:27 +00:00
self.bundle_map.set(bundle_id, bundle);
2017-03-10 11:43:32 +00:00
if self.next_meta_bundle == bundle_id {
self.next_meta_bundle = next_free_bundle_id
}
if self.next_content_bundle == bundle_id {
self.next_content_bundle = next_free_bundle_id
}
// Not saving the bundle map, this will be done by flush
}
// Add location to the index
2017-03-16 08:42:30 +00:00
try!(self.index.set(&hash, &Location::new(bundle_id, chunk_id as u32)));
2017-03-10 11:43:32 +00:00
Ok(())
}
#[inline]
2017-03-18 14:41:59 +00:00
pub fn put_data(&mut self, mode: BundleMode, data: &[u8]) -> Result<ChunkList, RepositoryError> {
2017-03-10 11:43:32 +00:00
let mut input = Cursor::new(data);
self.put_stream(mode, &mut input)
}
2017-03-18 14:41:59 +00:00
pub fn put_stream<R: Read>(&mut self, mode: BundleMode, data: &mut R) -> Result<ChunkList, RepositoryError> {
2017-03-10 11:43:32 +00:00
let avg_size = self.config.chunker.avg_size();
let mut chunks = Vec::new();
let mut chunk = Vec::with_capacity(avg_size * 2);
loop {
chunk.clear();
let mut output = Cursor::new(chunk);
2017-03-16 08:42:30 +00:00
let res = try!(self.chunker.chunk(data, &mut output));
2017-03-10 11:43:32 +00:00
chunk = output.into_inner();
let hash = self.config.hash.hash(&chunk);
2017-03-16 08:42:30 +00:00
try!(self.put_chunk(mode, hash, &chunk));
2017-03-18 14:41:59 +00:00
chunks.push((hash, chunk.len() as u32));
2017-03-10 11:43:32 +00:00
if res == ChunkerStatus::Finished {
break
}
}
2017-03-18 14:41:59 +00:00
Ok(chunks.into())
2017-03-10 11:43:32 +00:00
}
#[inline]
2017-03-16 08:42:30 +00:00
pub fn get_data(&mut self, chunks: &[Chunk]) -> Result<Vec<u8>, RepositoryError> {
2017-03-18 14:41:59 +00:00
let mut data = Vec::with_capacity(chunks.iter().map(|&(_, size)| size).sum::<u32>() as usize);
2017-03-10 11:43:32 +00:00
try!(self.get_stream(chunks, &mut data));
Ok(data)
}
#[inline]
2017-03-16 08:42:30 +00:00
pub fn get_stream<W: Write>(&mut self, chunks: &[Chunk], w: &mut W) -> Result<(), RepositoryError> {
2017-03-10 11:43:32 +00:00
for &(ref hash, len) in chunks {
2017-03-16 08:42:30 +00:00
let data = try!(try!(self.get_chunk(*hash)).ok_or_else(|| RepositoryIntegrityError::MissingChunk(hash.clone())));
2017-03-18 14:41:59 +00:00
debug_assert_eq!(data.len() as u32, len);
2017-03-16 08:42:30 +00:00
try!(w.write_all(&data));
2017-03-10 11:43:32 +00:00
}
Ok(())
}
}