zstd cli can now compress using multi-threading

added : command -T#
added : ZSTD_resetCStream() (zstdmt_compress)
added : FIO_setNbThreads()  (fileio)
This commit is contained in:
Yann Collet 2017-01-19 16:59:56 -08:00
parent 19d670ba9d
commit 500014af49
8 changed files with 119 additions and 52 deletions

View File

@ -443,6 +443,13 @@ size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t di
return 0;
}
/* ZSTDMT_resetCStream() :
* pledgedSrcSize is optional and can be zero == unknown */
size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize)
{
return ZSTDMT_initCStream_advanced(zcs, zcs->dict, zcs->dictSize, zcs->params, pledgedSrcSize);
}
size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0);
return ZSTDMT_initCStream_advanced(zcs, NULL, 0, params, 0);

View File

@ -20,6 +20,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx,
/* === Streaming functions === */
size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel);
size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */
size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize,
ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown ; current limitation : no checksum */

View File

@ -22,7 +22,7 @@ else
ALIGN_LOOP =
endif
CPPFLAGS+= -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(ZSTDDIR)/dictBuilder
CPPFLAGS+= -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(ZSTDDIR)/compress -I$(ZSTDDIR)/dictBuilder
CFLAGS ?= -O3
CFLAGS += -Wall -Wextra -Wcast-qual -Wcast-align -Wshadow -Wstrict-aliasing=1 \
-Wswitch-enum -Wdeclaration-after-statement -Wstrict-prototypes -Wundef \

View File

@ -9,6 +9,14 @@
/* **************************************
* Tuning parameters
****************************************/
#ifndef BMK_TIMETEST_DEFAULT_S /* default minimum time per test */
#define BMK_TIMETEST_DEFAULT_S 3
#endif
/* **************************************
* Compiler Warnings
****************************************/
@ -43,7 +51,6 @@
# define ZSTD_GIT_COMMIT_STRING ZSTD_EXPAND_AND_QUOTE(ZSTD_GIT_COMMIT)
#endif
#define NBSECONDS 3
#define TIMELOOP_MICROSEC 1*1000000ULL /* 1 second */
#define ACTIVEPERIOD_MICROSEC 70*1000000ULL /* 70 seconds */
#define COOLPERIOD_SEC 10
@ -109,31 +116,36 @@ static clock_us_t BMK_clockMicroSec(void)
/* *************************************
* Benchmark Parameters
***************************************/
static U32 g_nbSeconds = NBSECONDS;
static size_t g_blockSize = 0;
static int g_additionalParam = 0;
static U32 g_decodeOnly = 0;
static U32 g_nbThreads = 1;
void BMK_setNotificationLevel(unsigned level) { g_displayLevel=level; }
void BMK_setAdditionalParam(int additionalParam) { g_additionalParam=additionalParam; }
void BMK_SetNbSeconds(unsigned nbSeconds)
static U32 g_nbSeconds = BMK_TIMETEST_DEFAULT_S;
void BMK_setNbSeconds(unsigned nbSeconds)
{
g_nbSeconds = nbSeconds;
DISPLAYLEVEL(3, "- test >= %u seconds per compression / decompression -\n", g_nbSeconds);
DISPLAYLEVEL(3, "- test >= %u seconds per compression / decompression - \n", g_nbSeconds);
}
void BMK_SetBlockSize(size_t blockSize)
static size_t g_blockSize = 0;
void BMK_setBlockSize(size_t blockSize)
{
g_blockSize = blockSize;
DISPLAYLEVEL(2, "using blocks of size %u KB \n", (U32)(blockSize>>10));
if (g_blockSize) DISPLAYLEVEL(2, "using blocks of size %u KB \n", (U32)(blockSize>>10));
}
void BMK_setDecodeOnlyMode(unsigned decodeFlag) { g_decodeOnly = (decodeFlag>0); }
void BMK_SetNbThreads(unsigned nbThreads) { g_nbThreads = nbThreads; }
static U32 g_nbThreads = 1;
void BMK_setNbThreads(unsigned nbThreads) {
#ifndef ZSTD_MULTITHREAD
if (nbThreads > 1) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n");
#endif
g_nbThreads = nbThreads;
}
/* ********************************************************

View File

@ -19,9 +19,9 @@ int BMK_benchFiles(const char** fileNamesTable, unsigned nbFiles,const char* dic
int cLevel, int cLevelLast, ZSTD_compressionParameters* compressionParams);
/* Set Parameters */
void BMK_SetNbSeconds(unsigned nbLoops);
void BMK_SetBlockSize(size_t blockSize);
void BMK_SetNbThreads(unsigned nbThreads);
void BMK_setNbSeconds(unsigned nbLoops);
void BMK_setBlockSize(size_t blockSize);
void BMK_setNbThreads(unsigned nbThreads);
void BMK_setNotificationLevel(unsigned level);
void BMK_setAdditionalParam(int additionalParam);
void BMK_setDecodeOnlyMode(unsigned decodeFlag);

