First working version

pull/10/head
Dennis Schwerdel 2017-03-10 12:43:32 +01:00
parent 1e460b66d5
commit 231aa9fb58
25 changed files with 3222 additions and 4 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
target
squash
test.tar
test_data

68
Algotest1.txt Normal file
View File

@ -0,0 +1,68 @@
~/shared
Algorithm comparison on file test.tar
Reading input file... done. 2175416320 bytes
Chunker algorithms
Chunk size: 4 KiB
AE: avg chunk size 2756.5 ± 543.6 bytes, 12.1% saved, speed 748.9 MB/s
Rabin: avg chunk size 4902.3 ± 3826.2 bytes, 11.7% saved, speed 336.7 MB/s
FastCdc: avg chunk size 4783.3 ± 1940.5 bytes, 12.1% saved, speed 544.1 MB/s
Chunk size: 8 KiB
AE: avg chunk size 5245.1 ± 890.8 bytes, 10.0% saved, speed 756.3 MB/s
Rabin: avg chunk size 9774.2 ± 7636.0 bytes, 10.3% saved, speed 344.9 MB/s
FastCdc: avg chunk size 9583.2 ± 3933.2 bytes, 10.7% saved, speed 541.6 MB/s
Chunk size: 16 KiB
AE: avg chunk size 10169.5 ± 1485.8 bytes, 7.4% saved, speed 781.5 MB/s
Rabin: avg chunk size 19641.7 ± 15292.5 bytes, 9.0% saved, speed 345.9 MB/s
FastCdc: avg chunk size 19262.9 ± 7697.4 bytes, 9.0% saved, speed 548.1 MB/s
Chunk size: 32 KiB
AE: avg chunk size 20004.6 ± 2705.6 bytes, 5.6% saved, speed 787.0 MB/s
Rabin: avg chunk size 38963.6 ± 30218.2 bytes, 7.6% saved, speed 345.7 MB/s
FastCdc: avg chunk size 39159.3 ± 16834.6 bytes, 7.7% saved, speed 547.1 MB/s
Chunk size: 64 KiB
AE: avg chunk size 39627.2 ± 5310.6 bytes, 3.8% saved, speed 788.2 MB/s
Rabin: avg chunk size 78339.7 ± 60963.7 bytes, 6.4% saved, speed 345.6 MB/s
FastCdc: avg chunk size 76981.4 ± 30784.6 bytes, 6.1% saved, speed 548.4 MB/s
Hash algorithms
Blake2: 724.2 MB/s
Murmur3: 5358.3 MB/s
Compression algorithms
Snappy: ratio: 83.6%, compress: 301.7 MB/s, decompress: 876.2 MB/s
fatal runtime error: out of memory
ZStd/1: ratio: 77.2%, compress: 493.9 MB/s, decompress: 0.0 MB/s
ZStd/2: ratio: 76.7%, compress: 420.6 MB/s, decompress: 0.0 MB/s
ZStd/3: ratio: 75.4%, compress: 314.6 MB/s, decompress: 0.0 MB/s
ZStd/4: ratio: 75.3%, compress: 273.0 MB/s, decompress: 0.0 MB/s
ZStd/5: ratio: 74.9%, compress: 131.4 MB/s, decompress: 0.0 MB/s
ZStd/6: ratio: 73.6%, compress: 121.4 MB/s, decompress: 0.0 MB/s
ZStd/7: ratio: 73.5%, compress: 88.7 MB/s, decompress: 0.0 MB/s
ZStd/8: ratio: 73.4%, compress: 76.8 MB/s, decompress: 0.0 MB/s
ZStd/9: ratio: 73.3%, compress: 51.8 MB/s, decompress: 0.0 MB/s
Deflate/1: ratio: 78.3%, compress: 95.7 MB/s, decompress: 0.0 MB/s
Deflate/2: ratio: 78.2%, compress: 94.7 MB/s, decompress: 0.0 MB/s
Deflate/3: ratio: 78.1%, compress: 92.5 MB/s, decompress: 0.0 MB/s
Deflate/4: ratio: 78.0%, compress: 87.9 MB/s, decompress: 0.0 MB/s
Deflate/5: ratio: 77.8%, compress: 86.5 MB/s, decompress: 0.0 MB/s
Deflate/6: ratio: 77.7%, compress: 83.8 MB/s, decompress: 0.0 MB/s
Deflate/7: ratio: 77.7%, compress: 73.4 MB/s, decompress: 0.0 MB/s
Deflate/8: ratio: 77.6%, compress: 31.6 MB/s, decompress: 0.0 MB/s
Deflate/9: ratio: 77.4%, compress: 25.8 MB/s, decompress: 0.0 MB/s
Brotli/1: ratio: 77.6%, compress: 433.1 MB/s, decompress: 0.0 MB/s
Brotli/2: ratio: 75.4%, compress: 242.2 MB/s, decompress: 0.0 MB/s
Brotli/3: ratio: 75.3%, compress: 195.5 MB/s, decompress: 0.0 MB/s
Brotli/4: ratio: 72.4%, compress: 81.6 MB/s, decompress: 0.0 MB/s
Brotli/5: ratio: 73.9%, compress: 62.4 MB/s, decompress: 0.0 MB/s
Brotli/6: ratio: 72.9%, compress: 46.6 MB/s, decompress: 0.0 MB/s
Brotli/7: ratio: 71.5%, compress: 23.4 MB/s, decompress: 0.0 MB/s
Brotli/8: ratio: 71.5%, compress: 20.7 MB/s, decompress: 0.0 MB/s
Brotli/9: ratio: 71.2%, compress: 11.2 MB/s, decompress: 0.0 MB/s
Lzma2/1: ratio: 69.8%, compress: 4.2 MB/s, decompress: 0.0 MB/s

246
Cargo.lock generated Normal file
View File

