Disk ARchive  2.7.15
Full featured and portable backup and archiving tool
parallel_block_compressor.hpp
Go to the documentation of this file.
1 /*********************************************************************/
2 // dar - disk archive - a backup/restoration program
3 // Copyright (C) 2002-2024 Denis Corbin
4 //
5 // This program is free software; you can redistribute it and/or
6 // modify it under the terms of the GNU General Public License
7 // as published by the Free Software Foundation; either version 2
8 // of the License, or (at your option) any later version.
9 //
10 // This program is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU General Public License
16 // along with this program; if not, write to the Free Software
17 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 //
19 // to contact the author, see the AUTHOR file
20 /*********************************************************************/
21 
25 
98 
99 #ifndef PARALLEL_BLOCK_COMPRESSOR_HPP
100 #define PARALLEL_BLOCK_COMPRESSOR_HPP
101 
102 #include "../my_config.h"
103 
104 #include "infinint.hpp"
105 #include "crypto_segment.hpp"
106 #include "heap.hpp"
107 #include "compress_module.hpp"
108 #include "proto_compressor.hpp"
109 
110 #include <libthreadar/libthreadar.hpp>
111 
112 namespace libdar
113 {
114 
117 
118 
120 
121  enum class compressor_block_flags { data = 0, eof_die = 1, error = 2, worker_error = 3 };
122 
123  // the following classes hold the subthreads of class parallel_block_compressor
124  // and are defined just after it below
125 
126  class zip_below_read;
127  class zip_below_write;
128  class zip_worker;
129 
130 
132  //
133  // paralle_compressor class, which holds the sub-threads
134  //
135  //
136 
137  class parallel_block_compressor: public proto_compressor
138  {
139  public:
145 
146  parallel_block_compressor(U_I num_workers,
147  std::unique_ptr<compress_module> block_zipper,
148  generic_file & compressed_side,
149  U_I uncompressed_bs = default_uncompressed_block_size);
150  // compressed_side is not owned by the object and will remains
151  // after the objet destruction
152 
153  parallel_block_compressor(const parallel_block_compressor & ref) = delete;
154  parallel_block_compressor(parallel_block_compressor && ref) noexcept = delete;
155  parallel_block_compressor & operator = (const parallel_block_compressor & ref) = delete;
156  parallel_block_compressor & operator = (parallel_block_compressor && ref) noexcept = delete;
157  ~parallel_block_compressor();
158 
159  // inherited from proto_compressor
160 
161  virtual compression get_algo() const override { return suspended? compression::none : zipper->get_algo(); };
162  virtual void suspend_compression() override;
163  virtual void resume_compression() override;
164  virtual bool is_compression_suspended() const override { return suspended; };
165 
166  // inherited from generic file
167 
168  virtual bool skippable(skippability direction, const infinint & amount) override;
169  virtual bool skip(const infinint & pos) override;
170  virtual bool skip_to_eof() override;
171  virtual bool skip_relative(S_I x) override;
172  virtual bool truncatable(const infinint & pos) const override;
173  virtual infinint get_position() const override;
174 
175  protected :
176  virtual void inherited_read_ahead(const infinint & amount) override { if(!suspended) run_read_threads(); };
177  virtual U_I inherited_read(char *a, U_I size) override;
178  virtual void inherited_write(const char *a, U_I size) override;
179  virtual void inherited_truncate(const infinint & pos) override;
180  virtual void inherited_sync_write() override;
181  virtual void inherited_flush_read() override { stop_read_threads(); };
182  virtual void inherited_terminate() override;
183 
184  private:
185 
186  // the local fields
187 
188  U_I num_w;
189  std::unique_ptr<compress_module> zipper;
190  generic_file *compressed;
191  U_I uncompressed_block_size;
192  bool suspended;
193  bool running_threads;
194  std::unique_ptr<crypto_segment> curwrite;
195  std::deque<std::unique_ptr<crypto_segment> > lus_data;
196  std::deque<signed int> lus_flags;
197  bool reof;
198 
199 
200  // inter-thread data structure
201 
202  std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > disperse;
203  std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > rassemble;
204  std::shared_ptr<heap<crypto_segment> > tas;
205 
206 
207  // the subthreads
208 
209  std::unique_ptr<zip_below_read> reader;
210  std::unique_ptr<zip_below_write> writer;
211  std::deque<std::unique_ptr<zip_worker> > travailleurs;
212 
213 
214  // private methods
215 
216  void send_flag_to_workers(compressor_block_flags flag);
217  void stop_threads();
218  void stop_read_threads();
219  void stop_write_threads();
220  void run_threads();
221  void run_read_threads();
222  void run_write_threads();
223  compressor_block_flags purge_ratelier_up_to_non_data();
224 
225 
226  // static methods
227 
228  static U_I get_ratelier_size(U_I num_workers) { return num_workers + num_workers/2; };
229  static U_I get_heap_size(U_I num_workers);
230  };
231 
232 
233 
235  //
236  // zip_below_write class/sub-thread
237  //
238  //
239 
240  class zip_below_write: public libthreadar::thread
241  {
242  public:
243  zip_below_write(const std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > & source,
244  generic_file *dest,
245  const std::shared_ptr<heap<crypto_segment> > & xtas,
246  U_I num_workers);
247 
248  ~zip_below_write() { kill(); join(); };
249 
250 
254  bool exception_pending() const { return error; };
255 
257  void reset();
258 
259  protected:
260  virtual void inherited_run() override;
261 
262  private:
263  std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > src;
264  generic_file *dst;
265  std::shared_ptr<heap<crypto_segment> > tas;
266  U_I num_w;
267  bool error;
268  U_I ending;
269  std::deque<std::unique_ptr<crypto_segment> > data;
270  std::deque<signed int> flags;
271  libthreadar::mutex get_pos;
272  infinint current_position;
273 
274  void work();
275  void pop_front() { tas->put(std::move(data.front())); data.pop_front(); flags.pop_front(); };
276  };
277 
278 
279 
280 
282  //
283  // zip_below_read class/sub-thread
284  //
285  //
286 
287 
288 
289  class zip_below_read: public libthreadar::thread
290  {
291  public:
292  zip_below_read(generic_file *source,
293  const std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > & dest,
294  const std::shared_ptr<heap<crypto_segment> > & xtas,
295  U_I num_workers);
296 
297  ~zip_below_read() { kill(); join(); };
298 
300  void do_stop() { should_i_stop = true; };
301 
303  void reset();
304 
305  protected:
306  virtual void inherited_run() override;
307 
308  private:
309  generic_file *src;
310  const std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > & dst;
311  const std::shared_ptr<heap<crypto_segment> > & tas;
312  U_I num_w;
313  std::unique_ptr<crypto_segment> ptr;
314  bool should_i_stop;
315 
316 
317  void work();
318  void push_flag_to_all_workers(compressor_block_flags flag);
319  };
320 
321 
322 
323 
325  //
326  // zip_worker class/sub-thread
327  //
328  //
329 
330 
331 
332  class zip_worker: public libthreadar::thread
333  {
334  public:
335  zip_worker(std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & read_side,
336  std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & write_size,
337  std::unique_ptr<compress_module> && ptr,
338  bool compress);
339 
340  ~zip_worker() { kill(); join(); };
341 
342  protected:
343  virtual void inherited_run() override;
344 
345  private:
346  std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & reader;
347  std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & writer;
348  std::unique_ptr<compress_module> compr;
349  bool do_compress;
350  bool error;
351  std::unique_ptr<crypto_segment> transit;
352  unsigned int transit_slot;
353 
354  void work();
355  };
356 
357 
359 
360 } // end of namespace
361 
362 
363 #endif
364 
generic_file(gf_mode m)
main constructor
provides abstracted interface of per-block compression/decompression
defines unit block of information ciphered as once
compression
the different compression algorithm available
Definition: compression.hpp:46
@ none
no compression
compressor_block_flags
the different flags used to communicate between threads hold by parallel_block_compressor class
heap data structure (relying on FIFO)
switch module to limitint (32 ou 64 bits integers) or infinint
libdar namespace encapsulate all libdar symbols
Definition: archive.hpp:47
abstracted ancestor class for compressor and parallel_compressor classes