Disk ARchive  2.7.15
Full featured and portable backup and archiving tool
parallel_tronconneuse.hpp
Go to the documentation of this file.
1 /*********************************************************************/
2 // dar - disk archive - a backup/restoration program
3 // Copyright (C) 2002-2024 Denis Corbin
4 //
5 // This program is free software; you can redistribute it and/or
6 // modify it under the terms of the GNU General Public License
7 // as published by the Free Software Foundation; either version 2
8 // of the License, or (at your option) any later version.
9 //
10 // This program is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU General Public License
16 // along with this program; if not, write to the Free Software
17 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 //
19 // to contact the author, see the AUTHOR file
20 /*********************************************************************/
21 
25 
34 
35 #ifndef PARALLEL_TRONCONNEUSE_HPP
36 #define PARALLEL_TRONCONNEUSE_HPP
37 
38 #include "../my_config.h"
39 #include <string>
40 
41 #include "infinint.hpp"
42 #include "archive_version.hpp"
43 #include "crypto_segment.hpp"
44 #include "heap.hpp"
45 #include "crypto_module.hpp"
46 #include "proto_tronco.hpp"
47 
48 #include <libthreadar/libthreadar.hpp>
49 
50 namespace libdar
51 {
52 
55 
56  // those class are used by the parallel_tronconneuse class to wrap the different
57  // type of threads. They are defined just after the parallel_tronconneuse definition
58  class read_below;
59  class write_below;
60  class crypto_worker;
61 
63 
64  enum class tronco_flags { normal = 0, stop = 1, eof = 2, die = 3, data_error = 4, exception_below = 5, exception_worker = 6, exception_error = 7 };
65 
66 
68  //
69  // the parallel_tronconneuse class that orchestrate all that
70  //
71  //
72 
73 
75 
87 
88  class parallel_tronconneuse : public proto_tronco
89  {
90  public:
92 
101  U_32 block_size,
102  generic_file & encrypted_side,
104  std::unique_ptr<crypto_module> & ptr);
105 
108 
111 
114 
117 
120 
122  virtual bool skippable(skippability direction, const infinint & amount) override;
124  virtual bool skip(const infinint & pos) override;
126  virtual bool skip_to_eof() override;
128  virtual bool skip_relative(S_I x) override;
130  virtual bool truncatable(const infinint & pos) const override { return false; };
132  virtual infinint get_position() const override { if(is_terminated()) throw SRC_BUG; return current_position; };
133 
135 
140  virtual void write_end_of_file() override { if(is_terminated()) throw SRC_BUG; sync_write(); };
141 
142 
144 
145  virtual void set_initial_shift(const infinint & x) override;
146 
151 
153  virtual U_32 get_clear_block_size() const override { return clear_block_size; };
154 
155  private:
156 
157  // inherited from generic_file
158 
160  virtual void inherited_read_ahead(const infinint & amount) override;
161 
163  virtual U_I inherited_read(char *a, U_I size) override;
164 
166 
168  virtual void inherited_write(const char *a, U_I size) override;
169 
171 
174  virtual void inherited_truncate(const infinint & pos) override { throw SRC_BUG; };
175 
177  virtual void inherited_sync_write() override;
178 
179 
181  virtual void inherited_flush_read() override;
182 
184  virtual void inherited_terminate() override;
185 
186  const archive_version & get_reading_version() const { return reading_ver; };
187 
188  // internal data structure
189  enum class thread_status { running, suspended, dead };
190 
191  // the fields
192 
198  std::unique_ptr<crypto_module> crypto;
199  infinint (*mycallback)(generic_file & below, const archive_version & reading_ver);
200  generic_file* encrypted;
201 
202  // fields used to represent possible status of subthreads and communication channel (the pipe)
203 
205  thread_status t_status;
206 
207 
208  // the following stores data from the ratelier_gather to be provided for read() operation
209  // the lus_data/lus_flags is what is extracted from the ratelier_gather, both constitute
210  // the feedback channel from sub-threads to provide order acks and normal data
211 
212  std::deque<std::unique_ptr<crypto_segment> > lus_data;
213  std::deque<signed int> lus_flags;
214  bool lus_eof;
216 
217  // the following stores data going to ratelier_scatter for the write() operation
218 
219  std::unique_ptr<crypto_segment> tempo_write;
220  infinint block_num;
221 
222  // the datastructures shared among threads
223 
224  std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > scatter;
225  std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > gather;
226  std::shared_ptr<libthreadar::barrier> waiter;
227  std::shared_ptr<heap<crypto_segment> > tas;
228 
229  // the child threads
230 
231  std::deque<std::unique_ptr<crypto_worker> > travailleur;
232  std::unique_ptr<read_below> crypto_reader;
233  std::unique_ptr<write_below> crypto_writer;
234 
235 
236 
238 
249  bool send_read_order(tronco_flags order, const infinint & for_offset = 0);
250 
253 
255  void go_read();
256 
258  void read_refill();
259 
261 
268 
270 
279  bool purge_unack_stop_order(const infinint & pos = 0);
280 
282 
292 
294  void run_threads();
295 
297  void stop_threads();
298 
301 
303  void join_threads();
304 
305 
306  static U_I get_ratelier_size(U_I num_worker) { return num_worker + num_worker/2; };
307  static U_I get_heap_size(U_I num_worker);
308  };
309 
310 
312  //
313  // read_below subthread used by parallel_tronconneuse
314  // to dispatch chunk of encrypted data to the workers
315  //
316 
317  class read_below: public libthreadar::thread
318  {
319  public:
320  read_below(const std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > & to_workers,
321  const std::shared_ptr<libthreadar::barrier> & waiter,
322  U_I num_workers,
323  U_I clear_block_size,
324  generic_file* encrypted_side,
325  const std::shared_ptr<heap<crypto_segment> > xtas,
326  infinint init_shift):
327  workers(to_workers),
328  waiting(waiter),
329  num_w(num_workers),
330  clear_buf_size(clear_block_size),
331  encrypted(encrypted_side),
332  tas(xtas),
333  initial_shift(init_shift),
334  reof(false),
335  trailing_clear_data(nullptr)
336  { flag = tronco_flags::normal; };
337 
338  ~read_below() { if(ptr) tas->put(move(ptr)); kill(); join(); };
339 
343  void set_callback_trailing_clear_data(trailing_clear_data_callback call_back) { trailing_clear_data = call_back; };
344 
345  // *** //
346  // *** the method above should not be used anymore once the thread is running *** //
347  // *** //
348 
350  void set_initial_shift(const infinint & x) { initial_shift = x; };
351 
353 
358  void set_pos(const infinint & pos) { skip_to = pos; };
359 
361 
370  void set_flag(tronco_flags val) { flag = val; };
371 
373 
378  const infinint & get_clear_flow_start() const { return clear_flow_start; };
379 
381 
386  const infinint & get_pos_in_flow() const { return pos_in_flow; };
387 
388 
389  protected:
390  virtual void inherited_run() override;
391 
392  private:
393  std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > workers;
394  std::shared_ptr<libthreadar::barrier> waiting;
395  U_I num_w;
396  U_I clear_buf_size;
397  generic_file* encrypted;
398  archive_version version;
399  std::shared_ptr<heap<crypto_segment> > tas;
400  infinint initial_shift;
401  bool reof;
402  trailing_clear_data_callback trailing_clear_data;
403  std::unique_ptr<crypto_segment> ptr;
404  infinint index_num;
405 
406 
407  // initialized by inherited_run() / get_ready_for_new_offset()
408 
409  infinint crypt_offset;
410  U_I encrypted_buf_size;
411 
412  // fields accessible by both the caller and the read_below thread
413 
414  infinint skip_to;
415  tronco_flags flag;
416  infinint clear_flow_start;
417  infinint pos_in_flow;
418 
419  void work();
420  infinint get_ready_for_new_offset();
421  void send_flag_to_workers(tronco_flags theflag);
422 
423  // same function as the tronconneuse::position_clear2crypt
424  void position_clear2crypt(const infinint & pos,
425  infinint & file_buf_start,
426  infinint & clear_buf_start,
427  infinint & pos_in_buf,
428  infinint & block_num);
429 
430  };
431 
432 
434  //
435  // write_below subthread used by parallel_tronconneuse
436  // to gather and write down encrypted data work from workers
437  //
438 
439  class write_below: public libthreadar::thread
440  {
441  public:
442  write_below(const std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > & from_workers,
443  const std::shared_ptr<libthreadar::barrier> & waiter,
444  U_I num_workers,
445  generic_file* encrypted_side,
446  const std::shared_ptr<heap<crypto_segment> > xtas):
447  workers(from_workers),
448  waiting(waiter),
449  num_w(num_workers),
450  cur_num_w(0),
451  encrypted(encrypted_side),
452  tas(xtas),
453  error(false),
454  error_block(0)
455  { if(encrypted == nullptr) throw SRC_BUG; };
456 
457  ~write_below() { kill(); join(); };
458 
459  bool exception_pending() const { return error; };
460  const infinint & get_error_block() const { return error_block; };
461 
462  protected:
463  virtual void inherited_run() override;
464 
465  private:
466  std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > workers;
467  std::shared_ptr<libthreadar::barrier> waiting;
468  U_I num_w;
469  U_I cur_num_w;
470  generic_file* encrypted;
471  std::shared_ptr<heap<crypto_segment> > tas;
472  bool error;
473  infinint error_block; // last crypto block before error
474  std::deque<std::unique_ptr<crypto_segment> >ones;
475  std::deque<signed int> flags;
476 
477  void work();
478  };
479 
480 
482  //
483  // the crypto_worker threads performing ciphering/deciphering
484  // of many data blocks in parallel
485  //
486 
487 
488  class crypto_worker: public libthreadar::thread
489  {
490  public:
491  crypto_worker(std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & read_side,
492  std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & write_side,
493  std::shared_ptr<libthreadar::barrier> waiter,
494  std::unique_ptr<crypto_module> && ptr,
495  bool encrypt):
496  reader(read_side),
497  writer(write_side),
498  waiting(waiter),
499  crypto(move(ptr)),
500  do_encrypt(encrypt),
501  abort(status::fine)
502  { if(!reader || !writer || !waiting || !crypto) throw SRC_BUG; };
503 
504  virtual ~crypto_worker() { kill(); join(); };
505 
506  protected:
507  virtual void inherited_run() override;
508 
509  private:
510  enum class status { fine, inform, sent };
511 
512  std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & reader;
513  std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & writer;
514  std::shared_ptr<libthreadar::barrier> waiting;
515  std::unique_ptr<crypto_module> crypto;
516  bool do_encrypt; // if false do decrypt
517  std::unique_ptr<crypto_segment> ptr;
518  unsigned int slot;
519  status abort;
520 
521  void work();
522  };
523 
524 
526 
527 } // end of namespace
528 
529 #endif
class archive_version that rules which archive format to follow
class archive_version manages the version of the archive format
this is the interface class from which all other data transfer classes inherit
bool is_terminated() const
void sync_write()
write any pending data
the arbitrary large positive integer class
this is a partial implementation of the generic_file interface to cypher/decypher data block by block...
bool send_read_order(tronco_flags order, const infinint &for_offset=0)
send and order to subthreads and gather acks from them
void go_read()
wake up threads in read mode when necessary
virtual void inherited_terminate() override
this protected inherited method is now private for inherited classes of tronconneuse
virtual void inherited_write(const char *a, U_I size) override
inherited from generic_file
virtual U_32 get_clear_block_size() const override
returns the block size given to constructor
void run_threads()
reset the interthread datastructure and launch the threads
virtual void inherited_sync_write() override
this protected inherited method is now private for inherited classes of tronconneuse
virtual infinint get_position() const override
inherited from generic_file
void join_threads()
wait for threads to finish and eventually rethrow their exceptions in current thread
virtual void write_end_of_file() override
in write_only mode indicate that end of file is reached
virtual bool skippable(skippability direction, const infinint &amount) override
inherited from generic_file
bool purge_unack_stop_order(const infinint &pos=0)
removing the ignore_stop_acks pending on the pipe
parallel_tronconneuse & operator=(const parallel_tronconneuse &ref)=delete
assignment operator
virtual void set_callback_trailing_clear_data(trailing_clear_data_callback call_back) override
parallel_tronconneuse(parallel_tronconneuse &&ref)=default
move constructor
std::unique_ptr< crypto_module > crypto
the crypto module use to cipher / uncipher block of data
tronco_flags purge_ratelier_from_next_order(infinint pos=0)
purge the ratelier from the next order which is provided as returned value
virtual bool skip_to_eof() override
inherited from generic_file
archive_version reading_ver
archive format we follow
parallel_tronconneuse(U_I workers, U_32 block_size, generic_file &encrypted_side, const archive_version &reading_ver, std::unique_ptr< crypto_module > &ptr)
This is the constructor.
virtual bool skip(const infinint &pos) override
inherited from generic_file
~parallel_tronconneuse() noexcept
destructor
virtual bool skip_relative(S_I x) override
inherited from generic_file
virtual void inherited_truncate(const infinint &pos) override
this prorected inherited method is now private for inherited classed of tronconneuse
virtual U_I inherited_read(char *a, U_I size) override
this protected inherited method is now private for inherited classes of tronconneuse
bool check_bytes_to_skip
whether to check for bytes to skip
void send_write_order(tronco_flags order)
send order in write mode
parallel_tronconneuse(const parallel_tronconneuse &ref)=delete
copy constructor
U_I ignore_stop_acks
how much stop ack still to be read (aborted stop order context)
infinint initial_shift
the offset in the "encrypted" below layer at which starts the encrypted data
U_32 clear_block_size
size of a clear block
virtual void inherited_flush_read() override
this protected inherited method is now private for inherited classes of tronconneuse
virtual bool truncatable(const infinint &pos) const override
inherited from generic_file
U_I num_workers
number of worker threads
bool find_offset_in_lus_data(const infinint &pos)
flush lus_data/lus_flags up to requested pos offset to be found or all data has been removed
void stop_threads()
end threads taking into account the fact they may be suspended on the barrier
thread_status t_status
wehther child thread are waiting us on the barrier
void read_refill()
fill lus_data/lus_flags from ratelier_gather if these are empty
virtual void set_initial_shift(const infinint &x) override
this method to modify the initial shift. This overrides the constructor "no_initial_shift" of the con...
virtual void inherited_read_ahead(const infinint &amount) override
this protected inherited method is now private for inherited classes of tronconneuse
infinint current_position
current position for the upper layer perspective (modified by skip*, inherited_read/write,...
void join_workers_only()
call by join_threads() below just code simplification around exception handling
per block cryptography implementation
defines unit block of information ciphered as once
infinint(* trailing_clear_data_callback)(generic_file &below, const archive_version &reading_ver)
the trailing_clear_data_callback call back is a mean by which the upper layer cat tell when encrypted...
tronco_flags
status flags used between parallel_tronconneuse and its sub-threads
heap data structure (relying on FIFO)
switch module to limitint (32 ou 64 bits integers) or infinint
libdar namespace encapsulate all libdar symbols
Definition: archive.hpp:47
defines common interface for tronconneuse and parallel_tronconneuse