@ -0,0 +1,246 @@
[root]
name = "zvault"
version = "0.1.0"
dependencies = [
"blake2-rfc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)",
"mmap 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"murmurhash3 0.0.5 (registry+https://github.com/rust-lang/crates.io-index)",
"quick-error 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rmp-serde 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)",
"rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 0.9.11 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_utils 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_yaml 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
"squash-sys 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "bitflags"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "blake2-rfc"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"constant_time_eq 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "byteorder"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "constant_time_eq"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "gcc"
version = "0.3.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "kernel32-sys"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "libc"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "libc"
version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "linked-hash-map"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "mmap"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
"tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "murmurhash3"
version = "0.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "num-traits"
version = "0.1.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "pkg-config"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "quick-error"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "rand"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "redox_syscall"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "rmp"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"num-traits 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rmp-serde"
version = "0.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rmp 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 0.9.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rust-crypto"
version = "0.2.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"gcc 0.3.43 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rustc-serialize"
version = "0.3.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "serde"
version = "0.9.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "serde_utils"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"serde 0.9.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "serde_yaml"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 0.9.11 (registry+https://github.com/rust-lang/crates.io-index)",
"yaml-rust 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "squash-sys"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)",
"pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tempdir"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "time"
version = "0.1.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)",
"redox_syscall 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "winapi"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "winapi-build"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "yaml-rust"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[metadata]
"checksum bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d"
"checksum blake2-rfc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)" = "0c6a476f32fef3402f1161f89d0d39822809627754a126f8441ff2a9d45e2d59"
"checksum byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c40977b0ee6b9885c9013cd41d9feffdd22deb3bb4dc3a71d901cc7a77de18c8"
"checksum constant_time_eq 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "07dcb7959f0f6f1cf662f9a7ff389bcb919924d99ac41cf31f10d611d8721323"
"checksum gcc 0.3.43 (registry+https://github.com/rust-lang/crates.io-index)" = "c07c758b972368e703a562686adb39125707cc1ef3399da8c019fc6c2498a75d"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum libc 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "e32a70cf75e5846d53a673923498228bbec6a8624708a9ea5645f075d6276122"
"checksum libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)" = "88ee81885f9f04bff991e306fea7c1c60a5f0f9e409e99f6b40e3311a3363135"
"checksum linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6d262045c5b87c0861b3f004610afd0e2c851e2908d08b6c870cbb9d5f494ecd"
"checksum mmap 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0bc85448a6006dd2ba26a385a564a8a0f1f2c7e78c70f1a70b2e0f4af286b823"
"checksum murmurhash3 0.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "a2983372caf4480544083767bf2d27defafe32af49ab4df3a0b7fc90793a3664"
"checksum num-traits 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)" = "e1cbfa3781f3fe73dc05321bed52a06d2d491eaa764c52335cf4399f046ece99"
"checksum pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3a8b4c6b8165cd1a1cd4b9b120978131389f64bdaf456435caa41e630edba903"
"checksum quick-error 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0aad603e8d7fb67da22dbdf1f4b826ce8829e406124109e73cf1b2454b93a71c"
"checksum rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "022e0636ec2519ddae48154b028864bdce4eaf7d35226ab8e65c611be97b189d"
"checksum redox_syscall 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "8dd35cc9a8bdec562c757e3d43c1526b5c6d2653e23e2315065bc25556550753"
"checksum rmp 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)" = "e59917c01f49718a59c644a621a4848aafc6577c4a47d66270d78951a807541a"
"checksum rmp-serde 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)" = "06ec4d0cdea2645de5d0e649f90c3e654205d913e14adefa452257314a24e76e"
"checksum rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)" = "f76d05d3993fd5f4af9434e8e436db163a12a9d40e1a58a726f27a01dfd12a2a"
"checksum rustc-serialize 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)" = "237546c689f20bb44980270c73c3b9edd0891c1be49cc1274406134a66d3957b"
"checksum serde 0.9.11 (registry+https://github.com/rust-lang/crates.io-index)" = "a702319c807c016e51f672e5c77d6f0b46afddd744b5e437d6b8436b888b458f"
"checksum serde_utils 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b34a52969c7fc0254e214b82518c9a95dc88c84fc84cd847add314996a031be6"
"checksum serde_yaml 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f8bd3f24ad8c7bcd34a6d70ba676dc11302b96f4f166aa5f947762e01098844d"
"checksum squash-sys 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "db1f9dde91d819b7746e153bc32489fa19e6a106c3d7f2b92187a4efbdc88b40"
"checksum tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "87974a6f5c1dfb344d733055601650059a3363de2a6104819293baff662132d6"
"checksum time 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)" = "211b63c112206356ef1ff9b19355f43740fc3f85960c598a93d3a3d3ba7beade"
"checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
"checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"
"checksum yaml-rust 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e66366e18dc58b46801afbf2ca7661a9f59cc8c5962c29892b6039b4f86fa992"

16
Cargo.toml Normal file
View File

@ -0,0 +1,16 @@
[package]
name = "zvault"
version = "0.1.0"
authors = ["Dennis Schwerdel <schwerdel@informatik.uni-kl.de>"]
[dependencies]
serde = "0.9"
rmp-serde = "0.12"
serde_yaml = "0.6"
serde_utils = "0.5.1"
rust-crypto = "0.2"
squash-sys = "0.9"
mmap = "*"
quick-error = "1.1"
blake2-rfc = "*"
murmurhash3 = "*"

143
README.md
View File

@ -1,10 +1,96 @@
# ZVault Backup solution
## Goals
## Goals / Features
### Space-efficient storage with deduplication
The backup data is split into chunks. Fingerprints make sure that each chunk is
only stored once. The chunking algorithm is designed so that small changes to a
file only change a few chunks and leave most chunks unchanged.
Multiple backups of the same data set will only take up the space of one copy.
The chunks are combined into bundles. Each bundle holds chunks up to a maximum
data size and is compressed as a whole to save space ("solid archive").
### Independent backups
All backups share common data in form of chunks but are independent on a higher
level. Backups can be delete and chunks that are not used by any backup can be
removed.
Other backup solutions use differential backups organized in chains. This makes
those backups dependent on previous backups in the chain, so that those backups
can not be deleted. Also, restoring chained backups is much less efficient.
### Fast backup runs
* Only adding changed files
* In-Memory Hashtable
### Backup verification
* Bundles verification
* Index verification
* File structure verification
## Configuration options
There are several configuration options with trade-offs attached so these are
exposed to users.
### Chunker algorithm
The chunker algorithm is responsible for splitting files into chunks in a way
that survives small changes to the file so that small changes still yield
many matching chunks. The quality of the algorithm affects the deduplication
rate and its speed affects the backup speed.
There are 3 algorithms to choose from:
The **Rabin chunker** is a very common algorithm with a good quality but a
mediocre speed (about 350 MB/s).
The **AE chunker** is a novel approach that can reach very high speeds
(over 750 MB/s) but at a cost of quality.
The **FastCDC** algorithm has a slightly higher quality than the Rabin chunker
and is quite fast (about 550 MB/s).
The recommendation is **FastCDC**.
### Chunk size
The chunk size determines the memory usage during backup runs. For every chunk
in the backup repository, 24 bytes of memory are needed. That means that for
every GiB stored in the repository the following amount of memory is needed:
- 8 KiB chunks => 3 MiB / GiB
- 16 KiB chunks => 1.5 MiB / GiB
- 32 KiB chunks => 750 KiB / GiB
- 64 KiB chunks => 375 KiB / GiB
On the other hand, bigger chunks reduce the deduplication efficiency. Even small
changes of only one byte will result in at least one complete chunk changing.
### Hash algorithm
Blake2
Murmur3
Recommended: Blake2
### Bundle size
10 M
25 M
100 M
Recommended: 25 MiB
### Compression
Recommended: Brotli/2-7
- Blazingly fast backup runs
- Space-efficient storage
- Independent backups
## Design
@ -43,3 +129,52 @@
- Remote block writing and compression/encryption
- Inode data serialization
- Recursive directory scanning, difference calculation, new entry sorting
### ChunkDB
- Stores data in chunks
- A chunk is a file
- Per Chunk properties
- Format version
- Encryption method
- Encryption key
- Compression method / level
- Chunk ID is the hash of the contents
- No locks needed on shared chunk repository !!!
- Chunk ID is calculated after compression and encryption
- Chunk header
- "zvault01"
- Chunk size compressed / raw
- Content hash method / value
- Encryption method / options / key hash
- Compression method / options
- Chunks are write-once read-often
- Chunks are prepared outside the repository
- Only one chunk is being prepared at a time
- Adding data to the chunk returns starting position in raw data
- Operations:
- List available chunks
- Add data
- Flush chunk
- Delete chunk
- Get data
- Check chunk
- Chunk path is `checksum.chunk` or `chec/ksum.chunk`
- Data is added to current chunk and compressed in memory
- Operations on chunk files are just sequencial read/write and delete
- Ability to recompress chunks
### Index
16 Bytes per hash key
8 Bytes data per entry (4 bytes bundle id, 4 bytes chunk id)
=> 24 Bytes per entry
Average chunk sizes
8 Kib => 3 MiB / 1 GiB
16 Kib => 1.5 MiB / 1 GiB
24 Kib => 1.0 MiB / 1 GiB
32 Kib => 750 Kib / 1 GiB
64 Kib => 375 Kib / 1 GiB

122
src/algotest.rs Normal file
View File

@ -0,0 +1,122 @@
use std::io::{Cursor, Read};
use std::fs::File;
use std::time;
use super::chunker::*;
use super::util::*;
fn speed_chunk<C: IChunker>(chunker: &mut C, data: &[u8]) {
let mut input = Cursor::new(data);
let mut chunk = Vec::with_capacity(1_000_000);
loop {
chunk.clear();
let result = chunker.chunk(&mut input, &mut chunk).unwrap();
if result == ChunkerStatus::Finished {
return
}
}
}
fn chunk<C: IChunker>(chunker: &mut C, data: &[u8]) -> Vec<Vec<u8>> {
let mut input = Cursor::new(data);
let mut chunks = Vec::with_capacity(100_000);
loop {
let mut chunk = Vec::with_capacity(100_000);
let result = chunker.chunk(&mut input, &mut chunk).unwrap();
chunks.push(chunk);
if result == ChunkerStatus::Finished {
return chunks;
}
}
}
fn analyze_chunks(mut chunks: Vec<Vec<u8>>) -> (usize, f64, f64, f64) {
let count = chunks.len();
let total = chunks.iter().map(|c| c.len()).sum::<usize>();
let avg_size = total as f64 / count as f64;
let stddev = (chunks.iter().map(|c| (c.len() as f64 - avg_size).powi(2)).sum::<f64>() / (count as f64 - 1.0)).sqrt();
chunks.sort();
chunks.dedup();
let non_dup: usize = chunks.iter().map(|c| c.len()).sum();
let saved = 1.0 - non_dup as f64 / total as f64;
(count, avg_size, stddev, saved)
}
fn compare_chunker<C: IChunker>(name: &str, mut chunker: C, data: &[u8]) {
let start = time::Instant::now();
speed_chunk(&mut chunker, data);
let elapsed = start.elapsed();
let chunks = chunk(&mut chunker, data);
let duration = elapsed.as_secs() as f64 * 1.0 + elapsed.subsec_nanos() as f64 / 1_000_000_000.0;
let speed = data.len() as f64 / duration;
assert_eq!(chunks.iter().map(|c| c.len()).sum::<usize>(), data.len());
let (_count, avg_size, stddev, saved) = analyze_chunks(chunks);
println!("{}: \tavg chunk size {:.1}\t± {:.1} bytes, \t{:.1}% saved,\tspeed {:.1} MB/s",
name, avg_size, stddev, saved * 100.0, speed / 1_000_000.0);
}
fn compare_hash(name: &str, hash: HashMethod, data: &[u8]) {
let start = time::Instant::now();
let _ = hash.hash(data);
let elapsed = start.elapsed();
let duration = elapsed.as_secs() as f64 * 1.0 + elapsed.subsec_nanos() as f64 / 1_000_000_000.0;
let speed = data.len() as f64 / duration;
println!("{}: {:.1} MB/s", name, speed / 1_000_000.0);
}
fn compare_compression(name: &str, method: Compression, data: &[u8]) {
let start = time::Instant::now();
let compressed = method.compress(data).unwrap();
let elapsed = start.elapsed();
let duration = elapsed.as_secs() as f64 * 1.0 + elapsed.subsec_nanos() as f64 / 1_000_000_000.0;
let cspeed = data.len() as f64 / duration;
let ratio = compressed.len() as f64 / data.len() as f64;
/*let start = time::Instant::now();
let uncompressed = method.decompress(&compressed).unwrap();
if uncompressed != data {
panic!("{} did not uncompress to the same value", name);
}
let elapsed = start.elapsed();
let duration = elapsed.as_secs() as f64 * 1.0 + elapsed.subsec_nanos() as f64 / 1_000_000_000.0;*/
let dspeed = 0.0;//data.len() as f64 / duration;
println!("{}:\tratio: {:.1}%,\tcompress: {:.1} MB/s,\tdecompress: {:.1} MB/s",
name, ratio * 100.0, cspeed / 1_000_000.0, dspeed / 1_000_000.0);
}
#[allow(dead_code)]
pub fn run(path: &str) {
println!("Algorithm comparison on file {}", path);
println!();
print!("Reading input file...");
let mut file = File::open(path).unwrap();
let mut data = Vec::new();
file.read_to_end(&mut data).unwrap();
println!(" done. {} bytes", data.len());
println!();
println!("Chunker algorithms");
for size in &[4usize, 8, 16, 32, 64] {
println!(" Chunk size: {} KiB", size);
compare_chunker(" AE", AeChunker::new(size*1024), &data);
compare_chunker(" Rabin", RabinChunker::new(size*1024, 0), &data);
compare_chunker(" FastCdc", FastCdcChunker::new(size*1024, 0), &data);
}
println!();
println!("Hash algorithms");
compare_hash(" Blake2", HashMethod::Blake2, &data);
compare_hash(" Murmur3", HashMethod::Murmur3, &data);
println!();
println!("Compression algorithms");
compare_compression(" Snappy", Compression::Snappy(()), &data);
for level in 1..10 {
compare_compression(&format!(" ZStd/{}", level), Compression::ZStd(level), &data);
}
for level in 1..10 {
compare_compression(&format!(" Deflate/{}", level), Compression::Deflate(level), &data);
}
for level in 1..10 {
compare_compression(&format!(" Brotli/{}", level), Compression::Brotli(level), &data);
}
for level in 1..7 {
compare_compression(&format!(" Lzma2/{}", level), Compression::Lzma2(level), &data);
}
}

474
src/bundle.rs Normal file
View File

@ -0,0 +1,474 @@
use std::path::{Path, PathBuf};
use std::collections::HashMap;
use std::fs::{self, File};
use std::io::{Read, Write, Seek, SeekFrom, BufWriter, BufReader};
use std::cmp::max;
use std::fmt::{self, Debug, Write as FmtWrite};
use std::sync::{Arc, Mutex};
use serde::{self, Serialize, Deserialize};
use serde::bytes::ByteBuf;
use rmp_serde;
use errors::BundleError;
use util::*;
static HEADER_STRING: [u8; 7] = *b"zbundle";
static HEADER_VERSION: u8 = 1;
// TODO: Test cases
// TODO: Benchmarks
#[derive(Hash, PartialEq, Eq, Clone, Default)]
pub struct BundleId(pub Vec<u8>);
impl Serialize for BundleId {
fn serialize<S: serde::Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
ser.serialize_bytes(&self.0)
}
}
impl Deserialize for BundleId {
fn deserialize<D: serde::Deserializer>(de: D) -> Result<Self, D::Error> {
let bytes = try!(ByteBuf::deserialize(de));
Ok(BundleId(bytes.into()))
}
}
impl BundleId {
#[inline]
fn to_string(&self) -> String {
let mut buf = String::with_capacity(self.0.len()*2);
for b in &self.0 {
write!(&mut buf, "{:2x}", b).unwrap()
}
buf
}
}
impl fmt::Display for BundleId {
#[inline]
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(fmt, "{}", self.to_string())
}
}
impl fmt::Debug for BundleId {
#[inline]
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(fmt, "{}", self.to_string())
}
}
#[derive(Clone)]
pub struct BundleHeader {
pub id: BundleId,
pub compression: Option<Compression>,
pub encryption: Option<Encryption>,
pub checksum: Checksum,
pub raw_size: usize,
pub encoded_size: usize,
pub chunk_count: usize,
pub chunk_sizes: Vec<usize>
}
serde_impl!(BundleHeader(u64) {
id: BundleId => 0,
compression: Option<Compression> => 1,
encryption: Option<Encryption> => 2,
checksum: Checksum => 3,
raw_size: usize => 4,
encoded_size: usize => 5,
chunk_count: usize => 6,
chunk_sizes: Vec<usize> => 7
});
impl Default for BundleHeader {
fn default() -> Self {
BundleHeader {
id: BundleId(vec![]),
compression: None,
encryption: None,
checksum: (ChecksumType::Sha3_256, ByteBuf::new()),
raw_size: 0,
encoded_size: 0,
chunk_count: 0,
chunk_sizes: vec![]
}
}
}
pub struct Bundle {
pub id: BundleId,
pub version: u8,
pub path: PathBuf,
crypto: Arc<Mutex<Crypto>>,
pub compression: Option<Compression>,
pub encryption: Option<Encryption>,
pub raw_size: usize,
pub encoded_size: usize,
pub checksum: Checksum,
pub content_start: usize,
pub chunk_count: usize,
pub chunk_sizes: Vec<usize>,
pub chunk_positions: Vec<usize>
}
impl Bundle {
fn new(path: PathBuf, version: u8, content_start: usize, crypto: Arc<Mutex<Crypto>>, header: BundleHeader) -> Self {
let mut chunk_positions = Vec::with_capacity(header.chunk_sizes.len());
let mut pos = 0;
for len in &header.chunk_sizes {
chunk_positions.push(pos);
pos += *len;
}
Bundle {
id: header.id,
version: version,
path: path,
crypto: crypto,
compression: header.compression,
encryption: header.encryption,
raw_size: header.raw_size,
encoded_size: header.encoded_size,
chunk_count: header.chunk_count,
checksum: header.checksum,
content_start: content_start,
chunk_sizes: header.chunk_sizes,
chunk_positions: chunk_positions
}
}
pub fn load(path: PathBuf, crypto: Arc<Mutex<Crypto>>) -> Result<Self, BundleError> {
let mut file = BufReader::new(try!(File::open(&path)
.map_err(|e| BundleError::Read(e, path.clone(), "Failed to open bundle file"))));
let mut header = [0u8; 8];
try!(file.read_exact(&mut header)
.map_err(|e| BundleError::Read(e, path.clone(), "Failed to read bundle header")));
if header[..HEADER_STRING.len()] != HEADER_STRING {
return Err(BundleError::Format(path.clone(), "Wrong header string"))
}
let version = header[HEADER_STRING.len()];
if version != HEADER_VERSION {
return Err(BundleError::Format(path.clone(), "Unsupported bundle file version"))
}
let mut reader = rmp_serde::Deserializer::new(file);
let header = try!(BundleHeader::deserialize(&mut reader)
.map_err(|e| BundleError::Decode(e, path.clone())));
file = reader.into_inner();
let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize;
Ok(Bundle::new(path, version, content_start, crypto, header))
}
#[inline]
fn load_encoded_contents(&self) -> Result<Vec<u8>, BundleError> {
let mut file = BufReader::new(try!(File::open(&self.path)
.map_err(|e| BundleError::Read(e, self.path.clone(), "Failed to open bundle file"))));
try!(file.seek(SeekFrom::Start(self.content_start as u64))
.map_err(|e| BundleError::Read(e, self.path.clone(), "Failed to seek to data")));
let mut data = Vec::with_capacity(max(self.encoded_size, self.raw_size)+1024);
try!(file.read_to_end(&mut data).map_err(|_| "Failed to read data"));
Ok(data)
}
#[inline]
fn decode_contents(&self, mut data: Vec<u8>) -> Result<Vec<u8>, BundleError> {
if let Some(ref encryption) = self.encryption {
data = try!(self.crypto.lock().unwrap().decrypt(encryption.clone(), &data));
}
if let Some(ref compression) = self.compression {
data = try!(compression.decompress(&data));
}
Ok(data)
}
#[inline]
pub fn load_contents(&self) -> Result<Vec<u8>, BundleError> {
self.load_encoded_contents().and_then(|data| self.decode_contents(data))
}
#[inline]
pub fn get_chunk_position(&self, id: usize) -> Result<(usize, usize), BundleError> {
if id >= self.chunk_count {
return Err("Invalid chunk id".into())
}
Ok((self.chunk_positions[id], self.chunk_sizes[id]))
}
pub fn check(&self, full: bool) -> Result<(), BundleError> {
if self.chunk_count != self.chunk_sizes.len() {
return Err(BundleError::Integrity(self.id.clone(),
"Chunk list size does not match chunk count"))
}
if self.chunk_sizes.iter().sum::<usize>() != self.raw_size {
return Err(BundleError::Integrity(self.id.clone(),
"Individual chunk sizes do not add up to total size"))
}
if !full {
let size = try!(fs::metadata(&self.path)
.map_err(|e| BundleError::Read(e, self.path.clone(), "Failed to get size of file"))
).len();
if size as usize != self.encoded_size + self.content_start {
return Err(BundleError::Integrity(self.id.clone(),
"File size does not match size in header, truncated file"))
}
return Ok(())
}
let encoded_contents = try!(self.load_encoded_contents());
if self.encoded_size != encoded_contents.len() {
return Err(BundleError::Integrity(self.id.clone(),
"Encoded data size does not match size in header, truncated bundle"))
}
let contents = try!(self.decode_contents(encoded_contents));
if self.raw_size != contents.len() {
return Err(BundleError::Integrity(self.id.clone(),
"Raw data size does not match size in header, truncated bundle"))
}
//TODO: verify checksum
Ok(())
}
}
impl Debug for Bundle {
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(fmt, "Bundle(\n\tid: {}\n\tpath: {:?}\n\tchunks: {}\n\tsize: {}, encoded: {}\n\tcompression: {:?}\n)",
self.id.to_string(), self.path, self.chunk_count, self.raw_size, self.encoded_size, self.compression)
}
}
pub struct BundleWriter {
data: Vec<u8>,
compression: Option<Compression>,
compression_stream: Option<CompressionStream>,
encryption: Option<Encryption>,
crypto: Arc<Mutex<Crypto>>,
checksum: ChecksumCreator,
raw_size: usize,
chunk_count: usize,
chunk_sizes: Vec<usize>
}
impl BundleWriter {
fn new(compression: Option<Compression>, encryption: Option<Encryption>, crypto: Arc<Mutex<Crypto>>, checksum: ChecksumType) -> Result<Self, BundleError> {
let compression_stream = match compression {
Some(ref compression) => Some(try!(compression.compress_stream())),
None => None
};
Ok(BundleWriter {
data: vec![],
compression: compression,
compression_stream: compression_stream,
encryption: encryption,
crypto: crypto,
checksum: ChecksumCreator::new(checksum),
raw_size: 0,
chunk_count: 0,
chunk_sizes: vec![]
})
}
pub fn add(&mut self, chunk: &[u8]) -> Result<usize, BundleError> {
if let Some(ref mut stream) = self.compression_stream {
try!(stream.process(chunk, &mut self.data))
} else {
self.data.extend_from_slice(chunk)
}
self.checksum.update(chunk);
self.raw_size += chunk.len();
self.chunk_count += 1;
self.chunk_sizes.push(chunk.len());
Ok(self.chunk_count-1)
}
fn finish(mut self, db: &BundleDb) -> Result<Bundle, BundleError> {
if let Some(stream) = self.compression_stream {
try!(stream.finish(&mut self.data))
}
if let Some(ref encryption) = self.encryption {
self.data = try!(self.crypto.lock().unwrap().encrypt(encryption.clone(), &self.data));
}
let encoded_size = self.data.len();
let checksum = self.checksum.finish();
let id = BundleId(checksum.1.to_vec());
let (folder, file) = db.bundle_path(&id);
let path = folder.join(file);
try!(fs::create_dir_all(&folder)
.map_err(|e| BundleError::Write(e, path.clone(), "Failed to create folder")));
let mut file = BufWriter::new(try!(File::create(&path)
.map_err(|e| BundleError::Write(e, path.clone(), "Failed to create bundle file"))));
try!(file.write_all(&HEADER_STRING)
.map_err(|e| BundleError::Write(e, path.clone(), "Failed to write bundle header")));
try!(file.write_all(&[HEADER_VERSION])
.map_err(|e| BundleError::Write(e, path.clone(), "Failed to write bundle header")));
let header = BundleHeader {
checksum: checksum,
compression: self.compression,
encryption: self.encryption,
chunk_count: self.chunk_count,
id: id.clone(),
raw_size: self.raw_size,
encoded_size: encoded_size,
chunk_sizes: self.chunk_sizes
};
{
let mut writer = rmp_serde::Serializer::new(&mut file);
try!(header.serialize(&mut writer)
.map_err(|e| BundleError::Encode(e, path.clone())));
}
let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize;
try!(file.write_all(&self.data)
.map_err(|e| BundleError::Write(e, path.clone(), "Failed to write bundle data")));
Ok(Bundle::new(path, HEADER_VERSION, content_start, self.crypto, header))
}
#[inline]
pub fn size(&self) -> usize {
self.data.len()
}
}
pub struct BundleDb {
path: PathBuf,
compression: Option<Compression>,
encryption: Option<Encryption>,
crypto: Arc<Mutex<Crypto>>,
checksum: ChecksumType,
bundles: HashMap<BundleId, Bundle>,
bundle_cache: LruCache<BundleId, Vec<u8>>
}
impl BundleDb {
fn new(path: PathBuf, compression: Option<Compression>, encryption: Option<Encryption>, checksum: ChecksumType) -> Self {
BundleDb {
path: path,
compression:
compression,
crypto: Arc::new(Mutex::new(Crypto::new())),
encryption: encryption,
checksum: checksum,
bundles: HashMap::new(),
bundle_cache: LruCache::new(5, 10)
}
}
fn bundle_path(&self, bundle: &BundleId) -> (PathBuf, PathBuf) {
let mut folder = self.path.clone();
let mut file = bundle.to_string() + ".bundle";
let mut count = self.bundles.len();
while count >= 1000 {
if file.len() < 10 {
break
}
folder = folder.join(&file[0..3]);
file = file[3..].to_string();
count /= 1000;
}
(folder, file.into())
}
fn load_bundle_list(&mut self) -> Result<(), BundleError> {
self.bundles.clear();
let mut paths = Vec::new();
paths.push(self.path.clone());
while let Some(path) = paths.pop() {
for entry in try!(fs::read_dir(path).map_err(BundleError::List)) {
let entry = try!(entry.map_err(BundleError::List));
let path = entry.path();
if path.is_dir() {
paths.push(path);
} else {
let bundle = try!(Bundle::load(path, self.crypto.clone()));
self.bundles.insert(bundle.id.clone(), bundle);
}
}
}
Ok(())
}
#[inline]
pub fn open<P: AsRef<Path>>(path: P, compression: Option<Compression>, encryption: Option<Encryption>, checksum: ChecksumType) -> Result<Self, BundleError> {
let path = path.as_ref().to_owned();
let mut self_ = Self::new(path, compression, encryption, checksum);
try!(self_.load_bundle_list());
Ok(self_)
}
#[inline]
pub fn create<P: AsRef<Path>>(path: P, compression: Option<Compression>, encryption: Option<Encryption>, checksum: ChecksumType) -> Result<Self, BundleError> {
let path = path.as_ref().to_owned();
try!(fs::create_dir_all(&path)
.map_err(|e| BundleError::Write(e, path.clone(), "Failed to create folder")));
Ok(Self::new(path, compression, encryption, checksum))
}
#[inline]
pub fn open_or_create<P: AsRef<Path>>(path: P, compression: Option<Compression>, encryption: Option<Encryption>, checksum: ChecksumType) -> Result<Self, BundleError> {
if path.as_ref().exists() {
Self::open(path, compression, encryption, checksum)
} else {
Self::create(path, compression, encryption, checksum)
}
}
#[inline]
pub fn create_bundle(&self) -> Result<BundleWriter, BundleError> {
BundleWriter::new(self.compression.clone(), self.encryption.clone(), self.crypto.clone(), self.checksum)
}
pub fn get_chunk(&mut self, bundle_id: &BundleId, id: usize) -> Result<Vec<u8>, BundleError> {
let bundle = try!(self.bundles.get(bundle_id).ok_or("Bundle not found"));
let (pos, len) = try!(bundle.get_chunk_position(id));
let mut chunk = Vec::with_capacity(len);
if let Some(data) = self.bundle_cache.get(bundle_id) {
chunk.extend_from_slice(&data[pos..pos+len]);
return Ok(chunk);
}
let data = try!(bundle.load_contents());
chunk.extend_from_slice(&data[pos..pos+len]);
self.bundle_cache.put(bundle_id.clone(), data);
Ok(chunk)
}
#[inline]
pub fn add_bundle(&mut self, bundle: BundleWriter) -> Result<&Bundle, BundleError> {
let bundle = try!(bundle.finish(&self));
let id = bundle.id.clone();
self.bundles.insert(id.clone(), bundle);
Ok(self.get_bundle(&id).unwrap())
}
#[inline]
pub fn get_bundle(&self, bundle: &BundleId) -> Option<&Bundle> {
self.bundles.get(bundle)
}
#[inline]
pub fn list_bundles(&self) -> Vec<&Bundle> {
self.bundles.values().collect()
}
#[inline]
pub fn delete_bundle(&mut self, bundle: &BundleId) -> Result<(), BundleError> {
if let Some(bundle) = self.bundles.remove(bundle) {
fs::remove_file(&bundle.path).map_err(|e| BundleError::Remove(e, bundle.id.clone()))
} else {
Err("No such bundle".into())
}
}
#[inline]
pub fn check(&self, full: bool) -> Result<(), BundleError> {
for bundle in self.bundles.values() {
try!(bundle.check(full))
}
Ok(())
}
}

