zstdmt : fix end condition (ZSTD_e_end)

When ZSTD_e_end directive is provided,
the question is not only "are internal buffers completely flushed",
it is also "is current frame completed".

In some rare cases,
it was possible for internal buffers to be completely flushed,
triggering a @return == 0,
but frame was not completed as it needed a last null-size block to mark the end,
resulting in an unfinished frame.
This commit is contained in:
Yann Collet 2018-01-23 15:19:11 -08:00
parent de5e38a7a6
commit c1cc57f270
3 changed files with 16 additions and 13 deletions

View File

@ -1087,7 +1087,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
* `output` : `pos` will be updated with amount of data flushed .
* `blockToFlush` : if >0, the function will block and wait if there is no data available to flush .
* @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */
static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end)
{
unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u)", blockToFlush);
@ -1116,7 +1116,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, uns
if ( job.jobCompleted
&& job.frameChecksumNeeded ) {
U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
DEBUGLOG(5, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum);
DEBUGLOG(4, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum);
MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
job.cSize += 4;
zcs->jobs[wJobID].cSize += 4;
@ -1150,9 +1150,10 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, uns
if (job.srcSize > job.consumed) return 1; /* current job not completely compressed */
}
if (zcs->doneJobID < zcs->nextJobID) return 1; /* some more jobs to flush */
if (zcs->jobReady) return 1; /* at least one more job to do ! */
if (zcs->jobReady) return 1; /* one job is ready and queued! */
if (zcs->inBuff.filled > 0) return 1; /* input not empty */
zcs->allJobsCompleted = zcs->frameEnded; /* last frame entirely flushed */
if (end == ZSTD_e_end) return !zcs->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? It may need a last null-block */
return 0; /* everything flushed */
}
@ -1231,7 +1232,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
}
/* check for potential compressed data ready to be flushed */
{ size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress); /* block if there was no forward input progress */
{ size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */
if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not flush yet */
return remainingToFlush;
}
@ -1247,21 +1248,21 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
}
static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned endFrame)
static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_EndDirective endFrame)
{
size_t const srcSize = mtctx->inBuff.filled - mtctx->prefixSize;
DEBUGLOG(5, "ZSTDMT_flushStream_internal");
if ( mtctx->jobReady /* one job ready for a worker to pick up */
|| (srcSize > 0) /* still some data within input buffer */
|| (endFrame && !mtctx->frameEnded)) { /* need a last 0-size block to end frame */
|| ((endFrame==ZSTD_e_end) && !mtctx->frameEnded)) { /* need a last 0-size block to end frame */
DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job (%u bytes, end:%u)",
(U32)srcSize, endFrame);
(U32)srcSize, (U32)endFrame);
CHECK_F( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) );
}
/* check if there is any data available to flush */
return ZSTDMT_flushProduced(mtctx, output, 1 /* blockToFlush */);
return ZSTDMT_flushProduced(mtctx, output, 1 /* blockToFlush */, endFrame);
}
@ -1270,7 +1271,7 @@ size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output)
DEBUGLOG(5, "ZSTDMT_flushStream");
if (mtctx->singleBlockingThread)
return ZSTD_flushStream(mtctx->cctxPool->cctx[0], output);
return ZSTDMT_flushStream_internal(mtctx, output, 0 /* endFrame */);
return ZSTDMT_flushStream_internal(mtctx, output, ZSTD_e_flush);
}
size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output)
@ -1278,5 +1279,5 @@ size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output)
DEBUGLOG(4, "ZSTDMT_endStream");
if (mtctx->singleBlockingThread)
return ZSTD_endStream(mtctx->cctxPool->cctx[0], output);
return ZSTDMT_flushStream_internal(mtctx, output, 1 /* endFrame */);
return ZSTDMT_flushStream_internal(mtctx, output, ZSTD_e_end);
}

View File

@ -1136,10 +1136,11 @@ typedef enum {
* and then immediately returns, just indicating that there is some data remaining to be flushed.
* The function nonetheless guarantees forward progress : it will return only after it reads or write at least 1+ byte.
* - Exception : in multi-threading mode, if the first call requests a ZSTD_e_end directive, it is blocking : it will complete compression before giving back control to caller.
* - @return provides the minimum amount of data remaining to be flushed from internal buffers
* - @return provides a minimum amount of data remaining to be flushed from internal buffers
* or an error code, which can be tested using ZSTD_isError().
* if @return != 0, flush is not fully completed, there is still some data left within internal buffers.
* This is useful to determine if a ZSTD_e_flush or ZSTD_e_end directive is completed.
* This is useful for ZSTD_e_flush, since in this case more flushes are necessary to empty all buffers.
* For ZSTD_e_end, @return == 0 when internal buffers are fully flushed and frame is completed.
* - after a ZSTD_e_end directive, if internal buffer is not fully flushed (@return != 0),
* only ZSTD_e_end or ZSTD_e_flush operations are allowed.
* Before starting a new compression job, or changing compression parameters,

View File

@ -1695,8 +1695,9 @@ static int fuzzerTests_newAPI(U32 seed, U32 nbTests, unsigned startTest, double
size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog+1);
size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize);
outBuff.size = outBuff.pos + adjustedDstSize;
DISPLAYLEVEL(6, "End-flush into dst buffer of size %u \n", (U32)adjustedDstSize);
DISPLAYLEVEL(6, "t%u: End-flush into dst buffer of size %u \n", testNb, (U32)adjustedDstSize);
remainingToFlush = ZSTD_compress_generic(zc, &outBuff, &inBuff, ZSTD_e_end);
DISPLAYLEVEL(6, "t%u: Total flushed so far : %u bytes \n", testNb, (U32)outBuff.pos);
CHECK( ZSTD_isError(remainingToFlush),
"ZSTD_compress_generic w/ ZSTD_e_end error : %s",
ZSTD_getErrorName(remainingToFlush) );