99#ifndef PARALLEL_BLOCK_COMPRESSOR_HPP
100#define PARALLEL_BLOCK_COMPRESSOR_HPP
102#include "../my_config.h"
110#include <libthreadar/libthreadar.hpp>
126 class zip_below_read;
127 class zip_below_write;
137 class parallel_block_compressor:
public proto_compressor
146 parallel_block_compressor(U_I num_workers,
147 std::unique_ptr<compress_module> block_zipper,
148 generic_file & compressed_side,
149 U_I uncompressed_bs = default_uncompressed_block_size);
153 parallel_block_compressor(
const parallel_block_compressor & ref) =
delete;
154 parallel_block_compressor(parallel_block_compressor && ref)
noexcept =
delete;
155 parallel_block_compressor & operator = (
const parallel_block_compressor & ref) =
delete;
156 parallel_block_compressor & operator = (parallel_block_compressor && ref)
noexcept =
delete;
157 ~parallel_block_compressor();
162 virtual void suspend_compression()
override;
163 virtual void resume_compression()
override;
164 virtual bool is_compression_suspended()
const override {
return suspended; };
168 virtual bool skippable(skippability direction,
const infinint & amount)
override;
169 virtual bool skip(
const infinint & pos)
override;
170 virtual bool skip_to_eof()
override;
171 virtual bool skip_relative(S_I x)
override;
172 virtual bool truncatable(
const infinint & pos)
const override;
173 virtual infinint get_position()
const override;
176 virtual void inherited_read_ahead(
const infinint & amount)
override;
177 virtual U_I inherited_read(
char *a, U_I size)
override;
178 virtual void inherited_write(
const char *a, U_I size)
override;
179 virtual void inherited_truncate(
const infinint & pos)
override;
180 virtual void inherited_sync_write()
override;
181 virtual void inherited_flush_read()
override { stop_read_threads(); reof =
false; };
182 virtual void inherited_terminate()
override;
189 std::unique_ptr<compress_module> zipper;
190 generic_file *compressed;
191 U_I uncompressed_block_size;
193 bool running_threads;
194 std::unique_ptr<crypto_segment> curwrite;
195 std::deque<std::unique_ptr<crypto_segment> > lus_data;
196 std::deque<signed int> lus_flags;
202 std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > disperse;
203 std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > rassemble;
204 std::shared_ptr<heap<crypto_segment> > tas;
209 std::unique_ptr<zip_below_read> reader;
210 std::unique_ptr<zip_below_write> writer;
211 std::deque<std::unique_ptr<zip_worker> > travailleurs;
218 void stop_read_threads();
219 void stop_write_threads();
221 void run_read_threads();
222 void run_write_threads();
228 static U_I get_ratelier_size(U_I num_workers) {
return num_workers + num_workers/2; };
229 static U_I get_heap_size(U_I num_workers);
240 class zip_below_write:
public libthreadar::thread
243 zip_below_write(
const std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > & source,
245 const std::shared_ptr<heap<crypto_segment> > & xtas,
248 ~zip_below_write() { cancel(); join(); };
254 bool exception_pending()
const {
return error; };
260 virtual void inherited_run()
override;
263 std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > src;
265 std::shared_ptr<heap<crypto_segment> > tas;
269 std::deque<std::unique_ptr<crypto_segment> > data;
270 std::deque<signed int> flags;
271 libthreadar::mutex get_pos;
272 infinint current_position;
275 void pop_front() { tas->put(std::move(data.front())); data.pop_front(); flags.pop_front(); };
289 class zip_below_read:
public libthreadar::thread
292 zip_below_read(generic_file *source,
293 const std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > & dest,
294 const std::shared_ptr<heap<crypto_segment> > & xtas,
297 ~zip_below_read() { cancel(); join(); };
300 void do_stop() { should_i_stop =
true; };
306 virtual void inherited_run()
override;
310 const std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > & dst;
311 const std::shared_ptr<heap<crypto_segment> > & tas;
313 std::unique_ptr<crypto_segment> ptr;
332 class zip_worker:
public libthreadar::thread
335 zip_worker(std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & read_side,
336 std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & write_size,
337 std::unique_ptr<compress_module> && ptr,
340 ~zip_worker() { cancel(); join(); };
343 virtual void inherited_run()
override;
346 std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & reader;
347 std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & writer;
348 std::unique_ptr<compress_module> compr;
351 std::unique_ptr<crypto_segment> transit;
352 unsigned int transit_slot;
provides abstracted interface of per-block compression/decompression
defines unit block of information ciphered as once
compression
the different compression algorithm available
compressor_block_flags
the different flags used to communicate between threads hold by parallel_block_compressor class
heap data structure (relying on FIFO)
switch module to limitint (32 ou 64 bits integers) or infinint
@ error
neither big nor little endian! (libdar cannot run on such system)
libdar namespace encapsulate all libdar symbols
abstracted ancestor class for compressor and parallel_compressor classes