70
src/chunker/ae.rs Normal file
View File

@ -0,0 +1,70 @@
use super::*;
//use std::f64::consts;
use std::ptr;
// AE Chunker
// Paper: "AE: An Asymmetric Extremum Content Defined Chunking Algorithm for Fast and Bandwidth-Efficient Data Deduplication"
pub struct AeChunker {
buffer: [u8; 4096],
buffered: usize,
avg_size: usize,
window_size: usize
}
impl AeChunker {
pub fn new(avg_size: usize) -> AeChunker {
// Experiments show that this claim from the paper is wrong and results in smaller chunks
//let window_size = (avg_size as f64 / (consts::E - 1.0)) as usize;
let window_size = avg_size - 256;
AeChunker{
buffer: [0; 4096],
buffered: 0,
window_size: window_size,
avg_size: avg_size
}
}
}
impl IChunker for AeChunker {
#[inline]
fn get_type(&self) -> ChunkerType {
ChunkerType::Ae(self.avg_size)
}
#[allow(unknown_lints,explicit_counter_loop)]
fn chunk<R: Read, W: Write>(&mut self, r: &mut R, mut w: &mut W) -> Result<ChunkerStatus, ChunkerError> {
let mut max;
let mut pos = 0;
let mut max_pos = 0;
let mut max_val = 0;
loop {
// Fill the buffer, there might be some bytes still in there from last chunk
max = try!(r.read(&mut self.buffer[self.buffered..]).map_err(ChunkerError::Read)) + self.buffered;
// If nothing to do, finish
if max == 0 {
return Ok(ChunkerStatus::Finished)
}
for i in 0..max {
let val = self.buffer[i];
if val <= max_val {
if pos == max_pos + self.window_size {
// Write all bytes from this chunk out to sink and store rest for next chunk
try!(w.write_all(&self.buffer[..i+1]).map_err(ChunkerError::Write));
unsafe { ptr::copy(self.buffer[i+1..].as_ptr(), self.buffer.as_mut_ptr(), max-i-1) };
self.buffered = max-i-1;
return Ok(ChunkerStatus::Continue);
}
} else {
max_val = val;
max_pos = pos;
}
pos += 1;
}
try!(w.write_all(&self.buffer[..max]).map_err(ChunkerError::Write));
self.buffered = 0;
}
}
}

