zstdmt: there is now one mutex/cond per job

This commit is contained in:
Yann Collet 2018-01-26 17:48:33 -08:00
parent 77e36273de
commit 9c40ae7ff1
2 changed files with 55 additions and 52 deletions

View File

@ -310,8 +310,8 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
typedef struct {
size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */
ZSTD_pthread_mutex_t* mtctx_mutex; /* Thread-safe - used by mtctx and (all) workers */
ZSTD_pthread_cond_t* mtctx_cond; /* Thread-safe - used by mtctx and (all) workers */
ZSTD_pthread_mutex_t job_mutex; /* Thread-safe - used by mtctx and worker */
ZSTD_pthread_cond_t job_cond; /* Thread-safe - used by mtctx and worker */
ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */
ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */
buffer_t dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */
@ -395,13 +395,13 @@ void ZSTDMT_compressChunk(void* jobDescription)
ip += blockSize;
op += cSize; assert(op < oend);
/* stats */
ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); /* note : it's a mtctx mutex */
ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); /* note : it's a mtctx mutex */
job->cSize += cSize;
job->consumed = blockSize * blockNb;
DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
(U32)cSize, (U32)job->cSize);
ZSTD_pthread_cond_signal(job->mtctx_cond); /* warns some more data is ready to be flushed */
ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
ZSTD_pthread_cond_signal(&job->job_cond); /* warns some more data is ready to be flushed */
ZSTD_pthread_mutex_unlock(&job->job_mutex);
}
/* last block */
assert(blockSize > 0); assert((blockSize & (blockSize - 1)) == 0); /* blockSize must be power of 2 for mask==(blockSize-1) to work */
@ -413,9 +413,9 @@ void ZSTDMT_compressChunk(void* jobDescription)
ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
/* stats */
ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
job->cSize += cSize;
ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
ZSTD_pthread_mutex_unlock(&job->job_mutex);
} }
_endJob:
@ -424,10 +424,10 @@ _endJob:
ZSTDMT_releaseBuffer(job->bufPool, job->srcBuff);
job->srcBuff = g_nullBuffer; job->prefixStart = NULL;
/* report */
ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
job->consumed = job->srcSize;
ZSTD_pthread_cond_signal(job->mtctx_cond);
ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
ZSTD_pthread_cond_signal(&job->job_cond);
ZSTD_pthread_mutex_unlock(&job->job_mutex);
}
@ -447,8 +447,6 @@ struct ZSTDMT_CCtx_s {
ZSTDMT_jobDescription* jobs;
ZSTDMT_bufferPool* bufPool;
ZSTDMT_CCtxPool* cctxPool;
ZSTD_pthread_mutex_t mtctx_mutex;
ZSTD_pthread_cond_t mtctx_cond;
ZSTD_CCtx_params params;
size_t targetSectionSize;
size_t targetPrefixSize;
@ -470,16 +468,34 @@ struct ZSTDMT_CCtx_s {
};
/* ZSTDMT_allocJobsTable()
* allocate, and just init to zero, a job table.
* allocate and init a job table.
* update *nbJobsPtr to next power of 2 value, as size of table
* No reverse free() function is provided : just use ZSTD_free() */
static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem)
static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem)
{
U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1;
U32 const nbJobs = 1 << nbJobsLog2;
*nbJobsPtr = nbJobs;
return (ZSTDMT_jobDescription*) ZSTD_calloc(
ZSTDMT_jobDescription* const jobTable = ZSTD_calloc(
nbJobs * sizeof(ZSTDMT_jobDescription), cMem);
U32 jobNb;
if (jobTable==NULL) return NULL;
*nbJobsPtr = nbJobs;
for (jobNb=0; jobNb<nbJobs; jobNb++) {
ZSTD_pthread_mutex_init(&jobTable[jobNb].job_mutex, NULL); /* <======== should check init result */
ZSTD_pthread_cond_init(&jobTable[jobNb].job_cond, NULL); /* <======== should check init result */
}
return jobTable;
}
static void ZSTDMT_freeJobsTable(ZSTDMT_jobDescription* jobTable, U32 nbJobs, ZSTD_customMem cMem)
{
U32 jobNb;
if (jobTable == NULL) return;
for (jobNb=0; jobNb<nbJobs; jobNb++) {
ZSTD_pthread_mutex_destroy(&jobTable[jobNb].job_mutex);
ZSTD_pthread_cond_destroy(&jobTable[jobNb].job_cond);
}
ZSTD_free(jobTable, cMem);
}
/* ZSTDMT_CCtxParam_setNbThreads():
@ -512,7 +528,7 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
mtctx->cMem = cMem;
mtctx->allJobsCompleted = 1;
mtctx->factory = POOL_create_advanced(nbThreads, 0, cMem);
mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, cMem);
mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem);
assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0); /* ensure nbJobs is a power of 2 */
mtctx->jobIDMask = nbJobs - 1;
mtctx->bufPool = ZSTDMT_createBufferPool(nbThreads, cMem);
@ -521,14 +537,6 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
ZSTDMT_freeCCtx(mtctx);
return NULL;
}
if (ZSTD_pthread_mutex_init(&mtctx->mtctx_mutex, NULL)) {
ZSTDMT_freeCCtx(mtctx);
return NULL;
}
if (ZSTD_pthread_cond_init(&mtctx->mtctx_cond, NULL)) {
ZSTDMT_freeCCtx(mtctx);
return NULL;
}
DEBUGLOG(3, "mt_cctx created, for %u threads", nbThreads);
return mtctx;
}
@ -566,12 +574,12 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx)
DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted");
while (mtctx->doneJobID < mtctx->nextJobID) {
unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask;
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex);
while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].srcSize) {
DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */
ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex);
ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex);
}
ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex);
mtctx->doneJobID++;
}
}
@ -581,12 +589,10 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
if (mtctx==NULL) return 0; /* compatible with free on NULL */
POOL_free(mtctx->factory); /* stop and free worker threads */
ZSTDMT_releaseAllJobResources(mtctx); /* release job resources into pools first */
ZSTD_free(mtctx->jobs, mtctx->cMem);
ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
ZSTDMT_freeBufferPool(mtctx->bufPool);
ZSTDMT_freeCCtxPool(mtctx->cctxPool);
ZSTD_freeCDict(mtctx->cdictLocal);
ZSTD_pthread_mutex_destroy(&mtctx->mtctx_mutex);
ZSTD_pthread_cond_destroy(&mtctx->mtctx_cond);
ZSTD_free(mtctx, mtctx->cMem);
return 0;
}
@ -671,7 +677,6 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
{
ZSTD_frameProgression fps;
DEBUGLOG(6, "ZSTDMT_getFrameProgression");
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
fps.consumed = mtctx->consumed;
fps.produced = mtctx->produced;
assert(mtctx->inBuff.filled >= mtctx->inBuff.prefixSize);
@ -682,14 +687,16 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
mtctx->doneJobID, lastJobNb, mtctx->jobReady)
for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) {
unsigned const wJobID = jobNb & mtctx->jobIDMask;
size_t const cResult = mtctx->jobs[wJobID].cSize;
size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
fps.consumed += mtctx->jobs[wJobID].consumed;
fps.ingested += mtctx->jobs[wJobID].srcSize;
fps.produced += produced;
ZSTD_pthread_mutex_lock(&mtctx->jobs[wJobID].job_mutex);
{ size_t const cResult = mtctx->jobs[wJobID].cSize;
size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
fps.consumed += mtctx->jobs[wJobID].consumed;
fps.ingested += mtctx->jobs[wJobID].srcSize;
fps.produced += produced;
}
ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
}
}
ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
return fps;
}
@ -749,9 +756,9 @@ static size_t ZSTDMT_compress_advanced_internal(
if (nbChunks > mtctx->jobIDMask+1) { /* enlarge job table */
U32 nbJobs = nbChunks;
ZSTD_free(mtctx->jobs, mtctx->cMem);
ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
mtctx->jobIDMask = 0;
mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, mtctx->cMem);
mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, mtctx->cMem);
if (mtctx->jobs==NULL) return ERROR(memory_allocation);
assert((nbJobs != 0) && ((nbJobs & (nbJobs - 1)) == 0)); /* ensure nbJobs is a power of 2 */
mtctx->jobIDMask = nbJobs - 1;
@ -781,8 +788,6 @@ static size_t ZSTDMT_compress_advanced_internal(
mtctx->jobs[u].bufPool = mtctx->bufPool;
mtctx->jobs[u].firstChunk = (u==0);
mtctx->jobs[u].lastChunk = (u==nbChunks-1);
mtctx->jobs[u].mtctx_mutex = &mtctx->mtctx_mutex;
mtctx->jobs[u].mtctx_cond = &mtctx->mtctx_cond;
if (params.fParams.checksumFlag) {
XXH64_update(&xxh64, srcStart + frameStartPos, chunkSize);
@ -802,12 +807,12 @@ static size_t ZSTDMT_compress_advanced_internal(
unsigned chunkID;
for (chunkID=0; chunkID<nbChunks; chunkID++) {
DEBUGLOG(5, "waiting for chunk %u ", chunkID);
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[chunkID].job_mutex);
while (mtctx->jobs[chunkID].consumed < mtctx->jobs[chunkID].srcSize) {
DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID);
ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex);
ZSTD_pthread_cond_wait(&mtctx->jobs[chunkID].job_cond, &mtctx->jobs[chunkID].job_mutex);
}
ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
ZSTD_pthread_mutex_unlock(&mtctx->jobs[chunkID].job_mutex);
DEBUGLOG(5, "ready to write chunk %u ", chunkID);
mtctx->jobs[chunkID].prefixStart = NULL;
@ -1058,8 +1063,6 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
mtctx->jobs[jobID].lastChunk = endFrame;
mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag;
mtctx->jobs[jobID].dstFlushed = 0;
mtctx->jobs[jobID].mtctx_mutex = &mtctx->mtctx_mutex;
mtctx->jobs[jobID].mtctx_cond = &mtctx->mtctx_cond;
if (mtctx->params.fParams.checksumFlag)
XXH64_update(&mtctx->xxhState, (const char*)mtctx->inBuff.buffer.start + mtctx->inBuff.prefixSize, srcSize);
@ -1128,7 +1131,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
blockToFlush, mtctx->doneJobID, mtctx->nextJobID);
assert(output->size >= output->pos);
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex);
if ( blockToFlush
&& (mtctx->doneJobID < mtctx->nextJobID) ) {
assert(mtctx->jobs[wJobID].dstFlushed <= mtctx->jobs[wJobID].cSize);
@ -1140,14 +1143,14 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
}
DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex); /* block when nothing available to flush but more to come */
ZSTD_pthread_cond_wait(&mtctx->jobs[wJobID].job_cond, &mtctx->jobs[wJobID].job_mutex); /* block when nothing to flush but some to come */
} }
/* try to flush something */
{ size_t cSize = mtctx->jobs[wJobID].cSize; /* shared */
size_t const srcConsumed = mtctx->jobs[wJobID].consumed; /* shared */
size_t const srcSize = mtctx->jobs[wJobID].srcSize; /* read-only, could be done after mutex lock, but no-declaration-after-statement */
ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
if (ZSTD_isError(cSize)) {
DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
mtctx->doneJobID, ZSTD_getErrorName(cSize));

View File

@ -969,9 +969,9 @@ static int fuzzerTests(U32 seed, U32 nbTests, unsigned startTest, double compres
FUZ_rand(&coreSeed);
lseed = coreSeed ^ prime32;
if (nbTests >= testNb) {
DISPLAYUPDATE(2, "\r%6u/%6u (%08X) ", testNb, nbTests, lseed);
DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests);
} else {
DISPLAYUPDATE(2, "\r%6u (%08X) ", testNb, lseed);
DISPLAYUPDATE(2, "\r%6u ", testNb);
}
/* states full reset (deliberately not synchronized) */