mirror of
https://github.com/facebook/zstd.git
synced 2024-11-29 03:46:44 +08:00
ZSTDMT_free() scrubs potentially unfinished jobs to release their resources
In some complex scenarios (free() without finishing compression), it is possible that some resources are still into jobs and not collected back into pools. In which case, previous version of free() would miss them. This would be equivalent to a leak. New version ensures that it even foes after such resource. It requires job consumers to properly mark resources as released, by replacing entries by NULL after releasing back to the pool. Obviously, it's not recommended to free() zstdmt context mid-term, still that's now a supported scenario. The same methodology is also used to ensure proper resource collection after an error is detected. Still to do : - detect compression errors (not just allocation ones) - properly manage resource when init() is called without finishing previous compression.
This commit is contained in:
parent
d0a1d45582
commit
0d6b8f65a9
@ -52,6 +52,8 @@ typedef struct buffer_s {
|
||||
size_t size;
|
||||
} buffer_t;
|
||||
|
||||
static const buffer_t g_nullBuffer = (buffer_t) { NULL, 0 };
|
||||
|
||||
typedef struct ZSTDMT_bufferPool_s {
|
||||
unsigned totalBuffers;;
|
||||
unsigned nbBuffers;
|
||||
@ -262,10 +264,27 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
|
||||
return cctx;
|
||||
}
|
||||
|
||||
/* ZSTDMT_releaseAllJobResources() :
|
||||
* Ensure all workers are killed first. */
|
||||
static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
|
||||
{
|
||||
unsigned jobID;
|
||||
for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) {
|
||||
ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[jobID].dstBuff);
|
||||
mtctx->jobs[jobID].dstBuff = g_nullBuffer;
|
||||
ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[jobID].src);
|
||||
mtctx->jobs[jobID].src = g_nullBuffer;
|
||||
ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[jobID].cctx);
|
||||
mtctx->jobs[jobID].cctx = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
|
||||
{
|
||||
if (mtctx==NULL) return 0; /* compatible with free on NULL */
|
||||
POOL_free(mtctx->factory);
|
||||
ZSTDMT_freeBufferPool(mtctx->buffPool);
|
||||
ZSTDMT_releaseAllJobResources(mtctx); /* kill workers first */
|
||||
ZSTDMT_freeBufferPool(mtctx->buffPool); /* release job resources first */
|
||||
ZSTDMT_freeCCtxPool(mtctx->cctxPool);
|
||||
pthread_mutex_destroy(&mtctx->jobCompleted_mutex);
|
||||
pthread_cond_destroy(&mtctx->jobCompleted_cond);
|
||||
@ -340,12 +359,15 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
|
||||
pthread_mutex_unlock(&mtctx->jobCompleted_mutex);
|
||||
|
||||
ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[chunkID].cctx);
|
||||
mtctx->jobs[chunkID].cctx = NULL;
|
||||
mtctx->jobs[chunkID].srcStart = NULL;
|
||||
{ size_t const cSize = mtctx->jobs[chunkID].cSize;
|
||||
if (ZSTD_isError(cSize)) error = cSize;
|
||||
if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall);
|
||||
if (chunkID) { /* note : chunk 0 is already written directly into dst */
|
||||
if (!error) memcpy((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize);
|
||||
ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[chunkID].dstBuff);
|
||||
mtctx->jobs[chunkID].dstBuff = g_nullBuffer;
|
||||
}
|
||||
dstPos += cSize ;
|
||||
}
|
||||
@ -363,6 +385,19 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
|
||||
|
||||
#if 1
|
||||
|
||||
static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) {
|
||||
while (zcs->doneJobID < zcs->nextJobID) {
|
||||
unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
|
||||
PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
|
||||
while (zcs->jobs[jobID].jobCompleted==0) {
|
||||
DEBUGLOG(4, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID); /* we want to block when waiting for data to flush */
|
||||
pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
|
||||
}
|
||||
pthread_mutex_unlock(&zcs->jobCompleted_mutex);
|
||||
zcs->doneJobID++;
|
||||
}
|
||||
}
|
||||
|
||||
size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
|
||||
zcs->params = ZSTD_getParams(compressionLevel, 0, 0);
|
||||
zcs->targetSectionSize = (size_t)1 << (zcs->params.cParams.windowLog + 2);
|
||||
@ -393,8 +428,12 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
|
||||
unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
|
||||
|
||||
if ((cctx==NULL) || (dstBuffer.start==NULL)) {
|
||||
zcs->jobs[jobID].cSize = ERROR(memory_allocation); /* job result : how to collect that error ? */
|
||||
zcs->jobs[jobID].cSize = ERROR(memory_allocation);
|
||||
zcs->jobs[jobID].jobCompleted = 1;
|
||||
zcs->nextJobID++;
|
||||
ZSTDMT_waitForAllJobsCompleted(zcs);
|
||||
ZSTDMT_releaseAllJobResources(zcs);
|
||||
return ERROR(memory_allocation);
|
||||
}
|
||||
|
||||
zcs->jobs[jobID].src = zcs->inBuff.buffer;
|
||||
@ -412,7 +451,15 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
|
||||
zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
|
||||
|
||||
/* get a new buffer for next input - save remaining into it */
|
||||
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); /* check for NULL ! */
|
||||
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
|
||||
if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
|
||||
zcs->jobs[jobID].cSize = ERROR(memory_allocation);
|
||||
zcs->jobs[jobID].jobCompleted = 1;
|
||||
zcs->nextJobID++;
|
||||
ZSTDMT_waitForAllJobsCompleted(zcs);
|
||||
ZSTDMT_releaseAllJobResources(zcs);
|
||||
return ERROR(memory_allocation);
|
||||
}
|
||||
zcs->inBuff.filled = (U32)(zcs->inBuffSize - zcs->targetSectionSize);
|
||||
memcpy(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + zcs->targetSectionSize, zcs->inBuff.filled);
|
||||
|
||||
@ -426,13 +473,16 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
|
||||
ZSTDMT_jobDescription job = zcs->jobs[jobID];
|
||||
if (job.jobCompleted) { /* job completed : output can be flushed */
|
||||
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
|
||||
ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[jobID].cctx = NULL; /* release cctx for future task */
|
||||
ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[jobID].srcStart = NULL; zcs->jobs[jobID].src = (buffer_t) { NULL, 0 };
|
||||
ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx);
|
||||
zcs->jobs[jobID].cctx = NULL;
|
||||
ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
|
||||
zcs->jobs[jobID].srcStart = NULL; zcs->jobs[jobID].src = g_nullBuffer;
|
||||
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
|
||||
output->pos += toWrite;
|
||||
job.dstFlushed += toWrite;
|
||||
if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => go to next one */
|
||||
ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[jobID].dstBuff = (buffer_t) { NULL, 0 };
|
||||
ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff);
|
||||
zcs->jobs[jobID].dstBuff = g_nullBuffer;
|
||||
zcs->doneJobID++;
|
||||
} else {
|
||||
zcs->jobs[jobID].dstFlushed = job.dstFlushed; /* save flush level into zcs for later retrieval */
|
||||
@ -449,9 +499,19 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
|
||||
|
||||
if ((srcSize > 0) || (endFrame && !zcs->frameEnded)) {
|
||||
size_t const dstBufferCapacity = ZSTD_compressBound(srcSize);
|
||||
buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); /* should check for NULL */
|
||||
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool); /* should check for NULL */
|
||||
buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity);
|
||||
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);
|
||||
unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
|
||||
|
||||
if ((cctx==NULL) || (dstBuffer.start==NULL)) {
|
||||
zcs->jobs[jobID].cSize = ERROR(memory_allocation);
|
||||
zcs->jobs[jobID].jobCompleted = 1;
|
||||
zcs->nextJobID++;
|
||||
ZSTDMT_waitForAllJobsCompleted(zcs);
|
||||
ZSTDMT_releaseAllJobResources(zcs);
|
||||
return ERROR(memory_allocation);
|
||||
}
|
||||
|
||||
zcs->jobs[jobID].src = zcs->inBuff.buffer;
|
||||
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
|
||||
zcs->jobs[jobID].srcSize = srcSize;
|
||||
@ -468,8 +528,16 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
|
||||
|
||||
/* get a new buffer for next input */
|
||||
if (!endFrame) {
|
||||
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); /* check for NULL ! */
|
||||
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
|
||||
zcs->inBuff.filled = 0;
|
||||
if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
|
||||
zcs->jobs[jobID].cSize = ERROR(memory_allocation);
|
||||
zcs->jobs[jobID].jobCompleted = 1;
|
||||
zcs->nextJobID++;
|
||||
ZSTDMT_waitForAllJobsCompleted(zcs);
|
||||
ZSTDMT_releaseAllJobResources(zcs);
|
||||
return ERROR(memory_allocation);
|
||||
}
|
||||
} else {
|
||||
zcs->frameEnded = 1;
|
||||
}
|
||||
@ -491,12 +559,12 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
|
||||
ZSTDMT_jobDescription job = zcs->jobs[wJobID];
|
||||
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
|
||||
ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL; /* release cctx for future task */
|
||||
ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = (buffer_t) { NULL, 0 };
|
||||
ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = g_nullBuffer;
|
||||
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
|
||||
output->pos += toWrite;
|
||||
job.dstFlushed += toWrite;
|
||||
if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => next one */
|
||||
ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[wJobID].dstBuff = (buffer_t) { NULL, 0 };
|
||||
ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[wJobID].dstBuff = g_nullBuffer;
|
||||
zcs->doneJobID++;
|
||||
} else {
|
||||
zcs->jobs[wJobID].dstFlushed = job.dstFlushed;
|
||||
|
Loading…
Reference in New Issue
Block a user