120
src/chunker/fastcdc.rs Normal file
View File

@ -0,0 +1,120 @@
use super::*;
use std::ptr;
// FastCDC
// Paper: "FastCDC: a Fast and Efficient Content-Defined Chunking Approach for Data Deduplication"
// Paper-URL: https://www.usenix.org/system/files/conference/atc16/atc16-paper-xia.pdf
// Presentation: https://www.usenix.org/sites/default/files/conference/protected-files/atc16_slides_xia.pdf
// Creating 256 pseudo-random values (based on Knuth's MMIX)
fn create_gear(seed: u64) -> [u64; 256] {
let mut table = [0u64; 256];
let a = 6364136223846793005;
let c = 1442695040888963407;
let mut v = seed;
for t in &mut table.iter_mut() {
v = v.wrapping_mul(a).wrapping_add(c);
*t = v;
}
table
}
fn get_masks(avg_size: usize, nc_level: usize, seed: u64) -> (u64, u64) {
let bits = (avg_size.next_power_of_two() - 1).count_ones();
if bits == 13 {
// From the paper
return (0x0003590703530000, 0x0000d90003530000);
}
let mut mask = 0u64;
let mut v = seed;
let a = 6364136223846793005;
let c = 1442695040888963407;
while mask.count_ones() < bits - nc_level as u32 {
v = v.wrapping_mul(a).wrapping_add(c);
mask = (mask | 1).rotate_left(v as u32 & 0x3f);
}
let mask_long = mask;
while mask.count_ones() < bits + nc_level as u32 {
v = v.wrapping_mul(a).wrapping_add(c);
mask = (mask | 1).rotate_left(v as u32 & 0x3f);
}
let mask_short = mask;
(mask_short, mask_long)
}
pub struct FastCdcChunker {
buffer: [u8; 4096],
buffered: usize,
gear: [u64; 256],
min_size: usize,
max_size: usize,
avg_size: usize,
mask_long: u64,
mask_short: u64,
seed: u64
}
impl FastCdcChunker {
pub fn new(avg_size: usize, seed: u64) -> Self {
let (mask_short, mask_long) = get_masks(avg_size, 2, seed);
FastCdcChunker {
buffer: [0; 4096],
buffered: 0,
gear: create_gear(seed),
min_size: avg_size/4,
max_size: avg_size*8,
avg_size: avg_size,
mask_long: mask_long,
mask_short: mask_short,
seed: seed
}
}
}
impl IChunker for FastCdcChunker {
#[inline]
fn get_type(&self) -> ChunkerType {
ChunkerType::FastCdc((self.avg_size, self.seed))
}
#[allow(unknown_lints,explicit_counter_loop)]
fn chunk<R: Read, W: Write>(&mut self, r: &mut R, mut w: &mut W) -> Result<ChunkerStatus, ChunkerError> {
let mut max;
let mut hash = 0u64;
let mut pos = 0;
loop {
// Fill the buffer, there might be some bytes still in there from last chunk
max = try!(r.read(&mut self.buffer[self.buffered..]).map_err(ChunkerError::Read)) + self.buffered;
// If nothing to do, finish
if max == 0 {
return Ok(ChunkerStatus::Finished)
}
for i in 0..max {
if pos >= self.min_size {
// Hash update
hash = (hash << 1).wrapping_add(self.gear[self.buffer[i] as usize]);
// 3 options for break point
// 1) mask_short matches and chunk is smaller than average
// 2) mask_long matches and chunk is longer or equal to average
// 3) chunk reached max_size
if pos < self.avg_size && hash & self.mask_short == 0
|| pos >= self.avg_size && hash & self.mask_long == 0
|| pos >= self.max_size {
// Write all bytes from this chunk out to sink and store rest for next chunk
try!(w.write_all(&self.buffer[..i+1]).map_err(ChunkerError::Write));
unsafe { ptr::copy(self.buffer[i+1..].as_ptr(), self.buffer.as_mut_ptr(), max-i-1) };
self.buffered = max-i-1;
return Ok(ChunkerStatus::Continue);
}
}
pos += 1;
}
try!(w.write_all(&self.buffer[..max]).map_err(ChunkerError::Write));
self.buffered = 0;
}
}
}

118
src/chunker/mod.rs Normal file
View File

@ -0,0 +1,118 @@
use std::io::{Write, Read};
use super::errors::ChunkerError;
mod ae;
mod rabin;
mod fastcdc;
pub use self::ae::AeChunker;
pub use self::rabin::RabinChunker;
pub use self::fastcdc::FastCdcChunker;
// https://moinakg.wordpress.com/2013/06/22/high-performance-content-defined-chunking/
// Paper: "A Comprehensive Study of the Past, Present, and Future of Data Deduplication"
// Paper-URL: http://wxia.hustbackup.cn/IEEE-Survey-final.pdf
// https://borgbackup.readthedocs.io/en/stable/internals.html#chunks
// https://github.com/bup/bup/blob/master/lib/bup/bupsplit.c
#[derive(Debug, Eq, PartialEq)]
pub enum ChunkerStatus {
Continue,
Finished
}
pub trait IChunker: Sized {
fn chunk<R: Read, W: Write>(&mut self, r: &mut R, w: &mut W) -> Result<ChunkerStatus, ChunkerError>;
fn get_type(&self) -> ChunkerType;
}
pub enum Chunker {
Ae(Box<AeChunker>),
Rabin(Box<RabinChunker>),
FastCdc(Box<FastCdcChunker>)
}
impl IChunker for Chunker {
#[inline]
fn get_type(&self) -> ChunkerType {
match *self {
Chunker::Ae(ref c) => c.get_type(),
Chunker::Rabin(ref c) => c.get_type(),
Chunker::FastCdc(ref c) => c.get_type()
}
}
#[inline]
fn chunk<R: Read, W: Write>(&mut self, r: &mut R, w: &mut W) -> Result<ChunkerStatus, ChunkerError> {
match *self {
Chunker::Ae(ref mut c) => c.chunk(r, w),
Chunker::Rabin(ref mut c) => c.chunk(r, w),
Chunker::FastCdc(ref mut c) => c.chunk(r, w)
}
}
}
#[derive(Debug)]
pub enum ChunkerType {
Ae(usize),
Rabin((usize, u32)),
FastCdc((usize, u64))
}
serde_impl!(ChunkerType(u64) {
Ae(usize) => 1,
Rabin((usize, u32)) => 2,
FastCdc((usize, u64)) => 3
});
impl ChunkerType {
#[inline]
pub fn from(name: &str, avg_size: usize, seed: u64) -> Result<Self, &'static str> {
match name {
"ae" => Ok(ChunkerType::Ae(avg_size)),
"rabin" => Ok(ChunkerType::Rabin((avg_size, seed as u32))),
"fastcdc" => Ok(ChunkerType::FastCdc((avg_size, seed))),
_ => Err("Unsupported chunker type")
}
}
#[inline]
pub fn create(&self) -> Chunker {
match *self {
ChunkerType::Ae(size) => Chunker::Ae(Box::new(AeChunker::new(size))),
ChunkerType::Rabin((size, seed)) => Chunker::Rabin(Box::new(RabinChunker::new(size, seed))),
ChunkerType::FastCdc((size, seed)) => Chunker::FastCdc(Box::new(FastCdcChunker::new(size, seed)))
}
}
#[inline]
pub fn name(&self) -> &'static str {
match *self {
ChunkerType::Ae(_size) => "ae",
ChunkerType::Rabin((_size, _seed)) => "rabin",
ChunkerType::FastCdc((_size, _seed)) => "fastcdc"
}
}
#[inline]
pub fn avg_size(&self) -> usize {
match *self {
ChunkerType::Ae(size) => size,
ChunkerType::Rabin((size, _seed)) => size,
ChunkerType::FastCdc((size, _seed)) => size
}
}
#[inline]
pub fn seed(&self) -> u64 {
match *self {
ChunkerType::Ae(_size) => 0,
ChunkerType::Rabin((_size, seed)) => seed as u64,
ChunkerType::FastCdc((_size, seed)) => seed
}
}
}

116
src/chunker/rabin.rs Normal file
View File

@ -0,0 +1,116 @@
use std::collections::VecDeque;
use std::ptr;
use super::*;
// Rabin Chunker
// Paper: "Fingerprinting by Random Polynomials"
// Paper-URL: http://www.xmailserver.org/rabin.pdf
// Paper: "Redundancy Elimination Within Large Collections of Files"
// Paper-URL: https://www.usenix.org/legacy/event/usenix04/tech/general/full_papers/kulkarni/kulkarni_html/paper.html
// Wikipedia: https://en.wikipedia.org/wiki/Rabin_fingerprint
fn wrapping_pow(mut base: u32, mut exp: u32) -> u32 {
let mut acc: u32 = 1;
while exp > 0 {
if exp % 2 == 1 {
acc = acc.wrapping_mul(base)
}
base = base.wrapping_mul(base);
exp /= 2;
}
acc
}
fn create_table(alpha: u32, window_size: usize) -> [u32; 256] {
let mut table = [0u32; 256];
let a = wrapping_pow(alpha, window_size as u32);
for i in 0..table.len() as u32 {
table[i as usize] = i.wrapping_mul(a);
}
table
}
pub struct RabinChunker {
buffer: [u8; 4096],
buffered: usize,
seed: u32,
alpha: u32,
table: [u32; 256],
min_size: usize,
max_size: usize,
window_size: usize,
chunk_mask: u32,
avg_size: usize
}
impl RabinChunker {
pub fn new(avg_size: usize, seed: u32) -> Self {
let chunk_mask = (avg_size as u32).next_power_of_two() - 1;
let window_size = avg_size/4-1;
let alpha = 1664525;//153191;
RabinChunker {
buffer: [0; 4096],
buffered: 0,
table: create_table(alpha, window_size),
alpha: alpha,
seed: seed,
min_size: avg_size/4,
max_size: avg_size*4,
window_size: window_size,
chunk_mask: chunk_mask,
avg_size: avg_size
}
}
}
impl IChunker for RabinChunker {
#[inline]
fn get_type(&self) -> ChunkerType {
ChunkerType::Rabin((self.avg_size, self.seed))
}
#[allow(unknown_lints,explicit_counter_loop)]
fn chunk<R: Read, W: Write>(&mut self, r: &mut R, mut w: &mut W) -> Result<ChunkerStatus, ChunkerError> {
let mut max;
let mut hash = 0u32;
let mut pos = 0;
let mut window = VecDeque::with_capacity(self.window_size);
loop {
// Fill the buffer, there might be some bytes still in there from last chunk
max = try!(r.read(&mut self.buffer[self.buffered..]).map_err(ChunkerError::Read)) + self.buffered;
// If nothing to do, finish
if max == 0 {
return Ok(ChunkerStatus::Finished)
}
for i in 0..max {
let val = self.buffer[i];
if pos >= self.max_size {
try!(w.write_all(&self.buffer[..i+1]).map_err(ChunkerError::Write));
unsafe { ptr::copy(self.buffer[i+1..].as_ptr(), self.buffer.as_mut_ptr(), max-i-1) };
self.buffered = max-i-1;
return Ok(ChunkerStatus::Continue);
}
// Hash update
hash = hash.wrapping_mul(self.alpha).wrapping_add(val as u32);
if pos >= self.window_size {
let take = window.pop_front().unwrap();
hash = hash.wrapping_sub(self.table[take as usize]);
if pos >= self.min_size && ((hash ^ self.seed) & self.chunk_mask) == 0 {
try!(w.write_all(&self.buffer[..i+1]).map_err(ChunkerError::Write));
unsafe { ptr::copy(self.buffer[i+1..].as_ptr(), self.buffer.as_mut_ptr(), max-i-1) };
self.buffered = max-i-1;
return Ok(ChunkerStatus::Continue);
}
}
pos += 1;
window.push_back(val);
}
try!(w.write_all(&self.buffer[..max]).map_err(ChunkerError::Write));
self.buffered = 0;
}
}
}