View File

@ -34,11 +34,14 @@
#include "fileio.h"
#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_magicNumber, ZSTD_frameHeaderSize_max */
#include "zstd.h"
#ifdef ZSTD_GZDECOMPRESS
#include "zlib.h"
#if !defined(z_const)
#define z_const
#ifdef ZSTD_MULTITHREAD
# include "zstdmt_compress.h"
#endif
#ifdef ZSTD_GZDECOMPRESS
# include "zlib.h"
# if !defined(z_const)
# define z_const
# endif
#endif
@ -103,7 +106,13 @@ static U32 g_removeSrcFile = 0;
void FIO_setRemoveSrcFile(unsigned flag) { g_removeSrcFile = (flag>0); }
static U32 g_memLimit = 0;
void FIO_setMemLimit(unsigned memLimit) { g_memLimit = memLimit; }
static U32 g_nbThreads = 1;
void FIO_setNbThreads(unsigned nbThreads) {
#ifndef ZSTD_MULTITHREAD
if (nbThreads > 1) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n");
#endif
g_nbThreads = nbThreads;
}
/*-*************************************
@ -226,22 +235,30 @@ static size_t FIO_loadFile(void** bufferPtr, const char* fileName)
* Compression
************************************************************************/
typedef struct {
FILE* srcFile;
FILE* dstFile;
void* srcBuffer;
size_t srcBufferSize;
void* dstBuffer;
size_t dstBufferSize;
#ifdef ZSTD_MULTITHREAD
ZSTDMT_CCtx* cctx;
#else
ZSTD_CStream* cctx;
FILE* dstFile;
FILE* srcFile;
#endif
} cRess_t;
static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
U64 srcSize, ZSTD_compressionParameters* comprParams)
{
cRess_t ress;
memset(&ress, 0, sizeof(ress));
#ifdef ZSTD_MULTITHREAD
ress.cctx = ZSTDMT_createCCtx(g_nbThreads);
#else
ress.cctx = ZSTD_createCStream();
#endif
if (ress.cctx == NULL) EXM_THROW(30, "zstd: allocation error : can't create ZSTD_CStream");
ress.srcBufferSize = ZSTD_CStreamInSize();
ress.srcBuffer = malloc(ress.srcBufferSize);
@ -264,7 +281,11 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
if (comprParams->searchLength) params.cParams.searchLength = comprParams->searchLength;
if (comprParams->targetLength) params.cParams.targetLength = comprParams->targetLength;
if (comprParams->strategy) params.cParams.strategy = (ZSTD_strategy)(comprParams->strategy - 1);
#ifdef ZSTD_MULTITHREAD
{ size_t const errorCode = ZSTDMT_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize);
#else
{ size_t const errorCode = ZSTD_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize);
#endif
if (ZSTD_isError(errorCode)) EXM_THROW(33, "Error initializing CStream : %s", ZSTD_getErrorName(errorCode));
} }
free(dictBuffer);
@ -277,7 +298,11 @@ static void FIO_freeCResources(cRess_t ress)
{
free(ress.srcBuffer);
free(ress.dstBuffer);
#ifdef ZSTD_MULTITHREAD
ZSTDMT_freeCCtx(ress.cctx);
#else
ZSTD_freeCStream(ress.cctx); /* never fails */
#endif
}
@ -296,7 +321,11 @@ static int FIO_compressFilename_internal(cRess_t ress,
U64 const fileSize = UTIL_getFileSize(srcFileName);
/* init */
#ifdef ZSTD_MULTITHREAD
{ size_t const resetError = ZSTDMT_resetCStream(ress.cctx, fileSize);
#else
{ size_t const resetError = ZSTD_resetCStream(ress.cctx, fileSize);
#endif
if (ZSTD_isError(resetError)) EXM_THROW(21, "Error initializing compression : %s", ZSTD_getErrorName(resetError));
}
@ -311,11 +340,14 @@ static int FIO_compressFilename_internal(cRess_t ress,
/* Compress using buffered streaming */
{ ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 };
ZSTD_outBuffer outBuff= { ress.dstBuffer, ress.dstBufferSize, 0 };
{ size_t const result = ZSTD_compressStream(ress.cctx, &outBuff, &inBuff);
if (ZSTD_isError(result)) EXM_THROW(23, "Compression error : %s ", ZSTD_getErrorName(result)); }
if (inBuff.pos != inBuff.size)
/* inBuff should be entirely consumed since buffer sizes are recommended ones */
EXM_THROW(24, "Compression error : input block not fully consumed");
while (inBuff.pos != inBuff.size) { /* note : is there any possibility of endless loop ? for example, if outBuff is not large enough ? */
#ifdef ZSTD_MULTITHREAD
size_t const result = ZSTDMT_compressStream(ress.cctx, &outBuff, &inBuff);
#else
size_t const result = ZSTD_compressStream(ress.cctx, &outBuff, &inBuff);
#endif
if (ZSTD_isError(result)) EXM_THROW(23, "Compression error : %s ", ZSTD_getErrorName(result));
}
/* Write cBlock */
{ size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
@ -326,13 +358,19 @@ static int FIO_compressFilename_internal(cRess_t ress,
}
/* End of Frame */
{ ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
size_t const result = ZSTD_endStream(ress.cctx, &outBuff);
if (result!=0) EXM_THROW(26, "Compression error : cannot create frame end");
{ size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
if (sizeCheck!=outBuff.pos) EXM_THROW(27, "Write error : cannot write frame end into %s", dstFileName); }
compressedfilesize += outBuff.pos;
{ size_t result = 1;
while (result!=0) { /* note : is there any possibility of endless loop ? */
ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
#ifdef ZSTD_MULTITHREAD
result = ZSTDMT_endStream(ress.cctx, &outBuff);
#else
result = ZSTD_endStream(ress.cctx, &outBuff);
#endif
if (ZSTD_isError(result)) EXM_THROW(26, "Compression error during frame end : %s", ZSTD_getErrorName(result));
{ size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
if (sizeCheck!=outBuff.pos) EXM_THROW(27, "Write error : cannot write frame end into %s", dstFileName); }
compressedfilesize += outBuff.pos;
}
}
/* Status */
@ -632,7 +670,7 @@ unsigned long long FIO_decompressFrame(dRess_t* ress,
if (ZSTD_isError(readSizeHint)) EXM_THROW(36, "Decoding error : %s", ZSTD_getErrorName(readSizeHint));
/* Write block */
storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, outBuff.pos, storedSkips);
storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, outBuff.pos, storedSkips);
frameSize += outBuff.pos;
DISPLAYUPDATE(2, "\rDecoded : %u MB... ", (U32)((alreadyDecoded+frameSize)>>20) );

View File

@ -40,6 +40,7 @@ void FIO_setDictIDFlag(unsigned dictIDFlag);
void FIO_setChecksumFlag(unsigned checksumFlag);
void FIO_setRemoveSrcFile(unsigned flag);
void FIO_setMemLimit(unsigned memLimit);
void FIO_setNbThreads(unsigned nbThreads);
/*-*************************************

View File

@ -110,12 +110,15 @@ static int usage_advanced(const char* programName)
DISPLAY( " -q : suppress warnings; specify twice to suppress errors too\n");
DISPLAY( " -c : force write to standard output, even if it is the console\n");
#ifdef UTIL_HAS_CREATEFILELIST
DISPLAY( " -r : operate recursively on directories\n");
DISPLAY( " -r : operate recursively on directories \n");
#endif
#ifndef ZSTD_NOCOMPRESS
DISPLAY( "--ultra : enable levels beyond %i, up to %i (requires more memory)\n", ZSTDCLI_CLEVEL_MAX, ZSTD_maxCLevel());
DISPLAY( "--no-dictID : don't write dictID into header (dictionary compression)\n");
DISPLAY( "--[no-]check : integrity check (default:enabled)\n");
DISPLAY( "--[no-]check : integrity check (default:enabled) \n");
#ifdef ZSTD_MULTITHREAD
DISPLAY( " -T# : use # threads for compression (default:1) \n");
#endif
#endif
#ifndef ZSTD_NODECOMPRESS
DISPLAY( "--test : test compressed file integrity \n");
@ -233,7 +236,10 @@ int main(int argCount, const char* argv[])
nextArgumentIsDictID=0,
nextArgumentsAreFiles=0,
ultra=0,
lastCommand = 0;
lastCommand = 0,
nbThreads = 1;
unsigned bench_nbSeconds = 3; /* would be better if this value was synchronized from bench */
size_t blockSize = 0;
zstd_operation_mode operation = zom_compress;
ZSTD_compressionParameters compressionParams;
int cLevel = ZSTDCLI_CLEVEL_DEFAULT;
@ -396,39 +402,37 @@ int main(int argCount, const char* argv[])
#ifndef ZSTD_NOBENCH
/* Benchmark */
case 'b': operation=zom_bench; argument++; break;
case 'b':
operation=zom_bench;
argument++;
break;
/* range bench (benchmark only) */
case 'e':
/* compression Level */
argument++;
cLevelLast = readU32FromChar(&argument);
break;
/* compression Level */
argument++;
cLevelLast = readU32FromChar(&argument);
break;
/* Modify Nb Iterations (benchmark only) */
case 'i':
argument++;
{ U32 const iters = readU32FromChar(&argument);
BMK_setNotificationLevel(displayLevel);
BMK_SetNbSeconds(iters);
}
bench_nbSeconds = readU32FromChar(&argument);
break;
/* cut input into blocks (benchmark only) */
case 'B':
argument++;
{ size_t const bSize = readU32FromChar(&argument);
BMK_setNotificationLevel(displayLevel);
BMK_SetBlockSize(bSize);
}
blockSize = readU32FromChar(&argument);
break;
#endif /* ZSTD_NOBENCH */
/* nb of threads (hidden option) */
case 'T':
argument++;
BMK_SetNbThreads(readU32FromChar(&argument));
nbThreads = readU32FromChar(&argument);
break;
#endif /* ZSTD_NOBENCH */
/* Dictionary Selection level */
case 's':
@ -518,6 +522,9 @@ int main(int argCount, const char* argv[])
if (operation==zom_bench) {
#ifndef ZSTD_NOBENCH
BMK_setNotificationLevel(displayLevel);
BMK_setBlockSize(blockSize);
BMK_setNbThreads(nbThreads);
BMK_setNbSeconds(bench_nbSeconds);
BMK_benchFiles(filenameTable, filenameIdx, dictFileName, cLevel, cLevelLast, &compressionParams);
#endif
goto _end;
@ -569,6 +576,7 @@ int main(int argCount, const char* argv[])
FIO_setNotificationLevel(displayLevel);
if (operation==zom_compress) {
#ifndef ZSTD_NOCOMPRESS
FIO_setNbThreads(nbThreads);
if ((filenameIdx==1) && outFileName)
operationResult = FIO_compressFilename(outFileName, filenameTable[0], dictFileName, cLevel, &compressionParams);
else