summaryrefslogtreecommitdiffstats
path: root/contrib/xz/src/liblzma/common/stream_encoder_mt.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/xz/src/liblzma/common/stream_encoder_mt.c')
-rw-r--r--contrib/xz/src/liblzma/common/stream_encoder_mt.c124
1 files changed, 68 insertions, 56 deletions
diff --git a/contrib/xz/src/liblzma/common/stream_encoder_mt.c b/contrib/xz/src/liblzma/common/stream_encoder_mt.c
index 9780ed0..2efe44c 100644
--- a/contrib/xz/src/liblzma/common/stream_encoder_mt.c
+++ b/contrib/xz/src/liblzma/common/stream_encoder_mt.c
@@ -44,6 +44,7 @@ typedef enum {
} worker_state;
+typedef struct lzma_stream_coder_s lzma_stream_coder;
typedef struct worker_thread_s worker_thread;
struct worker_thread_s {
@@ -65,7 +66,7 @@ struct worker_thread_s {
/// Pointer to the main structure is needed when putting this
/// thread back to the stack of free threads.
- lzma_coder *coder;
+ lzma_stream_coder *coder;
/// The allocator is set by the main thread. Since a copy of the
/// pointer is kept here, the application must not change the
@@ -96,7 +97,7 @@ struct worker_thread_s {
};
-struct lzma_coder_s {
+struct lzma_stream_coder_s {
enum {
SEQ_STREAM_HEADER,
SEQ_BLOCK,
@@ -417,7 +418,7 @@ worker_start(void *thr_ptr)
/// Make the threads stop but not exit. Optionally wait for them to stop.
static void
-threads_stop(lzma_coder *coder, bool wait_for_threads)
+threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
{
// Tell the threads to stop.
for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
@@ -446,7 +447,7 @@ threads_stop(lzma_coder *coder, bool wait_for_threads)
/// Stop the threads and free the resources associated with them.
/// Wait until the threads have exited.
static void
-threads_end(lzma_coder *coder, const lzma_allocator *allocator)
+threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
{
for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
mythread_sync(coder->threads[i].mutex) {
@@ -468,7 +469,8 @@ threads_end(lzma_coder *coder, const lzma_allocator *allocator)
/// Initialize a new worker_thread structure and create a new thread.
static lzma_ret
-initialize_new_thread(lzma_coder *coder, const lzma_allocator *allocator)
+initialize_new_thread(lzma_stream_coder *coder,
+ const lzma_allocator *allocator)
{
worker_thread *thr = &coder->threads[coder->threads_initialized];
@@ -510,7 +512,7 @@ error_mutex:
static lzma_ret
-get_thread(lzma_coder *coder, const lzma_allocator *allocator)
+get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
{
// If there are no free output subqueues, there is no
// point to try getting a thread.
@@ -548,7 +550,7 @@ get_thread(lzma_coder *coder, const lzma_allocator *allocator)
static lzma_ret
-stream_encode_in(lzma_coder *coder, const lzma_allocator *allocator,
+stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
const uint8_t *restrict in, size_t *restrict in_pos,
size_t in_size, lzma_action action)
{
@@ -616,7 +618,7 @@ stream_encode_in(lzma_coder *coder, const lzma_allocator *allocator,
/// Wait until more input can be consumed, more output can be read, or
/// an optional timeout is reached.
static bool
-wait_for_work(lzma_coder *coder, mythread_condtime *wait_abs,
+wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
bool *has_blocked, bool has_input)
{
if (coder->timeout != 0 && !*has_blocked) {
@@ -662,11 +664,13 @@ wait_for_work(lzma_coder *coder, mythread_condtime *wait_abs,
static lzma_ret
-stream_encode_mt(lzma_coder *coder, const lzma_allocator *allocator,
+stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
const uint8_t *restrict in, size_t *restrict in_pos,
size_t in_size, uint8_t *restrict out,
size_t *restrict out_pos, size_t out_size, lzma_action action)
{
+ lzma_stream_coder *coder = coder_ptr;
+
switch (coder->sequence) {
case SEQ_STREAM_HEADER:
lzma_bufcpy(coder->header, &coder->header_pos,
@@ -834,8 +838,10 @@ stream_encode_mt(lzma_coder *coder, const lzma_allocator *allocator,
static void
-stream_encoder_mt_end(lzma_coder *coder, const lzma_allocator *allocator)
+stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
{
+ lzma_stream_coder *coder = coder_ptr;
+
// Threads must be killed before the output queue can be freed.
threads_end(coder, allocator);
lzma_outq_end(&coder->outq, allocator);
@@ -907,10 +913,12 @@ get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
static void
-get_progress(lzma_coder *coder, uint64_t *progress_in, uint64_t *progress_out)
+get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
{
+ lzma_stream_coder *coder = coder_ptr;
+
// Lock coder->mutex to prevent finishing threads from moving their
- // progress info from the worker_thread structure to lzma_coder.
+ // progress info from the worker_thread structure to lzma_stream_coder.
mythread_sync(coder->mutex) {
*progress_in = coder->progress_in;
*progress_out = coder->progress_out;
@@ -962,24 +970,27 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
return LZMA_UNSUPPORTED_CHECK;
// Allocate and initialize the base structure if needed.
- if (next->coder == NULL) {
- next->coder = lzma_alloc(sizeof(lzma_coder), allocator);
- if (next->coder == NULL)
+ lzma_stream_coder *coder = next->coder;
+ if (coder == NULL) {
+ coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
+ if (coder == NULL)
return LZMA_MEM_ERROR;
+ next->coder = coder;
+
// For the mutex and condition variable initializations
// the error handling has to be done here because
// stream_encoder_mt_end() doesn't know if they have
// already been initialized or not.
- if (mythread_mutex_init(&next->coder->mutex)) {
- lzma_free(next->coder, allocator);
+ if (mythread_mutex_init(&coder->mutex)) {
+ lzma_free(coder, allocator);
next->coder = NULL;
return LZMA_MEM_ERROR;
}
- if (mythread_cond_init(&next->coder->cond)) {
- mythread_mutex_destroy(&next->coder->mutex);
- lzma_free(next->coder, allocator);
+ if (mythread_cond_init(&coder->cond)) {
+ mythread_mutex_destroy(&coder->mutex);
+ lzma_free(coder, allocator);
next->coder = NULL;
return LZMA_MEM_ERROR;
}
@@ -989,76 +1000,76 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
next->get_progress = &get_progress;
// next->update = &stream_encoder_mt_update;
- next->coder->filters[0].id = LZMA_VLI_UNKNOWN;
- next->coder->index_encoder = LZMA_NEXT_CODER_INIT;
- next->coder->index = NULL;
- memzero(&next->coder->outq, sizeof(next->coder->outq));
- next->coder->threads = NULL;
- next->coder->threads_max = 0;
- next->coder->threads_initialized = 0;
+ coder->filters[0].id = LZMA_VLI_UNKNOWN;
+ coder->index_encoder = LZMA_NEXT_CODER_INIT;
+ coder->index = NULL;
+ memzero(&coder->outq, sizeof(coder->outq));
+ coder->threads = NULL;
+ coder->threads_max = 0;
+ coder->threads_initialized = 0;
}
// Basic initializations
- next->coder->sequence = SEQ_STREAM_HEADER;
- next->coder->block_size = (size_t)(block_size);
- next->coder->thread_error = LZMA_OK;
- next->coder->thr = NULL;
+ coder->sequence = SEQ_STREAM_HEADER;
+ coder->block_size = (size_t)(block_size);
+ coder->thread_error = LZMA_OK;
+ coder->thr = NULL;
// Allocate the thread-specific base structures.
assert(options->threads > 0);
- if (next->coder->threads_max != options->threads) {
- threads_end(next->coder, allocator);
+ if (coder->threads_max != options->threads) {
+ threads_end(coder, allocator);
- next->coder->threads = NULL;
- next->coder->threads_max = 0;
+ coder->threads = NULL;
+ coder->threads_max = 0;
- next->coder->threads_initialized = 0;
- next->coder->threads_free = NULL;
+ coder->threads_initialized = 0;
+ coder->threads_free = NULL;
- next->coder->threads = lzma_alloc(
+ coder->threads = lzma_alloc(
options->threads * sizeof(worker_thread),
allocator);
- if (next->coder->threads == NULL)
+ if (coder->threads == NULL)
return LZMA_MEM_ERROR;
- next->coder->threads_max = options->threads;
+ coder->threads_max = options->threads;
} else {
// Reuse the old structures and threads. Tell the running
// threads to stop and wait until they have stopped.
- threads_stop(next->coder, true);
+ threads_stop(coder, true);
}
// Output queue
- return_if_error(lzma_outq_init(&next->coder->outq, allocator,
+ return_if_error(lzma_outq_init(&coder->outq, allocator,
outbuf_size_max, options->threads));
// Timeout
- next->coder->timeout = options->timeout;
+ coder->timeout = options->timeout;
// Free the old filter chain and copy the new one.
- for (size_t i = 0; next->coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
- lzma_free(next->coder->filters[i].options, allocator);
+ for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
+ lzma_free(coder->filters[i].options, allocator);
return_if_error(lzma_filters_copy(
- filters, next->coder->filters, allocator));
+ filters, coder->filters, allocator));
// Index
- lzma_index_end(next->coder->index, allocator);
- next->coder->index = lzma_index_init(allocator);
- if (next->coder->index == NULL)
+ lzma_index_end(coder->index, allocator);
+ coder->index = lzma_index_init(allocator);
+ if (coder->index == NULL)
return LZMA_MEM_ERROR;
// Stream Header
- next->coder->stream_flags.version = 0;
- next->coder->stream_flags.check = options->check;
+ coder->stream_flags.version = 0;
+ coder->stream_flags.check = options->check;
return_if_error(lzma_stream_header_encode(
- &next->coder->stream_flags, next->coder->header));
+ &coder->stream_flags, coder->header));
- next->coder->header_pos = 0;
+ coder->header_pos = 0;
// Progress info
- next->coder->progress_in = 0;
- next->coder->progress_out = LZMA_STREAM_HEADER_SIZE;
+ coder->progress_in = 0;
+ coder->progress_out = LZMA_STREAM_HEADER_SIZE;
return LZMA_OK;
}
@@ -1111,7 +1122,8 @@ lzma_stream_encoder_mt_memusage(const lzma_mt *options)
return UINT64_MAX;
// Sum them with overflow checking.
- uint64_t total_memusage = LZMA_MEMUSAGE_BASE + sizeof(lzma_coder)
+ uint64_t total_memusage = LZMA_MEMUSAGE_BASE
+ + sizeof(lzma_stream_coder)
+ options->threads * sizeof(worker_thread);
if (UINT64_MAX - total_memusage < inbuf_memusage)
OpenPOWER on IntegriCloud