72
src/errors.rs Normal file
View File

@ -0,0 +1,72 @@
use std::io;
use std::path::PathBuf;
use rmp_serde::decode::Error as MsgpackDecode;
use rmp_serde::encode::Error as MsgpackEncode;
use super::bundle::BundleId;
quick_error!{
#[derive(Debug)]
pub enum BundleError {
List(err: io::Error) {
cause(err)
description("Failed to list bundles")
}
Read(err: io::Error, path: PathBuf, reason: &'static str) {
cause(err)
description("Failed to read bundle")
display("Failed to read bundle {:?}: {}", path, reason)
}
Decode(err: MsgpackDecode, path: PathBuf) {
cause(err)
description("Failed to decode bundle header")
}
Write(err: io::Error, path: PathBuf, reason: &'static str) {
cause(err)
description("Failed to write bundle")
display("Failed to write bundle {:?}: {}", path, reason)
}
Encode(err: MsgpackEncode, path: PathBuf) {
cause(err)
description("Failed to encode bundle header")
}
Format(path: PathBuf, reason: &'static str) {
description("Failed to decode bundle")
display("Failed to decode bundle {:?}: {}", path, reason)
}
Integrity(bundle: BundleId, reason: &'static str) {
description("Bundle has an integrity error")
display("Bundle {:?} has an integrity error: {}", bundle, reason)
}
Remove(err: io::Error, bundle: BundleId) {
cause(err)
description("Failed to remove bundle")
display("Failed to remove bundle {}", bundle)
}
Custom {
from(&'static str)
description("Custom error")
}
}
}
quick_error!{
#[derive(Debug)]
pub enum ChunkerError {
Read(err: io::Error) {
from(err)
cause(err)
description("Failed to read")
}
Write(err: io::Error) {
from(err)
cause(err)
description("Failed to write")
}
Custom {
from(&'static str)
description("Custom error")
}
}
}

495
src/index.rs Normal file
View File

@ -0,0 +1,495 @@
use super::util::Hash;
use std::path::Path;
use std::fs::{File, OpenOptions};
use std::mem;
use std::io;
use std::slice;
use std::os::unix::io::AsRawFd;
use mmap::{MemoryMap, MapOption, MapError};
const MAGIC: [u8; 7] = *b"zcindex";
const VERSION: u8 = 1;
pub const MAX_USAGE: f64 = 0.9;
pub const MIN_USAGE: f64 = 0.25;
pub const INITIAL_SIZE: usize = 1024;
#[repr(packed)]
pub struct Header {
magic: [u8; 7],
version: u8,
entries: u64,
capacity: u64,
}
#[repr(packed)]
#[derive(Clone, Copy, PartialEq, Debug)]
pub struct Location {
pub bundle: u32,
pub chunk: u32
}
impl Location {
pub fn new(bundle: u32, chunk: u32) -> Self {
Location{ bundle: bundle, chunk: chunk }
}
}
#[repr(packed)]
#[derive(Clone)]
pub struct Entry {
pub key: Hash,
pub data: Location
}
impl Entry {
#[inline]
fn is_used(&self) -> bool {
self.key.low != 0 || self.key.high != 0
}
fn clear(&mut self) {
self.key.low = 0;
self.key.high = 0;
}
}
pub struct Index {
capacity: usize,
entries: usize,
max_entries: usize,
min_entries: usize,
fd: File,
mmap: MemoryMap,
data: &'static mut [Entry]
}
#[derive(Debug)]
pub enum Error {
IOError(io::Error),
MapError(MapError),
NoHeader,
MagicError,
VersionError,
}
#[derive(Debug)]
enum LocateResult {
Found(usize), // Found the key at this position
Hole(usize), // Found a hole at this position while searching for a key
Steal(usize) // Found a spot to steal at this position while searching for a key
}
impl Index {
pub fn new(path: &Path, create: bool) -> Result<Index, Error> {
let fd = try!(OpenOptions::new().read(true).write(true).create(create).open(path).map_err(|e| { Error::IOError(e) }));
if create {
try!(Index::resize_fd(&fd, INITIAL_SIZE));
}
let mmap = try!(Index::map_fd(&fd));
if mmap.len() < mem::size_of::<Header>() {
return Err(Error::NoHeader);
}
let data = Index::mmap_as_slice(&mmap, INITIAL_SIZE as usize);
let mut index = Index{capacity: 0, max_entries: 0, min_entries: 0, entries: 0, fd: fd, mmap: mmap, data: data};
{
let capacity;
let entries;
{
let header = index.header();
if create {
header.magic = MAGIC;
header.version = VERSION;
header.entries = 0;
header.capacity = INITIAL_SIZE as u64;
} else {
if header.magic != MAGIC {
return Err(Error::MagicError);
}
if header.version != VERSION {
return Err(Error::VersionError);
}
}
capacity = header.capacity;
entries = header.entries;
}
index.data = Index::mmap_as_slice(&index.mmap, capacity as usize);
index.set_capacity(capacity as usize);
index.entries = entries as usize;
}
debug_assert!(index.is_consistent(), "Inconsistent after creation");
Ok(index)
}
#[inline]
pub fn open(path: &Path) -> Result<Index, Error> {
Index::new(path, false)
}
#[inline]
pub fn create(path: &Path) -> Result<Index, Error> {
Index::new(path, true)
}
#[inline]
fn map_fd(fd: &File) -> Result<MemoryMap, Error> {
MemoryMap::new(
try!(fd.metadata().map_err(Error::IOError)).len() as usize,
&[MapOption::MapReadable,
MapOption::MapWritable,
MapOption::MapFd(fd.as_raw_fd()),
MapOption::MapNonStandardFlags(0x0001) //libc::consts::os::posix88::MAP_SHARED
]).map_err(|e| { Error::MapError(e) })
}
#[inline]
fn resize_fd(fd: &File, capacity: usize) -> Result<(), Error> {
fd.set_len((mem::size_of::<Header>() + capacity * mem::size_of::<Entry>()) as u64).map_err( Error::IOError)
}
#[inline]
fn mmap_as_slice(mmap: &MemoryMap, len: usize) -> &'static mut [Entry] {
if mmap.len() < mem::size_of::<Header>() + len * mem::size_of::<Entry>() {
panic!("Memory map too small");
}
let ptr = unsafe { mmap.data().offset(mem::size_of::<Header>() as isize) as *mut Entry };
unsafe { slice::from_raw_parts_mut(ptr, len) }
}
#[inline]
fn header(&mut self) -> &mut Header {
if self.mmap.len() < mem::size_of::<Header>() {
panic!("Failed to read beyond end");
}
unsafe { &mut *(self.mmap.data() as *mut Header) }
}
#[inline]
fn set_capacity(&mut self, capacity: usize) {
self.capacity = capacity;
self.min_entries = (capacity as f64 * MIN_USAGE) as usize;
self.max_entries = (capacity as f64 * MAX_USAGE) as usize;
}
fn reinsert(&mut self, start: usize, end: usize) -> Result<(), Error> {
for pos in start..end {
let key;
let data;
{
let entry = &mut self.data[pos];
if !entry.is_used() {
continue;
}
key = entry.key;
data = entry.data;
entry.clear();
}
self.entries -= 1;
try!(self.set(&key, &data));
}
Ok(())
}
fn shrink(&mut self) -> Result<bool, Error> {
if self.entries >= self.min_entries || self.capacity <= INITIAL_SIZE {
return Ok(false)
}
let old_capacity = self.capacity;
let new_capacity = self.capacity / 2;
self.set_capacity(new_capacity);
try!(self.reinsert(new_capacity, old_capacity));
try!(Index::resize_fd(&self.fd, new_capacity));
self.mmap = try!(Index::map_fd(&self.fd));
self.data = Index::mmap_as_slice(&self.mmap, new_capacity);
assert_eq!(self.data.len(), self.capacity);
Ok(true)
}
fn extend(&mut self) -> Result<bool, Error> {
if self.entries <= self.max_entries {
return Ok(false)
}
let new_capacity = 2 * self.capacity;
try!(Index::resize_fd(&self.fd, new_capacity));
self.mmap = try!(Index::map_fd(&self.fd));
self.data = Index::mmap_as_slice(&self.mmap, new_capacity);
self.set_capacity(new_capacity);
assert_eq!(self.data.len(), self.capacity);
try!(self.reinsert(0, new_capacity));
Ok(true)
}
#[allow(dead_code)]
pub fn is_consistent(&self) -> bool {
let mut entries = 0;
for pos in 0..self.capacity {
let entry = &self.data[pos];
if !entry.is_used() {
continue;
}
entries += 1;
match self.locate(&entry.key) {
LocateResult::Found(p) if p == pos => true,
found => {
println!("Inconsistency found: Key {:?} should be at {} but is at {:?}", entry.key, pos, found);
return false
}
};
}
if entries != self.entries {
println!("Inconsistency found: Index contains {} entries, should contain {}", entries, self.entries);
return false
}
true
}
pub fn check(&self) -> Result<(), &'static str> {
//TODO: proper errors instead of string
if self.is_consistent() {
Ok(())
} else {
Err("Inconsistent")
}
}
#[inline]
fn increase_count(&mut self) -> Result<(), Error> {
self.entries += 1;
try!(self.extend());
self.write_header();
Ok(())
}
#[inline]
fn decrease_count(&mut self) -> Result<(), Error> {
self.entries -= 1;
try!(self.shrink());
self.write_header();
Ok(())
}
#[inline]
fn write_header(&mut self) {
let entries = self.entries;
let capacity = self.capacity;
let header = self.header();
header.entries = entries as u64;
header.capacity = capacity as u64;
}
/// Finds the position for this key
/// If the key is in the table, it will be the position of the key,
/// otherwise it will be the position where this key should be inserted
fn locate(&self, key: &Hash) -> LocateResult {
let mut pos = key.hash() as usize % self.capacity;
let mut dist = 0;
loop {
let entry = &self.data[pos];
if !entry.is_used() {
return LocateResult::Hole(pos);
}
if entry.key == *key {
return LocateResult::Found(pos);
}
let odist = (pos + self.capacity - entry.key.hash() as usize % self.capacity) % self.capacity;
if dist > odist {
return LocateResult::Steal(pos);
}
pos = (pos + 1) % self.capacity ;
dist += 1;
}
}
/// Shifts all following entries towards the left if they can get closer to their ideal position.
/// The entry at the given position will be lost.
fn backshift(&mut self, start: usize) {
let mut pos = start;
let mut last_pos;
loop {
last_pos = pos;
pos = (pos + 1) % self.capacity;
{
let entry = &self.data[pos];
if !entry.is_used() {
// we found a hole, stop shifting here
break;
}
if entry.key.hash() as usize % self.capacity == pos {
// we found an entry at the right position, stop shifting here
break;
}
}
self.data[last_pos] = self.data[pos].clone();
}
self.data[last_pos].clear();
}
/// Adds the key, data pair into the table.
/// If the key existed in the table before, it is overwritten and false is returned.
/// Otherwise it will be added to the table and true is returned.
pub fn set(&mut self, key: &Hash, data: &Location) -> Result<bool, Error> {
match self.locate(key) {
LocateResult::Found(pos) => {
self.data[pos].data = *data;
Ok(false)
},
LocateResult::Hole(pos) => {
{
let entry = &mut self.data[pos];
entry.key = *key;
entry.data = *data;
}
try!(self.increase_count());
Ok(true)
},
LocateResult::Steal(pos) => {
let mut stolen_key;
let mut stolen_data;
let mut cur_pos = pos;
{
let entry = &mut self.data[pos];
stolen_key = entry.key;
stolen_data = entry.data;
entry.key = *key;
entry.data = *data;
}
loop {
cur_pos = (cur_pos + 1) % self.capacity;
let entry = &mut self.data[cur_pos];
if entry.is_used() {
mem::swap(&mut stolen_key, &mut entry.key);
mem::swap(&mut stolen_data, &mut entry.data);
} else {
entry.key = stolen_key;
entry.data = stolen_data;
break;
}
}
try!(self.increase_count());
Ok(true)
}
}
}
#[inline]
pub fn contains(&self, key: &Hash) -> bool {
debug_assert!(self.is_consistent(), "Inconsistent before get");
match self.locate(key) {
LocateResult::Found(_) => true,
_ => false
}
}
#[inline]
pub fn get(&self, key: &Hash) -> Option<Location> {
debug_assert!(self.is_consistent(), "Inconsistent before get");
match self.locate(key) {
LocateResult::Found(pos) => Some(self.data[pos].data),
_ => None
}
}
#[inline]
pub fn modify<F>(&mut self, key: &Hash, mut f: F) -> bool where F: FnMut(&mut Location) {
debug_assert!(self.is_consistent(), "Inconsistent before get");
match self.locate(key) {
LocateResult::Found(pos) => {
f(&mut self.data[pos].data);
true
},
_ => false
}
}
#[inline]
pub fn delete(&mut self, key: &Hash) -> Result<bool, Error> {
match self.locate(key) {
LocateResult::Found(pos) => {
self.backshift(pos);
try!(self.decrease_count());
Ok(true)
},
_ => Ok(false)
}
}
pub fn filter<F>(&mut self, mut f: F) -> Result<usize, Error> where F: FnMut(&Hash, &Location) -> bool {
//TODO: is it faster to walk in reverse direction?
let mut deleted = 0;
let mut pos = 0;
while pos < self.capacity {
{
let entry = &mut self.data[pos];
if !entry.is_used() || f(&entry.key, &entry.data) {
pos += 1;
continue;
}
}
self.backshift(pos);
deleted += 1;
}
self.entries -= deleted;
while try!(self.shrink()) {}
self.write_header();
Ok(deleted)
}
#[inline]
pub fn walk<F>(&self, mut f: F) where F: FnMut(&Hash, &Location) {
for pos in 0..self.capacity {
let entry = &self.data[pos];
if entry.is_used() {
f(&entry.key, &entry.data);
}
}
}
#[inline]
pub fn walk_mut<F>(&mut self, mut f: F) where F: FnMut(&Hash, &mut Location) {
for pos in 0..self.capacity {
let entry = &mut self.data[pos];
if entry.is_used() {
f(&entry.key, &mut entry.data);
}
}
}
#[inline]
pub fn next_entry(&self, index: usize) -> Option<usize> {
let mut i = index;
while i < self.capacity && !self.data[i].is_used() {
i += 1;
}
if i == self.capacity {
None
} else {
Some(i)
}
}
#[inline]
pub fn get_entry(&self, index: usize) -> Option<&Entry> {
let entry = &self.data[index];
if entry.is_used() {
Some(entry)
} else {
None
}
}
#[inline]
pub fn len(&self) -> usize {
self.entries
}
#[inline]
pub fn is_empty(&self) -> bool {
self.entries == 0
}
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
}
}

