Disk ARchive 2.7.16
Full featured and portable backup and archiving tool
parallel_block_compressor.hpp
Go to the documentation of this file.
1/*********************************************************************/
2// dar - disk archive - a backup/restoration program
3// Copyright (C) 2002-2024 Denis Corbin
4//
5// This program is free software; you can redistribute it and/or
6// modify it under the terms of the GNU General Public License
7// as published by the Free Software Foundation; either version 2
8// of the License, or (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU General Public License
16// along with this program; if not, write to the Free Software
17// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18//
19// to contact the author, see the AUTHOR file
20/*********************************************************************/
21
25
98
99#ifndef PARALLEL_BLOCK_COMPRESSOR_HPP
100#define PARALLEL_BLOCK_COMPRESSOR_HPP
101
102#include "../my_config.h"
103
104#include "infinint.hpp"
105#include "crypto_segment.hpp"
106#include "heap.hpp"
107#include "compress_module.hpp"
108#include "proto_compressor.hpp"
109
110#include <libthreadar/libthreadar.hpp>
111
112namespace libdar
113{
114
117
118
120
121 enum class compressor_block_flags { data = 0, eof_die = 1, error = 2, worker_error = 3 };
122
123 // the following classes hold the subthreads of class parallel_block_compressor
124 // and are defined just after it below
125
126 class zip_below_read;
127 class zip_below_write;
128 class zip_worker;
129
130
132 //
133 // paralle_compressor class, which holds the sub-threads
134 //
135 //
136
137 class parallel_block_compressor: public proto_compressor
138 {
139 public:
145
146 parallel_block_compressor(U_I num_workers,
147 std::unique_ptr<compress_module> block_zipper,
148 generic_file & compressed_side,
149 U_I uncompressed_bs = default_uncompressed_block_size);
150 // compressed_side is not owned by the object and will remains
151 // after the objet destruction
152
153 parallel_block_compressor(const parallel_block_compressor & ref) = delete;
154 parallel_block_compressor(parallel_block_compressor && ref) noexcept = delete;
155 parallel_block_compressor & operator = (const parallel_block_compressor & ref) = delete;
156 parallel_block_compressor & operator = (parallel_block_compressor && ref) noexcept = delete;
157 ~parallel_block_compressor();
158
159 // inherited from proto_compressor
160
161 virtual compression get_algo() const override { return suspended? compression::none : zipper->get_algo(); };
162 virtual void suspend_compression() override;
163 virtual void resume_compression() override;
164 virtual bool is_compression_suspended() const override { return suspended; };
165
166 // inherited from generic file
167
168 virtual bool skippable(skippability direction, const infinint & amount) override;
169 virtual bool skip(const infinint & pos) override;
170 virtual bool skip_to_eof() override;
171 virtual bool skip_relative(S_I x) override;
172 virtual bool truncatable(const infinint & pos) const override;
173 virtual infinint get_position() const override;
174
175 protected :
176 virtual void inherited_read_ahead(const infinint & amount) override { if(!suspended) run_read_threads(); };
177 virtual U_I inherited_read(char *a, U_I size) override;
178 virtual void inherited_write(const char *a, U_I size) override;
179 virtual void inherited_truncate(const infinint & pos) override;
180 virtual void inherited_sync_write() override;
181 virtual void inherited_flush_read() override { stop_read_threads(); };
182 virtual void inherited_terminate() override;
183
184 private:
185
186 // the local fields
187
188 U_I num_w;
189 std::unique_ptr<compress_module> zipper;
190 generic_file *compressed;
191 U_I uncompressed_block_size;
192 bool suspended;
193 bool running_threads;
194 std::unique_ptr<crypto_segment> curwrite;
195 std::deque<std::unique_ptr<crypto_segment> > lus_data;
196 std::deque<signed int> lus_flags;
197 bool reof;
198
199
200 // inter-thread data structure
201
202 std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > disperse;
203 std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > rassemble;
204 std::shared_ptr<heap<crypto_segment> > tas;
205
206
207 // the subthreads
208
209 std::unique_ptr<zip_below_read> reader;
210 std::unique_ptr<zip_below_write> writer;
211 std::deque<std::unique_ptr<zip_worker> > travailleurs;
212
213
214 // private methods
215
216 void send_flag_to_workers(compressor_block_flags flag);
217 void stop_threads();
218 void stop_read_threads();
219 void stop_write_threads();
220 void run_threads();
221 void run_read_threads();
222 void run_write_threads();
223 compressor_block_flags purge_ratelier_up_to_non_data();
224
225
226 // static methods
227
228 static U_I get_ratelier_size(U_I num_workers) { return num_workers + num_workers/2; };
229 static U_I get_heap_size(U_I num_workers);
230 };
231
232
233
235 //
236 // zip_below_write class/sub-thread
237 //
238 //
239
240 class zip_below_write: public libthreadar::thread
241 {
242 public:
243 zip_below_write(const std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > & source,
244 generic_file *dest,
245 const std::shared_ptr<heap<crypto_segment> > & xtas,
246 U_I num_workers);
247
248 ~zip_below_write() { kill(); join(); };
249
250
254 bool exception_pending() const { return error; };
255
257 void reset();
258
259 protected:
260 virtual void inherited_run() override;
261
262 private:
263 std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > src;
264 generic_file *dst;
265 std::shared_ptr<heap<crypto_segment> > tas;
266 U_I num_w;
267 bool error;
268 U_I ending;
269 std::deque<std::unique_ptr<crypto_segment> > data;
270 std::deque<signed int> flags;
271 libthreadar::mutex get_pos;
272 infinint current_position;
273
274 void work();
275 void pop_front() { tas->put(std::move(data.front())); data.pop_front(); flags.pop_front(); };
276 };
277
278
279
280
282 //
283 // zip_below_read class/sub-thread
284 //
285 //
286
287
288
289 class zip_below_read: public libthreadar::thread
290 {
291 public:
292 zip_below_read(generic_file *source,
293 const std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > & dest,
294 const std::shared_ptr<heap<crypto_segment> > & xtas,
295 U_I num_workers);
296
297 ~zip_below_read() { kill(); join(); };
298
300 void do_stop() { should_i_stop = true; };
301
303 void reset();
304
305 protected:
306 virtual void inherited_run() override;
307
308 private:
309 generic_file *src;
310 const std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > & dst;
311 const std::shared_ptr<heap<crypto_segment> > & tas;
312 U_I num_w;
313 std::unique_ptr<crypto_segment> ptr;
314 bool should_i_stop;
315
316
317 void work();
318 void push_flag_to_all_workers(compressor_block_flags flag);
319 };
320
321
322
323
325 //
326 // zip_worker class/sub-thread
327 //
328 //
329
330
331
332 class zip_worker: public libthreadar::thread
333 {
334 public:
335 zip_worker(std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & read_side,
336 std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & write_size,
337 std::unique_ptr<compress_module> && ptr,
338 bool compress);
339
340 ~zip_worker() { kill(); join(); };
341
342 protected:
343 virtual void inherited_run() override;
344
345 private:
346 std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & reader;
347 std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & writer;
348 std::unique_ptr<compress_module> compr;
349 bool do_compress;
350 bool error;
351 std::unique_ptr<crypto_segment> transit;
352 unsigned int transit_slot;
353
354 void work();
355 };
356
357
359
360} // end of namespace
361
362
363#endif
364
provides abstracted interface of per-block compression/decompression
defines unit block of information ciphered as once
compression
the different compression algorithm available
Definition: compression.hpp:46
@ none
no compression
compressor_block_flags
the different flags used to communicate between threads hold by parallel_block_compressor class
heap data structure (relying on FIFO)
switch module to limitint (32 ou 64 bits integers) or infinint
libdar namespace encapsulate all libdar symbols
Definition: archive.hpp:47
abstracted ancestor class for compressor and parallel_compressor classes