use ::prelude::*; use std::mem; use std::cmp::min; use std::collections::VecDeque; use std::io::{self, Read, Write, Cursor}; pub struct ChunkReader<'a> { chunks: VecDeque, data: Vec, pos: usize, repo: &'a mut Repository } impl<'a> ChunkReader<'a> { pub fn new(repo: &'a mut Repository, chunks: ChunkList) -> Self { ChunkReader { repo: repo, chunks: chunks.into_inner().into(), data: vec![], pos: 0 } } } impl<'a> Read for ChunkReader<'a> { fn read(&mut self, mut buf: &mut [u8]) -> Result { let mut bpos = 0; loop { if buf.len() == bpos { break } if self.data.len() == self.pos { if let Some(chunk) = self.chunks.pop_front() { self.data = match self.repo.get_chunk(chunk.0) { Ok(Some(data)) => data, Ok(None) => return Err(io::Error::new(io::ErrorKind::Other, IntegrityError::MissingChunk(chunk.0))), Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)) }; self.pos = 0; } else { break } } let l = min(self.data.len()-self.pos, buf.len() - bpos); buf[bpos..bpos+l].copy_from_slice(&self.data[self.pos..self.pos+l]); bpos += l; self.pos += l; } Ok(bpos) } } impl Repository { #[inline] pub fn get_bundle_id(&self, id: u32) -> Result { self.bundle_map.get(id).ok_or_else(|| IntegrityError::MissingBundleId(id).into()) } pub fn get_chunk(&mut self, hash: Hash) -> Result>, RepositoryError> { // Find bundle and chunk id in index let found = if let Some(found) = self.index.get(&hash) { found } else { return Ok(None) }; // Lookup bundle id from map let bundle_id = try!(self.get_bundle_id(found.bundle)); // Get chunk from bundle Ok(Some(try!(self.bundles.get_chunk(&bundle_id, found.chunk as usize)))) } #[inline] pub fn put_chunk(&mut self, mode: BundleMode, hash: Hash, data: &[u8]) -> Result<(), RepositoryError> { // If this chunk is in the index, ignore it if self.index.contains(&hash) { return Ok(()) } self.put_chunk_override(mode, hash, data) } fn write_chunk_to_bundle_and_index(&mut self, mode: BundleMode, hash: Hash, data: &[u8]) -> Result<(), RepositoryError> { let writer = match mode { BundleMode::Data => &mut self.data_bundle, BundleMode::Meta => &mut self.meta_bundle }; // ...alocate one if needed if writer.is_none() { *writer = Some(try!(self.bundles.create_bundle( mode, self.config.hash, self.config.compression.clone(), self.config.encryption.clone() ))); } debug_assert!(writer.is_some()); // Add chunk to bundle writer and determine the size of the bundle let writer_obj = writer.as_mut().unwrap(); let chunk_id = try!(writer_obj.add(data, hash)); let bundle_id = match mode { BundleMode::Data => self.next_data_bundle, BundleMode::Meta => self.next_meta_bundle }; // Add location to the index try!(self.index.set(&hash, &Location::new(bundle_id, chunk_id as u32))); Ok(()) } fn finish_bundle(&mut self, mode: BundleMode) -> Result<(), RepositoryError> { // Calculate the next free bundle id now (late lifetime prevents this) let next_free_bundle_id = self.next_free_bundle_id(); let writer = match mode { BundleMode::Data => &mut self.data_bundle, BundleMode::Meta => &mut self.meta_bundle }; if writer.is_none() { return Ok(()) } let bundle_id = match mode { BundleMode::Data => self.next_data_bundle, BundleMode::Meta => self.next_meta_bundle }; let mut finished = None; mem::swap(writer, &mut finished); let bundle = try!(self.bundles.add_bundle(finished.unwrap())); self.bundle_map.set(bundle_id, bundle.id.clone()); if self.next_meta_bundle == bundle_id { self.next_meta_bundle = next_free_bundle_id } if self.next_data_bundle == bundle_id { self.next_data_bundle = next_free_bundle_id } Ok(()) } fn finish_bundle_if_needed(&mut self, mode: BundleMode) -> Result<(), RepositoryError> { let (size, raw_size) = { let writer = match mode { BundleMode::Data => &mut self.data_bundle, BundleMode::Meta => &mut self.meta_bundle }; if let Some(ref writer) = *writer { (writer.size(), writer.raw_size()) } else { return Ok(()) } }; if size >= self.config.bundle_size || raw_size >= 4 * self.config.bundle_size { if mode == BundleMode::Meta { //First store the current data bundle as meta referrs to those chunks try!(self.finish_bundle(BundleMode::Data)) } try!(self.finish_bundle(mode)) } Ok(()) } #[inline] pub fn put_chunk_override(&mut self, mode: BundleMode, hash: Hash, data: &[u8]) -> Result<(), RepositoryError> { try!(self.write_chunk_to_bundle_and_index(mode, hash, data)); self.finish_bundle_if_needed(mode) } #[inline] pub fn put_data(&mut self, mode: BundleMode, data: &[u8]) -> Result { let mut input = Cursor::new(data); self.put_stream(mode, &mut input) } pub fn put_stream(&mut self, mode: BundleMode, data: &mut R) -> Result { let avg_size = self.config.chunker.avg_size(); let mut chunks = Vec::new(); let mut chunk = Vec::with_capacity(avg_size * 2); loop { chunk.clear(); let mut output = Cursor::new(chunk); let res = try!(self.chunker.chunk(data, &mut output)); chunk = output.into_inner(); let hash = self.config.hash.hash(&chunk); try!(self.put_chunk(mode, hash, &chunk)); chunks.push((hash, chunk.len() as u32)); if res == ChunkerStatus::Finished { break } } Ok(chunks.into()) } pub fn get_data(&mut self, chunks: &[Chunk]) -> Result, RepositoryError> { let mut data = Vec::with_capacity(chunks.iter().map(|&(_, size)| size).sum::() as usize); try!(self.get_stream(chunks, &mut data)); Ok(data) } #[inline] pub fn get_reader(&mut self, chunks: ChunkList) -> ChunkReader { ChunkReader::new(self, chunks) } pub fn get_stream(&mut self, chunks: &[Chunk], w: &mut W) -> Result<(), RepositoryError> { for &(ref hash, len) in chunks { let data = try!(try!(self.get_chunk(*hash)).ok_or_else(|| IntegrityError::MissingChunk(hash.clone()))); debug_assert_eq!(data.len() as u32, len); try!(w.write_all(&data)); } Ok(()) } }