35 #ifndef PARALLEL_TRONCONNEUSE_HPP
36 #define PARALLEL_TRONCONNEUSE_HPP
38 #include "../my_config.h"
48 #include <libthreadar/libthreadar.hpp>
64 enum class tronco_flags { normal = 0, stop = 1, eof = 2, die = 3, data_error = 4, exception_below = 5, exception_worker = 6, exception_error = 7 };
104 std::unique_ptr<crypto_module> & ptr);
189 enum class thread_status { running, suspended, dead };
212 std::deque<std::unique_ptr<crypto_segment> > lus_data;
213 std::deque<signed int> lus_flags;
219 std::unique_ptr<crypto_segment> tempo_write;
224 std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > scatter;
225 std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > gather;
226 std::shared_ptr<libthreadar::barrier> waiter;
227 std::shared_ptr<heap<crypto_segment> > tas;
231 std::deque<std::unique_ptr<crypto_worker> > travailleur;
232 std::unique_ptr<read_below> crypto_reader;
233 std::unique_ptr<write_below> crypto_writer;
306 static U_I get_ratelier_size(U_I num_worker) {
return num_worker + num_worker/2; };
307 static U_I get_heap_size(U_I num_worker);
317 class read_below:
public libthreadar::thread
320 read_below(
const std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > & to_workers,
321 const std::shared_ptr<libthreadar::barrier> & waiter,
323 U_I clear_block_size,
324 generic_file* encrypted_side,
325 const std::shared_ptr<heap<crypto_segment> > xtas,
326 infinint init_shift):
330 clear_buf_size(clear_block_size),
331 encrypted(encrypted_side),
333 initial_shift(init_shift),
335 trailing_clear_data(nullptr)
336 { flag = tronco_flags::normal; };
338 ~read_below() {
if(ptr) tas->put(move(ptr)); kill(); join(); };
350 void set_initial_shift(
const infinint & x) { initial_shift = x; };
358 void set_pos(
const infinint & pos) { skip_to = pos; };
378 const infinint & get_clear_flow_start()
const {
return clear_flow_start; };
386 const infinint & get_pos_in_flow()
const {
return pos_in_flow; };
390 virtual void inherited_run()
override;
393 std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > workers;
394 std::shared_ptr<libthreadar::barrier> waiting;
397 generic_file* encrypted;
398 archive_version version;
399 std::shared_ptr<heap<crypto_segment> > tas;
400 infinint initial_shift;
403 std::unique_ptr<crypto_segment> ptr;
409 infinint crypt_offset;
410 U_I encrypted_buf_size;
416 infinint clear_flow_start;
417 infinint pos_in_flow;
420 infinint get_ready_for_new_offset();
424 void position_clear2crypt(
const infinint & pos,
425 infinint & file_buf_start,
426 infinint & clear_buf_start,
427 infinint & pos_in_buf,
428 infinint & block_num);
439 class write_below:
public libthreadar::thread
442 write_below(
const std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > & from_workers,
443 const std::shared_ptr<libthreadar::barrier> & waiter,
445 generic_file* encrypted_side,
446 const std::shared_ptr<heap<crypto_segment> > xtas):
447 workers(from_workers),
451 encrypted(encrypted_side),
455 {
if(encrypted ==
nullptr)
throw SRC_BUG; };
457 ~write_below() { kill(); join(); };
459 bool exception_pending()
const {
return error; };
460 const infinint & get_error_block()
const {
return error_block; };
463 virtual void inherited_run()
override;
466 std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > workers;
467 std::shared_ptr<libthreadar::barrier> waiting;
470 generic_file* encrypted;
471 std::shared_ptr<heap<crypto_segment> > tas;
473 infinint error_block;
474 std::deque<std::unique_ptr<crypto_segment> >ones;
475 std::deque<signed int> flags;
488 class crypto_worker:
public libthreadar::thread
491 crypto_worker(std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & read_side,
492 std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & write_side,
493 std::shared_ptr<libthreadar::barrier> waiter,
494 std::unique_ptr<crypto_module> && ptr,
502 {
if(!reader || !writer || !waiting || !crypto)
throw SRC_BUG; };
504 virtual ~crypto_worker() { kill(); join(); };
507 virtual void inherited_run()
override;
510 enum class status { fine, inform, sent };
512 std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & reader;
513 std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & writer;
514 std::shared_ptr<libthreadar::barrier> waiting;
515 std::unique_ptr<crypto_module> crypto;
517 std::unique_ptr<crypto_segment> ptr;
class archive_version that rules which archive format to follow
class archive_version manages the version of the archive format
this is the interface class from which all other data transfer classes inherit
bool is_terminated() const
void sync_write()
write any pending data
the arbitrary large positive integer class
this is a partial implementation of the generic_file interface to cypher/decypher data block by block...
bool send_read_order(tronco_flags order, const infinint &for_offset=0)
send and order to subthreads and gather acks from them
void go_read()
wake up threads in read mode when necessary
virtual void inherited_terminate() override
this protected inherited method is now private for inherited classes of tronconneuse
virtual void inherited_write(const char *a, U_I size) override
inherited from generic_file
virtual U_32 get_clear_block_size() const override
returns the block size given to constructor
void run_threads()
reset the interthread datastructure and launch the threads
virtual void inherited_sync_write() override
this protected inherited method is now private for inherited classes of tronconneuse
virtual infinint get_position() const override
inherited from generic_file
void join_threads()
wait for threads to finish and eventually rethrow their exceptions in current thread
virtual void write_end_of_file() override
in write_only mode indicate that end of file is reached
virtual bool skippable(skippability direction, const infinint &amount) override
inherited from generic_file
bool purge_unack_stop_order(const infinint &pos=0)
removing the ignore_stop_acks pending on the pipe
parallel_tronconneuse & operator=(const parallel_tronconneuse &ref)=delete
assignment operator
virtual void set_callback_trailing_clear_data(trailing_clear_data_callback call_back) override
parallel_tronconneuse(parallel_tronconneuse &&ref)=default
move constructor
std::unique_ptr< crypto_module > crypto
the crypto module use to cipher / uncipher block of data
tronco_flags purge_ratelier_from_next_order(infinint pos=0)
purge the ratelier from the next order which is provided as returned value
virtual bool skip_to_eof() override
inherited from generic_file
archive_version reading_ver
archive format we follow
parallel_tronconneuse(U_I workers, U_32 block_size, generic_file &encrypted_side, const archive_version &reading_ver, std::unique_ptr< crypto_module > &ptr)
This is the constructor.
virtual bool skip(const infinint &pos) override
inherited from generic_file
~parallel_tronconneuse() noexcept
destructor
virtual bool skip_relative(S_I x) override
inherited from generic_file
virtual void inherited_truncate(const infinint &pos) override
this prorected inherited method is now private for inherited classed of tronconneuse
virtual U_I inherited_read(char *a, U_I size) override
this protected inherited method is now private for inherited classes of tronconneuse
bool check_bytes_to_skip
whether to check for bytes to skip
void send_write_order(tronco_flags order)
send order in write mode
parallel_tronconneuse(const parallel_tronconneuse &ref)=delete
copy constructor
U_I ignore_stop_acks
how much stop ack still to be read (aborted stop order context)
infinint initial_shift
the offset in the "encrypted" below layer at which starts the encrypted data
U_32 clear_block_size
size of a clear block
virtual void inherited_flush_read() override
this protected inherited method is now private for inherited classes of tronconneuse
virtual bool truncatable(const infinint &pos) const override
inherited from generic_file
U_I num_workers
number of worker threads
bool find_offset_in_lus_data(const infinint &pos)
flush lus_data/lus_flags up to requested pos offset to be found or all data has been removed
void stop_threads()
end threads taking into account the fact they may be suspended on the barrier
thread_status t_status
wehther child thread are waiting us on the barrier
void read_refill()
fill lus_data/lus_flags from ratelier_gather if these are empty
virtual void set_initial_shift(const infinint &x) override
this method to modify the initial shift. This overrides the constructor "no_initial_shift" of the con...
virtual void inherited_read_ahead(const infinint &amount) override
this protected inherited method is now private for inherited classes of tronconneuse
infinint current_position
current position for the upper layer perspective (modified by skip*, inherited_read/write,...
void join_workers_only()
call by join_threads() below just code simplification around exception handling
per block cryptography implementation
defines unit block of information ciphered as once
infinint(* trailing_clear_data_callback)(generic_file &below, const archive_version &reading_ver)
the trailing_clear_data_callback call back is a mean by which the upper layer cat tell when encrypted...
tronco_flags
status flags used between parallel_tronconneuse and its sub-threads
heap data structure (relying on FIFO)
switch module to limitint (32 ou 64 bits integers) or infinint
libdar namespace encapsulate all libdar symbols
defines common interface for tronconneuse and parallel_tronconneuse