zstdmt:: renamed mutex and cond to underline they are context-global

This commit is contained in:
Yann Collet 2018-01-25 14:52:34 -08:00
parent 4f7c896113
commit 1272d8e760

View File

@ -316,8 +316,8 @@ typedef struct {
unsigned lastChunk;
unsigned jobCompleted;
unsigned frameChecksumNeeded;
ZSTD_pthread_mutex_t* jobCompleted_mutex;
ZSTD_pthread_cond_t* jobCompleted_cond;
ZSTD_pthread_mutex_t* mtctx_mutex;
ZSTD_pthread_cond_t* mtctx_cond;
ZSTD_CCtx_params params;
const ZSTD_CDict* cdict;
ZSTDMT_CCtxPool* cctxPool;
@ -344,9 +344,9 @@ void ZSTDMT_compressChunk(void* jobDescription)
job->cSize = ERROR(memory_allocation);
goto _endJob;
}
ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */
ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
job->dstBuff = dstBuff;
ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
}
/* init */
@ -399,13 +399,13 @@ void ZSTDMT_compressChunk(void* jobDescription)
ip += ZSTD_BLOCKSIZE_MAX;
op += cSize; assert(op < oend);
/* stats */
ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */
ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); /* note : it's a mtctx mutex */
job->cSize += cSize;
job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb;
DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
(U32)cSize, (U32)job->cSize);
ZSTD_pthread_cond_signal(job->jobCompleted_cond);
ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
ZSTD_pthread_cond_signal(job->mtctx_cond);
ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
}
/* last block */
if ((nbBlocks > 0) | job->lastChunk /*must output a "last block" flag*/ ) {
@ -416,10 +416,10 @@ void ZSTDMT_compressChunk(void* jobDescription)
ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
/* stats */
ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */
ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); /* note : it's a mtctx mutex */
job->cSize += cSize;
job->consumed = job->srcSize;
ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
}
}
#endif
@ -430,11 +430,11 @@ _endJob:
ZSTDMT_releaseBuffer(job->bufPool, job->src);
job->src = g_nullBuffer; job->srcStart = NULL;
/* report */
ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
job->consumed = job->srcSize;
job->jobCompleted = 1;
ZSTD_pthread_cond_signal(job->jobCompleted_cond);
ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
ZSTD_pthread_cond_signal(job->mtctx_cond);
ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
}
@ -452,8 +452,8 @@ struct ZSTDMT_CCtx_s {
ZSTDMT_jobDescription* jobs;
ZSTDMT_bufferPool* bufPool;
ZSTDMT_CCtxPool* cctxPool;
ZSTD_pthread_mutex_t jobCompleted_mutex;
ZSTD_pthread_cond_t jobCompleted_cond;
ZSTD_pthread_mutex_t mtctx_mutex;
ZSTD_pthread_cond_t mtctx_cond;
ZSTD_CCtx_params params;
size_t targetSectionSize;
size_t inBuffSize;
@ -538,11 +538,11 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
ZSTDMT_freeCCtx(mtctx);
return NULL;
}
if (ZSTD_pthread_mutex_init(&mtctx->jobCompleted_mutex, NULL)) {
if (ZSTD_pthread_mutex_init(&mtctx->mtctx_mutex, NULL)) {
ZSTDMT_freeCCtx(mtctx);
return NULL;
}
if (ZSTD_pthread_cond_init(&mtctx->jobCompleted_cond, NULL)) {
if (ZSTD_pthread_cond_init(&mtctx->mtctx_cond, NULL)) {
ZSTDMT_freeCCtx(mtctx);
return NULL;
}
@ -582,12 +582,12 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs)
DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted");
while (zcs->doneJobID < zcs->nextJobID) {
unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
ZSTD_PTHREAD_MUTEX_LOCK(&zcs->mtctx_mutex);
while (zcs->jobs[jobID].jobCompleted==0) {
DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID); /* we want to block when waiting for data to flush */
ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
ZSTD_pthread_cond_wait(&zcs->mtctx_cond, &zcs->mtctx_mutex);
}
ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex);
ZSTD_pthread_mutex_unlock(&zcs->mtctx_mutex);
zcs->doneJobID++;
}
}
@ -601,8 +601,8 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
ZSTDMT_freeBufferPool(mtctx->bufPool);
ZSTDMT_freeCCtxPool(mtctx->cctxPool);
ZSTD_freeCDict(mtctx->cdictLocal);
ZSTD_pthread_mutex_destroy(&mtctx->jobCompleted_mutex);
ZSTD_pthread_cond_destroy(&mtctx->jobCompleted_cond);
ZSTD_pthread_mutex_destroy(&mtctx->mtctx_mutex);
ZSTD_pthread_cond_destroy(&mtctx->mtctx_cond);
ZSTD_free(mtctx, mtctx->cMem);
return 0;
}
@ -672,7 +672,7 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
{
ZSTD_frameProgression fs;
DEBUGLOG(6, "ZSTDMT_getFrameProgression");
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex);
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
fs.consumed = mtctx->consumed;
fs.produced = mtctx->produced;
assert(mtctx->inBuff.filled >= mtctx->prefixSize);
@ -690,7 +690,7 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
fs.produced += produced;
}
}
ZSTD_pthread_mutex_unlock(&mtctx->jobCompleted_mutex);
ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
return fs;
}
@ -783,8 +783,8 @@ static size_t ZSTDMT_compress_advanced_internal(
mtctx->jobs[u].firstChunk = (u==0);
mtctx->jobs[u].lastChunk = (u==nbChunks-1);
mtctx->jobs[u].jobCompleted = 0;
mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex;
mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond;
mtctx->jobs[u].mtctx_mutex = &mtctx->mtctx_mutex;
mtctx->jobs[u].mtctx_cond = &mtctx->mtctx_cond;
if (params.fParams.checksumFlag) {
XXH64_update(&xxh64, srcStart + frameStartPos, chunkSize);
@ -804,12 +804,12 @@ static size_t ZSTDMT_compress_advanced_internal(
unsigned chunkID;
for (chunkID=0; chunkID<nbChunks; chunkID++) {
DEBUGLOG(5, "waiting for chunk %u ", chunkID);
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex);
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
while (mtctx->jobs[chunkID].jobCompleted==0) {
DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID);
ZSTD_pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex);
ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex);
}
ZSTD_pthread_mutex_unlock(&mtctx->jobCompleted_mutex);
ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
DEBUGLOG(5, "ready to write chunk %u ", chunkID);
mtctx->jobs[chunkID].srcStart = NULL;
@ -1035,8 +1035,8 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, ZSTD
zcs->jobs[jobID].jobCompleted = 0;
zcs->jobs[jobID].frameChecksumNeeded = endFrame && (zcs->nextJobID>0) && zcs->params.fParams.checksumFlag;
zcs->jobs[jobID].dstFlushed = 0;
zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
zcs->jobs[jobID].mtctx_mutex = &zcs->mtctx_mutex;
zcs->jobs[jobID].mtctx_cond = &zcs->mtctx_cond;
if (zcs->params.fParams.checksumFlag)
XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->prefixSize, srcSize);
@ -1067,7 +1067,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, ZSTD
zcs->params.fParams.checksumFlag = 0;
} } }
DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)",
DEBUGLOG(2, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)",
zcs->nextJobID,
(U32)zcs->jobs[jobID].srcSize,
zcs->jobs[jobID].lastChunk,
@ -1094,18 +1094,18 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, uns
DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u)", blockToFlush);
assert(output->size >= output->pos);
ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
ZSTD_PTHREAD_MUTEX_LOCK(&zcs->mtctx_mutex);
if (blockToFlush && (zcs->doneJobID < zcs->nextJobID)) {
while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) {
if (zcs->jobs[wJobID].jobCompleted==1) break;
DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed);
ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush but more to come */
ZSTD_pthread_cond_wait(&zcs->mtctx_cond, &zcs->mtctx_mutex); /* block when nothing available to flush but more to come */
} }
/* some output is available to be flushed */
{ ZSTDMT_jobDescription job = zcs->jobs[wJobID];
ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex);
ZSTD_pthread_mutex_unlock(&zcs->mtctx_mutex);
if (ZSTD_isError(job.cSize)) {
DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
zcs->doneJobID, ZSTD_getErrorName(job.cSize));
@ -1186,8 +1186,9 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
/* single-pass shortcut (note : synchronous-mode) */
if ( (mtctx->nextJobID == 0) /* just started */
&& (mtctx->inBuff.filled == 0) /* nothing buffered */
&& (!mtctx->jobReady) /* no job already created */
&& (endOp == ZSTD_e_end) /* end order */
&& (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough room */
&& (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough space in dst */
size_t const cSize = ZSTDMT_compress_advanced_internal(mtctx,
(char*)output->dst + output->pos, output->size - output->pos,
(const char*)input->src + input->pos, input->size - input->pos,
@ -1234,7 +1235,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
/* check for potential compressed data ready to be flushed */
{ size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */
if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not flush yet */
if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not end flush yet */
return remainingToFlush;
}
}