mirror of
https://github.com/facebook/zstd.git
synced 2024-11-28 21:46:44 +08:00
zstdmt uses POOL_tryAdd() to call a new worker
so that it's no longer a blocking call. This makes it possible to stream out data gradually, while waiting for a worker to become available.
This commit is contained in:
parent
6f7280fb33
commit
70f81d6030
@ -389,7 +389,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
|
||||
BYTE* op = ostart;
|
||||
BYTE* oend = op + dstBuff.size;
|
||||
int blockNb;
|
||||
DEBUGLOG(2, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks);
|
||||
DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks);
|
||||
assert(job->cSize == 0);
|
||||
for (blockNb = 1; blockNb < nbBlocks; blockNb++) {
|
||||
size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX);
|
||||
@ -400,7 +400,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
|
||||
ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */
|
||||
job->cSize += cSize;
|
||||
job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb;
|
||||
DEBUGLOG(2, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
|
||||
DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
|
||||
(U32)cSize, (U32)job->cSize);
|
||||
ZSTD_pthread_cond_signal(job->jobCompleted_cond);
|
||||
ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
|
||||
@ -458,6 +458,7 @@ struct ZSTDMT_CCtx_s {
|
||||
size_t prefixSize;
|
||||
size_t targetPrefixSize;
|
||||
inBuff_t inBuff;
|
||||
int jobReady; /* 1 => one job is already prepared, but pool has shortage of workers. Don't create another one. */
|
||||
XXH64_state_t xxhState;
|
||||
unsigned singleBlockingThread;
|
||||
unsigned jobIDMask;
|
||||
@ -668,14 +669,17 @@ unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx)
|
||||
ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
|
||||
{
|
||||
ZSTD_frameProgression fs;
|
||||
DEBUGLOG(5, "ZSTDMT_getFrameProgression");
|
||||
DEBUGLOG(6, "ZSTDMT_getFrameProgression");
|
||||
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex);
|
||||
fs.consumed = mtctx->consumed;
|
||||
fs.produced = mtctx->produced;
|
||||
assert(mtctx->inBuff.filled >= mtctx->prefixSize);
|
||||
fs.ingested = mtctx->consumed + (mtctx->inBuff.filled - mtctx->prefixSize);
|
||||
{ unsigned jobNb;
|
||||
for (jobNb = mtctx->doneJobID ; jobNb < mtctx->nextJobID ; jobNb++) {
|
||||
unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1);
|
||||
DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)",
|
||||
mtctx->doneJobID, lastJobNb, mtctx->jobReady)
|
||||
for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) {
|
||||
unsigned const wJobID = jobNb & mtctx->jobIDMask;
|
||||
size_t const cResult = mtctx->jobs[wJobID].cSize;
|
||||
size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
|
||||
@ -999,59 +1003,60 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
|
||||
{
|
||||
unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
|
||||
|
||||
DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
|
||||
zcs->nextJobID, (U32)srcSize, (U32)zcs->prefixSize);
|
||||
zcs->jobs[jobID].src = zcs->inBuff.buffer;
|
||||
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
|
||||
zcs->jobs[jobID].srcSize = srcSize;
|
||||
zcs->jobs[jobID].consumed = 0;
|
||||
zcs->jobs[jobID].cSize = 0;
|
||||
zcs->jobs[jobID].prefixSize = zcs->prefixSize;
|
||||
assert(zcs->inBuff.filled >= srcSize + zcs->prefixSize);
|
||||
zcs->jobs[jobID].params = zcs->params;
|
||||
/* do not calculate checksum within sections, but write it in header for first section */
|
||||
if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;
|
||||
zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
|
||||
zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
|
||||
zcs->jobs[jobID].dstBuff = g_nullBuffer;
|
||||
zcs->jobs[jobID].cctxPool = zcs->cctxPool;
|
||||
zcs->jobs[jobID].bufPool = zcs->bufPool;
|
||||
zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
|
||||
zcs->jobs[jobID].lastChunk = endFrame;
|
||||
zcs->jobs[jobID].jobCompleted = 0;
|
||||
zcs->jobs[jobID].frameChecksumNeeded = endFrame && (zcs->nextJobID>0) && zcs->params.fParams.checksumFlag;
|
||||
zcs->jobs[jobID].dstFlushed = 0;
|
||||
zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
|
||||
zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
|
||||
if (!zcs->jobReady) {
|
||||
DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
|
||||
zcs->nextJobID, (U32)srcSize, (U32)zcs->prefixSize);
|
||||
zcs->jobs[jobID].src = zcs->inBuff.buffer;
|
||||
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
|
||||
zcs->jobs[jobID].srcSize = srcSize;
|
||||
zcs->jobs[jobID].consumed = 0;
|
||||
zcs->jobs[jobID].cSize = 0;
|
||||
zcs->jobs[jobID].prefixSize = zcs->prefixSize;
|
||||
assert(zcs->inBuff.filled >= srcSize + zcs->prefixSize);
|
||||
zcs->jobs[jobID].params = zcs->params;
|
||||
/* do not calculate checksum within sections, but write it in header for first section */
|
||||
if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;
|
||||
zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
|
||||
zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
|
||||
zcs->jobs[jobID].dstBuff = g_nullBuffer;
|
||||
zcs->jobs[jobID].cctxPool = zcs->cctxPool;
|
||||
zcs->jobs[jobID].bufPool = zcs->bufPool;
|
||||
zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
|
||||
zcs->jobs[jobID].lastChunk = endFrame;
|
||||
zcs->jobs[jobID].jobCompleted = 0;
|
||||
zcs->jobs[jobID].frameChecksumNeeded = endFrame && (zcs->nextJobID>0) && zcs->params.fParams.checksumFlag;
|
||||
zcs->jobs[jobID].dstFlushed = 0;
|
||||
zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
|
||||
zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
|
||||
|
||||
if (zcs->params.fParams.checksumFlag)
|
||||
XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->prefixSize, srcSize);
|
||||
if (zcs->params.fParams.checksumFlag)
|
||||
XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->prefixSize, srcSize);
|
||||
|
||||
/* get a new buffer for next input */
|
||||
if (!endFrame) {
|
||||
size_t const newPrefixSize = MIN(srcSize + zcs->prefixSize, zcs->targetPrefixSize);
|
||||
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool);
|
||||
if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
|
||||
zcs->jobs[jobID].jobCompleted = 1;
|
||||
zcs->nextJobID++;
|
||||
ZSTDMT_waitForAllJobsCompleted(zcs);
|
||||
ZSTDMT_releaseAllJobResources(zcs);
|
||||
return ERROR(memory_allocation);
|
||||
}
|
||||
zcs->inBuff.filled -= srcSize + zcs->prefixSize - newPrefixSize;
|
||||
memmove(zcs->inBuff.buffer.start,
|
||||
(const char*)zcs->jobs[jobID].srcStart + zcs->prefixSize + srcSize - newPrefixSize,
|
||||
zcs->inBuff.filled);
|
||||
zcs->prefixSize = newPrefixSize;
|
||||
} else { /* if (endFrame==1) */
|
||||
zcs->inBuff.buffer = g_nullBuffer;
|
||||
zcs->inBuff.filled = 0;
|
||||
zcs->prefixSize = 0;
|
||||
zcs->frameEnded = 1;
|
||||
if (zcs->nextJobID == 0) {
|
||||
/* single chunk exception : checksum is calculated directly within worker thread */
|
||||
zcs->params.fParams.checksumFlag = 0;
|
||||
} }
|
||||
/* get a new buffer for next input */
|
||||
if (!endFrame) {
|
||||
size_t const newPrefixSize = MIN(srcSize + zcs->prefixSize, zcs->targetPrefixSize);
|
||||
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool);
|
||||
if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
|
||||
zcs->jobs[jobID].jobCompleted = 1;
|
||||
zcs->nextJobID++;
|
||||
ZSTDMT_waitForAllJobsCompleted(zcs);
|
||||
ZSTDMT_releaseAllJobResources(zcs);
|
||||
return ERROR(memory_allocation);
|
||||
}
|
||||
zcs->inBuff.filled -= srcSize + zcs->prefixSize - newPrefixSize;
|
||||
memmove(zcs->inBuff.buffer.start, /* copy end of current job into next job, as "prefix" */
|
||||
(const char*)zcs->jobs[jobID].srcStart + zcs->prefixSize + srcSize - newPrefixSize,
|
||||
zcs->inBuff.filled);
|
||||
zcs->prefixSize = newPrefixSize;
|
||||
} else { /* endFrame==1 => no need for another input buffer */
|
||||
zcs->inBuff.buffer = g_nullBuffer;
|
||||
zcs->inBuff.filled = 0;
|
||||
zcs->prefixSize = 0;
|
||||
zcs->frameEnded = 1;
|
||||
if (zcs->nextJobID == 0) {
|
||||
/* single chunk exception : checksum is calculated directly within worker thread */
|
||||
zcs->params.fParams.checksumFlag = 0;
|
||||
} } }
|
||||
|
||||
DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)",
|
||||
zcs->nextJobID,
|
||||
@ -1059,8 +1064,13 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
|
||||
zcs->jobs[jobID].lastChunk,
|
||||
zcs->doneJobID,
|
||||
zcs->doneJobID & zcs->jobIDMask);
|
||||
POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */
|
||||
zcs->nextJobID++;
|
||||
if (POOL_tryAdd(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID])) {
|
||||
zcs->nextJobID++;
|
||||
zcs->jobReady = 0;
|
||||
} else {
|
||||
DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker available for job %u", zcs->nextJobID);
|
||||
zcs->jobReady = 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -1072,16 +1082,17 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
|
||||
static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
|
||||
{
|
||||
unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
|
||||
DEBUGLOG(2, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush);
|
||||
DEBUGLOG(5, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush);
|
||||
if (zcs->doneJobID == zcs->nextJobID) {
|
||||
DEBUGLOG(2, "ZSTDMT_flushNextJob: doneJobID==nextJobID : nothing to flush !")
|
||||
DEBUGLOG(5, "ZSTDMT_flushNextJob: doneJobID(%u)==(%u)nextJobID : nothing to flush !",
|
||||
zcs->doneJobID, zcs->nextJobID)
|
||||
return 0; /* all flushed ! */
|
||||
}
|
||||
ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
|
||||
while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) {
|
||||
if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */
|
||||
if (zcs->jobs[wJobID].jobCompleted==1) break;
|
||||
DEBUGLOG(2, "waiting for something to flush from job %u (currently flushed: %u bytes)",
|
||||
DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
|
||||
zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed);
|
||||
ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush but more to come */
|
||||
}
|
||||
@ -1108,14 +1119,16 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
|
||||
}
|
||||
assert(job.cSize >= job.dstFlushed);
|
||||
{ size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
|
||||
DEBUGLOG(2, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
|
||||
DEBUGLOG(5, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u (completion:%.1f%%)",
|
||||
(U32)toWrite, zcs->doneJobID, (double)job.consumed / job.srcSize * 100);
|
||||
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
|
||||
output->pos += toWrite;
|
||||
job.dstFlushed += toWrite;
|
||||
}
|
||||
if ( job.jobCompleted
|
||||
&& (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => move to next one */
|
||||
DEBUGLOG(2, "Job %u completed, moving to next one", zcs->doneJobID);
|
||||
DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
|
||||
zcs->doneJobID, (U32)job.dstFlushed);
|
||||
ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff);
|
||||
zcs->jobs[wJobID].dstBuff = g_nullBuffer;
|
||||
zcs->jobs[wJobID].jobCompleted = 0;
|
||||
@ -1128,6 +1141,7 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
|
||||
/* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */
|
||||
if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
|
||||
if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some more buffer to flush */
|
||||
if (zcs->jobReady) return 1; /* some more work to do ! */
|
||||
zcs->allJobsCompleted = zcs->frameEnded; /* last frame entirely flushed */
|
||||
return 0; /* everything flushed */
|
||||
} }
|
||||
@ -1176,7 +1190,8 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
|
||||
}
|
||||
|
||||
/* fill input buffer */
|
||||
if (input->size > input->pos) { /* support NULL input */
|
||||
if ( (!mtctx->jobReady)
|
||||
&& (input->size > input->pos) ) { /* support NULL input */
|
||||
if (mtctx->inBuff.buffer.start == NULL) {
|
||||
mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool); /* note : allocation can fail, in which case, no forward input progress */
|
||||
mtctx->inBuff.filled = 0;
|
||||
@ -1186,34 +1201,36 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
|
||||
} }
|
||||
if (mtctx->inBuff.buffer.start != NULL) {
|
||||
size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled);
|
||||
DEBUGLOG(5, "inBuff:%08X; inBuffSize=%u; ToCopy=%u", (U32)(size_t)mtctx->inBuff.buffer.start, (U32)mtctx->inBuffSize, (U32)toLoad);
|
||||
DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u",
|
||||
(U32)toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->inBuffSize);
|
||||
memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad);
|
||||
input->pos += toLoad;
|
||||
mtctx->inBuff.filled += toLoad;
|
||||
forwardInputProgress = toLoad>0;
|
||||
} }
|
||||
|
||||
if ( (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */
|
||||
&& (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) { /* avoid overwriting job round buffer */
|
||||
if ( (mtctx->jobReady)
|
||||
|| ( (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */
|
||||
&& (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) ) { /* avoid overwriting job round buffer */
|
||||
CHECK_F( ZSTDMT_createCompressionJob(mtctx, mtctx->targetSectionSize, 0 /* endFrame */) );
|
||||
}
|
||||
|
||||
/* check for potential compressed data ready to be flushed */
|
||||
CHECK_F( ZSTDMT_flushNextJob(mtctx, output, !forwardInputProgress /* blockToFlush */) ); /* block if there was no forward input progress */
|
||||
{ size_t const remainingToFlush = ZSTDMT_flushNextJob(mtctx, output, !forwardInputProgress); /* block if there was no forward input progress */
|
||||
if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not flush yet */
|
||||
if (mtctx->jobReady) return remainingToFlush; /* some more input ready to be compressed */
|
||||
|
||||
if (input->pos < input->size) /* input not consumed : do not flush yet */
|
||||
endOp = ZSTD_e_continue;
|
||||
|
||||
switch(endOp)
|
||||
{
|
||||
case ZSTD_e_flush:
|
||||
return ZSTDMT_flushStream(mtctx, output);
|
||||
case ZSTD_e_end:
|
||||
return ZSTDMT_endStream(mtctx, output);
|
||||
case ZSTD_e_continue:
|
||||
return 1;
|
||||
default:
|
||||
return ERROR(GENERIC); /* invalid endDirective */
|
||||
switch(endOp)
|
||||
{
|
||||
case ZSTD_e_flush:
|
||||
return ZSTDMT_flushStream(mtctx, output);
|
||||
case ZSTD_e_end:
|
||||
return ZSTDMT_endStream(mtctx, output);
|
||||
case ZSTD_e_continue:
|
||||
return 1;
|
||||
default:
|
||||
return ERROR(GENERIC); /* invalid endDirective */
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -792,6 +792,7 @@ static int FIO_compressFilename_internal(cRess_t ress,
|
||||
/* Fill input Buffer */
|
||||
size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile);
|
||||
ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 };
|
||||
DISPLAYLEVEL(6, "fread %u bytes from source \n", (U32)inSize);
|
||||
readsize += inSize;
|
||||
|
||||
if (inSize == 0 || (fileSize != UTIL_FILESIZE_UNKNOWN && readsize == fileSize))
|
||||
@ -803,32 +804,23 @@ static int FIO_compressFilename_internal(cRess_t ress,
|
||||
CHECK_V(result, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive));
|
||||
|
||||
/* Write compressed stream */
|
||||
DISPLAYLEVEL(6, "ZSTD_compress_generic,ZSTD_e_continue: generated %u bytes \n",
|
||||
(U32)outBuff.pos);
|
||||
DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => intput pos(%u)<=(%u)size ; output generated %u bytes \n",
|
||||
(U32)directive, (U32)inBuff.pos, (U32)inBuff.size, (U32)outBuff.pos);
|
||||
if (outBuff.pos) {
|
||||
size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
|
||||
if (sizeCheck!=outBuff.pos)
|
||||
EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName);
|
||||
compressedfilesize += outBuff.pos;
|
||||
}
|
||||
if (READY_FOR_UPDATE()) {
|
||||
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
|
||||
DISPLAYUPDATE(2, "\rRead :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%",
|
||||
(U32)(zfp.ingested >> 20),
|
||||
(U32)(zfp.consumed >> 20),
|
||||
(U32)(zfp.produced >> 20),
|
||||
(double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100 );
|
||||
}
|
||||
}
|
||||
#if 1
|
||||
if (READY_FOR_UPDATE()) {
|
||||
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
|
||||
DISPLAYUPDATE(2, "\rRead :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%",
|
||||
(U32)(zfp.ingested >> 20),
|
||||
(U32)(zfp.consumed >> 20),
|
||||
(U32)(zfp.produced >> 20),
|
||||
(double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100 );
|
||||
}
|
||||
#else
|
||||
if (fileSize == UTIL_FILESIZE_UNKNOWN) {
|
||||
DISPLAYUPDATE(2, "\rRead : %u MB", (U32)(readsize>>20));
|
||||
} else {
|
||||
DISPLAYUPDATE(2, "\rRead : %u / %u MB",
|
||||
(U32)(readsize>>20), (U32)(fileSize>>20));
|
||||
}
|
||||
#endif
|
||||
} while (directive != ZSTD_e_end);
|
||||
|
||||
finish:
|
||||
|
Loading…
Reference in New Issue
Block a user