Disk ARchive 2.7.16
Full featured and portable backup and archiving tool
parallel_tronconneuse.hpp
Go to the documentation of this file.
1/*********************************************************************/
2// dar - disk archive - a backup/restoration program
3// Copyright (C) 2002-2024 Denis Corbin
4//
5// This program is free software; you can redistribute it and/or
6// modify it under the terms of the GNU General Public License
7// as published by the Free Software Foundation; either version 2
8// of the License, or (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU General Public License
16// along with this program; if not, write to the Free Software
17// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18//
19// to contact the author, see the AUTHOR file
20/*********************************************************************/
21
25
34
35#ifndef PARALLEL_TRONCONNEUSE_HPP
36#define PARALLEL_TRONCONNEUSE_HPP
37
38#include "../my_config.h"
39#include <string>
40
41#include "infinint.hpp"
42#include "archive_version.hpp"
43#include "crypto_segment.hpp"
44#include "heap.hpp"
45#include "crypto_module.hpp"
46#include "proto_tronco.hpp"
47
48#include <libthreadar/libthreadar.hpp>
49
50namespace libdar
51{
52
55
56 // those class are used by the parallel_tronconneuse class to wrap the different
57 // type of threads. They are defined just after the parallel_tronconneuse definition
58 class read_below;
59 class write_below;
60 class crypto_worker;
61
63
64 enum class tronco_flags { normal = 0, stop = 1, eof = 2, die = 3, data_error = 4, exception_below = 5, exception_worker = 6, exception_error = 7 };
65
66
68 //
69 // the parallel_tronconneuse class that orchestrate all that
70 //
71 //
72
73
75
87
88 class parallel_tronconneuse : public proto_tronco
89 {
90 public:
92
101 U_32 block_size,
102 generic_file & encrypted_side,
104 std::unique_ptr<crypto_module> & ptr);
105
108
111
114
117
120
122 virtual bool skippable(skippability direction, const infinint & amount) override;
124 virtual bool skip(const infinint & pos) override;
126 virtual bool skip_to_eof() override;
128 virtual bool skip_relative(S_I x) override;
130 virtual bool truncatable(const infinint & pos) const override { return false; };
132 virtual infinint get_position() const override { if(is_terminated()) throw SRC_BUG; return current_position; };
133
135
140 virtual void write_end_of_file() override { if(is_terminated()) throw SRC_BUG; sync_write(); };
141
142
144
145 virtual void set_initial_shift(const infinint & x) override;
146
151
153 virtual U_32 get_clear_block_size() const override { return clear_block_size; };
154
155 private:
156
157 // inherited from generic_file
158
160 virtual void inherited_read_ahead(const infinint & amount) override;
161
163 virtual U_I inherited_read(char *a, U_I size) override;
164
166
168 virtual void inherited_write(const char *a, U_I size) override;
169
171
174 virtual void inherited_truncate(const infinint & pos) override { throw SRC_BUG; };
175
177 virtual void inherited_sync_write() override;
178
179
181 virtual void inherited_flush_read() override;
182
184 virtual void inherited_terminate() override;
185
186 const archive_version & get_reading_version() const { return reading_ver; };
187
188 // internal data structure
189 enum class thread_status { running, suspended, dead };
190
191 // the fields
192
198 std::unique_ptr<crypto_module> crypto;
199 infinint (*mycallback)(generic_file & below, const archive_version & reading_ver);
200 generic_file* encrypted;
201
202 // fields used to represent possible status of subthreads and communication channel (the pipe)
203
205 thread_status t_status;
206
207
208 // the following stores data from the ratelier_gather to be provided for read() operation
209 // the lus_data/lus_flags is what is extracted from the ratelier_gather, both constitute
210 // the feedback channel from sub-threads to provide order acks and normal data
211
212 std::deque<std::unique_ptr<crypto_segment> > lus_data;
213 std::deque<signed int> lus_flags;
214 bool lus_eof;
216
217 // the following stores data going to ratelier_scatter for the write() operation
218
219 std::unique_ptr<crypto_segment> tempo_write;
220 infinint block_num;
221
222 // the datastructures shared among threads
223
224 std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > scatter;
225 std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > gather;
226 std::shared_ptr<libthreadar::barrier> waiter;
227 std::shared_ptr<heap<crypto_segment> > tas;
228
229 // the child threads
230
231 std::deque<std::unique_ptr<crypto_worker> > travailleur;
232 std::unique_ptr<read_below> crypto_reader;
233 std::unique_ptr<write_below> crypto_writer;
234
235
236
238
249 bool send_read_order(tronco_flags order, const infinint & for_offset = 0);
250
253
255 void go_read();
256
259
261
268
270
279 bool purge_unack_stop_order(const infinint & pos = 0);
280
282
292
295
298
301
304
305
306 static U_I get_ratelier_size(U_I num_worker) { return num_worker + num_worker/2; };
307 static U_I get_heap_size(U_I num_worker);
308 };
309
310
312 //
313 // read_below subthread used by parallel_tronconneuse
314 // to dispatch chunk of encrypted data to the workers
315 //
316
317 class read_below: public libthreadar::thread
318 {
319 public:
320 read_below(const std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > & to_workers,
321 const std::shared_ptr<libthreadar::barrier> & waiter,
322 U_I num_workers,
323 U_I clear_block_size,
324 generic_file* encrypted_side,
325 const std::shared_ptr<heap<crypto_segment> > xtas,
326 infinint init_shift):
327 workers(to_workers),
328 waiting(waiter),
329 num_w(num_workers),
330 clear_buf_size(clear_block_size),
331 encrypted(encrypted_side),
332 tas(xtas),
333 initial_shift(init_shift),
334 reof(false),
335 trailing_clear_data(nullptr)
336 { flag = tronco_flags::normal; };
337
338 ~read_below() { if(ptr) tas->put(move(ptr)); kill(); join(); };
339
343 void set_callback_trailing_clear_data(trailing_clear_data_callback call_back) { trailing_clear_data = call_back; };
344
345 // *** //
346 // *** the method above should not be used anymore once the thread is running *** //
347 // *** //
348
350 void set_initial_shift(const infinint & x) { initial_shift = x; };
351
353
358 void set_pos(const infinint & pos) { skip_to = pos; };
359
361
370 void set_flag(tronco_flags val) { flag = val; };
371
373
378 const infinint & get_clear_flow_start() const { return clear_flow_start; };
379
381
386 const infinint & get_pos_in_flow() const { return pos_in_flow; };
387
388
389 protected:
390 virtual void inherited_run() override;
391
392 private:
393 std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > workers;
394 std::shared_ptr<libthreadar::barrier> waiting;
395 U_I num_w;
396 U_I clear_buf_size;
397 generic_file* encrypted;
398 archive_version version;
399 std::shared_ptr<heap<crypto_segment> > tas;
400 infinint initial_shift;
401 bool reof;
402 trailing_clear_data_callback trailing_clear_data;
403 std::unique_ptr<crypto_segment> ptr;
404 infinint index_num;
405
406
407 // initialized by inherited_run() / get_ready_for_new_offset()
408
409 infinint crypt_offset;
410 U_I encrypted_buf_size;
411
412 // fields accessible by both the caller and the read_below thread
413
414 infinint skip_to;
415 tronco_flags flag;
416 infinint clear_flow_start;
417 infinint pos_in_flow;
418
419 void work();
420 infinint get_ready_for_new_offset();
421 void send_flag_to_workers(tronco_flags theflag);
422
423 // same function as the tronconneuse::position_clear2crypt
424 void position_clear2crypt(const infinint & pos,
425 infinint & file_buf_start,
426 infinint & clear_buf_start,
427 infinint & pos_in_buf,
428 infinint & block_num);
429
430 };
431
432
434 //
435 // write_below subthread used by parallel_tronconneuse
436 // to gather and write down encrypted data work from workers
437 //
438
439 class write_below: public libthreadar::thread
440 {
441 public:
442 write_below(const std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > & from_workers,
443 const std::shared_ptr<libthreadar::barrier> & waiter,
444 U_I num_workers,
445 generic_file* encrypted_side,
446 const std::shared_ptr<heap<crypto_segment> > xtas):
447 workers(from_workers),
448 waiting(waiter),
449 num_w(num_workers),
450 cur_num_w(0),
451 encrypted(encrypted_side),
452 tas(xtas),
453 error(false),
454 error_block(0)
455 { if(encrypted == nullptr) throw SRC_BUG; };
456
457 ~write_below() { kill(); join(); };
458
459 bool exception_pending() const { return error; };
460 const infinint & get_error_block() const { return error_block; };
461
462 protected:
463 virtual void inherited_run() override;
464
465 private:
466 std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > workers;
467 std::shared_ptr<libthreadar::barrier> waiting;
468 U_I num_w;
469 U_I cur_num_w;
470 generic_file* encrypted;
471 std::shared_ptr<heap<crypto_segment> > tas;
472 bool error;
473 infinint error_block; // last crypto block before error
474 std::deque<std::unique_ptr<crypto_segment> >ones;
475 std::deque<signed int> flags;
476
477 void work();
478 };
479
480
482 //
483 // the crypto_worker threads performing ciphering/deciphering
484 // of many data blocks in parallel
485 //
486
487
488 class crypto_worker: public libthreadar::thread
489 {
490 public:
491 crypto_worker(std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & read_side,
492 std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & write_side,
493 std::shared_ptr<libthreadar::barrier> waiter,
494 std::unique_ptr<crypto_module> && ptr,
495 bool encrypt):
496 reader(read_side),
497 writer(write_side),
498 waiting(waiter),
499 crypto(move(ptr)),
500 do_encrypt(encrypt),
501 abort(status::fine)
502 { if(!reader || !writer || !waiting || !crypto) throw SRC_BUG; };
503
504 virtual ~crypto_worker() { kill(); join(); };
505
506 protected:
507 virtual void inherited_run() override;
508
509 private:
510 enum class status { fine, inform, sent };
511
512 std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & reader;
513 std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & writer;
514 std::shared_ptr<libthreadar::barrier> waiting;
515 std::unique_ptr<crypto_module> crypto;
516 bool do_encrypt; // if false do decrypt
517 std::unique_ptr<crypto_segment> ptr;
518 unsigned int slot;
519 status abort;
520
521 void work();
522 };
523
524
526
527} // end of namespace
528
529#endif
class archive_version that rules which archive format to follow
class archive_version manages the version of the archive format
this is the interface class from which all other data transfer classes inherit
bool is_terminated() const
void sync_write()
write any pending data
the arbitrary large positive integer class
this is a partial implementation of the generic_file interface to cypher/decypher data block by block...
bool send_read_order(tronco_flags order, const infinint &for_offset=0)
send and order to subthreads and gather acks from them
void go_read()
wake up threads in read mode when necessary
virtual void inherited_terminate() override
this protected inherited method is now private for inherited classes of tronconneuse
virtual void inherited_write(const char *a, U_I size) override
inherited from generic_file
virtual U_32 get_clear_block_size() const override
returns the block size given to constructor
void run_threads()
reset the interthread datastructure and launch the threads
virtual void inherited_sync_write() override
this protected inherited method is now private for inherited classes of tronconneuse
virtual infinint get_position() const override
inherited from generic_file
void join_threads()
wait for threads to finish and eventually rethrow their exceptions in current thread
virtual void write_end_of_file() override
in write_only mode indicate that end of file is reached
virtual bool skippable(skippability direction, const infinint &amount) override
inherited from generic_file
bool purge_unack_stop_order(const infinint &pos=0)
removing the ignore_stop_acks pending on the pipe
virtual void set_callback_trailing_clear_data(trailing_clear_data_callback call_back) override
parallel_tronconneuse(parallel_tronconneuse &&ref)=default
move constructor
std::unique_ptr< crypto_module > crypto
the crypto module use to cipher / uncipher block of data
tronco_flags purge_ratelier_from_next_order(infinint pos=0)
purge the ratelier from the next order which is provided as returned value
virtual bool skip_to_eof() override
inherited from generic_file
archive_version reading_ver
archive format we follow
parallel_tronconneuse(U_I workers, U_32 block_size, generic_file &encrypted_side, const archive_version &reading_ver, std::unique_ptr< crypto_module > &ptr)
This is the constructor.
virtual bool skip(const infinint &pos) override
inherited from generic_file
~parallel_tronconneuse() noexcept
destructor
parallel_tronconneuse & operator=(const parallel_tronconneuse &ref)=delete
assignment operator
virtual bool skip_relative(S_I x) override
inherited from generic_file
virtual void inherited_truncate(const infinint &pos) override
this prorected inherited method is now private for inherited classed of tronconneuse
virtual U_I inherited_read(char *a, U_I size) override
this protected inherited method is now private for inherited classes of tronconneuse
bool check_bytes_to_skip
whether to check for bytes to skip
void send_write_order(tronco_flags order)
send order in write mode
parallel_tronconneuse(const parallel_tronconneuse &ref)=delete
copy constructor
U_I ignore_stop_acks
how much stop ack still to be read (aborted stop order context)
infinint initial_shift
the offset in the "encrypted" below layer at which starts the encrypted data
U_32 clear_block_size
size of a clear block
virtual void inherited_flush_read() override
this protected inherited method is now private for inherited classes of tronconneuse
virtual bool truncatable(const infinint &pos) const override
inherited from generic_file
U_I num_workers
number of worker threads
bool find_offset_in_lus_data(const infinint &pos)
flush lus_data/lus_flags up to requested pos offset to be found or all data has been removed
void stop_threads()
end threads taking into account the fact they may be suspended on the barrier
thread_status t_status
wehther child thread are waiting us on the barrier
void read_refill()
fill lus_data/lus_flags from ratelier_gather if these are empty
virtual void set_initial_shift(const infinint &x) override
this method to modify the initial shift. This overrides the constructor "no_initial_shift" of the con...
virtual void inherited_read_ahead(const infinint &amount) override
this protected inherited method is now private for inherited classes of tronconneuse
infinint current_position
current position for the upper layer perspective (modified by skip*, inherited_read/write,...
void join_workers_only()
call by join_threads() below just code simplification around exception handling
per block cryptography implementation
defines unit block of information ciphered as once
infinint(* trailing_clear_data_callback)(generic_file &below, const archive_version &reading_ver)
the trailing_clear_data_callback call back is a mean by which the upper layer cat tell when encrypted...
tronco_flags
status flags used between parallel_tronconneuse and its sub-threads
heap data structure (relying on FIFO)
switch module to limitint (32 ou 64 bits integers) or infinint
libdar namespace encapsulate all libdar symbols
Definition: archive.hpp:47
defines common interface for tronconneuse and parallel_tronconneuse