79
src/main.rs Normal file
View File

@ -0,0 +1,79 @@
extern crate serde;
extern crate rmp_serde;
#[macro_use] extern crate serde_utils;
extern crate crypto;
extern crate squash_sys as squash;
extern crate mmap;
extern crate blake2_rfc as blake2;
extern crate murmurhash3;
extern crate serde_yaml;
#[macro_use] extern crate quick_error;
mod errors;
mod util;
mod bundle;
mod index;
mod chunker;
mod repository;
mod algotest;
use chunker::ChunkerType;
use repository::{Repository, Config, Mode};
use util::{ChecksumType, Compression, HashMethod};
use std::path::Path;
use std::fs::File;
use std::env;
use std::io::Read;
use std::time;
fn main() {
let path: &Path = "test_data".as_ref();
let mut repo = if path.exists() {
Repository::open(path).unwrap()
} else {
Repository::create(path, Config {
bundle_size: 1024*1024,
checksum: ChecksumType::Sha3_256,
chunker: ChunkerType::FastCdc((8*1024, 0)),
compression: Some(Compression::Brotli(5)),
hash: HashMethod::Blake2
}).unwrap()
};
print!("Integrity check before...");
repo.check(true).unwrap();
println!(" done.");
let file_path = env::args().nth(1).expect("Need file as argument");
print!("Reading file {}...", file_path);
let mut data = Vec::new();
let mut file = File::open(file_path).unwrap();
file.read_to_end(&mut data).unwrap();
println!(" done. {} bytes", data.len());
print!("Adding data to repository...");
let start = time::Instant::now();
let chunks = repo.put_data(Mode::Content, &data).unwrap();
repo.flush().unwrap();
let elapsed = start.elapsed();
let duration = elapsed.as_secs() as f64 * 1.0 + elapsed.subsec_nanos() as f64 / 1_000_000_000.0;
let write_speed = data.len() as f64 / duration;
println!(" done. {} chunks, {:.1} MB/s", chunks.len(), write_speed / 1_000_000.0);
println!("Integrity check after...");
repo.check(true).unwrap();
println!(" done.");
print!("Reading data from repository...");
let start = time::Instant::now();
let data2 = repo.get_data(&chunks).unwrap();
let elapsed = start.elapsed();
let duration = elapsed.as_secs() as f64 * 1.0 + elapsed.subsec_nanos() as f64 / 1_000_000_000.0;
let read_speed = data.len() as f64 / duration;
assert_eq!(data.len(), data2.len());
println!(" done. {:.1} MB/s", read_speed / 1_000_000.0);
//algotest::run("test.tar");
}

124
src/repository/basic_io.rs Normal file
View File

@ -0,0 +1,124 @@
use std::mem;
use std::io::{Read, Write, Cursor};
use super::{Repository, Mode};
use super::bundle_map::BundleInfo;
use ::index::Location;
use ::util::Hash;
use ::chunker::{IChunker, ChunkerStatus};
impl Repository {
pub fn get_chunk(&mut self, hash: Hash) -> Result<Option<Vec<u8>>, &'static str> {
// Find bundle and chunk id in index
let found = if let Some(found) = self.index.get(&hash) {
found
} else {
return Ok(None)
};
// Lookup bundle id from map
let bundle_id = if let Some(bundle_info) = self.bundle_map.get(found.bundle) {
bundle_info.id.clone()
} else {
return Err("Bundle id not found in map")
};
// Get chunk from bundle
if let Ok(chunk) = self.bundles.get_chunk(&bundle_id, found.chunk as usize) {
Ok(Some(chunk))
} else {
Err("Failed to load chunk from bundle")
}
}
pub fn put_chunk(&mut self, mode: Mode, hash: Hash, data: &[u8]) -> Result<(), &'static str> {
// If this chunk is in the index, ignore it
if self.index.contains(&hash) {
return Ok(())
}
// Calculate the next free bundle id now (late lifetime prevents this)
let next_free_bundle_id = self.next_free_bundle_id();
// Select a bundle writer according to the mode and...
let writer = match mode {
Mode::Content => &mut self.content_bundle,
Mode::Meta => &mut self.meta_bundle
};
// ...alocate one if needed
if writer.is_none() {
*writer = Some(try!(self.bundles.create_bundle().map_err(|_| "Failed to create new bundle")));
}
debug_assert!(writer.is_some());
let chunk_id;
let size;
{
// Add chunk to bundle writer and determine the size of the bundle
let writer_obj = writer.as_mut().unwrap();
chunk_id = try!(writer_obj.add(data).map_err(|_| "Failed to write chunk"));
size = writer_obj.size();
}
let bundle_id = match mode {
Mode::Content => self.next_content_bundle,
Mode::Meta => self.next_meta_bundle
};
// Finish bundle if over maximum size
if size >= self.config.bundle_size {
let mut finished = None;
mem::swap(writer, &mut finished);
let bundle = try!(self.bundles.add_bundle(finished.unwrap()).map_err(|_| "Failed to write finished bundle"));
let bundle_info = BundleInfo{id: bundle.id.clone()};
self.bundle_map.set(bundle_id, bundle_info);
if self.next_meta_bundle == bundle_id {
self.next_meta_bundle = next_free_bundle_id
}
if self.next_content_bundle == bundle_id {
self.next_content_bundle = next_free_bundle_id
}
// Not saving the bundle map, this will be done by flush
}
// Add location to the index
try!(self.index.set(&hash, &Location::new(bundle_id, chunk_id as u32)).map_err(|_| "Failed to add chunk location to index"));
Ok(())
}
#[inline]
pub fn put_data(&mut self, mode: Mode, data: &[u8]) -> Result<Vec<(Hash, usize)>, &'static str> {
let mut input = Cursor::new(data);
self.put_stream(mode, &mut input)
}
pub fn put_stream<R: Read>(&mut self, mode: Mode, data: &mut R) -> Result<Vec<(Hash, usize)>, &'static str> {
let avg_size = self.config.chunker.avg_size();
let mut chunks = Vec::new();
let mut chunk = Vec::with_capacity(avg_size * 2);
loop {
chunk.clear();
let mut output = Cursor::new(chunk);
let res = try!(self.chunker.chunk(data, &mut output).map_err(|_| "Failed to chunk"));
chunk = output.into_inner();
let hash = self.config.hash.hash(&chunk);
try!(self.put_chunk(mode, hash, &chunk).map_err(|_| "Failed to store chunk"));
chunks.push((hash, chunk.len()));
if res == ChunkerStatus::Finished {
break
}
}
Ok(chunks)
}
#[inline]
pub fn get_data(&mut self, chunks: &[(Hash, usize)]) -> Result<Vec<u8>, &'static str> {
let mut data = Vec::with_capacity(chunks.iter().map(|&(_, size)| size).sum());
try!(self.get_stream(chunks, &mut data));
Ok(data)
}
#[inline]
pub fn get_stream<W: Write>(&mut self, chunks: &[(Hash, usize)], w: &mut W) -> Result<(), &'static str> {
for &(ref hash, len) in chunks {
let data = try!(try!(self.get_chunk(*hash).map_err(|_| "Failed to load chunk")).ok_or("Chunk missing"));
debug_assert_eq!(data.len(), len);
try!(w.write_all(&data).map_err(|_| "Failed to write to sink"));
}
Ok(())
}
}

View File

@ -0,0 +1,74 @@
use std::collections::HashMap;
use std::path::Path;
use std::io::{BufReader, Read, Write, BufWriter};
use std::fs::File;
use rmp_serde;
use serde::Deserialize;
use serde::Serialize;
use ::bundle::BundleId;
static HEADER_STRING: [u8; 7] = *b"zbunmap";
static HEADER_VERSION: u8 = 1;
#[derive(Default)]
pub struct BundleInfo {
pub id: BundleId
}
serde_impl!(BundleInfo(u64) {
id: BundleId => 0
});
pub struct BundleMap(HashMap<u32, BundleInfo>);
impl BundleMap {
pub fn create() -> Self {
BundleMap(Default::default())
}
pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, &'static str> {
let mut file = BufReader::new(try!(File::open(path.as_ref())
.map_err(|_| "Failed to open bundle map file")));
let mut header = [0u8; 8];
try!(file.read_exact(&mut header)
.map_err(|_| "Failed to read bundle map header"));
if header[..HEADER_STRING.len()] != HEADER_STRING {
return Err("Wrong header string")
}
let version = header[HEADER_STRING.len()];
if version != HEADER_VERSION {
return Err("Unsupported bundle map file version")
}
let mut reader = rmp_serde::Deserializer::new(file);
let map = try!(HashMap::deserialize(&mut reader)
.map_err(|_| "Failed to read bundle map data"));
Ok(BundleMap(map))
}
pub fn save<P: AsRef<Path>>(&self, path: P) -> Result<(), &'static str> {
let mut file = BufWriter::new(try!(File::create(path)
.map_err(|_| "Failed to create bundle file")));
try!(file.write_all(&HEADER_STRING)
.map_err(|_| "Failed to write bundle header"));
try!(file.write_all(&[HEADER_VERSION])
.map_err(|_| "Failed to write bundle header"));
let mut writer = rmp_serde::Serializer::new(&mut file);
self.0.serialize(&mut writer)
.map_err(|_| "Failed to write bundle map data")
}
#[inline]
pub fn get(&self, id: u32) -> Option<&BundleInfo> {
self.0.get(&id)
}
#[inline]
pub fn set(&mut self, id: u32, info: BundleInfo) {
self.0.insert(id, info);
}
}

