diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5f5515b --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +target +squash +test.tar +test_data diff --git a/Algotest1.txt b/Algotest1.txt new file mode 100644 index 0000000..336fba3 --- /dev/null +++ b/Algotest1.txt @@ -0,0 +1,68 @@ +~/shared + + +Algorithm comparison on file test.tar + +Reading input file... done. 2175416320 bytes + +Chunker algorithms + Chunk size: 4 KiB + AE: avg chunk size 2756.5 ± 543.6 bytes, 12.1% saved, speed 748.9 MB/s + Rabin: avg chunk size 4902.3 ± 3826.2 bytes, 11.7% saved, speed 336.7 MB/s + FastCdc: avg chunk size 4783.3 ± 1940.5 bytes, 12.1% saved, speed 544.1 MB/s + Chunk size: 8 KiB + AE: avg chunk size 5245.1 ± 890.8 bytes, 10.0% saved, speed 756.3 MB/s + Rabin: avg chunk size 9774.2 ± 7636.0 bytes, 10.3% saved, speed 344.9 MB/s + FastCdc: avg chunk size 9583.2 ± 3933.2 bytes, 10.7% saved, speed 541.6 MB/s + Chunk size: 16 KiB + AE: avg chunk size 10169.5 ± 1485.8 bytes, 7.4% saved, speed 781.5 MB/s + Rabin: avg chunk size 19641.7 ± 15292.5 bytes, 9.0% saved, speed 345.9 MB/s + FastCdc: avg chunk size 19262.9 ± 7697.4 bytes, 9.0% saved, speed 548.1 MB/s + Chunk size: 32 KiB + AE: avg chunk size 20004.6 ± 2705.6 bytes, 5.6% saved, speed 787.0 MB/s + Rabin: avg chunk size 38963.6 ± 30218.2 bytes, 7.6% saved, speed 345.7 MB/s + FastCdc: avg chunk size 39159.3 ± 16834.6 bytes, 7.7% saved, speed 547.1 MB/s + Chunk size: 64 KiB + AE: avg chunk size 39627.2 ± 5310.6 bytes, 3.8% saved, speed 788.2 MB/s + Rabin: avg chunk size 78339.7 ± 60963.7 bytes, 6.4% saved, speed 345.6 MB/s + FastCdc: avg chunk size 76981.4 ± 30784.6 bytes, 6.1% saved, speed 548.4 MB/s + +Hash algorithms + Blake2: 724.2 MB/s + Murmur3: 5358.3 MB/s + +Compression algorithms + Snappy: ratio: 83.6%, compress: 301.7 MB/s, decompress: 876.2 MB/s +fatal runtime error: out of memory + + + ZStd/1: ratio: 77.2%, compress: 493.9 MB/s, decompress: 0.0 MB/s + ZStd/2: ratio: 76.7%, compress: 420.6 MB/s, decompress: 0.0 MB/s + ZStd/3: ratio: 75.4%, compress: 314.6 MB/s, decompress: 0.0 MB/s + ZStd/4: ratio: 75.3%, compress: 273.0 MB/s, decompress: 0.0 MB/s + ZStd/5: ratio: 74.9%, compress: 131.4 MB/s, decompress: 0.0 MB/s + ZStd/6: ratio: 73.6%, compress: 121.4 MB/s, decompress: 0.0 MB/s + ZStd/7: ratio: 73.5%, compress: 88.7 MB/s, decompress: 0.0 MB/s + ZStd/8: ratio: 73.4%, compress: 76.8 MB/s, decompress: 0.0 MB/s + ZStd/9: ratio: 73.3%, compress: 51.8 MB/s, decompress: 0.0 MB/s + Deflate/1: ratio: 78.3%, compress: 95.7 MB/s, decompress: 0.0 MB/s + Deflate/2: ratio: 78.2%, compress: 94.7 MB/s, decompress: 0.0 MB/s + Deflate/3: ratio: 78.1%, compress: 92.5 MB/s, decompress: 0.0 MB/s + Deflate/4: ratio: 78.0%, compress: 87.9 MB/s, decompress: 0.0 MB/s + Deflate/5: ratio: 77.8%, compress: 86.5 MB/s, decompress: 0.0 MB/s + Deflate/6: ratio: 77.7%, compress: 83.8 MB/s, decompress: 0.0 MB/s + Deflate/7: ratio: 77.7%, compress: 73.4 MB/s, decompress: 0.0 MB/s + Deflate/8: ratio: 77.6%, compress: 31.6 MB/s, decompress: 0.0 MB/s + Deflate/9: ratio: 77.4%, compress: 25.8 MB/s, decompress: 0.0 MB/s + Brotli/1: ratio: 77.6%, compress: 433.1 MB/s, decompress: 0.0 MB/s + Brotli/2: ratio: 75.4%, compress: 242.2 MB/s, decompress: 0.0 MB/s + Brotli/3: ratio: 75.3%, compress: 195.5 MB/s, decompress: 0.0 MB/s + Brotli/4: ratio: 72.4%, compress: 81.6 MB/s, decompress: 0.0 MB/s + Brotli/5: ratio: 73.9%, compress: 62.4 MB/s, decompress: 0.0 MB/s + Brotli/6: ratio: 72.9%, compress: 46.6 MB/s, decompress: 0.0 MB/s + Brotli/7: ratio: 71.5%, compress: 23.4 MB/s, decompress: 0.0 MB/s + Brotli/8: ratio: 71.5%, compress: 20.7 MB/s, decompress: 0.0 MB/s + Brotli/9: ratio: 71.2%, compress: 11.2 MB/s, decompress: 0.0 MB/s + Lzma2/1: ratio: 69.8%, compress: 4.2 MB/s, decompress: 0.0 MB/s + + diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..980a40f --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,246 @@ +[root] +name = "zvault" +version = "0.1.0" +dependencies = [ + "blake2-rfc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)", + "mmap 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "murmurhash3 0.0.5 (registry+https://github.com/rust-lang/crates.io-index)", + "quick-error 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rmp-serde 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 0.9.11 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_utils 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_yaml 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", + "squash-sys 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "bitflags" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "blake2-rfc" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "constant_time_eq 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "byteorder" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "constant_time_eq" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "gcc" +version = "0.3.43" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "kernel32-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "libc" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "libc" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "linked-hash-map" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "mmap" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", + "tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "murmurhash3" +version = "0.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "num-traits" +version = "0.1.37" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "pkg-config" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "quick-error" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "rand" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "redox_syscall" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "rmp" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "num-traits 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rmp-serde" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rmp 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 0.9.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rust-crypto" +version = "0.2.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "gcc 0.3.43 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc-serialize 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rustc-serialize" +version = "0.3.22" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "serde" +version = "0.9.11" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "serde_utils" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "serde 0.9.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "serde_yaml" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 0.9.11 (registry+https://github.com/rust-lang/crates.io-index)", + "yaml-rust 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "squash-sys" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)", + "pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tempdir" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "time" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)", + "redox_syscall 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "winapi" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "winapi-build" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "yaml-rust" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[metadata] +"checksum bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d" +"checksum blake2-rfc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)" = "0c6a476f32fef3402f1161f89d0d39822809627754a126f8441ff2a9d45e2d59" +"checksum byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c40977b0ee6b9885c9013cd41d9feffdd22deb3bb4dc3a71d901cc7a77de18c8" +"checksum constant_time_eq 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "07dcb7959f0f6f1cf662f9a7ff389bcb919924d99ac41cf31f10d611d8721323" +"checksum gcc 0.3.43 (registry+https://github.com/rust-lang/crates.io-index)" = "c07c758b972368e703a562686adb39125707cc1ef3399da8c019fc6c2498a75d" +"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +"checksum libc 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "e32a70cf75e5846d53a673923498228bbec6a8624708a9ea5645f075d6276122" +"checksum libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)" = "88ee81885f9f04bff991e306fea7c1c60a5f0f9e409e99f6b40e3311a3363135" +"checksum linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6d262045c5b87c0861b3f004610afd0e2c851e2908d08b6c870cbb9d5f494ecd" +"checksum mmap 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0bc85448a6006dd2ba26a385a564a8a0f1f2c7e78c70f1a70b2e0f4af286b823" +"checksum murmurhash3 0.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "a2983372caf4480544083767bf2d27defafe32af49ab4df3a0b7fc90793a3664" +"checksum num-traits 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)" = "e1cbfa3781f3fe73dc05321bed52a06d2d491eaa764c52335cf4399f046ece99" +"checksum pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3a8b4c6b8165cd1a1cd4b9b120978131389f64bdaf456435caa41e630edba903" +"checksum quick-error 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0aad603e8d7fb67da22dbdf1f4b826ce8829e406124109e73cf1b2454b93a71c" +"checksum rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "022e0636ec2519ddae48154b028864bdce4eaf7d35226ab8e65c611be97b189d" +"checksum redox_syscall 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "8dd35cc9a8bdec562c757e3d43c1526b5c6d2653e23e2315065bc25556550753" +"checksum rmp 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)" = "e59917c01f49718a59c644a621a4848aafc6577c4a47d66270d78951a807541a" +"checksum rmp-serde 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)" = "06ec4d0cdea2645de5d0e649f90c3e654205d913e14adefa452257314a24e76e" +"checksum rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)" = "f76d05d3993fd5f4af9434e8e436db163a12a9d40e1a58a726f27a01dfd12a2a" +"checksum rustc-serialize 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)" = "237546c689f20bb44980270c73c3b9edd0891c1be49cc1274406134a66d3957b" +"checksum serde 0.9.11 (registry+https://github.com/rust-lang/crates.io-index)" = "a702319c807c016e51f672e5c77d6f0b46afddd744b5e437d6b8436b888b458f" +"checksum serde_utils 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b34a52969c7fc0254e214b82518c9a95dc88c84fc84cd847add314996a031be6" +"checksum serde_yaml 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f8bd3f24ad8c7bcd34a6d70ba676dc11302b96f4f166aa5f947762e01098844d" +"checksum squash-sys 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "db1f9dde91d819b7746e153bc32489fa19e6a106c3d7f2b92187a4efbdc88b40" +"checksum tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "87974a6f5c1dfb344d733055601650059a3363de2a6104819293baff662132d6" +"checksum time 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)" = "211b63c112206356ef1ff9b19355f43740fc3f85960c598a93d3a3d3ba7beade" +"checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" +"checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" +"checksum yaml-rust 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e66366e18dc58b46801afbf2ca7661a9f59cc8c5962c29892b6039b4f86fa992" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..736ce2b --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "zvault" +version = "0.1.0" +authors = ["Dennis Schwerdel "] + +[dependencies] +serde = "0.9" +rmp-serde = "0.12" +serde_yaml = "0.6" +serde_utils = "0.5.1" +rust-crypto = "0.2" +squash-sys = "0.9" +mmap = "*" +quick-error = "1.1" +blake2-rfc = "*" +murmurhash3 = "*" diff --git a/README.md b/README.md index b0df650..1cff9fd 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,96 @@ # ZVault Backup solution -## Goals +## Goals / Features + + +### Space-efficient storage with deduplication +The backup data is split into chunks. Fingerprints make sure that each chunk is +only stored once. The chunking algorithm is designed so that small changes to a +file only change a few chunks and leave most chunks unchanged. + +Multiple backups of the same data set will only take up the space of one copy. + +The chunks are combined into bundles. Each bundle holds chunks up to a maximum +data size and is compressed as a whole to save space ("solid archive"). + + +### Independent backups +All backups share common data in form of chunks but are independent on a higher +level. Backups can be delete and chunks that are not used by any backup can be +removed. + +Other backup solutions use differential backups organized in chains. This makes +those backups dependent on previous backups in the chain, so that those backups +can not be deleted. Also, restoring chained backups is much less efficient. + + +### Fast backup runs +* Only adding changed files +* In-Memory Hashtable + + +### Backup verification +* Bundles verification +* Index verification +* File structure verification + + + +## Configuration options +There are several configuration options with trade-offs attached so these are +exposed to users. + + +### Chunker algorithm +The chunker algorithm is responsible for splitting files into chunks in a way +that survives small changes to the file so that small changes still yield +many matching chunks. The quality of the algorithm affects the deduplication +rate and its speed affects the backup speed. + +There are 3 algorithms to choose from: + +The **Rabin chunker** is a very common algorithm with a good quality but a +mediocre speed (about 350 MB/s). +The **AE chunker** is a novel approach that can reach very high speeds +(over 750 MB/s) but at a cost of quality. +The **FastCDC** algorithm has a slightly higher quality than the Rabin chunker +and is quite fast (about 550 MB/s). + +The recommendation is **FastCDC**. + + +### Chunk size +The chunk size determines the memory usage during backup runs. For every chunk +in the backup repository, 24 bytes of memory are needed. That means that for +every GiB stored in the repository the following amount of memory is needed: +- 8 KiB chunks => 3 MiB / GiB +- 16 KiB chunks => 1.5 MiB / GiB +- 32 KiB chunks => 750 KiB / GiB +- 64 KiB chunks => 375 KiB / GiB + +On the other hand, bigger chunks reduce the deduplication efficiency. Even small +changes of only one byte will result in at least one complete chunk changing. + + +### Hash algorithm +Blake2 +Murmur3 + +Recommended: Blake2 + + +### Bundle size +10 M +25 M +100 M + +Recommended: 25 MiB + + +### Compression + +Recommended: Brotli/2-7 -- Blazingly fast backup runs -- Space-efficient storage -- Independent backups ## Design @@ -43,3 +129,52 @@ - Remote block writing and compression/encryption - Inode data serialization - Recursive directory scanning, difference calculation, new entry sorting + + +### ChunkDB + +- Stores data in chunks +- A chunk is a file +- Per Chunk properties + - Format version + - Encryption method + - Encryption key + - Compression method / level +- Chunk ID is the hash of the contents + - No locks needed on shared chunk repository !!! + - Chunk ID is calculated after compression and encryption +- Chunk header + - "zvault01" + - Chunk size compressed / raw + - Content hash method / value + - Encryption method / options / key hash + - Compression method / options +- Chunks are write-once read-often +- Chunks are prepared outside the repository +- Only one chunk is being prepared at a time +- Adding data to the chunk returns starting position in raw data +- Operations: + - List available chunks + - Add data + - Flush chunk + - Delete chunk + - Get data + - Check chunk +- Chunk path is `checksum.chunk` or `chec/ksum.chunk` +- Data is added to current chunk and compressed in memory +- Operations on chunk files are just sequencial read/write and delete +- Ability to recompress chunks + + +### Index + +16 Bytes per hash key +8 Bytes data per entry (4 bytes bundle id, 4 bytes chunk id) +=> 24 Bytes per entry + +Average chunk sizes + 8 Kib => 3 MiB / 1 GiB +16 Kib => 1.5 MiB / 1 GiB +24 Kib => 1.0 MiB / 1 GiB +32 Kib => 750 Kib / 1 GiB +64 Kib => 375 Kib / 1 GiB diff --git a/src/algotest.rs b/src/algotest.rs new file mode 100644 index 0000000..dc67a13 --- /dev/null +++ b/src/algotest.rs @@ -0,0 +1,122 @@ +use std::io::{Cursor, Read}; +use std::fs::File; +use std::time; + +use super::chunker::*; +use super::util::*; + +fn speed_chunk(chunker: &mut C, data: &[u8]) { + let mut input = Cursor::new(data); + let mut chunk = Vec::with_capacity(1_000_000); + loop { + chunk.clear(); + let result = chunker.chunk(&mut input, &mut chunk).unwrap(); + if result == ChunkerStatus::Finished { + return + } + } +} + +fn chunk(chunker: &mut C, data: &[u8]) -> Vec> { + let mut input = Cursor::new(data); + let mut chunks = Vec::with_capacity(100_000); + loop { + let mut chunk = Vec::with_capacity(100_000); + let result = chunker.chunk(&mut input, &mut chunk).unwrap(); + chunks.push(chunk); + if result == ChunkerStatus::Finished { + return chunks; + } + } +} + +fn analyze_chunks(mut chunks: Vec>) -> (usize, f64, f64, f64) { + let count = chunks.len(); + let total = chunks.iter().map(|c| c.len()).sum::(); + let avg_size = total as f64 / count as f64; + let stddev = (chunks.iter().map(|c| (c.len() as f64 - avg_size).powi(2)).sum::() / (count as f64 - 1.0)).sqrt(); + chunks.sort(); + chunks.dedup(); + let non_dup: usize = chunks.iter().map(|c| c.len()).sum(); + let saved = 1.0 - non_dup as f64 / total as f64; + (count, avg_size, stddev, saved) +} + +fn compare_chunker(name: &str, mut chunker: C, data: &[u8]) { + let start = time::Instant::now(); + speed_chunk(&mut chunker, data); + let elapsed = start.elapsed(); + let chunks = chunk(&mut chunker, data); + let duration = elapsed.as_secs() as f64 * 1.0 + elapsed.subsec_nanos() as f64 / 1_000_000_000.0; + let speed = data.len() as f64 / duration; + assert_eq!(chunks.iter().map(|c| c.len()).sum::(), data.len()); + let (_count, avg_size, stddev, saved) = analyze_chunks(chunks); + println!("{}: \tavg chunk size {:.1}\t± {:.1} bytes, \t{:.1}% saved,\tspeed {:.1} MB/s", + name, avg_size, stddev, saved * 100.0, speed / 1_000_000.0); +} + +fn compare_hash(name: &str, hash: HashMethod, data: &[u8]) { + let start = time::Instant::now(); + let _ = hash.hash(data); + let elapsed = start.elapsed(); + let duration = elapsed.as_secs() as f64 * 1.0 + elapsed.subsec_nanos() as f64 / 1_000_000_000.0; + let speed = data.len() as f64 / duration; + println!("{}: {:.1} MB/s", name, speed / 1_000_000.0); +} + +fn compare_compression(name: &str, method: Compression, data: &[u8]) { + let start = time::Instant::now(); + let compressed = method.compress(data).unwrap(); + let elapsed = start.elapsed(); + let duration = elapsed.as_secs() as f64 * 1.0 + elapsed.subsec_nanos() as f64 / 1_000_000_000.0; + let cspeed = data.len() as f64 / duration; + let ratio = compressed.len() as f64 / data.len() as f64; + /*let start = time::Instant::now(); + let uncompressed = method.decompress(&compressed).unwrap(); + if uncompressed != data { + panic!("{} did not uncompress to the same value", name); + } + let elapsed = start.elapsed(); + let duration = elapsed.as_secs() as f64 * 1.0 + elapsed.subsec_nanos() as f64 / 1_000_000_000.0;*/ + let dspeed = 0.0;//data.len() as f64 / duration; + println!("{}:\tratio: {:.1}%,\tcompress: {:.1} MB/s,\tdecompress: {:.1} MB/s", + name, ratio * 100.0, cspeed / 1_000_000.0, dspeed / 1_000_000.0); +} + +#[allow(dead_code)] +pub fn run(path: &str) { + println!("Algorithm comparison on file {}", path); + println!(); + print!("Reading input file..."); + let mut file = File::open(path).unwrap(); + let mut data = Vec::new(); + file.read_to_end(&mut data).unwrap(); + println!(" done. {} bytes", data.len()); + println!(); + println!("Chunker algorithms"); + for size in &[4usize, 8, 16, 32, 64] { + println!(" Chunk size: {} KiB", size); + compare_chunker(" AE", AeChunker::new(size*1024), &data); + compare_chunker(" Rabin", RabinChunker::new(size*1024, 0), &data); + compare_chunker(" FastCdc", FastCdcChunker::new(size*1024, 0), &data); + } + println!(); + println!("Hash algorithms"); + compare_hash(" Blake2", HashMethod::Blake2, &data); + compare_hash(" Murmur3", HashMethod::Murmur3, &data); + println!(); + println!("Compression algorithms"); + compare_compression(" Snappy", Compression::Snappy(()), &data); + for level in 1..10 { + compare_compression(&format!(" ZStd/{}", level), Compression::ZStd(level), &data); + } + for level in 1..10 { + compare_compression(&format!(" Deflate/{}", level), Compression::Deflate(level), &data); + } + for level in 1..10 { + compare_compression(&format!(" Brotli/{}", level), Compression::Brotli(level), &data); + } + for level in 1..7 { + compare_compression(&format!(" Lzma2/{}", level), Compression::Lzma2(level), &data); + } +} diff --git a/src/bundle.rs b/src/bundle.rs new file mode 100644 index 0000000..16165ff --- /dev/null +++ b/src/bundle.rs @@ -0,0 +1,474 @@ +use std::path::{Path, PathBuf}; +use std::collections::HashMap; +use std::fs::{self, File}; +use std::io::{Read, Write, Seek, SeekFrom, BufWriter, BufReader}; +use std::cmp::max; +use std::fmt::{self, Debug, Write as FmtWrite}; +use std::sync::{Arc, Mutex}; + +use serde::{self, Serialize, Deserialize}; +use serde::bytes::ByteBuf; +use rmp_serde; + +use errors::BundleError; +use util::*; + +static HEADER_STRING: [u8; 7] = *b"zbundle"; +static HEADER_VERSION: u8 = 1; + + +// TODO: Test cases +// TODO: Benchmarks + + +#[derive(Hash, PartialEq, Eq, Clone, Default)] +pub struct BundleId(pub Vec); + +impl Serialize for BundleId { + fn serialize(&self, ser: S) -> Result { + ser.serialize_bytes(&self.0) + } +} + +impl Deserialize for BundleId { + fn deserialize(de: D) -> Result { + let bytes = try!(ByteBuf::deserialize(de)); + Ok(BundleId(bytes.into())) + } +} + +impl BundleId { + #[inline] + fn to_string(&self) -> String { + let mut buf = String::with_capacity(self.0.len()*2); + for b in &self.0 { + write!(&mut buf, "{:2x}", b).unwrap() + } + buf + } +} + +impl fmt::Display for BundleId { + #[inline] + fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(fmt, "{}", self.to_string()) + } +} + +impl fmt::Debug for BundleId { + #[inline] + fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(fmt, "{}", self.to_string()) + } +} + + + +#[derive(Clone)] +pub struct BundleHeader { + pub id: BundleId, + pub compression: Option, + pub encryption: Option, + pub checksum: Checksum, + pub raw_size: usize, + pub encoded_size: usize, + pub chunk_count: usize, + pub chunk_sizes: Vec +} +serde_impl!(BundleHeader(u64) { + id: BundleId => 0, + compression: Option => 1, + encryption: Option => 2, + checksum: Checksum => 3, + raw_size: usize => 4, + encoded_size: usize => 5, + chunk_count: usize => 6, + chunk_sizes: Vec => 7 +}); + +impl Default for BundleHeader { + fn default() -> Self { + BundleHeader { + id: BundleId(vec![]), + compression: None, + encryption: None, + checksum: (ChecksumType::Sha3_256, ByteBuf::new()), + raw_size: 0, + encoded_size: 0, + chunk_count: 0, + chunk_sizes: vec![] + } + } +} + + +pub struct Bundle { + pub id: BundleId, + pub version: u8, + pub path: PathBuf, + crypto: Arc>, + pub compression: Option, + pub encryption: Option, + pub raw_size: usize, + pub encoded_size: usize, + pub checksum: Checksum, + pub content_start: usize, + pub chunk_count: usize, + pub chunk_sizes: Vec, + pub chunk_positions: Vec +} + +impl Bundle { + fn new(path: PathBuf, version: u8, content_start: usize, crypto: Arc>, header: BundleHeader) -> Self { + let mut chunk_positions = Vec::with_capacity(header.chunk_sizes.len()); + let mut pos = 0; + for len in &header.chunk_sizes { + chunk_positions.push(pos); + pos += *len; + } + Bundle { + id: header.id, + version: version, + path: path, + crypto: crypto, + compression: header.compression, + encryption: header.encryption, + raw_size: header.raw_size, + encoded_size: header.encoded_size, + chunk_count: header.chunk_count, + checksum: header.checksum, + content_start: content_start, + chunk_sizes: header.chunk_sizes, + chunk_positions: chunk_positions + } + } + + pub fn load(path: PathBuf, crypto: Arc>) -> Result { + let mut file = BufReader::new(try!(File::open(&path) + .map_err(|e| BundleError::Read(e, path.clone(), "Failed to open bundle file")))); + let mut header = [0u8; 8]; + try!(file.read_exact(&mut header) + .map_err(|e| BundleError::Read(e, path.clone(), "Failed to read bundle header"))); + if header[..HEADER_STRING.len()] != HEADER_STRING { + return Err(BundleError::Format(path.clone(), "Wrong header string")) + } + let version = header[HEADER_STRING.len()]; + if version != HEADER_VERSION { + return Err(BundleError::Format(path.clone(), "Unsupported bundle file version")) + } + let mut reader = rmp_serde::Deserializer::new(file); + let header = try!(BundleHeader::deserialize(&mut reader) + .map_err(|e| BundleError::Decode(e, path.clone()))); + file = reader.into_inner(); + let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize; + Ok(Bundle::new(path, version, content_start, crypto, header)) + } + + #[inline] + fn load_encoded_contents(&self) -> Result, BundleError> { + let mut file = BufReader::new(try!(File::open(&self.path) + .map_err(|e| BundleError::Read(e, self.path.clone(), "Failed to open bundle file")))); + try!(file.seek(SeekFrom::Start(self.content_start as u64)) + .map_err(|e| BundleError::Read(e, self.path.clone(), "Failed to seek to data"))); + let mut data = Vec::with_capacity(max(self.encoded_size, self.raw_size)+1024); + try!(file.read_to_end(&mut data).map_err(|_| "Failed to read data")); + Ok(data) + } + + #[inline] + fn decode_contents(&self, mut data: Vec) -> Result, BundleError> { + if let Some(ref encryption) = self.encryption { + data = try!(self.crypto.lock().unwrap().decrypt(encryption.clone(), &data)); + } + if let Some(ref compression) = self.compression { + data = try!(compression.decompress(&data)); + } + Ok(data) + } + + #[inline] + pub fn load_contents(&self) -> Result, BundleError> { + self.load_encoded_contents().and_then(|data| self.decode_contents(data)) + } + + #[inline] + pub fn get_chunk_position(&self, id: usize) -> Result<(usize, usize), BundleError> { + if id >= self.chunk_count { + return Err("Invalid chunk id".into()) + } + Ok((self.chunk_positions[id], self.chunk_sizes[id])) + } + + pub fn check(&self, full: bool) -> Result<(), BundleError> { + if self.chunk_count != self.chunk_sizes.len() { + return Err(BundleError::Integrity(self.id.clone(), + "Chunk list size does not match chunk count")) + } + if self.chunk_sizes.iter().sum::() != self.raw_size { + return Err(BundleError::Integrity(self.id.clone(), + "Individual chunk sizes do not add up to total size")) + } + if !full { + let size = try!(fs::metadata(&self.path) + .map_err(|e| BundleError::Read(e, self.path.clone(), "Failed to get size of file")) + ).len(); + if size as usize != self.encoded_size + self.content_start { + return Err(BundleError::Integrity(self.id.clone(), + "File size does not match size in header, truncated file")) + } + return Ok(()) + } + let encoded_contents = try!(self.load_encoded_contents()); + if self.encoded_size != encoded_contents.len() { + return Err(BundleError::Integrity(self.id.clone(), + "Encoded data size does not match size in header, truncated bundle")) + } + let contents = try!(self.decode_contents(encoded_contents)); + if self.raw_size != contents.len() { + return Err(BundleError::Integrity(self.id.clone(), + "Raw data size does not match size in header, truncated bundle")) + } + //TODO: verify checksum + Ok(()) + } +} + +impl Debug for Bundle { + fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(fmt, "Bundle(\n\tid: {}\n\tpath: {:?}\n\tchunks: {}\n\tsize: {}, encoded: {}\n\tcompression: {:?}\n)", + self.id.to_string(), self.path, self.chunk_count, self.raw_size, self.encoded_size, self.compression) + } +} + + + +pub struct BundleWriter { + data: Vec, + compression: Option, + compression_stream: Option, + encryption: Option, + crypto: Arc>, + checksum: ChecksumCreator, + raw_size: usize, + chunk_count: usize, + chunk_sizes: Vec +} + +impl BundleWriter { + fn new(compression: Option, encryption: Option, crypto: Arc>, checksum: ChecksumType) -> Result { + let compression_stream = match compression { + Some(ref compression) => Some(try!(compression.compress_stream())), + None => None + }; + Ok(BundleWriter { + data: vec![], + compression: compression, + compression_stream: compression_stream, + encryption: encryption, + crypto: crypto, + checksum: ChecksumCreator::new(checksum), + raw_size: 0, + chunk_count: 0, + chunk_sizes: vec![] + }) + } + + pub fn add(&mut self, chunk: &[u8]) -> Result { + if let Some(ref mut stream) = self.compression_stream { + try!(stream.process(chunk, &mut self.data)) + } else { + self.data.extend_from_slice(chunk) + } + self.checksum.update(chunk); + self.raw_size += chunk.len(); + self.chunk_count += 1; + self.chunk_sizes.push(chunk.len()); + Ok(self.chunk_count-1) + } + + fn finish(mut self, db: &BundleDb) -> Result { + if let Some(stream) = self.compression_stream { + try!(stream.finish(&mut self.data)) + } + if let Some(ref encryption) = self.encryption { + self.data = try!(self.crypto.lock().unwrap().encrypt(encryption.clone(), &self.data)); + } + let encoded_size = self.data.len(); + let checksum = self.checksum.finish(); + let id = BundleId(checksum.1.to_vec()); + let (folder, file) = db.bundle_path(&id); + let path = folder.join(file); + try!(fs::create_dir_all(&folder) + .map_err(|e| BundleError::Write(e, path.clone(), "Failed to create folder"))); + let mut file = BufWriter::new(try!(File::create(&path) + .map_err(|e| BundleError::Write(e, path.clone(), "Failed to create bundle file")))); + try!(file.write_all(&HEADER_STRING) + .map_err(|e| BundleError::Write(e, path.clone(), "Failed to write bundle header"))); + try!(file.write_all(&[HEADER_VERSION]) + .map_err(|e| BundleError::Write(e, path.clone(), "Failed to write bundle header"))); + let header = BundleHeader { + checksum: checksum, + compression: self.compression, + encryption: self.encryption, + chunk_count: self.chunk_count, + id: id.clone(), + raw_size: self.raw_size, + encoded_size: encoded_size, + chunk_sizes: self.chunk_sizes + }; + { + let mut writer = rmp_serde::Serializer::new(&mut file); + try!(header.serialize(&mut writer) + .map_err(|e| BundleError::Encode(e, path.clone()))); + } + let content_start = file.seek(SeekFrom::Current(0)).unwrap() as usize; + try!(file.write_all(&self.data) + .map_err(|e| BundleError::Write(e, path.clone(), "Failed to write bundle data"))); + Ok(Bundle::new(path, HEADER_VERSION, content_start, self.crypto, header)) + } + + #[inline] + pub fn size(&self) -> usize { + self.data.len() + } +} + + +pub struct BundleDb { + path: PathBuf, + compression: Option, + encryption: Option, + crypto: Arc>, + checksum: ChecksumType, + bundles: HashMap, + bundle_cache: LruCache> +} + + +impl BundleDb { + fn new(path: PathBuf, compression: Option, encryption: Option, checksum: ChecksumType) -> Self { + BundleDb { + path: path, + compression: + compression, + crypto: Arc::new(Mutex::new(Crypto::new())), + encryption: encryption, + checksum: checksum, + bundles: HashMap::new(), + bundle_cache: LruCache::new(5, 10) + } + } + + fn bundle_path(&self, bundle: &BundleId) -> (PathBuf, PathBuf) { + let mut folder = self.path.clone(); + let mut file = bundle.to_string() + ".bundle"; + let mut count = self.bundles.len(); + while count >= 1000 { + if file.len() < 10 { + break + } + folder = folder.join(&file[0..3]); + file = file[3..].to_string(); + count /= 1000; + } + (folder, file.into()) + } + + fn load_bundle_list(&mut self) -> Result<(), BundleError> { + self.bundles.clear(); + let mut paths = Vec::new(); + paths.push(self.path.clone()); + while let Some(path) = paths.pop() { + for entry in try!(fs::read_dir(path).map_err(BundleError::List)) { + let entry = try!(entry.map_err(BundleError::List)); + let path = entry.path(); + if path.is_dir() { + paths.push(path); + } else { + let bundle = try!(Bundle::load(path, self.crypto.clone())); + self.bundles.insert(bundle.id.clone(), bundle); + } + } + } + Ok(()) + } + + #[inline] + pub fn open>(path: P, compression: Option, encryption: Option, checksum: ChecksumType) -> Result { + let path = path.as_ref().to_owned(); + let mut self_ = Self::new(path, compression, encryption, checksum); + try!(self_.load_bundle_list()); + Ok(self_) + } + + #[inline] + pub fn create>(path: P, compression: Option, encryption: Option, checksum: ChecksumType) -> Result { + let path = path.as_ref().to_owned(); + try!(fs::create_dir_all(&path) + .map_err(|e| BundleError::Write(e, path.clone(), "Failed to create folder"))); + Ok(Self::new(path, compression, encryption, checksum)) + } + + #[inline] + pub fn open_or_create>(path: P, compression: Option, encryption: Option, checksum: ChecksumType) -> Result { + if path.as_ref().exists() { + Self::open(path, compression, encryption, checksum) + } else { + Self::create(path, compression, encryption, checksum) + } + } + + #[inline] + pub fn create_bundle(&self) -> Result { + BundleWriter::new(self.compression.clone(), self.encryption.clone(), self.crypto.clone(), self.checksum) + } + + pub fn get_chunk(&mut self, bundle_id: &BundleId, id: usize) -> Result, BundleError> { + let bundle = try!(self.bundles.get(bundle_id).ok_or("Bundle not found")); + let (pos, len) = try!(bundle.get_chunk_position(id)); + let mut chunk = Vec::with_capacity(len); + if let Some(data) = self.bundle_cache.get(bundle_id) { + chunk.extend_from_slice(&data[pos..pos+len]); + return Ok(chunk); + } + let data = try!(bundle.load_contents()); + chunk.extend_from_slice(&data[pos..pos+len]); + self.bundle_cache.put(bundle_id.clone(), data); + Ok(chunk) + } + + #[inline] + pub fn add_bundle(&mut self, bundle: BundleWriter) -> Result<&Bundle, BundleError> { + let bundle = try!(bundle.finish(&self)); + let id = bundle.id.clone(); + self.bundles.insert(id.clone(), bundle); + Ok(self.get_bundle(&id).unwrap()) + } + + #[inline] + pub fn get_bundle(&self, bundle: &BundleId) -> Option<&Bundle> { + self.bundles.get(bundle) + } + + #[inline] + pub fn list_bundles(&self) -> Vec<&Bundle> { + self.bundles.values().collect() + } + + #[inline] + pub fn delete_bundle(&mut self, bundle: &BundleId) -> Result<(), BundleError> { + if let Some(bundle) = self.bundles.remove(bundle) { + fs::remove_file(&bundle.path).map_err(|e| BundleError::Remove(e, bundle.id.clone())) + } else { + Err("No such bundle".into()) + } + } + + #[inline] + pub fn check(&self, full: bool) -> Result<(), BundleError> { + for bundle in self.bundles.values() { + try!(bundle.check(full)) + } + Ok(()) + } +} diff --git a/src/chunker/ae.rs b/src/chunker/ae.rs new file mode 100644 index 0000000..c837556 --- /dev/null +++ b/src/chunker/ae.rs @@ -0,0 +1,70 @@ +use super::*; + +//use std::f64::consts; +use std::ptr; + +// AE Chunker +// Paper: "AE: An Asymmetric Extremum Content Defined Chunking Algorithm for Fast and Bandwidth-Efficient Data Deduplication" + + +pub struct AeChunker { + buffer: [u8; 4096], + buffered: usize, + avg_size: usize, + window_size: usize +} + +impl AeChunker { + pub fn new(avg_size: usize) -> AeChunker { + // Experiments show that this claim from the paper is wrong and results in smaller chunks + //let window_size = (avg_size as f64 / (consts::E - 1.0)) as usize; + let window_size = avg_size - 256; + AeChunker{ + buffer: [0; 4096], + buffered: 0, + window_size: window_size, + avg_size: avg_size + } + } +} + +impl IChunker for AeChunker { + #[inline] + fn get_type(&self) -> ChunkerType { + ChunkerType::Ae(self.avg_size) + } + + #[allow(unknown_lints,explicit_counter_loop)] + fn chunk(&mut self, r: &mut R, mut w: &mut W) -> Result { + let mut max; + let mut pos = 0; + let mut max_pos = 0; + let mut max_val = 0; + loop { + // Fill the buffer, there might be some bytes still in there from last chunk + max = try!(r.read(&mut self.buffer[self.buffered..]).map_err(ChunkerError::Read)) + self.buffered; + // If nothing to do, finish + if max == 0 { + return Ok(ChunkerStatus::Finished) + } + for i in 0..max { + let val = self.buffer[i]; + if val <= max_val { + if pos == max_pos + self.window_size { + // Write all bytes from this chunk out to sink and store rest for next chunk + try!(w.write_all(&self.buffer[..i+1]).map_err(ChunkerError::Write)); + unsafe { ptr::copy(self.buffer[i+1..].as_ptr(), self.buffer.as_mut_ptr(), max-i-1) }; + self.buffered = max-i-1; + return Ok(ChunkerStatus::Continue); + } + } else { + max_val = val; + max_pos = pos; + } + pos += 1; + } + try!(w.write_all(&self.buffer[..max]).map_err(ChunkerError::Write)); + self.buffered = 0; + } + } +} diff --git a/src/chunker/fastcdc.rs b/src/chunker/fastcdc.rs new file mode 100644 index 0000000..9163b32 --- /dev/null +++ b/src/chunker/fastcdc.rs @@ -0,0 +1,120 @@ +use super::*; + +use std::ptr; + +// FastCDC +// Paper: "FastCDC: a Fast and Efficient Content-Defined Chunking Approach for Data Deduplication" +// Paper-URL: https://www.usenix.org/system/files/conference/atc16/atc16-paper-xia.pdf +// Presentation: https://www.usenix.org/sites/default/files/conference/protected-files/atc16_slides_xia.pdf + + +// Creating 256 pseudo-random values (based on Knuth's MMIX) +fn create_gear(seed: u64) -> [u64; 256] { + let mut table = [0u64; 256]; + let a = 6364136223846793005; + let c = 1442695040888963407; + let mut v = seed; + for t in &mut table.iter_mut() { + v = v.wrapping_mul(a).wrapping_add(c); + *t = v; + } + table +} + +fn get_masks(avg_size: usize, nc_level: usize, seed: u64) -> (u64, u64) { + let bits = (avg_size.next_power_of_two() - 1).count_ones(); + if bits == 13 { + // From the paper + return (0x0003590703530000, 0x0000d90003530000); + } + let mut mask = 0u64; + let mut v = seed; + let a = 6364136223846793005; + let c = 1442695040888963407; + while mask.count_ones() < bits - nc_level as u32 { + v = v.wrapping_mul(a).wrapping_add(c); + mask = (mask | 1).rotate_left(v as u32 & 0x3f); + } + let mask_long = mask; + while mask.count_ones() < bits + nc_level as u32 { + v = v.wrapping_mul(a).wrapping_add(c); + mask = (mask | 1).rotate_left(v as u32 & 0x3f); + } + let mask_short = mask; + (mask_short, mask_long) +} + +pub struct FastCdcChunker { + buffer: [u8; 4096], + buffered: usize, + gear: [u64; 256], + min_size: usize, + max_size: usize, + avg_size: usize, + mask_long: u64, + mask_short: u64, + seed: u64 +} + + +impl FastCdcChunker { + pub fn new(avg_size: usize, seed: u64) -> Self { + let (mask_short, mask_long) = get_masks(avg_size, 2, seed); + FastCdcChunker { + buffer: [0; 4096], + buffered: 0, + gear: create_gear(seed), + min_size: avg_size/4, + max_size: avg_size*8, + avg_size: avg_size, + mask_long: mask_long, + mask_short: mask_short, + seed: seed + } + } +} + +impl IChunker for FastCdcChunker { + #[inline] + fn get_type(&self) -> ChunkerType { + ChunkerType::FastCdc((self.avg_size, self.seed)) + } + + + #[allow(unknown_lints,explicit_counter_loop)] + fn chunk(&mut self, r: &mut R, mut w: &mut W) -> Result { + let mut max; + let mut hash = 0u64; + let mut pos = 0; + loop { + // Fill the buffer, there might be some bytes still in there from last chunk + max = try!(r.read(&mut self.buffer[self.buffered..]).map_err(ChunkerError::Read)) + self.buffered; + // If nothing to do, finish + if max == 0 { + return Ok(ChunkerStatus::Finished) + } + for i in 0..max { + if pos >= self.min_size { + // Hash update + hash = (hash << 1).wrapping_add(self.gear[self.buffer[i] as usize]); + // 3 options for break point + // 1) mask_short matches and chunk is smaller than average + // 2) mask_long matches and chunk is longer or equal to average + // 3) chunk reached max_size + if pos < self.avg_size && hash & self.mask_short == 0 + || pos >= self.avg_size && hash & self.mask_long == 0 + || pos >= self.max_size { + // Write all bytes from this chunk out to sink and store rest for next chunk + try!(w.write_all(&self.buffer[..i+1]).map_err(ChunkerError::Write)); + unsafe { ptr::copy(self.buffer[i+1..].as_ptr(), self.buffer.as_mut_ptr(), max-i-1) }; + self.buffered = max-i-1; + return Ok(ChunkerStatus::Continue); + } + } + pos += 1; + } + try!(w.write_all(&self.buffer[..max]).map_err(ChunkerError::Write)); + self.buffered = 0; + } + } +} diff --git a/src/chunker/mod.rs b/src/chunker/mod.rs new file mode 100644 index 0000000..f5bc8f2 --- /dev/null +++ b/src/chunker/mod.rs @@ -0,0 +1,118 @@ +use std::io::{Write, Read}; + +use super::errors::ChunkerError; + +mod ae; +mod rabin; +mod fastcdc; + +pub use self::ae::AeChunker; +pub use self::rabin::RabinChunker; +pub use self::fastcdc::FastCdcChunker; + +// https://moinakg.wordpress.com/2013/06/22/high-performance-content-defined-chunking/ + +// Paper: "A Comprehensive Study of the Past, Present, and Future of Data Deduplication" +// Paper-URL: http://wxia.hustbackup.cn/IEEE-Survey-final.pdf + +// https://borgbackup.readthedocs.io/en/stable/internals.html#chunks +// https://github.com/bup/bup/blob/master/lib/bup/bupsplit.c + +#[derive(Debug, Eq, PartialEq)] +pub enum ChunkerStatus { + Continue, + Finished +} + +pub trait IChunker: Sized { + fn chunk(&mut self, r: &mut R, w: &mut W) -> Result; + fn get_type(&self) -> ChunkerType; +} + +pub enum Chunker { + Ae(Box), + Rabin(Box), + FastCdc(Box) +} + +impl IChunker for Chunker { + #[inline] + fn get_type(&self) -> ChunkerType { + match *self { + Chunker::Ae(ref c) => c.get_type(), + Chunker::Rabin(ref c) => c.get_type(), + Chunker::FastCdc(ref c) => c.get_type() + } + } + + #[inline] + fn chunk(&mut self, r: &mut R, w: &mut W) -> Result { + match *self { + Chunker::Ae(ref mut c) => c.chunk(r, w), + Chunker::Rabin(ref mut c) => c.chunk(r, w), + Chunker::FastCdc(ref mut c) => c.chunk(r, w) + } + } +} + + +#[derive(Debug)] +pub enum ChunkerType { + Ae(usize), + Rabin((usize, u32)), + FastCdc((usize, u64)) +} +serde_impl!(ChunkerType(u64) { + Ae(usize) => 1, + Rabin((usize, u32)) => 2, + FastCdc((usize, u64)) => 3 +}); + + +impl ChunkerType { + #[inline] + pub fn from(name: &str, avg_size: usize, seed: u64) -> Result { + match name { + "ae" => Ok(ChunkerType::Ae(avg_size)), + "rabin" => Ok(ChunkerType::Rabin((avg_size, seed as u32))), + "fastcdc" => Ok(ChunkerType::FastCdc((avg_size, seed))), + _ => Err("Unsupported chunker type") + } + } + + #[inline] + pub fn create(&self) -> Chunker { + match *self { + ChunkerType::Ae(size) => Chunker::Ae(Box::new(AeChunker::new(size))), + ChunkerType::Rabin((size, seed)) => Chunker::Rabin(Box::new(RabinChunker::new(size, seed))), + ChunkerType::FastCdc((size, seed)) => Chunker::FastCdc(Box::new(FastCdcChunker::new(size, seed))) + } + } + + #[inline] + pub fn name(&self) -> &'static str { + match *self { + ChunkerType::Ae(_size) => "ae", + ChunkerType::Rabin((_size, _seed)) => "rabin", + ChunkerType::FastCdc((_size, _seed)) => "fastcdc" + } + } + + #[inline] + pub fn avg_size(&self) -> usize { + match *self { + ChunkerType::Ae(size) => size, + ChunkerType::Rabin((size, _seed)) => size, + ChunkerType::FastCdc((size, _seed)) => size + } + } + + #[inline] + pub fn seed(&self) -> u64 { + match *self { + ChunkerType::Ae(_size) => 0, + ChunkerType::Rabin((_size, seed)) => seed as u64, + ChunkerType::FastCdc((_size, seed)) => seed + } + } +} diff --git a/src/chunker/rabin.rs b/src/chunker/rabin.rs new file mode 100644 index 0000000..d3fa269 --- /dev/null +++ b/src/chunker/rabin.rs @@ -0,0 +1,116 @@ +use std::collections::VecDeque; +use std::ptr; + +use super::*; + +// Rabin Chunker +// Paper: "Fingerprinting by Random Polynomials" +// Paper-URL: http://www.xmailserver.org/rabin.pdf +// Paper: "Redundancy Elimination Within Large Collections of Files" +// Paper-URL: https://www.usenix.org/legacy/event/usenix04/tech/general/full_papers/kulkarni/kulkarni_html/paper.html +// Wikipedia: https://en.wikipedia.org/wiki/Rabin_fingerprint + + +fn wrapping_pow(mut base: u32, mut exp: u32) -> u32 { + let mut acc: u32 = 1; + while exp > 0 { + if exp % 2 == 1 { + acc = acc.wrapping_mul(base) + } + base = base.wrapping_mul(base); + exp /= 2; + } + acc +} + +fn create_table(alpha: u32, window_size: usize) -> [u32; 256] { + let mut table = [0u32; 256]; + let a = wrapping_pow(alpha, window_size as u32); + for i in 0..table.len() as u32 { + table[i as usize] = i.wrapping_mul(a); + } + table +} + + +pub struct RabinChunker { + buffer: [u8; 4096], + buffered: usize, + seed: u32, + alpha: u32, + table: [u32; 256], + min_size: usize, + max_size: usize, + window_size: usize, + chunk_mask: u32, + avg_size: usize +} + + +impl RabinChunker { + pub fn new(avg_size: usize, seed: u32) -> Self { + let chunk_mask = (avg_size as u32).next_power_of_two() - 1; + let window_size = avg_size/4-1; + let alpha = 1664525;//153191; + RabinChunker { + buffer: [0; 4096], + buffered: 0, + table: create_table(alpha, window_size), + alpha: alpha, + seed: seed, + min_size: avg_size/4, + max_size: avg_size*4, + window_size: window_size, + chunk_mask: chunk_mask, + avg_size: avg_size + } + } +} + +impl IChunker for RabinChunker { + #[inline] + fn get_type(&self) -> ChunkerType { + ChunkerType::Rabin((self.avg_size, self.seed)) + } + + #[allow(unknown_lints,explicit_counter_loop)] + fn chunk(&mut self, r: &mut R, mut w: &mut W) -> Result { + let mut max; + let mut hash = 0u32; + let mut pos = 0; + let mut window = VecDeque::with_capacity(self.window_size); + loop { + // Fill the buffer, there might be some bytes still in there from last chunk + max = try!(r.read(&mut self.buffer[self.buffered..]).map_err(ChunkerError::Read)) + self.buffered; + // If nothing to do, finish + if max == 0 { + return Ok(ChunkerStatus::Finished) + } + for i in 0..max { + let val = self.buffer[i]; + if pos >= self.max_size { + try!(w.write_all(&self.buffer[..i+1]).map_err(ChunkerError::Write)); + unsafe { ptr::copy(self.buffer[i+1..].as_ptr(), self.buffer.as_mut_ptr(), max-i-1) }; + self.buffered = max-i-1; + return Ok(ChunkerStatus::Continue); + } + // Hash update + hash = hash.wrapping_mul(self.alpha).wrapping_add(val as u32); + if pos >= self.window_size { + let take = window.pop_front().unwrap(); + hash = hash.wrapping_sub(self.table[take as usize]); + if pos >= self.min_size && ((hash ^ self.seed) & self.chunk_mask) == 0 { + try!(w.write_all(&self.buffer[..i+1]).map_err(ChunkerError::Write)); + unsafe { ptr::copy(self.buffer[i+1..].as_ptr(), self.buffer.as_mut_ptr(), max-i-1) }; + self.buffered = max-i-1; + return Ok(ChunkerStatus::Continue); + } + } + pos += 1; + window.push_back(val); + } + try!(w.write_all(&self.buffer[..max]).map_err(ChunkerError::Write)); + self.buffered = 0; + } + } +} diff --git a/src/errors.rs b/src/errors.rs new file mode 100644 index 0000000..3c20a96 --- /dev/null +++ b/src/errors.rs @@ -0,0 +1,72 @@ +use std::io; +use std::path::PathBuf; + +use rmp_serde::decode::Error as MsgpackDecode; +use rmp_serde::encode::Error as MsgpackEncode; + +use super::bundle::BundleId; + +quick_error!{ + #[derive(Debug)] + pub enum BundleError { + List(err: io::Error) { + cause(err) + description("Failed to list bundles") + } + Read(err: io::Error, path: PathBuf, reason: &'static str) { + cause(err) + description("Failed to read bundle") + display("Failed to read bundle {:?}: {}", path, reason) + } + Decode(err: MsgpackDecode, path: PathBuf) { + cause(err) + description("Failed to decode bundle header") + } + Write(err: io::Error, path: PathBuf, reason: &'static str) { + cause(err) + description("Failed to write bundle") + display("Failed to write bundle {:?}: {}", path, reason) + } + Encode(err: MsgpackEncode, path: PathBuf) { + cause(err) + description("Failed to encode bundle header") + } + Format(path: PathBuf, reason: &'static str) { + description("Failed to decode bundle") + display("Failed to decode bundle {:?}: {}", path, reason) + } + Integrity(bundle: BundleId, reason: &'static str) { + description("Bundle has an integrity error") + display("Bundle {:?} has an integrity error: {}", bundle, reason) + } + Remove(err: io::Error, bundle: BundleId) { + cause(err) + description("Failed to remove bundle") + display("Failed to remove bundle {}", bundle) + } + Custom { + from(&'static str) + description("Custom error") + } + } +} + +quick_error!{ + #[derive(Debug)] + pub enum ChunkerError { + Read(err: io::Error) { + from(err) + cause(err) + description("Failed to read") + } + Write(err: io::Error) { + from(err) + cause(err) + description("Failed to write") + } + Custom { + from(&'static str) + description("Custom error") + } + } +} diff --git a/src/index.rs b/src/index.rs new file mode 100644 index 0000000..5ae4a51 --- /dev/null +++ b/src/index.rs @@ -0,0 +1,495 @@ +use super::util::Hash; + +use std::path::Path; +use std::fs::{File, OpenOptions}; +use std::mem; +use std::io; +use std::slice; +use std::os::unix::io::AsRawFd; + +use mmap::{MemoryMap, MapOption, MapError}; + +const MAGIC: [u8; 7] = *b"zcindex"; +const VERSION: u8 = 1; +pub const MAX_USAGE: f64 = 0.9; +pub const MIN_USAGE: f64 = 0.25; +pub const INITIAL_SIZE: usize = 1024; + +#[repr(packed)] +pub struct Header { + magic: [u8; 7], + version: u8, + entries: u64, + capacity: u64, +} + +#[repr(packed)] +#[derive(Clone, Copy, PartialEq, Debug)] +pub struct Location { + pub bundle: u32, + pub chunk: u32 +} +impl Location { + pub fn new(bundle: u32, chunk: u32) -> Self { + Location{ bundle: bundle, chunk: chunk } + } +} + + +#[repr(packed)] +#[derive(Clone)] +pub struct Entry { + pub key: Hash, + pub data: Location +} + +impl Entry { + #[inline] + fn is_used(&self) -> bool { + self.key.low != 0 || self.key.high != 0 + } + + fn clear(&mut self) { + self.key.low = 0; + self.key.high = 0; + } +} + +pub struct Index { + capacity: usize, + entries: usize, + max_entries: usize, + min_entries: usize, + fd: File, + mmap: MemoryMap, + data: &'static mut [Entry] +} + +#[derive(Debug)] +pub enum Error { + IOError(io::Error), + MapError(MapError), + NoHeader, + MagicError, + VersionError, +} + +#[derive(Debug)] +enum LocateResult { + Found(usize), // Found the key at this position + Hole(usize), // Found a hole at this position while searching for a key + Steal(usize) // Found a spot to steal at this position while searching for a key +} + +impl Index { + pub fn new(path: &Path, create: bool) -> Result { + let fd = try!(OpenOptions::new().read(true).write(true).create(create).open(path).map_err(|e| { Error::IOError(e) })); + if create { + try!(Index::resize_fd(&fd, INITIAL_SIZE)); + } + let mmap = try!(Index::map_fd(&fd)); + if mmap.len() < mem::size_of::
() { + return Err(Error::NoHeader); + } + let data = Index::mmap_as_slice(&mmap, INITIAL_SIZE as usize); + let mut index = Index{capacity: 0, max_entries: 0, min_entries: 0, entries: 0, fd: fd, mmap: mmap, data: data}; + { + let capacity; + let entries; + { + let header = index.header(); + if create { + header.magic = MAGIC; + header.version = VERSION; + header.entries = 0; + header.capacity = INITIAL_SIZE as u64; + } else { + if header.magic != MAGIC { + return Err(Error::MagicError); + } + if header.version != VERSION { + return Err(Error::VersionError); + } + } + capacity = header.capacity; + entries = header.entries; + } + index.data = Index::mmap_as_slice(&index.mmap, capacity as usize); + index.set_capacity(capacity as usize); + index.entries = entries as usize; + } + debug_assert!(index.is_consistent(), "Inconsistent after creation"); + Ok(index) + } + + #[inline] + pub fn open(path: &Path) -> Result { + Index::new(path, false) + } + + #[inline] + pub fn create(path: &Path) -> Result { + Index::new(path, true) + } + + #[inline] + fn map_fd(fd: &File) -> Result { + MemoryMap::new( + try!(fd.metadata().map_err(Error::IOError)).len() as usize, + &[MapOption::MapReadable, + MapOption::MapWritable, + MapOption::MapFd(fd.as_raw_fd()), + MapOption::MapNonStandardFlags(0x0001) //libc::consts::os::posix88::MAP_SHARED + ]).map_err(|e| { Error::MapError(e) }) + } + + #[inline] + fn resize_fd(fd: &File, capacity: usize) -> Result<(), Error> { + fd.set_len((mem::size_of::
() + capacity * mem::size_of::()) as u64).map_err( Error::IOError) + } + + #[inline] + fn mmap_as_slice(mmap: &MemoryMap, len: usize) -> &'static mut [Entry] { + if mmap.len() < mem::size_of::
() + len * mem::size_of::() { + panic!("Memory map too small"); + } + let ptr = unsafe { mmap.data().offset(mem::size_of::
() as isize) as *mut Entry }; + unsafe { slice::from_raw_parts_mut(ptr, len) } + } + + #[inline] + fn header(&mut self) -> &mut Header { + if self.mmap.len() < mem::size_of::
() { + panic!("Failed to read beyond end"); + } + unsafe { &mut *(self.mmap.data() as *mut Header) } + } + + #[inline] + fn set_capacity(&mut self, capacity: usize) { + self.capacity = capacity; + self.min_entries = (capacity as f64 * MIN_USAGE) as usize; + self.max_entries = (capacity as f64 * MAX_USAGE) as usize; + } + + fn reinsert(&mut self, start: usize, end: usize) -> Result<(), Error> { + for pos in start..end { + let key; + let data; + { + let entry = &mut self.data[pos]; + if !entry.is_used() { + continue; + } + key = entry.key; + data = entry.data; + entry.clear(); + } + self.entries -= 1; + try!(self.set(&key, &data)); + } + Ok(()) + } + + fn shrink(&mut self) -> Result { + if self.entries >= self.min_entries || self.capacity <= INITIAL_SIZE { + return Ok(false) + } + let old_capacity = self.capacity; + let new_capacity = self.capacity / 2; + self.set_capacity(new_capacity); + try!(self.reinsert(new_capacity, old_capacity)); + try!(Index::resize_fd(&self.fd, new_capacity)); + self.mmap = try!(Index::map_fd(&self.fd)); + self.data = Index::mmap_as_slice(&self.mmap, new_capacity); + assert_eq!(self.data.len(), self.capacity); + Ok(true) + } + + fn extend(&mut self) -> Result { + if self.entries <= self.max_entries { + return Ok(false) + } + let new_capacity = 2 * self.capacity; + try!(Index::resize_fd(&self.fd, new_capacity)); + self.mmap = try!(Index::map_fd(&self.fd)); + self.data = Index::mmap_as_slice(&self.mmap, new_capacity); + self.set_capacity(new_capacity); + assert_eq!(self.data.len(), self.capacity); + try!(self.reinsert(0, new_capacity)); + Ok(true) + } + + #[allow(dead_code)] + pub fn is_consistent(&self) -> bool { + let mut entries = 0; + for pos in 0..self.capacity { + let entry = &self.data[pos]; + if !entry.is_used() { + continue; + } + entries += 1; + match self.locate(&entry.key) { + LocateResult::Found(p) if p == pos => true, + found => { + println!("Inconsistency found: Key {:?} should be at {} but is at {:?}", entry.key, pos, found); + return false + } + }; + } + if entries != self.entries { + println!("Inconsistency found: Index contains {} entries, should contain {}", entries, self.entries); + return false + } + true + } + + pub fn check(&self) -> Result<(), &'static str> { + //TODO: proper errors instead of string + if self.is_consistent() { + Ok(()) + } else { + Err("Inconsistent") + } + } + + #[inline] + fn increase_count(&mut self) -> Result<(), Error> { + self.entries += 1; + try!(self.extend()); + self.write_header(); + Ok(()) + } + + #[inline] + fn decrease_count(&mut self) -> Result<(), Error> { + self.entries -= 1; + try!(self.shrink()); + self.write_header(); + Ok(()) + } + + #[inline] + fn write_header(&mut self) { + let entries = self.entries; + let capacity = self.capacity; + let header = self.header(); + header.entries = entries as u64; + header.capacity = capacity as u64; + } + + /// Finds the position for this key + /// If the key is in the table, it will be the position of the key, + /// otherwise it will be the position where this key should be inserted + fn locate(&self, key: &Hash) -> LocateResult { + let mut pos = key.hash() as usize % self.capacity; + let mut dist = 0; + loop { + let entry = &self.data[pos]; + if !entry.is_used() { + return LocateResult::Hole(pos); + } + if entry.key == *key { + return LocateResult::Found(pos); + } + let odist = (pos + self.capacity - entry.key.hash() as usize % self.capacity) % self.capacity; + if dist > odist { + return LocateResult::Steal(pos); + } + pos = (pos + 1) % self.capacity ; + dist += 1; + } + } + + /// Shifts all following entries towards the left if they can get closer to their ideal position. + /// The entry at the given position will be lost. + fn backshift(&mut self, start: usize) { + let mut pos = start; + let mut last_pos; + loop { + last_pos = pos; + pos = (pos + 1) % self.capacity; + { + let entry = &self.data[pos]; + if !entry.is_used() { + // we found a hole, stop shifting here + break; + } + if entry.key.hash() as usize % self.capacity == pos { + // we found an entry at the right position, stop shifting here + break; + } + } + self.data[last_pos] = self.data[pos].clone(); + } + self.data[last_pos].clear(); + } + + /// Adds the key, data pair into the table. + /// If the key existed in the table before, it is overwritten and false is returned. + /// Otherwise it will be added to the table and true is returned. + pub fn set(&mut self, key: &Hash, data: &Location) -> Result { + match self.locate(key) { + LocateResult::Found(pos) => { + self.data[pos].data = *data; + Ok(false) + }, + LocateResult::Hole(pos) => { + { + let entry = &mut self.data[pos]; + entry.key = *key; + entry.data = *data; + } + try!(self.increase_count()); + Ok(true) + }, + LocateResult::Steal(pos) => { + let mut stolen_key; + let mut stolen_data; + let mut cur_pos = pos; + { + let entry = &mut self.data[pos]; + stolen_key = entry.key; + stolen_data = entry.data; + entry.key = *key; + entry.data = *data; + } + loop { + cur_pos = (cur_pos + 1) % self.capacity; + let entry = &mut self.data[cur_pos]; + if entry.is_used() { + mem::swap(&mut stolen_key, &mut entry.key); + mem::swap(&mut stolen_data, &mut entry.data); + } else { + entry.key = stolen_key; + entry.data = stolen_data; + break; + } + } + try!(self.increase_count()); + Ok(true) + } + } + } + + #[inline] + pub fn contains(&self, key: &Hash) -> bool { + debug_assert!(self.is_consistent(), "Inconsistent before get"); + match self.locate(key) { + LocateResult::Found(_) => true, + _ => false + } + } + + #[inline] + pub fn get(&self, key: &Hash) -> Option { + debug_assert!(self.is_consistent(), "Inconsistent before get"); + match self.locate(key) { + LocateResult::Found(pos) => Some(self.data[pos].data), + _ => None + } + } + + #[inline] + pub fn modify(&mut self, key: &Hash, mut f: F) -> bool where F: FnMut(&mut Location) { + debug_assert!(self.is_consistent(), "Inconsistent before get"); + match self.locate(key) { + LocateResult::Found(pos) => { + f(&mut self.data[pos].data); + true + }, + _ => false + } + } + + #[inline] + pub fn delete(&mut self, key: &Hash) -> Result { + match self.locate(key) { + LocateResult::Found(pos) => { + self.backshift(pos); + try!(self.decrease_count()); + Ok(true) + }, + _ => Ok(false) + } + } + + pub fn filter(&mut self, mut f: F) -> Result where F: FnMut(&Hash, &Location) -> bool { + //TODO: is it faster to walk in reverse direction? + let mut deleted = 0; + let mut pos = 0; + while pos < self.capacity { + { + let entry = &mut self.data[pos]; + if !entry.is_used() || f(&entry.key, &entry.data) { + pos += 1; + continue; + } + } + self.backshift(pos); + deleted += 1; + } + self.entries -= deleted; + while try!(self.shrink()) {} + self.write_header(); + Ok(deleted) + } + + #[inline] + pub fn walk(&self, mut f: F) where F: FnMut(&Hash, &Location) { + for pos in 0..self.capacity { + let entry = &self.data[pos]; + if entry.is_used() { + f(&entry.key, &entry.data); + } + } + } + + #[inline] + pub fn walk_mut(&mut self, mut f: F) where F: FnMut(&Hash, &mut Location) { + for pos in 0..self.capacity { + let entry = &mut self.data[pos]; + if entry.is_used() { + f(&entry.key, &mut entry.data); + } + } + } + + #[inline] + pub fn next_entry(&self, index: usize) -> Option { + let mut i = index; + while i < self.capacity && !self.data[i].is_used() { + i += 1; + } + if i == self.capacity { + None + } else { + Some(i) + } + } + + #[inline] + pub fn get_entry(&self, index: usize) -> Option<&Entry> { + let entry = &self.data[index]; + if entry.is_used() { + Some(entry) + } else { + None + } + } + + #[inline] + pub fn len(&self) -> usize { + self.entries + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.entries == 0 + } + + #[inline] + pub fn capacity(&self) -> usize { + self.capacity + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..d9c8f58 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,79 @@ +extern crate serde; +extern crate rmp_serde; +#[macro_use] extern crate serde_utils; +extern crate crypto; +extern crate squash_sys as squash; +extern crate mmap; +extern crate blake2_rfc as blake2; +extern crate murmurhash3; +extern crate serde_yaml; +#[macro_use] extern crate quick_error; + +mod errors; +mod util; +mod bundle; +mod index; +mod chunker; +mod repository; +mod algotest; + +use chunker::ChunkerType; +use repository::{Repository, Config, Mode}; +use util::{ChecksumType, Compression, HashMethod}; + +use std::path::Path; +use std::fs::File; +use std::env; +use std::io::Read; +use std::time; + + +fn main() { + let path: &Path = "test_data".as_ref(); + let mut repo = if path.exists() { + Repository::open(path).unwrap() + } else { + Repository::create(path, Config { + bundle_size: 1024*1024, + checksum: ChecksumType::Sha3_256, + chunker: ChunkerType::FastCdc((8*1024, 0)), + compression: Some(Compression::Brotli(5)), + hash: HashMethod::Blake2 + }).unwrap() + }; + print!("Integrity check before..."); + repo.check(true).unwrap(); + println!(" done."); + + let file_path = env::args().nth(1).expect("Need file as argument"); + print!("Reading file {}...", file_path); + let mut data = Vec::new(); + let mut file = File::open(file_path).unwrap(); + file.read_to_end(&mut data).unwrap(); + println!(" done. {} bytes", data.len()); + + print!("Adding data to repository..."); + let start = time::Instant::now(); + let chunks = repo.put_data(Mode::Content, &data).unwrap(); + repo.flush().unwrap(); + let elapsed = start.elapsed(); + let duration = elapsed.as_secs() as f64 * 1.0 + elapsed.subsec_nanos() as f64 / 1_000_000_000.0; + let write_speed = data.len() as f64 / duration; + println!(" done. {} chunks, {:.1} MB/s", chunks.len(), write_speed / 1_000_000.0); + + println!("Integrity check after..."); + repo.check(true).unwrap(); + println!(" done."); + + print!("Reading data from repository..."); + let start = time::Instant::now(); + let data2 = repo.get_data(&chunks).unwrap(); + let elapsed = start.elapsed(); + let duration = elapsed.as_secs() as f64 * 1.0 + elapsed.subsec_nanos() as f64 / 1_000_000_000.0; + let read_speed = data.len() as f64 / duration; + assert_eq!(data.len(), data2.len()); + println!(" done. {:.1} MB/s", read_speed / 1_000_000.0); + + //algotest::run("test.tar"); + +} diff --git a/src/repository/basic_io.rs b/src/repository/basic_io.rs new file mode 100644 index 0000000..40ac38b --- /dev/null +++ b/src/repository/basic_io.rs @@ -0,0 +1,124 @@ +use std::mem; +use std::io::{Read, Write, Cursor}; + +use super::{Repository, Mode}; +use super::bundle_map::BundleInfo; +use ::index::Location; + +use ::util::Hash; +use ::chunker::{IChunker, ChunkerStatus}; + + +impl Repository { + pub fn get_chunk(&mut self, hash: Hash) -> Result>, &'static str> { + // Find bundle and chunk id in index + let found = if let Some(found) = self.index.get(&hash) { + found + } else { + return Ok(None) + }; + // Lookup bundle id from map + let bundle_id = if let Some(bundle_info) = self.bundle_map.get(found.bundle) { + bundle_info.id.clone() + } else { + return Err("Bundle id not found in map") + }; + // Get chunk from bundle + if let Ok(chunk) = self.bundles.get_chunk(&bundle_id, found.chunk as usize) { + Ok(Some(chunk)) + } else { + Err("Failed to load chunk from bundle") + } + } + + pub fn put_chunk(&mut self, mode: Mode, hash: Hash, data: &[u8]) -> Result<(), &'static str> { + // If this chunk is in the index, ignore it + if self.index.contains(&hash) { + return Ok(()) + } + // Calculate the next free bundle id now (late lifetime prevents this) + let next_free_bundle_id = self.next_free_bundle_id(); + // Select a bundle writer according to the mode and... + let writer = match mode { + Mode::Content => &mut self.content_bundle, + Mode::Meta => &mut self.meta_bundle + }; + // ...alocate one if needed + if writer.is_none() { + *writer = Some(try!(self.bundles.create_bundle().map_err(|_| "Failed to create new bundle"))); + } + debug_assert!(writer.is_some()); + let chunk_id; + let size; + { + // Add chunk to bundle writer and determine the size of the bundle + let writer_obj = writer.as_mut().unwrap(); + chunk_id = try!(writer_obj.add(data).map_err(|_| "Failed to write chunk")); + size = writer_obj.size(); + } + let bundle_id = match mode { + Mode::Content => self.next_content_bundle, + Mode::Meta => self.next_meta_bundle + }; + // Finish bundle if over maximum size + if size >= self.config.bundle_size { + let mut finished = None; + mem::swap(writer, &mut finished); + let bundle = try!(self.bundles.add_bundle(finished.unwrap()).map_err(|_| "Failed to write finished bundle")); + let bundle_info = BundleInfo{id: bundle.id.clone()}; + self.bundle_map.set(bundle_id, bundle_info); + if self.next_meta_bundle == bundle_id { + self.next_meta_bundle = next_free_bundle_id + } + if self.next_content_bundle == bundle_id { + self.next_content_bundle = next_free_bundle_id + } + // Not saving the bundle map, this will be done by flush + } + // Add location to the index + try!(self.index.set(&hash, &Location::new(bundle_id, chunk_id as u32)).map_err(|_| "Failed to add chunk location to index")); + Ok(()) + } + + #[inline] + pub fn put_data(&mut self, mode: Mode, data: &[u8]) -> Result, &'static str> { + let mut input = Cursor::new(data); + self.put_stream(mode, &mut input) + } + + pub fn put_stream(&mut self, mode: Mode, data: &mut R) -> Result, &'static str> { + let avg_size = self.config.chunker.avg_size(); + let mut chunks = Vec::new(); + let mut chunk = Vec::with_capacity(avg_size * 2); + loop { + chunk.clear(); + let mut output = Cursor::new(chunk); + let res = try!(self.chunker.chunk(data, &mut output).map_err(|_| "Failed to chunk")); + chunk = output.into_inner(); + let hash = self.config.hash.hash(&chunk); + try!(self.put_chunk(mode, hash, &chunk).map_err(|_| "Failed to store chunk")); + chunks.push((hash, chunk.len())); + if res == ChunkerStatus::Finished { + break + } + } + Ok(chunks) + } + + #[inline] + pub fn get_data(&mut self, chunks: &[(Hash, usize)]) -> Result, &'static str> { + let mut data = Vec::with_capacity(chunks.iter().map(|&(_, size)| size).sum()); + try!(self.get_stream(chunks, &mut data)); + Ok(data) + } + + #[inline] + pub fn get_stream(&mut self, chunks: &[(Hash, usize)], w: &mut W) -> Result<(), &'static str> { + for &(ref hash, len) in chunks { + let data = try!(try!(self.get_chunk(*hash).map_err(|_| "Failed to load chunk")).ok_or("Chunk missing")); + debug_assert_eq!(data.len(), len); + try!(w.write_all(&data).map_err(|_| "Failed to write to sink")); + } + Ok(()) + } +} diff --git a/src/repository/bundle_map.rs b/src/repository/bundle_map.rs new file mode 100644 index 0000000..afd103c --- /dev/null +++ b/src/repository/bundle_map.rs @@ -0,0 +1,74 @@ +use std::collections::HashMap; +use std::path::Path; +use std::io::{BufReader, Read, Write, BufWriter}; +use std::fs::File; + +use rmp_serde; +use serde::Deserialize; +use serde::Serialize; + +use ::bundle::BundleId; + + +static HEADER_STRING: [u8; 7] = *b"zbunmap"; +static HEADER_VERSION: u8 = 1; + + +#[derive(Default)] +pub struct BundleInfo { + pub id: BundleId +} +serde_impl!(BundleInfo(u64) { + id: BundleId => 0 +}); + + +pub struct BundleMap(HashMap); + +impl BundleMap { + pub fn create() -> Self { + BundleMap(Default::default()) + } + + pub fn load>(path: P) -> Result { + let mut file = BufReader::new(try!(File::open(path.as_ref()) + .map_err(|_| "Failed to open bundle map file"))); + let mut header = [0u8; 8]; + try!(file.read_exact(&mut header) + .map_err(|_| "Failed to read bundle map header")); + if header[..HEADER_STRING.len()] != HEADER_STRING { + return Err("Wrong header string") + } + let version = header[HEADER_STRING.len()]; + if version != HEADER_VERSION { + return Err("Unsupported bundle map file version") + } + let mut reader = rmp_serde::Deserializer::new(file); + let map = try!(HashMap::deserialize(&mut reader) + .map_err(|_| "Failed to read bundle map data")); + Ok(BundleMap(map)) + } + + + pub fn save>(&self, path: P) -> Result<(), &'static str> { + let mut file = BufWriter::new(try!(File::create(path) + .map_err(|_| "Failed to create bundle file"))); + try!(file.write_all(&HEADER_STRING) + .map_err(|_| "Failed to write bundle header")); + try!(file.write_all(&[HEADER_VERSION]) + .map_err(|_| "Failed to write bundle header")); + let mut writer = rmp_serde::Serializer::new(&mut file); + self.0.serialize(&mut writer) + .map_err(|_| "Failed to write bundle map data") + } + + #[inline] + pub fn get(&self, id: u32) -> Option<&BundleInfo> { + self.0.get(&id) + } + + #[inline] + pub fn set(&mut self, id: u32, info: BundleInfo) { + self.0.insert(id, info); + } +} diff --git a/src/repository/config.rs b/src/repository/config.rs new file mode 100644 index 0000000..63ae7ce --- /dev/null +++ b/src/repository/config.rs @@ -0,0 +1,182 @@ +use serde_yaml; + +use std::fs::File; +use std::path::Path; + +use ::util::*; +use ::chunker::ChunkerType; + + +impl HashMethod { + fn from_yaml(yaml: String) -> Result { + HashMethod::from(&yaml) + } + + fn to_yaml(&self) -> String { + self.name().to_string() + } +} + + + +impl ChecksumType { + fn from_yaml(yaml: String) -> Result { + ChecksumType::from(&yaml) + } + + fn to_yaml(&self) -> String { + self.name().to_string() + } +} + + + +struct ChunkerYaml { + method: String, + avg_size: usize, + seed: u64 +} +impl Default for ChunkerYaml { + fn default() -> Self { + ChunkerYaml { + method: "fastcdc".to_string(), + avg_size: 16*1024, + seed: 0 + } + } +} +serde_impl!(ChunkerYaml(String) { + method: String => "method", + avg_size: usize => "avg_size", + seed: u64 => "seed" +}); + +impl ChunkerType { + fn from_yaml(yaml: ChunkerYaml) -> Result { + ChunkerType::from(&yaml.method, yaml.avg_size, yaml.seed) + } + + fn to_yaml(&self) -> ChunkerYaml { + ChunkerYaml { + method: self.name().to_string(), + avg_size: self.avg_size(), + seed: self.seed() + } + } +} + + + +struct CompressionYaml { + codec: String, + level: Option +} +impl Default for CompressionYaml { + fn default() -> Self { + CompressionYaml { + codec: "brotli".to_string(), + level: None + } + } +} +serde_impl!(CompressionYaml(String) { + codec: String => "codec", + level: Option => "level" +}); + +impl Compression { + fn from_yaml(yaml: CompressionYaml) -> Result { + match &yaml.codec as &str { + "snappy" => Ok(Compression::Snappy(())), + "zstd" => Ok(Compression::ZStd(yaml.level.unwrap_or(5))), + "deflate" | "zlib" | "gzip" => Ok(Compression::Deflate(yaml.level.unwrap_or(5))), + "brotli" => Ok(Compression::Brotli(yaml.level.unwrap_or(5))), + "lzma2" => Ok(Compression::Lzma2(yaml.level.unwrap_or(5))), + _ => Err("Unsupported codec") + } + } + + fn to_yaml(&self) -> CompressionYaml { + CompressionYaml { + codec: self.name().to_string(), + level: self.level() + } + } +} + + + +struct ConfigYaml { + compression: Option, + bundle_size: usize, + chunker: ChunkerYaml, + checksum: String, + hash: String, +} +impl Default for ConfigYaml { + fn default() -> Self { + ConfigYaml { + compression: Some(CompressionYaml { codec: "brotli".to_string(), level: Some(5) }), + bundle_size: 25*1024*1024, + chunker: ChunkerYaml::default(), + checksum: "sha3-256".to_string(), + hash: "blake2".to_string() + } + } +} +serde_impl!(ConfigYaml(String) { + compression: Option => "compression", + bundle_size: usize => "bundle_size", + chunker: ChunkerYaml => "chunker", + checksum: String => "checksum", + hash: String => "hash" +}); + + + +#[derive(Debug)] +pub struct Config { + pub compression: Option, + pub bundle_size: usize, + pub chunker: ChunkerType, + pub checksum: ChecksumType, + pub hash: HashMethod +} +impl Config { + fn from_yaml(yaml: ConfigYaml) -> Result { + let compression = if let Some(c) = yaml.compression { + Some(try!(Compression::from_yaml(c))) + } else { + None + }; + Ok(Config{ + compression: compression, + bundle_size: yaml.bundle_size, + chunker: try!(ChunkerType::from_yaml(yaml.chunker)), + checksum: try!(ChecksumType::from_yaml(yaml.checksum)), + hash: try!(HashMethod::from_yaml(yaml.hash)) + }) + } + + fn to_yaml(&self) -> ConfigYaml { + ConfigYaml { + compression: self.compression.as_ref().map(|c| c.to_yaml()), + bundle_size: self.bundle_size, + chunker: self.chunker.to_yaml(), + checksum: self.checksum.to_yaml(), + hash: self.hash.to_yaml() + } + } + + pub fn load>(path: P) -> Result { + let f = try!(File::open(path).map_err(|_| "Failed to open config")); + let config = try!(serde_yaml::from_reader(f).map_err(|_| "Failed to parse config")); + Config::from_yaml(config) + } + + pub fn save>(&self, path: P) -> Result<(), &'static str> { + let mut f = try!(File::create(path).map_err(|_| "Failed to open config")); + try!(serde_yaml::to_writer(&mut f, &self.to_yaml()).map_err(|_| "Failed to wrtie config")); + Ok(()) + } +} diff --git a/src/repository/integrity.rs b/src/repository/integrity.rs new file mode 100644 index 0000000..668c3b0 --- /dev/null +++ b/src/repository/integrity.rs @@ -0,0 +1,61 @@ +use super::Repository; + +use ::util::Hash; + + +impl Repository { + fn check_chunk(&self, hash: Hash) -> Result<(), &'static str> { + // Find bundle and chunk id in index + let found = if let Some(found) = self.index.get(&hash) { + found + } else { + return Err("Chunk not in index"); + }; + // Lookup bundle id from map + let bundle_id = if let Some(bundle_info) = self.bundle_map.get(found.bundle) { + bundle_info.id.clone() + } else { + return Err("Bundle id not found in map") + }; + // Get bundle object from bundledb + let bundle = if let Some(bundle) = self.bundles.get_bundle(&bundle_id) { + bundle + } else { + return Err("Bundle not found in bundledb") + }; + // Get chunk from bundle + if bundle.chunk_count > found.chunk as usize { + Ok(()) + } else { + Err("Bundle does not contain that chunk") + } + //TODO: check that contents match their hash + } + + pub fn check(&mut self, full: bool) -> Result<(), &'static str> { + try!(self.flush()); + try!(self.bundles.check(full).map_err(|_| "Bundles inconsistent")); + try!(self.index.check().map_err(|_| "Index inconsistent")); + let mut pos = 0; + loop { + pos = if let Some(pos) = self.index.next_entry(pos) { + pos + } else { + break + }; + let entry = self.index.get_entry(pos).unwrap(); + try!(self.check_chunk(entry.key)); + pos += 1; + } + if self.next_content_bundle == self.next_meta_bundle { + return Err("Next bundle ids for meta and content as the same") + } + if self.bundle_map.get(self.next_content_bundle).is_some() { + return Err("Bundle map already contains next bundle bundle id") + } + if self.bundle_map.get(self.next_meta_bundle).is_some() { + return Err("Bundle map already contains next meta bundle id") + } + Ok(()) + } +} diff --git a/src/repository/mod.rs b/src/repository/mod.rs new file mode 100644 index 0000000..8f50284 --- /dev/null +++ b/src/repository/mod.rs @@ -0,0 +1,138 @@ +mod config; +mod bundle_map; +mod integrity; +mod basic_io; + +use std::mem; +use std::cmp::max; +use std::path::{PathBuf, Path}; +use std::fs; + +use super::index::Index; +use super::bundle::{BundleDb, BundleWriter}; +use super::chunker::Chunker; + +pub use self::config::Config; +use self::bundle_map::{BundleMap, BundleInfo}; + + +#[derive(Eq, Debug, PartialEq, Clone, Copy)] +pub enum Mode { + Content, Meta +} + +pub struct Repository { + path: PathBuf, + config: Config, + index: Index, + bundle_map: BundleMap, + next_content_bundle: u32, + next_meta_bundle: u32, + bundles: BundleDb, + content_bundle: Option, + meta_bundle: Option, + chunker: Chunker +} + + +impl Repository { + pub fn create>(path: P, config: Config) -> Result { + let path = path.as_ref().to_owned(); + try!(fs::create_dir(&path).map_err(|_| "Failed to create repository directory")); + let bundles = try!(BundleDb::create( + path.join("bundles"), + config.compression.clone(), + None, //FIXME: store encryption in config + config.checksum + ).map_err(|_| "Failed to create bundle db")); + let index = try!(Index::create(&path.join("index")).map_err(|_| "Failed to create index")); + try!(config.save(path.join("config.yaml")).map_err(|_| "Failed to save config")); + let bundle_map = BundleMap::create(); + try!(bundle_map.save(path.join("bundles.map")).map_err(|_| "Failed to save bundle map")); + Ok(Repository{ + path: path, + chunker: config.chunker.create(), + config: config, + index: index, + bundle_map: bundle_map, + next_content_bundle: 1, + next_meta_bundle: 0, + bundles: bundles, + content_bundle: None, + meta_bundle: None, + }) + } + + pub fn open>(path: P) -> Result { + let path = path.as_ref().to_owned(); + let config = try!(Config::load(path.join("config.yaml")).map_err(|_| "Failed to load config")); + let bundles = try!(BundleDb::open( + path.join("bundles"), + config.compression.clone(), + None, //FIXME: load encryption from config + config.checksum + ).map_err(|_| "Failed to open bundle db")); + let index = try!(Index::open(&path.join("index")).map_err(|_| "Failed to open index")); + let bundle_map = try!(BundleMap::load(path.join("bundles.map")).map_err(|_| "Failed to load bundle map")); + let mut repo = Repository { + path: path, + chunker: config.chunker.create(), + config: config, + index: index, + bundle_map: bundle_map, + next_content_bundle: 0, + next_meta_bundle: 0, + bundles: bundles, + content_bundle: None, + meta_bundle: None, + }; + repo.next_meta_bundle = repo.next_free_bundle_id(); + repo.next_content_bundle = repo.next_free_bundle_id(); + Ok(repo) + } + + #[inline] + fn save_bundle_map(&self) -> Result<(), &'static str> { + self.bundle_map.save(self.path.join("bundles.map")) + } + + #[inline] + fn next_free_bundle_id(&self) -> u32 { + let mut id = max(self.next_content_bundle, self.next_meta_bundle) + 1; + while self.bundle_map.get(id).is_some() { + id += 1; + } + id + } + + pub fn flush(&mut self) -> Result<(), &'static str> { + if self.content_bundle.is_some() { + let mut finished = None; + mem::swap(&mut self.content_bundle, &mut finished); + { + let bundle = try!(self.bundles.add_bundle(finished.unwrap()).map_err(|_| "Failed to write finished bundle")); + let bundle_info = BundleInfo{id: bundle.id.clone()}; + self.bundle_map.set(self.next_content_bundle, bundle_info); + } + self.next_content_bundle = self.next_free_bundle_id() + } + if self.meta_bundle.is_some() { + let mut finished = None; + mem::swap(&mut self.meta_bundle, &mut finished); + { + let bundle = try!(self.bundles.add_bundle(finished.unwrap()).map_err(|_| "Failed to write finished bundle")); + let bundle_info = BundleInfo{id: bundle.id.clone()}; + self.bundle_map.set(self.next_meta_bundle, bundle_info); + } + self.next_meta_bundle = self.next_free_bundle_id() + } + try!(self.save_bundle_map().map_err(|_| "Failed to save bundle map")); + Ok(()) + } +} + +impl Drop for Repository { + fn drop(&mut self) { + self.flush().expect("Failed to write last bundles") + } +} diff --git a/src/util/checksum.rs b/src/util/checksum.rs new file mode 100644 index 0000000..a96529e --- /dev/null +++ b/src/util/checksum.rs @@ -0,0 +1,66 @@ +use serde::bytes::ByteBuf; +use crypto::sha3; +use crypto::digest::Digest; + + +#[derive(Clone, Debug, Copy)] +#[allow(non_camel_case_types)] +pub enum ChecksumType { + Sha3_256 +} +serde_impl!(ChecksumType(u64) { + Sha3_256 => 1 +}); + +impl ChecksumType { + #[inline] + pub fn from(name: &str) -> Result { + match name { + "sha3-256" => Ok(ChecksumType::Sha3_256), + _ => Err("Unsupported checksum type") + } + } + + #[inline] + pub fn name(&self) -> &'static str { + match *self { + ChecksumType::Sha3_256 => "sha3-256", + } + } +} + + +pub type Checksum = (ChecksumType, ByteBuf); + +#[allow(non_camel_case_types, unknown_lints, large_enum_variant)] +pub enum ChecksumCreator { + Sha3_256(sha3::Sha3) +} + +impl ChecksumCreator { + #[inline] + pub fn new(type_: ChecksumType) -> Self { + match type_ { + ChecksumType::Sha3_256 => ChecksumCreator::Sha3_256(sha3::Sha3::sha3_256()) + } + } + + #[inline] + pub fn update(&mut self, data: &[u8]) { + match *self { + ChecksumCreator::Sha3_256(ref mut state) => state.input(data) + } + } + + #[inline] + pub fn finish(self) -> Checksum { + match self { + ChecksumCreator::Sha3_256(mut state) => { + let mut buf = Vec::with_capacity(state.output_bytes()); + buf.resize(state.output_bytes(), 0); + state.result(&mut buf); + (ChecksumType::Sha3_256, buf.into()) + } + } + } +} diff --git a/src/util/compression.rs b/src/util/compression.rs new file mode 100644 index 0000000..966a444 --- /dev/null +++ b/src/util/compression.rs @@ -0,0 +1,216 @@ +use std::ptr; +use std::ffi::{CStr, CString}; +use std::io::Write; + +use squash::*; + + +#[derive(Clone, Debug)] +pub enum Compression { + Snappy(()), + Deflate(u8), + Brotli(u8), + Lzma2(u8), + ZStd(u8) +} +serde_impl!(Compression(u64) { + Snappy(()) => 0, + Deflate(u8) => 1, + Brotli(u8) => 2, + Lzma2(u8) => 3, + ZStd(u8) => 4 +}); + + +impl Compression { + #[inline] + pub fn name(&self) -> &'static str { + match *self { + Compression::Snappy(_) => "snappy", + Compression::Deflate(_) => "deflate", + Compression::Brotli(_) => "brotli", + Compression::Lzma2(_) => "lzma2", + Compression::ZStd(_) => "zstd", + } + } + + #[inline] + fn codec(&self) -> Result<*mut SquashCodec, &'static str> { + let name = CString::new(self.name().as_bytes()).unwrap(); + let codec = unsafe { squash_get_codec(name.as_ptr()) }; + if codec.is_null() { + return Err("Unsupported algorithm") + } + Ok(codec) + } + + #[inline] + pub fn level(&self) -> Option { + match *self { + Compression::Snappy(_) => None, + Compression::Deflate(lvl) | + Compression::Brotli(lvl) | + Compression::ZStd(lvl) | + Compression::Lzma2(lvl) => Some(lvl), + } + } + + fn options(&self) -> Result<*mut SquashOptions, &'static str> { + let codec = try!(self.codec()); + let options = unsafe { squash_options_new(codec, ptr::null::<()>()) }; + if let Some(level) = self.level() { + if options.is_null() { + return Err("Algorithm does not support a level") + } + let option = CString::new("level"); + let value = CString::new(format!("{}", level)); + let res = unsafe { squash_options_parse_option( + options, + option.unwrap().as_ptr(), + value.unwrap().as_ptr() + )}; + if res != SQUASH_OK { + //panic!(unsafe { CStr::from_ptr(squash_status_to_string(res)).to_str().unwrap() }); + return Err("Failed to set compression level") + } + } + Ok(options) + } + + #[inline] + fn error(code: SquashStatus) -> &'static str { + unsafe { CStr::from_ptr(squash_status_to_string(code)).to_str().unwrap() } + } + + pub fn compress(&self, data: &[u8]) -> Result, &'static str> { + let codec = try!(self.codec()); + let options = try!(self.options()); + let mut size = data.len() * 2 + 500; + // The following does not work for all codecs + /*unsafe { squash_codec_get_max_compressed_size( + codec, + data.len() as usize + )};*/ + let mut buf = Vec::with_capacity(size as usize); + let res = unsafe { squash_codec_compress_with_options( + codec, + &mut size, + buf.as_mut_ptr(), + data.len(), + data.as_ptr(), + options) + }; + if res != SQUASH_OK { + println!("{:?}", data); + println!("{}, {}", data.len(), size); + return Err(Self::error(res)) + } + unsafe { buf.set_len(size) }; + Ok(buf) + } + + pub fn decompress(&self, data: &[u8]) -> Result, &'static str> { + let codec = try!(self.codec()); + let mut size = unsafe { squash_codec_get_uncompressed_size( + codec, + data.len(), + data.as_ptr() + )}; + if size == 0 { + size = 100 * data.len(); + } + let mut buf = Vec::with_capacity(size); + let res = unsafe { squash_codec_decompress( + codec, + &mut size, + buf.as_mut_ptr(), + data.len(), + data.as_ptr(), + ptr::null_mut::<()>()) + }; + if res != SQUASH_OK { + return Err(Self::error(res)) + } + unsafe { buf.set_len(size) }; + Ok(buf) + } + + #[inline] + pub fn compress_stream(&self) -> Result { + let codec = try!(self.codec()); + let options = try!(self.options()); + let stream = unsafe { squash_stream_new_with_options( + codec, SQUASH_STREAM_COMPRESS, options + ) }; + if stream.is_null() { + return Err("Failed to create stream"); + } + Ok(CompressionStream::new(unsafe { Box::from_raw(stream) })) + } + + #[inline] + pub fn decompress_stream(&self) -> Result { + let codec = try!(self.codec()); + let stream = unsafe { squash_stream_new( + codec, SQUASH_STREAM_DECOMPRESS, ptr::null::<()>() + ) }; + if stream.is_null() { + return Err("Failed to create stream"); + } + Ok(CompressionStream::new(unsafe { Box::from_raw(stream) })) + } +} + + +pub struct CompressionStream { + stream: Box, + buffer: [u8; 16*1024] +} + +impl CompressionStream { + #[inline] + fn new(stream: Box) -> Self { + CompressionStream { + stream: stream, + buffer: [0; 16*1024] + } + } + + pub fn process(&mut self, input: &[u8], output: &mut W) -> Result<(), &'static str> { + let mut stream = &mut *self.stream; + stream.next_in = input.as_ptr(); + stream.avail_in = input.len(); + loop { + stream.next_out = self.buffer.as_mut_ptr(); + stream.avail_out = self.buffer.len(); + let res = unsafe { squash_stream_process(stream) }; + if res < 0 { + return Err(Compression::error(res)) + } + let output_size = self.buffer.len() - stream.avail_out; + try!(output.write_all(&self.buffer[..output_size]).map_err(|_| "Failed to write to output")); + if res != SQUASH_PROCESSING { + break + } + } + Ok(()) + } + + pub fn finish(mut self, output: &mut W) -> Result<(), &'static str> { + let mut stream = &mut *self.stream; + loop { + stream.next_out = self.buffer.as_mut_ptr(); + stream.avail_out = self.buffer.len(); + let res = unsafe { squash_stream_finish(stream) }; + if res < 0 { + return Err(Compression::error(res)) + } + let output_size = self.buffer.len() - stream.avail_out; + try!(output.write_all(&self.buffer[..output_size]).map_err(|_| "Failed to write to output")); + if res != SQUASH_PROCESSING { + break + } + } + Ok(()) + } +} diff --git a/src/util/encryption.rs b/src/util/encryption.rs new file mode 100644 index 0000000..06a5084 --- /dev/null +++ b/src/util/encryption.rs @@ -0,0 +1,54 @@ +use std::collections::HashMap; + +#[derive(Clone)] +pub enum EncryptionMethod { + Dummy +} +serde_impl!(EncryptionMethod(u64) { + Dummy => 0 +}); + +pub type EncryptionKey = Vec; + +pub type EncryptionKeyId = u64; + +pub type Encryption = (EncryptionMethod, EncryptionKeyId); + +#[derive(Clone)] +pub struct Crypto { + keys: HashMap +} + +impl Crypto { + #[inline] + pub fn new() -> Self { + Crypto { keys: Default::default() } + } + + #[inline] + pub fn register_key(&mut self, key: EncryptionKey, id: EncryptionKeyId) { + self.keys.insert(id, key); + } + + #[inline] + pub fn contains_key(&mut self, id: EncryptionKeyId) -> bool { + self.keys.contains_key(&id) + } + + #[inline] + pub fn encrypt(&self, _enc: Encryption, _data: &[u8]) -> Result, &'static str> { + unimplemented!() + } + + #[inline] + pub fn decrypt(&self, _enc: Encryption, _data: &[u8]) -> Result, &'static str> { + unimplemented!() + } +} + +impl Default for Crypto { + #[inline] + fn default() -> Self { + Crypto::new() + } +} diff --git a/src/util/hash.rs b/src/util/hash.rs new file mode 100644 index 0000000..64fb41e --- /dev/null +++ b/src/util/hash.rs @@ -0,0 +1,104 @@ +use serde::{self, Serialize, Deserialize}; +use serde::de::Error; +use serde::bytes::{ByteBuf, Bytes}; + +use murmurhash3::murmurhash3_x64_128; +use blake2::blake2b::blake2b; + +use std::mem; +use std::fmt; +use std::u64; + + + +#[repr(packed)] +#[derive(Clone, Copy, PartialEq, Debug, Hash, Eq)] +pub struct Hash { + pub high: u64, + pub low: u64 +} + +impl Hash { + #[inline] + pub fn hash(&self) -> u64 { + self.low + } + + #[inline] + pub fn empty() -> Self { + Hash{high: 0, low: 0} + } +} + +impl fmt::Display for Hash { + #[inline] + fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(fmt, "{:16x}{:16x}", self.high, self.low) + } +} + +impl Serialize for Hash { + fn serialize(&self, serializer: S) -> Result where S: serde::Serializer { + let hash = Hash{high: u64::to_le(self.high), low: u64::to_le(self.low)}; + let dat: [u8; 16] = unsafe { mem::transmute(hash) }; + Bytes::from(&dat as &[u8]).serialize(serializer) + } +} + +impl Deserialize for Hash { + fn deserialize(deserializer: D) -> Result where D: serde::Deserializer { + let dat: Vec = try!(ByteBuf::deserialize(deserializer)).into(); + if dat.len() != 16 { + return Err(D::Error::custom("Invalid key length")); + } + let hash = unsafe { &*(dat.as_ptr() as *const Hash) }; + Ok(Hash{high: u64::from_le(hash.high), low: u64::from_le(hash.low)}) + } +} + + +#[derive(Debug, Clone, Copy)] +pub enum HashMethod { + Blake2, + Murmur3 +} +serde_impl!(HashMethod(u64) { + Blake2 => 1, + Murmur3 => 2 +}); + + +impl HashMethod { + #[inline] + pub fn hash(&self, data: &[u8]) -> Hash { + match *self { + HashMethod::Blake2 => { + let hash = blake2b(16, &[], data); + let hash = unsafe { &*mem::transmute::<_, *mut (u64, u64)>(hash.as_bytes().as_ptr()) }; + Hash { high: u64::from_be(hash.0), low: u64::from_be(hash.1) } + }, + HashMethod::Murmur3 => { + let (a, b) = murmurhash3_x64_128(data, 0); + Hash { high: a, low: b } + } + } + } + + #[inline] + pub fn from(name: &str) -> Result { + match name { + "blake2" => Ok(HashMethod::Blake2), + "murmur3" => Ok(HashMethod::Murmur3), + _ => Err("Unsupported hash method") + } + } + + #[inline] + pub fn name(&self) -> &'static str { + match *self { + HashMethod::Blake2 => "blake2", + HashMethod::Murmur3 => "murmur3" + } + } + +} diff --git a/src/util/lru_cache.rs b/src/util/lru_cache.rs new file mode 100644 index 0000000..cd95402 --- /dev/null +++ b/src/util/lru_cache.rs @@ -0,0 +1,53 @@ + +use std::hash::Hash; +use std::collections::HashMap; + +pub struct LruCache { + items: HashMap, + min_size: usize, + max_size: usize, + next: u64 +} + + +impl LruCache { + #[inline] + pub fn new(min_size: usize, max_size: usize) -> Self { + LruCache { + items: HashMap::default(), + min_size: min_size, + max_size: max_size, + next: 0 + } + } + + #[inline] + pub fn put(&mut self, key: K, value: V) { + self.items.insert(key, (value, self.next)); + self.next += 1; + if self.items.len() > self.max_size { + self.shrink() + } + } + + #[inline] + pub fn get(&mut self, key: &K) -> Option<&V> { + if let Some(&mut (ref item, ref mut n)) = self.items.get_mut(key) { + *n = self.next; + self.next += 1; + Some(item) + } else { + None + } + } + + #[inline] + fn shrink(&mut self) { + let mut tags: Vec = self.items.values().map(|&(_, n)| n).collect(); + tags.sort(); + let bar = tags[tags.len()-self.min_size]; + let mut new = HashMap::with_capacity(self.min_size); + new.extend(self.items.drain().filter(|&(_,(_, n))| n>=bar)); + self.items = new; + } +} diff --git a/src/util/mod.rs b/src/util/mod.rs new file mode 100644 index 0000000..c40881e --- /dev/null +++ b/src/util/mod.rs @@ -0,0 +1,11 @@ +mod checksum; +mod compression; +mod encryption; +mod hash; +mod lru_cache; + +pub use self::checksum::*; +pub use self::compression::*; +pub use self::encryption::*; +pub use self::hash::*; +pub use self::lru_cache::*;