99 #ifndef PARALLEL_BLOCK_COMPRESSOR_HPP
100 #define PARALLEL_BLOCK_COMPRESSOR_HPP
102 #include "../my_config.h"
110 #include <libthreadar/libthreadar.hpp>
126 class zip_below_read;
127 class zip_below_write;
137 class parallel_block_compressor:
public proto_compressor
146 parallel_block_compressor(U_I num_workers,
147 std::unique_ptr<compress_module> block_zipper,
148 generic_file & compressed_side,
149 U_I uncompressed_bs = default_uncompressed_block_size);
153 parallel_block_compressor(
const parallel_block_compressor & ref) =
delete;
154 parallel_block_compressor(parallel_block_compressor && ref) noexcept =
delete;
155 parallel_block_compressor & operator = (
const parallel_block_compressor & ref) =
delete;
156 parallel_block_compressor & operator = (parallel_block_compressor && ref) noexcept =
delete;
157 ~parallel_block_compressor();
162 virtual void suspend_compression()
override;
163 virtual void resume_compression()
override;
164 virtual bool is_compression_suspended()
const override {
return suspended; };
168 virtual bool skippable(skippability direction,
const infinint & amount)
override;
169 virtual bool skip(
const infinint & pos)
override;
170 virtual bool skip_to_eof()
override;
171 virtual bool skip_relative(S_I x)
override;
172 virtual bool truncatable(
const infinint & pos)
const override;
173 virtual infinint get_position()
const override;
176 virtual void inherited_read_ahead(
const infinint & amount)
override {
if(!suspended) run_read_threads(); };
177 virtual U_I inherited_read(
char *a, U_I size)
override;
178 virtual void inherited_write(
const char *a, U_I size)
override;
179 virtual void inherited_truncate(
const infinint & pos)
override;
180 virtual void inherited_sync_write()
override;
181 virtual void inherited_flush_read()
override { stop_read_threads(); };
182 virtual void inherited_terminate()
override;
189 std::unique_ptr<compress_module> zipper;
191 U_I uncompressed_block_size;
193 bool running_threads;
194 std::unique_ptr<crypto_segment> curwrite;
195 std::deque<std::unique_ptr<crypto_segment> > lus_data;
196 std::deque<signed int> lus_flags;
202 std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > disperse;
203 std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > rassemble;
204 std::shared_ptr<heap<crypto_segment> > tas;
209 std::unique_ptr<zip_below_read> reader;
210 std::unique_ptr<zip_below_write> writer;
211 std::deque<std::unique_ptr<zip_worker> > travailleurs;
218 void stop_read_threads();
219 void stop_write_threads();
221 void run_read_threads();
222 void run_write_threads();
228 static U_I get_ratelier_size(U_I num_workers) {
return num_workers + num_workers/2; };
229 static U_I get_heap_size(U_I num_workers);
240 class zip_below_write:
public libthreadar::thread
243 zip_below_write(
const std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > & source,
245 const std::shared_ptr<heap<crypto_segment> > & xtas,
248 ~zip_below_write() { kill(); join(); };
254 bool exception_pending()
const {
return error; };
260 virtual void inherited_run()
override;
263 std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > src;
265 std::shared_ptr<heap<crypto_segment> > tas;
269 std::deque<std::unique_ptr<crypto_segment> > data;
270 std::deque<signed int> flags;
271 libthreadar::mutex get_pos;
272 infinint current_position;
275 void pop_front() { tas->put(std::move(data.front())); data.pop_front(); flags.pop_front(); };
289 class zip_below_read:
public libthreadar::thread
292 zip_below_read(generic_file *source,
293 const std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > & dest,
294 const std::shared_ptr<heap<crypto_segment> > & xtas,
297 ~zip_below_read() { kill(); join(); };
300 void do_stop() { should_i_stop =
true; };
306 virtual void inherited_run()
override;
310 const std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > & dst;
311 const std::shared_ptr<heap<crypto_segment> > & tas;
313 std::unique_ptr<crypto_segment> ptr;
332 class zip_worker:
public libthreadar::thread
335 zip_worker(std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & read_side,
336 std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & write_size,
337 std::unique_ptr<compress_module> && ptr,
340 ~zip_worker() { kill(); join(); };
343 virtual void inherited_run()
override;
346 std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & reader;
347 std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & writer;
348 std::unique_ptr<compress_module> compr;
351 std::unique_ptr<crypto_segment> transit;
352 unsigned int transit_slot;
generic_file(gf_mode m)
main constructor
provides abstracted interface of per-block compression/decompression
defines unit block of information ciphered as once
compression
the different compression algorithm available
compressor_block_flags
the different flags used to communicate between threads hold by parallel_block_compressor class
heap data structure (relying on FIFO)
switch module to limitint (32 ou 64 bits integers) or infinint
libdar namespace encapsulate all libdar symbols
abstracted ancestor class for compressor and parallel_compressor classes