182
src/repository/config.rs Normal file
View File

@ -0,0 +1,182 @@
use serde_yaml;
use std::fs::File;
use std::path::Path;
use ::util::*;
use ::chunker::ChunkerType;
impl HashMethod {
fn from_yaml(yaml: String) -> Result<Self, &'static str> {
HashMethod::from(&yaml)
}
fn to_yaml(&self) -> String {
self.name().to_string()
}
}
impl ChecksumType {
fn from_yaml(yaml: String) -> Result<Self, &'static str> {
ChecksumType::from(&yaml)
}
fn to_yaml(&self) -> String {
self.name().to_string()
}
}
struct ChunkerYaml {
method: String,
avg_size: usize,
seed: u64
}
impl Default for ChunkerYaml {
fn default() -> Self {
ChunkerYaml {
method: "fastcdc".to_string(),
avg_size: 16*1024,
seed: 0
}
}
}
serde_impl!(ChunkerYaml(String) {
method: String => "method",
avg_size: usize => "avg_size",
seed: u64 => "seed"
});
impl ChunkerType {
fn from_yaml(yaml: ChunkerYaml) -> Result<Self, &'static str> {
ChunkerType::from(&yaml.method, yaml.avg_size, yaml.seed)
}
fn to_yaml(&self) -> ChunkerYaml {
ChunkerYaml {
method: self.name().to_string(),
avg_size: self.avg_size(),
seed: self.seed()
}
}
}
struct CompressionYaml {
codec: String,
level: Option<u8>
}
impl Default for CompressionYaml {
fn default() -> Self {
CompressionYaml {
codec: "brotli".to_string(),
level: None
}
}
}
serde_impl!(CompressionYaml(String) {
codec: String => "codec",
level: Option<u8> => "level"
});
impl Compression {
fn from_yaml(yaml: CompressionYaml) -> Result<Self, &'static str> {
match &yaml.codec as &str {
"snappy" => Ok(Compression::Snappy(())),
"zstd" => Ok(Compression::ZStd(yaml.level.unwrap_or(5))),
"deflate" | "zlib" | "gzip" => Ok(Compression::Deflate(yaml.level.unwrap_or(5))),
"brotli" => Ok(Compression::Brotli(yaml.level.unwrap_or(5))),
"lzma2" => Ok(Compression::Lzma2(yaml.level.unwrap_or(5))),
_ => Err("Unsupported codec")
}
}
fn to_yaml(&self) -> CompressionYaml {
CompressionYaml {
codec: self.name().to_string(),
level: self.level()
}
}
}
struct ConfigYaml {
compression: Option<CompressionYaml>,
bundle_size: usize,
chunker: ChunkerYaml,
checksum: String,
hash: String,
}
impl Default for ConfigYaml {
fn default() -> Self {
ConfigYaml {
compression: Some(CompressionYaml { codec: "brotli".to_string(), level: Some(5) }),
bundle_size: 25*1024*1024,
chunker: ChunkerYaml::default(),
checksum: "sha3-256".to_string(),
hash: "blake2".to_string()
}
}
}
serde_impl!(ConfigYaml(String) {
compression: Option<CompressionYaml> => "compression",
bundle_size: usize => "bundle_size",
chunker: ChunkerYaml => "chunker",
checksum: String => "checksum",
hash: String => "hash"
});
#[derive(Debug)]
pub struct Config {
pub compression: Option<Compression>,
pub bundle_size: usize,
pub chunker: ChunkerType,
pub checksum: ChecksumType,
pub hash: HashMethod
}
impl Config {
fn from_yaml(yaml: ConfigYaml) -> Result<Self, &'static str> {
let compression = if let Some(c) = yaml.compression {
Some(try!(Compression::from_yaml(c)))
} else {
None
};
Ok(Config{
compression: compression,
bundle_size: yaml.bundle_size,
chunker: try!(ChunkerType::from_yaml(yaml.chunker)),
checksum: try!(ChecksumType::from_yaml(yaml.checksum)),
hash: try!(HashMethod::from_yaml(yaml.hash))
})
}
fn to_yaml(&self) -> ConfigYaml {
ConfigYaml {
compression: self.compression.as_ref().map(|c| c.to_yaml()),
bundle_size: self.bundle_size,
chunker: self.chunker.to_yaml(),
checksum: self.checksum.to_yaml(),
hash: self.hash.to_yaml()
}
}
pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, &'static str> {
let f = try!(File::open(path).map_err(|_| "Failed to open config"));
let config = try!(serde_yaml::from_reader(f).map_err(|_| "Failed to parse config"));
Config::from_yaml(config)
}
pub fn save<P: AsRef<Path>>(&self, path: P) -> Result<(), &'static str> {
let mut f = try!(File::create(path).map_err(|_| "Failed to open config"));
try!(serde_yaml::to_writer(&mut f, &self.to_yaml()).map_err(|_| "Failed to wrtie config"));
Ok(())
}
}

View File

@ -0,0 +1,61 @@
use super::Repository;
use ::util::Hash;
impl Repository {
fn check_chunk(&self, hash: Hash) -> Result<(), &'static str> {
// Find bundle and chunk id in index
let found = if let Some(found) = self.index.get(&hash) {
found
} else {
return Err("Chunk not in index");
};
// Lookup bundle id from map
let bundle_id = if let Some(bundle_info) = self.bundle_map.get(found.bundle) {
bundle_info.id.clone()
} else {
return Err("Bundle id not found in map")
};
// Get bundle object from bundledb
let bundle = if let Some(bundle) = self.bundles.get_bundle(&bundle_id) {
bundle
} else {
return Err("Bundle not found in bundledb")
};
// Get chunk from bundle
if bundle.chunk_count > found.chunk as usize {
Ok(())
} else {
Err("Bundle does not contain that chunk")
}
//TODO: check that contents match their hash
}
pub fn check(&mut self, full: bool) -> Result<(), &'static str> {
try!(self.flush());
try!(self.bundles.check(full).map_err(|_| "Bundles inconsistent"));
try!(self.index.check().map_err(|_| "Index inconsistent"));
let mut pos = 0;
loop {
pos = if let Some(pos) = self.index.next_entry(pos) {
pos
} else {
break
};
let entry = self.index.get_entry(pos).unwrap();
try!(self.check_chunk(entry.key));
pos += 1;
}
if self.next_content_bundle == self.next_meta_bundle {
return Err("Next bundle ids for meta and content as the same")
}
if self.bundle_map.get(self.next_content_bundle).is_some() {
return Err("Bundle map already contains next bundle bundle id")
}
if self.bundle_map.get(self.next_meta_bundle).is_some() {
return Err("Bundle map already contains next meta bundle id")
}
Ok(())
}
}

138
src/repository/mod.rs Normal file
View File

@ -0,0 +1,138 @@
mod config;
mod bundle_map;
mod integrity;
mod basic_io;
use std::mem;
use std::cmp::max;
use std::path::{PathBuf, Path};
use std::fs;
use super::index::Index;
use super::bundle::{BundleDb, BundleWriter};
use super::chunker::Chunker;
pub use self::config::Config;
use self::bundle_map::{BundleMap, BundleInfo};
#[derive(Eq, Debug, PartialEq, Clone, Copy)]
pub enum Mode {
Content, Meta
}
pub struct Repository {
path: PathBuf,
config: Config,
index: Index,
bundle_map: BundleMap,
next_content_bundle: u32,
next_meta_bundle: u32,
bundles: BundleDb,
content_bundle: Option<BundleWriter>,
meta_bundle: Option<BundleWriter>,
chunker: Chunker
}
impl Repository {
pub fn create<P: AsRef<Path>>(path: P, config: Config) -> Result<Self, &'static str> {
let path = path.as_ref().to_owned();
try!(fs::create_dir(&path).map_err(|_| "Failed to create repository directory"));
let bundles = try!(BundleDb::create(
path.join("bundles"),
config.compression.clone(),
None, //FIXME: store encryption in config
config.checksum
).map_err(|_| "Failed to create bundle db"));
let index = try!(Index::create(&path.join("index")).map_err(|_| "Failed to create index"));
try!(config.save(path.join("config.yaml")).map_err(|_| "Failed to save config"));
let bundle_map = BundleMap::create();
try!(bundle_map.save(path.join("bundles.map")).map_err(|_| "Failed to save bundle map"));
Ok(Repository{
path: path,
chunker: config.chunker.create(),
config: config,
index: index,
bundle_map: bundle_map,
next_content_bundle: 1,
next_meta_bundle: 0,
bundles: bundles,
content_bundle: None,
meta_bundle: None,
})
}
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, &'static str> {
let path = path.as_ref().to_owned();
let config = try!(Config::load(path.join("config.yaml")).map_err(|_| "Failed to load config"));
let bundles = try!(BundleDb::open(
path.join("bundles"),
config.compression.clone(),
None, //FIXME: load encryption from config
config.checksum
).map_err(|_| "Failed to open bundle db"));
let index = try!(Index::open(&path.join("index")).map_err(|_| "Failed to open index"));
let bundle_map = try!(BundleMap::load(path.join("bundles.map")).map_err(|_| "Failed to load bundle map"));
let mut repo = Repository {
path: path,
chunker: config.chunker.create(),
config: config,
index: index,
bundle_map: bundle_map,
next_content_bundle: 0,
next_meta_bundle: 0,
bundles: bundles,
content_bundle: None,
meta_bundle: None,
};
repo.next_meta_bundle = repo.next_free_bundle_id();
repo.next_content_bundle = repo.next_free_bundle_id();
Ok(repo)
}
#[inline]
fn save_bundle_map(&self) -> Result<(), &'static str> {
self.bundle_map.save(self.path.join("bundles.map"))
}
#[inline]
fn next_free_bundle_id(&self) -> u32 {
let mut id = max(self.next_content_bundle, self.next_meta_bundle) + 1;
while self.bundle_map.get(id).is_some() {
id += 1;
}
id
}
pub fn flush(&mut self) -> Result<(), &'static str> {
if self.content_bundle.is_some() {
let mut finished = None;
mem::swap(&mut self.content_bundle, &mut finished);
{
let bundle = try!(self.bundles.add_bundle(finished.unwrap()).map_err(|_| "Failed to write finished bundle"));
let bundle_info = BundleInfo{id: bundle.id.clone()};
self.bundle_map.set(self.next_content_bundle, bundle_info);
}
self.next_content_bundle = self.next_free_bundle_id()
}
if self.meta_bundle.is_some() {
let mut finished = None;
mem::swap(&mut self.meta_bundle, &mut finished);
{
let bundle = try!(self.bundles.add_bundle(finished.unwrap()).map_err(|_| "Failed to write finished bundle"));
let bundle_info = BundleInfo{id: bundle.id.clone()};
self.bundle_map.set(self.next_meta_bundle, bundle_info);
}
self.next_meta_bundle = self.next_free_bundle_id()
}
try!(self.save_bundle_map().map_err(|_| "Failed to save bundle map"));
Ok(())
}
}
impl Drop for Repository {
fn drop(&mut self) {
self.flush().expect("Failed to write last bundles")
}
}

66
src/util/checksum.rs Normal file
View File

@ -0,0 +1,66 @@
use serde::bytes::ByteBuf;
use crypto::sha3;
use crypto::digest::Digest;
#[derive(Clone, Debug, Copy)]
#[allow(non_camel_case_types)]
pub enum ChecksumType {
Sha3_256
}
serde_impl!(ChecksumType(u64) {
Sha3_256 => 1
});
impl ChecksumType {
#[inline]
pub fn from(name: &str) -> Result<Self, &'static str> {
match name {
"sha3-256" => Ok(ChecksumType::Sha3_256),
_ => Err("Unsupported checksum type")
}
}
#[inline]
pub fn name(&self) -> &'static str {
match *self {
ChecksumType::Sha3_256 => "sha3-256",
}
}
}
pub type Checksum = (ChecksumType, ByteBuf);
#[allow(non_camel_case_types, unknown_lints, large_enum_variant)]
pub enum ChecksumCreator {
Sha3_256(sha3::Sha3)
}
impl ChecksumCreator {
#[inline]
pub fn new(type_: ChecksumType) -> Self {
match type_ {
ChecksumType::Sha3_256 => ChecksumCreator::Sha3_256(sha3::Sha3::sha3_256())
}
}
#[inline]
pub fn update(&mut self, data: &[u8]) {
match *self {
ChecksumCreator::Sha3_256(ref mut state) => state.input(data)
}
}
#[inline]
pub fn finish(self) -> Checksum {
match self {
ChecksumCreator::Sha3_256(mut state) => {
let mut buf = Vec::with_capacity(state.output_bytes());
buf.resize(state.output_bytes(), 0);
state.result(&mut buf);
(ChecksumType::Sha3_256, buf.into())
}
}
}
}

