mirror of
https://github.com/facebook/zstd.git
synced 2024-12-01 12:26:46 +08:00
Integrate ldm with zstdmt
Integrate ldm into zstdmt by running it in serial and in order in the first step of each job, in the same place as the hash gets updated. The input buffer is sized to fit the whole LDM window and 2 full buffers of slack. Input buffers cannot be reused until the LDM step is done with them. After the LDM step is finished, the jobs don't actually have access to the full window, only the overlap. Tested on a few different multi-GB files with and without sanitizers, and with different numbers of threads.
This commit is contained in:
parent
aa4dbd09a1
commit
94c77710a9
@ -27,8 +27,14 @@
|
||||
#include "pool.h" /* threadpool */
|
||||
#include "threading.h" /* mutex */
|
||||
#include "zstd_compress_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
|
||||
#include "zstd_ldm.h"
|
||||
#include "zstdmt_compress.h"
|
||||
|
||||
/* Guards code to support resizing the SeqPool.
|
||||
* We will want to resize the SeqPool to save memory in the future.
|
||||
* Until then, comment the code out since it is unused.
|
||||
*/
|
||||
#define ZSTD_RESIZE_SEQPOOL 0
|
||||
|
||||
/* ====== Debug ====== */
|
||||
#if defined(ZSTD_DEBUG) && (ZSTD_DEBUG>=2)
|
||||
@ -194,6 +200,32 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)
|
||||
}
|
||||
}
|
||||
|
||||
#if ZSTD_RESIZE_SEQPOOL
|
||||
/** ZSTDMT_resizeBuffer() :
|
||||
* assumption : bufPool must be valid
|
||||
* @return : a buffer that is at least the buffer pool buffer size.
|
||||
* If a reallocation happens, the data in the input buffer is copied.
|
||||
*/
|
||||
static buffer_t ZSTDMT_resizeBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buffer)
|
||||
{
|
||||
size_t const bSize = bufPool->bufferSize;
|
||||
if (buffer.capacity < bSize) {
|
||||
void* const start = ZSTD_malloc(bSize, bufPool->cMem);
|
||||
buffer_t newBuffer;
|
||||
newBuffer.start = start;
|
||||
newBuffer.capacity = start == NULL ? 0 : bSize;
|
||||
if (start != NULL) {
|
||||
assert(newBuffer.capacity >= buffer.capacity);
|
||||
memcpy(newBuffer.start, buffer.start, buffer.capacity);
|
||||
DEBUGLOG(5, "ZSTDMT_resizeBuffer: created buffer of size %u", (U32)bSize);
|
||||
return newBuffer;
|
||||
}
|
||||
DEBUGLOG(5, "ZSTDMT_resizeBuffer: buffer allocation failure !!");
|
||||
}
|
||||
return buffer;
|
||||
}
|
||||
#endif
|
||||
|
||||
/* store buffer for later re-use, up to pool capacity */
|
||||
static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
|
||||
{
|
||||
@ -214,6 +246,72 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
|
||||
}
|
||||
|
||||
|
||||
/* ===== Seq Pool Wrapper ====== */
|
||||
|
||||
static rawSeqStore_t kNullRawSeqStore = {NULL, 0, 0, 0};
|
||||
|
||||
typedef ZSTDMT_bufferPool ZSTDMT_seqPool;
|
||||
|
||||
static size_t ZSTDMT_sizeof_seqPool(ZSTDMT_seqPool* seqPool)
|
||||
{
|
||||
return ZSTDMT_sizeof_bufferPool(seqPool);
|
||||
}
|
||||
|
||||
static rawSeqStore_t bufferToSeq(buffer_t buffer)
|
||||
{
|
||||
rawSeqStore_t seq = {NULL, 0, 0, 0};
|
||||
seq.seq = (rawSeq*)buffer.start;
|
||||
seq.capacity = buffer.capacity / sizeof(rawSeq);
|
||||
return seq;
|
||||
}
|
||||
|
||||
static buffer_t seqToBuffer(rawSeqStore_t seq)
|
||||
{
|
||||
buffer_t buffer;
|
||||
buffer.start = seq.seq;
|
||||
buffer.capacity = seq.capacity * sizeof(rawSeq);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
static rawSeqStore_t ZSTDMT_getSeq(ZSTDMT_seqPool* seqPool)
|
||||
{
|
||||
if (seqPool->bufferSize == 0) {
|
||||
return kNullRawSeqStore;
|
||||
}
|
||||
return bufferToSeq(ZSTDMT_getBuffer(seqPool));
|
||||
}
|
||||
|
||||
#if ZSTD_RESIZE_SEQPOOL
|
||||
static rawSeqStore_t ZSTDMT_resizeSeq(ZSTDMT_seqPool* seqPool, rawSeqStore_t seq)
|
||||
{
|
||||
return bufferToSeq(ZSTDMT_resizeBuffer(seqPool, seqToBuffer(seq)));
|
||||
}
|
||||
#endif
|
||||
|
||||
static void ZSTDMT_releaseSeq(ZSTDMT_seqPool* seqPool, rawSeqStore_t seq)
|
||||
{
|
||||
ZSTDMT_releaseBuffer(seqPool, seqToBuffer(seq));
|
||||
}
|
||||
|
||||
static void ZSTDMT_setNbSeq(ZSTDMT_seqPool* const seqPool, size_t const nbSeq)
|
||||
{
|
||||
ZSTDMT_setBufferSize(seqPool, nbSeq * sizeof(rawSeq));
|
||||
}
|
||||
|
||||
static ZSTDMT_seqPool* ZSTDMT_createSeqPool(unsigned nbWorkers, ZSTD_customMem cMem)
|
||||
{
|
||||
ZSTDMT_seqPool* seqPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
|
||||
ZSTDMT_setNbSeq(seqPool, 0);
|
||||
return seqPool;
|
||||
}
|
||||
|
||||
static void ZSTDMT_freeSeqPool(ZSTDMT_seqPool* seqPool)
|
||||
{
|
||||
ZSTDMT_freeBufferPool(seqPool);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/* ===== CCtx Pool ===== */
|
||||
/* a single CCtx Pool can be invoked from multiple threads in parallel */
|
||||
|
||||
@ -312,36 +410,95 @@ typedef struct {
|
||||
} range_t;
|
||||
|
||||
typedef struct {
|
||||
/* All variables in the struct are protected by mutex. */
|
||||
ZSTD_pthread_mutex_t mutex;
|
||||
ZSTD_pthread_cond_t cond;
|
||||
ZSTD_CCtx_params params;
|
||||
ldmState_t ldmState;
|
||||
XXH64_state_t xxhState;
|
||||
unsigned nextJobID;
|
||||
/* Protects ldmWindow.
|
||||
* Must be acquired after the main mutex when acquiring both.
|
||||
*/
|
||||
ZSTD_pthread_mutex_t ldmWindowMutex;
|
||||
ZSTD_pthread_cond_t ldmWindowCond; /* Signaled when ldmWindow is udpated */
|
||||
ZSTD_window_t ldmWindow; /* A thread-safe copy of ldmState.window */
|
||||
} serialState_t;
|
||||
|
||||
static void ZSTDMT_serialState_reset(serialState_t* serialState, ZSTD_CCtx_params params)
|
||||
static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params)
|
||||
{
|
||||
/* Adjust parameters */
|
||||
if (params.ldmParams.enableLdm) {
|
||||
DEBUGLOG(4, "LDM window size = %u KB", (1U << params.cParams.windowLog) >> 10);
|
||||
params.ldmParams.windowLog = params.cParams.windowLog;
|
||||
ZSTD_ldm_adjustParameters(¶ms.ldmParams, ¶ms.cParams);
|
||||
assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog);
|
||||
assert(params.ldmParams.hashEveryLog < 32);
|
||||
serialState->ldmState.hashPower =
|
||||
ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength);
|
||||
}
|
||||
serialState->nextJobID = 0;
|
||||
if (params.fParams.checksumFlag)
|
||||
XXH64_reset(&serialState->xxhState, 0);
|
||||
if (params.ldmParams.enableLdm) {
|
||||
ZSTD_customMem cMem = params.customMem;
|
||||
unsigned const hashLog = params.ldmParams.hashLog;
|
||||
size_t const hashSize = ((size_t)1 << hashLog) * sizeof(ldmEntry_t);
|
||||
unsigned const bucketLog =
|
||||
params.ldmParams.hashLog - params.ldmParams.bucketSizeLog;
|
||||
size_t const bucketSize = (size_t)1 << bucketLog;
|
||||
unsigned const prevBucketLog =
|
||||
serialState->params.ldmParams.hashLog -
|
||||
serialState->params.ldmParams.bucketSizeLog;
|
||||
/* Size the seq pool tables */
|
||||
ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, params.jobSize));
|
||||
/* Reset the window */
|
||||
ZSTD_window_clear(&serialState->ldmState.window);
|
||||
serialState->ldmWindow = serialState->ldmState.window;
|
||||
/* Resize tables and output space if necessary. */
|
||||
if (serialState->params.ldmParams.hashLog < hashLog) {
|
||||
ZSTD_free(serialState->ldmState.hashTable, cMem);
|
||||
serialState->ldmState.hashTable = (ldmEntry_t*)ZSTD_malloc(hashSize, cMem);
|
||||
}
|
||||
if (prevBucketLog < bucketLog) {
|
||||
ZSTD_free(serialState->ldmState.bucketOffsets, cMem);
|
||||
serialState->ldmState.bucketOffsets = (BYTE*)ZSTD_malloc(bucketSize, cMem);
|
||||
}
|
||||
if (!serialState->ldmState.hashTable || !serialState->ldmState.bucketOffsets)
|
||||
return 1;
|
||||
/* Zero the tables */
|
||||
memset(serialState->ldmState.hashTable, 0, hashSize);
|
||||
memset(serialState->ldmState.bucketOffsets, 0, bucketSize);
|
||||
}
|
||||
serialState->params = params;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int ZSTDMT_serialState_init(serialState_t* serialState)
|
||||
{
|
||||
int initError = 0;
|
||||
memset(serialState, 0, sizeof(*serialState));
|
||||
initError |= ZSTD_pthread_mutex_init(&serialState->mutex, NULL);
|
||||
initError |= ZSTD_pthread_cond_init(&serialState->cond, NULL);
|
||||
initError |= ZSTD_pthread_mutex_init(&serialState->ldmWindowMutex, NULL);
|
||||
initError |= ZSTD_pthread_cond_init(&serialState->ldmWindowCond, NULL);
|
||||
return initError;
|
||||
}
|
||||
|
||||
static void ZSTDMT_serialState_free(serialState_t* serialState)
|
||||
{
|
||||
ZSTD_customMem cMem = serialState->params.customMem;
|
||||
ZSTD_pthread_mutex_destroy(&serialState->mutex);
|
||||
ZSTD_pthread_cond_destroy(&serialState->cond);
|
||||
ZSTD_pthread_mutex_destroy(&serialState->ldmWindowMutex);
|
||||
ZSTD_pthread_cond_destroy(&serialState->ldmWindowCond);
|
||||
ZSTD_free(serialState->ldmState.hashTable, cMem);
|
||||
ZSTD_free(serialState->ldmState.bucketOffsets, cMem);
|
||||
}
|
||||
|
||||
static void ZSTDMT_serialState_update(serialState_t* serialState, range_t src, unsigned jobID)
|
||||
static void ZSTDMT_serialState_update(serialState_t* serialState,
|
||||
ZSTD_CCtx* jobCCtx, rawSeqStore_t seqStore,
|
||||
range_t src, unsigned jobID)
|
||||
{
|
||||
/* Wait for our turn */
|
||||
ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex);
|
||||
@ -351,6 +508,24 @@ static void ZSTDMT_serialState_update(serialState_t* serialState, range_t src, u
|
||||
/* A future job may error and skip our job */
|
||||
if (serialState->nextJobID == jobID) {
|
||||
/* It is now our turn, do any processing necessary */
|
||||
if (serialState->params.ldmParams.enableLdm) {
|
||||
size_t error;
|
||||
assert(seqStore.seq != NULL && seqStore.pos == 0 &&
|
||||
seqStore.size == 0 && seqStore.capacity > 0);
|
||||
ZSTD_window_update(&serialState->ldmState.window, src.start, src.size);
|
||||
error = ZSTD_ldm_generateSequences(
|
||||
&serialState->ldmState, &seqStore,
|
||||
&serialState->params.ldmParams, src.start, src.size);
|
||||
/* We provide a large enough buffer to never fail. */
|
||||
assert(!ZSTD_isError(error)); (void)error;
|
||||
/* Update ldmWindow to match the ldmState.window and signal the main
|
||||
* thread if it is waiting for a buffer.
|
||||
*/
|
||||
ZSTD_PTHREAD_MUTEX_LOCK(&serialState->ldmWindowMutex);
|
||||
serialState->ldmWindow = serialState->ldmState.window;
|
||||
ZSTD_pthread_cond_signal(&serialState->ldmWindowCond);
|
||||
ZSTD_pthread_mutex_unlock(&serialState->ldmWindowMutex);
|
||||
}
|
||||
if (serialState->params.fParams.checksumFlag && src.size > 0)
|
||||
XXH64_update(&serialState->xxhState, src.start, src.size);
|
||||
}
|
||||
@ -358,6 +533,14 @@ static void ZSTDMT_serialState_update(serialState_t* serialState, range_t src, u
|
||||
serialState->nextJobID++;
|
||||
ZSTD_pthread_cond_broadcast(&serialState->cond);
|
||||
ZSTD_pthread_mutex_unlock(&serialState->mutex);
|
||||
|
||||
if (seqStore.size > 0) {
|
||||
size_t const err = ZSTD_referenceExternalSequences(
|
||||
jobCCtx, seqStore.seq, seqStore.size);
|
||||
assert(serialState->params.ldmParams.enableLdm);
|
||||
assert(!ZSTD_isError(err));
|
||||
(void)err;
|
||||
}
|
||||
}
|
||||
|
||||
static void ZSTDMT_serialState_ensureFinished(serialState_t* serialState,
|
||||
@ -369,6 +552,11 @@ static void ZSTDMT_serialState_ensureFinished(serialState_t* serialState,
|
||||
DEBUGLOG(5, "Skipping past job %u because of error", jobID);
|
||||
serialState->nextJobID = jobID + 1;
|
||||
ZSTD_pthread_cond_broadcast(&serialState->cond);
|
||||
|
||||
ZSTD_PTHREAD_MUTEX_LOCK(&serialState->ldmWindowMutex);
|
||||
ZSTD_window_clear(&serialState->ldmWindow);
|
||||
ZSTD_pthread_cond_signal(&serialState->ldmWindowCond);
|
||||
ZSTD_pthread_mutex_unlock(&serialState->ldmWindowMutex);
|
||||
}
|
||||
ZSTD_pthread_mutex_unlock(&serialState->mutex);
|
||||
|
||||
@ -388,6 +576,7 @@ typedef struct {
|
||||
ZSTD_pthread_cond_t job_cond; /* Thread-safe - used by mtctx and worker */
|
||||
ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */
|
||||
ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */
|
||||
ZSTDMT_seqPool* seqPool; /* Thread-safe - used by mtctx and (all) workers */
|
||||
serialState_t* serial; /* Thread-safe - used by mtctx and (all) workers */
|
||||
buffer_t dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */
|
||||
range_t prefix; /* set by mtctx, then read by worker & mtctx => no barrier */
|
||||
@ -408,10 +597,15 @@ void ZSTDMT_compressionJob(void* jobDescription)
|
||||
ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
|
||||
ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */
|
||||
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool);
|
||||
rawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool);
|
||||
buffer_t dstBuff = job->dstBuff;
|
||||
|
||||
/* Don't compute the checksum for chunks, but write it in the header */
|
||||
/* Don't compute the checksum for chunks, since we compute it externally,
|
||||
* but write it in the header.
|
||||
*/
|
||||
if (job->jobID != 0) jobParams.fParams.checksumFlag = 0;
|
||||
/* Don't run LDM for the chunks, since we handle it externally */
|
||||
jobParams.ldmParams.enableLdm = 0;
|
||||
|
||||
/* ressources */
|
||||
if (cctx==NULL) {
|
||||
@ -448,8 +642,8 @@ void ZSTDMT_compressionJob(void* jobDescription)
|
||||
goto _endJob;
|
||||
} } }
|
||||
|
||||
/* Perform serial step as early as possible */
|
||||
ZSTDMT_serialState_update(job->serial, job->src, job->jobID);
|
||||
/* Perform serial step as early as possible, but after CCtx initialization */
|
||||
ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID);
|
||||
|
||||
if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */
|
||||
size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0);
|
||||
@ -504,6 +698,7 @@ _endJob:
|
||||
DEBUGLOG(5, "Finished with prefix: %zx", (size_t)job->prefix.start);
|
||||
DEBUGLOG(5, "Finished with source: %zx", (size_t)job->src.start);
|
||||
/* release resources */
|
||||
ZSTDMT_releaseSeq(job->seqPool, rawSeqStore);
|
||||
ZSTDMT_releaseCCtx(job->cctxPool, cctx);
|
||||
/* report */
|
||||
ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
|
||||
@ -544,6 +739,7 @@ struct ZSTDMT_CCtx_s {
|
||||
ZSTDMT_jobDescription* jobs;
|
||||
ZSTDMT_bufferPool* bufPool;
|
||||
ZSTDMT_CCtxPool* cctxPool;
|
||||
ZSTDMT_seqPool* seqPool;
|
||||
ZSTD_CCtx_params params;
|
||||
size_t targetSectionSize;
|
||||
size_t targetPrefixSize;
|
||||
@ -635,9 +831,10 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem)
|
||||
mtctx->jobIDMask = nbJobs - 1;
|
||||
mtctx->bufPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
|
||||
mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem);
|
||||
mtctx->seqPool = ZSTDMT_createSeqPool(nbWorkers, cMem);
|
||||
initError = ZSTDMT_serialState_init(&mtctx->serial);
|
||||
mtctx->roundBuff = kNullRoundBuff;
|
||||
if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool | initError) {
|
||||
if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool | !mtctx->seqPool | initError) {
|
||||
ZSTDMT_freeCCtx(mtctx);
|
||||
return NULL;
|
||||
}
|
||||
@ -692,6 +889,7 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
|
||||
ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
|
||||
ZSTDMT_freeBufferPool(mtctx->bufPool);
|
||||
ZSTDMT_freeCCtxPool(mtctx->cctxPool);
|
||||
ZSTDMT_freeSeqPool(mtctx->seqPool);
|
||||
ZSTDMT_serialState_free(&mtctx->serial);
|
||||
ZSTD_freeCDict(mtctx->cdictLocal);
|
||||
if (mtctx->roundBuff.buffer)
|
||||
@ -708,6 +906,7 @@ size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx)
|
||||
+ ZSTDMT_sizeof_bufferPool(mtctx->bufPool)
|
||||
+ (mtctx->jobIDMask+1) * sizeof(ZSTDMT_jobDescription)
|
||||
+ ZSTDMT_sizeof_CCtxPool(mtctx->cctxPool)
|
||||
+ ZSTDMT_sizeof_seqPool(mtctx->seqPool)
|
||||
+ ZSTD_sizeof_CDict(mtctx->cdictLocal)
|
||||
+ mtctx->roundBuff.capacity;
|
||||
}
|
||||
@ -761,7 +960,6 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
|
||||
jobParams.compressionLevel = params.compressionLevel;
|
||||
jobParams.disableLiteralCompression = params.disableLiteralCompression;
|
||||
|
||||
jobParams.ldmParams = params.ldmParams;
|
||||
return jobParams;
|
||||
}
|
||||
|
||||
@ -827,12 +1025,16 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
|
||||
|
||||
static size_t ZSTDMT_computeTargetJobLog(ZSTD_CCtx_params const params)
|
||||
{
|
||||
if (params.ldmParams.enableLdm)
|
||||
return MAX(21, params.cParams.chainLog + 4);
|
||||
return params.cParams.windowLog + 2;
|
||||
}
|
||||
|
||||
static size_t ZSTDMT_computeOverlapLog(ZSTD_CCtx_params const params)
|
||||
{
|
||||
unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog;
|
||||
if (params.ldmParams.enableLdm)
|
||||
return (MIN(params.cParams.windowLog, ZSTDMT_computeTargetJobLog(params) - 2) - overlapRLog);
|
||||
return overlapRLog >= 9 ? 0 : (params.cParams.windowLog - overlapRLog);
|
||||
}
|
||||
|
||||
@ -856,7 +1058,7 @@ static size_t ZSTDMT_compress_advanced_internal(
|
||||
void* dst, size_t dstCapacity,
|
||||
const void* src, size_t srcSize,
|
||||
const ZSTD_CDict* cdict,
|
||||
ZSTD_CCtx_params const params)
|
||||
ZSTD_CCtx_params params)
|
||||
{
|
||||
ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params);
|
||||
size_t const overlapSize = (size_t)1 << ZSTDMT_computeOverlapLog(params);
|
||||
@ -870,6 +1072,7 @@ static size_t ZSTDMT_compress_advanced_internal(
|
||||
assert(jobParams.nbWorkers == 0);
|
||||
assert(mtctx->cctxPool->totalCCtx == params.nbWorkers);
|
||||
|
||||
params.jobSize = (U32)avgJobSize;
|
||||
DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ",
|
||||
nbJobs, (U32)proposedJobSize, (U32)avgJobSize);
|
||||
|
||||
@ -882,7 +1085,8 @@ static size_t ZSTDMT_compress_advanced_internal(
|
||||
|
||||
assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */
|
||||
ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) );
|
||||
ZSTDMT_serialState_reset(&mtctx->serial, params);
|
||||
if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params))
|
||||
return ERROR(memory_allocation);
|
||||
|
||||
if (nbJobs > mtctx->jobIDMask+1) { /* enlarge job table */
|
||||
U32 jobsTableSize = nbJobs;
|
||||
@ -915,6 +1119,7 @@ static size_t ZSTDMT_compress_advanced_internal(
|
||||
mtctx->jobs[u].dstBuff = dstBuffer;
|
||||
mtctx->jobs[u].cctxPool = mtctx->cctxPool;
|
||||
mtctx->jobs[u].bufPool = mtctx->bufPool;
|
||||
mtctx->jobs[u].seqPool = mtctx->seqPool;
|
||||
mtctx->jobs[u].serial = &mtctx->serial;
|
||||
mtctx->jobs[u].jobID = u;
|
||||
mtctx->jobs[u].firstJob = (u==0);
|
||||
@ -1023,10 +1228,7 @@ size_t ZSTDMT_initCStream_internal(
|
||||
|
||||
/* init */
|
||||
if (params.jobSize == 0) {
|
||||
if (params.cParams.windowLog >= 29)
|
||||
params.jobSize = ZSTDMT_JOBSIZE_MAX;
|
||||
else
|
||||
params.jobSize = 1U << ZSTDMT_computeTargetJobLog(params);
|
||||
params.jobSize = 1U << ZSTDMT_computeTargetJobLog(params);
|
||||
}
|
||||
if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX;
|
||||
|
||||
@ -1072,11 +1274,15 @@ size_t ZSTDMT_initCStream_internal(
|
||||
DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10));
|
||||
ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize));
|
||||
{
|
||||
/* If ldm is enabled we need windowSize space. */
|
||||
size_t const windowSize = mtctx->params.ldmParams.enableLdm ? (1U << mtctx->params.cParams.windowLog) : 0;
|
||||
/* Two buffers of slack, plus extra space for the overlap */
|
||||
size_t const nbWorkers = MAX(mtctx->params.nbWorkers, 1);
|
||||
size_t const nbSlackBuffers = MIN(nbWorkers, 2) + (mtctx->targetPrefixSize > 0);
|
||||
size_t const nbSections = nbWorkers + nbSlackBuffers;
|
||||
size_t const capacity = mtctx->targetSectionSize * nbSections;
|
||||
size_t const slackSize = mtctx->targetSectionSize * nbSlackBuffers;
|
||||
/* Compute the total size, and always have enough slack */
|
||||
size_t const sectionsSize = mtctx->targetSectionSize * nbWorkers;
|
||||
size_t const capacity = MAX(windowSize, sectionsSize) + slackSize;
|
||||
if (mtctx->roundBuff.capacity < capacity) {
|
||||
if (mtctx->roundBuff.buffer)
|
||||
ZSTD_free(mtctx->roundBuff.buffer, mtctx->cMem);
|
||||
@ -1099,7 +1305,8 @@ size_t ZSTDMT_initCStream_internal(
|
||||
mtctx->allJobsCompleted = 0;
|
||||
mtctx->consumed = 0;
|
||||
mtctx->produced = 0;
|
||||
ZSTDMT_serialState_reset(&mtctx->serial, params);
|
||||
if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params))
|
||||
return ERROR(memory_allocation);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -1201,6 +1408,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
|
||||
mtctx->jobs[jobID].dstBuff = g_nullBuffer;
|
||||
mtctx->jobs[jobID].cctxPool = mtctx->cctxPool;
|
||||
mtctx->jobs[jobID].bufPool = mtctx->bufPool;
|
||||
mtctx->jobs[jobID].seqPool = mtctx->seqPool;
|
||||
mtctx->jobs[jobID].serial = &mtctx->serial;
|
||||
mtctx->jobs[jobID].jobID = mtctx->nextJobID;
|
||||
mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0);
|
||||
@ -1387,6 +1595,44 @@ static int ZSTDMT_isOverlapped(buffer_t buffer, range_t range)
|
||||
return bufferStart < rangeEnd && rangeStart < bufferEnd;
|
||||
}
|
||||
|
||||
static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window)
|
||||
{
|
||||
range_t extDict;
|
||||
range_t prefix;
|
||||
|
||||
extDict.start = window.dictBase + window.lowLimit;
|
||||
extDict.size = window.dictLimit - window.lowLimit;
|
||||
|
||||
prefix.start = window.base + window.dictLimit;
|
||||
prefix.size = window.nextSrc - (window.base + window.dictLimit);
|
||||
DEBUGLOG(5, "extDict [0x%zx, 0x%zx)",
|
||||
(size_t)extDict.start,
|
||||
(size_t)extDict.start + extDict.size);
|
||||
DEBUGLOG(5, "prefix [0x%zx, 0x%zx)",
|
||||
(size_t)prefix.start,
|
||||
(size_t)prefix.start + prefix.size);
|
||||
|
||||
return ZSTDMT_isOverlapped(buffer, extDict)
|
||||
|| ZSTDMT_isOverlapped(buffer, prefix);
|
||||
}
|
||||
|
||||
static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer)
|
||||
{
|
||||
if (mtctx->params.ldmParams.enableLdm) {
|
||||
ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex;
|
||||
DEBUGLOG(5, "source [0x%zx, 0x%zx)",
|
||||
(size_t)buffer.start,
|
||||
(size_t)buffer.start + buffer.capacity);
|
||||
ZSTD_PTHREAD_MUTEX_LOCK(mutex);
|
||||
while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) {
|
||||
DEBUGLOG(6, "Waiting for LDM to finish...");
|
||||
ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex);
|
||||
}
|
||||
DEBUGLOG(6, "Done waiting for LDM to finish");
|
||||
ZSTD_pthread_mutex_unlock(mutex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to set the inBuff to the next section to fill.
|
||||
* If any part of the new section is still in use we give up.
|
||||
@ -1415,6 +1661,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx)
|
||||
DEBUGLOG(6, "Waiting for buffer...");
|
||||
return 0;
|
||||
}
|
||||
ZSTDMT_waitForLdmComplete(mtctx, buffer);
|
||||
memmove(start, mtctx->inBuff.prefix.start, prefixSize);
|
||||
mtctx->inBuff.prefix.start = start;
|
||||
mtctx->roundBuff.pos = prefixSize;
|
||||
@ -1427,6 +1674,9 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx)
|
||||
return 0;
|
||||
}
|
||||
assert(!ZSTDMT_isOverlapped(buffer, mtctx->inBuff.prefix));
|
||||
|
||||
ZSTDMT_waitForLdmComplete(mtctx, buffer);
|
||||
|
||||
DEBUGLOG(5, "Using prefix range [%zx, %zx)",
|
||||
(size_t)mtctx->inBuff.prefix.start,
|
||||
(size_t)mtctx->inBuff.prefix.start + mtctx->inBuff.prefix.size);
|
||||
|
Loading…
Reference in New Issue
Block a user