216
src/util/compression.rs Normal file
View File

@ -0,0 +1,216 @@
use std::ptr;
use std::ffi::{CStr, CString};
use std::io::Write;
use squash::*;
#[derive(Clone, Debug)]
pub enum Compression {
Snappy(()),
Deflate(u8),
Brotli(u8),
Lzma2(u8),
ZStd(u8)
}
serde_impl!(Compression(u64) {
Snappy(()) => 0,
Deflate(u8) => 1,
Brotli(u8) => 2,
Lzma2(u8) => 3,
ZStd(u8) => 4
});
impl Compression {
#[inline]
pub fn name(&self) -> &'static str {
match *self {
Compression::Snappy(_) => "snappy",
Compression::Deflate(_) => "deflate",
Compression::Brotli(_) => "brotli",
Compression::Lzma2(_) => "lzma2",
Compression::ZStd(_) => "zstd",
}
}
#[inline]
fn codec(&self) -> Result<*mut SquashCodec, &'static str> {
let name = CString::new(self.name().as_bytes()).unwrap();
let codec = unsafe { squash_get_codec(name.as_ptr()) };
if codec.is_null() {
return Err("Unsupported algorithm")
}
Ok(codec)
}
#[inline]
pub fn level(&self) -> Option<u8> {
match *self {
Compression::Snappy(_) => None,
Compression::Deflate(lvl) |
Compression::Brotli(lvl) |
Compression::ZStd(lvl) |
Compression::Lzma2(lvl) => Some(lvl),
}
}
fn options(&self) -> Result<*mut SquashOptions, &'static str> {
let codec = try!(self.codec());
let options = unsafe { squash_options_new(codec, ptr::null::<()>()) };
if let Some(level) = self.level() {
if options.is_null() {
return Err("Algorithm does not support a level")
}
let option = CString::new("level");
let value = CString::new(format!("{}", level));
let res = unsafe { squash_options_parse_option(
options,
option.unwrap().as_ptr(),
value.unwrap().as_ptr()
)};
if res != SQUASH_OK {
//panic!(unsafe { CStr::from_ptr(squash_status_to_string(res)).to_str().unwrap() });
return Err("Failed to set compression level")
}
}
Ok(options)
}
#[inline]
fn error(code: SquashStatus) -> &'static str {
unsafe { CStr::from_ptr(squash_status_to_string(code)).to_str().unwrap() }
}
pub fn compress(&self, data: &[u8]) -> Result<Vec<u8>, &'static str> {
let codec = try!(self.codec());
let options = try!(self.options());
let mut size = data.len() * 2 + 500;
// The following does not work for all codecs
/*unsafe { squash_codec_get_max_compressed_size(
codec,
data.len() as usize
)};*/
let mut buf = Vec::with_capacity(size as usize);
let res = unsafe { squash_codec_compress_with_options(
codec,
&mut size,
buf.as_mut_ptr(),
data.len(),
data.as_ptr(),
options)
};
if res != SQUASH_OK {
println!("{:?}", data);
println!("{}, {}", data.len(), size);
return Err(Self::error(res))
}
unsafe { buf.set_len(size) };
Ok(buf)
}
pub fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, &'static str> {
let codec = try!(self.codec());
let mut size = unsafe { squash_codec_get_uncompressed_size(
codec,
data.len(),
data.as_ptr()
)};
if size == 0 {
size = 100 * data.len();
}
let mut buf = Vec::with_capacity(size);
let res = unsafe { squash_codec_decompress(
codec,
&mut size,
buf.as_mut_ptr(),
data.len(),
data.as_ptr(),
ptr::null_mut::<()>())
};
if res != SQUASH_OK {
return Err(Self::error(res))
}
unsafe { buf.set_len(size) };
Ok(buf)
}
#[inline]
pub fn compress_stream(&self) -> Result<CompressionStream, &'static str> {
let codec = try!(self.codec());
let options = try!(self.options());
let stream = unsafe { squash_stream_new_with_options(
codec, SQUASH_STREAM_COMPRESS, options
) };
if stream.is_null() {
return Err("Failed to create stream");
}
Ok(CompressionStream::new(unsafe { Box::from_raw(stream) }))
}
#[inline]
pub fn decompress_stream(&self) -> Result<CompressionStream, &'static str> {
let codec = try!(self.codec());
let stream = unsafe { squash_stream_new(
codec, SQUASH_STREAM_DECOMPRESS, ptr::null::<()>()
) };
if stream.is_null() {
return Err("Failed to create stream");
}
Ok(CompressionStream::new(unsafe { Box::from_raw(stream) }))
}
}
pub struct CompressionStream {
stream: Box<SquashStream>,
buffer: [u8; 16*1024]
}
impl CompressionStream {
#[inline]
fn new(stream: Box<SquashStream>) -> Self {
CompressionStream {
stream: stream,
buffer: [0; 16*1024]
}
}
pub fn process<W: Write>(&mut self, input: &[u8], output: &mut W) -> Result<(), &'static str> {
let mut stream = &mut *self.stream;
stream.next_in = input.as_ptr();
stream.avail_in = input.len();
loop {
stream.next_out = self.buffer.as_mut_ptr();
stream.avail_out = self.buffer.len();
let res = unsafe { squash_stream_process(stream) };
if res < 0 {
return Err(Compression::error(res))
}
let output_size = self.buffer.len() - stream.avail_out;
try!(output.write_all(&self.buffer[..output_size]).map_err(|_| "Failed to write to output"));
if res != SQUASH_PROCESSING {
break
}
}
Ok(())
}
pub fn finish<W: Write>(mut self, output: &mut W) -> Result<(), &'static str> {
let mut stream = &mut *self.stream;
loop {
stream.next_out = self.buffer.as_mut_ptr();
stream.avail_out = self.buffer.len();
let res = unsafe { squash_stream_finish(stream) };
if res < 0 {
return Err(Compression::error(res))
}
let output_size = self.buffer.len() - stream.avail_out;
try!(output.write_all(&self.buffer[..output_size]).map_err(|_| "Failed to write to output"));
if res != SQUASH_PROCESSING {
break
}
}
Ok(())
}
}

54
src/util/encryption.rs Normal file
View File

@ -0,0 +1,54 @@
use std::collections::HashMap;
#[derive(Clone)]
pub enum EncryptionMethod {
Dummy
}
serde_impl!(EncryptionMethod(u64) {
Dummy => 0
});
pub type EncryptionKey = Vec<u8>;
pub type EncryptionKeyId = u64;
pub type Encryption = (EncryptionMethod, EncryptionKeyId);
#[derive(Clone)]
pub struct Crypto {
keys: HashMap<EncryptionKeyId, EncryptionKey>
}
impl Crypto {
#[inline]
pub fn new() -> Self {
Crypto { keys: Default::default() }
}
#[inline]
pub fn register_key(&mut self, key: EncryptionKey, id: EncryptionKeyId) {
self.keys.insert(id, key);
}
#[inline]
pub fn contains_key(&mut self, id: EncryptionKeyId) -> bool {
self.keys.contains_key(&id)
}
#[inline]
pub fn encrypt(&self, _enc: Encryption, _data: &[u8]) -> Result<Vec<u8>, &'static str> {
unimplemented!()
}
#[inline]
pub fn decrypt(&self, _enc: Encryption, _data: &[u8]) -> Result<Vec<u8>, &'static str> {
unimplemented!()
}
}
impl Default for Crypto {
#[inline]
fn default() -> Self {
Crypto::new()
}
}

104
src/util/hash.rs Normal file
View File

@ -0,0 +1,104 @@
use serde::{self, Serialize, Deserialize};
use serde::de::Error;
use serde::bytes::{ByteBuf, Bytes};
use murmurhash3::murmurhash3_x64_128;
use blake2::blake2b::blake2b;
use std::mem;
use std::fmt;
use std::u64;
#[repr(packed)]
#[derive(Clone, Copy, PartialEq, Debug, Hash, Eq)]
pub struct Hash {
pub high: u64,
pub low: u64
}
impl Hash {
#[inline]
pub fn hash(&self) -> u64 {
self.low
}
#[inline]
pub fn empty() -> Self {
Hash{high: 0, low: 0}
}
}
impl fmt::Display for Hash {
#[inline]
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(fmt, "{:16x}{:16x}", self.high, self.low)
}
}
impl Serialize for Hash {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: serde::Serializer {
let hash = Hash{high: u64::to_le(self.high), low: u64::to_le(self.low)};
let dat: [u8; 16] = unsafe { mem::transmute(hash) };
Bytes::from(&dat as &[u8]).serialize(serializer)
}
}
impl Deserialize for Hash {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> where D: serde::Deserializer {
let dat: Vec<u8> = try!(ByteBuf::deserialize(deserializer)).into();
if dat.len() != 16 {
return Err(D::Error::custom("Invalid key length"));
}
let hash = unsafe { &*(dat.as_ptr() as *const Hash) };
Ok(Hash{high: u64::from_le(hash.high), low: u64::from_le(hash.low)})
}
}
#[derive(Debug, Clone, Copy)]
pub enum HashMethod {
Blake2,
Murmur3
}
serde_impl!(HashMethod(u64) {
Blake2 => 1,
Murmur3 => 2
});
impl HashMethod {
#[inline]
pub fn hash(&self, data: &[u8]) -> Hash {
match *self {
HashMethod::Blake2 => {
let hash = blake2b(16, &[], data);
let hash = unsafe { &*mem::transmute::<_, *mut (u64, u64)>(hash.as_bytes().as_ptr()) };
Hash { high: u64::from_be(hash.0), low: u64::from_be(hash.1) }
},
HashMethod::Murmur3 => {
let (a, b) = murmurhash3_x64_128(data, 0);
Hash { high: a, low: b }
}
}
}
#[inline]
pub fn from(name: &str) -> Result<Self, &'static str> {
match name {
"blake2" => Ok(HashMethod::Blake2),
"murmur3" => Ok(HashMethod::Murmur3),
_ => Err("Unsupported hash method")
}
}
#[inline]
pub fn name(&self) -> &'static str {
match *self {
HashMethod::Blake2 => "blake2",
HashMethod::Murmur3 => "murmur3"
}
}
}

53
src/util/lru_cache.rs Normal file
View File

@ -0,0 +1,53 @@
use std::hash::Hash;
use std::collections::HashMap;
pub struct LruCache<K, V> {
items: HashMap<K, (V, u64)>,
min_size: usize,
max_size: usize,
next: u64
}
impl<K: Eq+Hash, V> LruCache<K, V> {
#[inline]
pub fn new(min_size: usize, max_size: usize) -> Self {
LruCache {
items: HashMap::default(),
min_size: min_size,
max_size: max_size,
next: 0
}
}
#[inline]
pub fn put(&mut self, key: K, value: V) {
self.items.insert(key, (value, self.next));
self.next += 1;
if self.items.len() > self.max_size {
self.shrink()
}
}
#[inline]
pub fn get(&mut self, key: &K) -> Option<&V> {
if let Some(&mut (ref item, ref mut n)) = self.items.get_mut(key) {
*n = self.next;
self.next += 1;
Some(item)
} else {
None
}
}
#[inline]
fn shrink(&mut self) {
let mut tags: Vec<u64> = self.items.values().map(|&(_, n)| n).collect();
tags.sort();
let bar = tags[tags.len()-self.min_size];
let mut new = HashMap::with_capacity(self.min_size);
new.extend(self.items.drain().filter(|&(_,(_, n))| n>=bar));
self.items = new;
}
}

11
src/util/mod.rs Normal file
View File

@ -0,0 +1,11 @@
mod checksum;
mod compression;
mod encryption;
mod hash;
mod lru_cache;
pub use self::checksum::*;
pub use self::compression::*;
pub use self::encryption::*;
pub use self::hash::*;
pub use self::lru_cache::*;