Merge pull request #1436 from lz4/MT_linkedBlocks

Linked Blocks compression (-BD) can employ multiple threads
This commit is contained in:
Yann Collet 2024-07-09 22:10:29 -07:00 committed by GitHub
commit 291c3ae9b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 61 additions and 59 deletions

View File

@ -530,21 +530,22 @@ jobs:
# xemu : QEMU static emulator executable.
# os : GitHub Actions YAML workflow label. See https://github.com/actions/virtual-environments#available-environments
{ type: ARM, pkgs: 'qemu-system-arm gcc-arm-linux-gnueabi', xcc: arm-linux-gnueabi-gcc, xemu: qemu-arm-static, os: ubuntu-latest, },
{ type: ARM64, pkgs: 'qemu-system-arm gcc-aarch64-linux-gnu', xcc: aarch64-linux-gnu-gcc, xemu: qemu-aarch64-static, os: ubuntu-latest, },
{ type: PPC, pkgs: 'qemu-system-ppc gcc-powerpc-linux-gnu', xcc: powerpc-linux-gnu-gcc, xemu: qemu-ppc-static, os: ubuntu-latest, },
{ type: PPC64LE, pkgs: 'qemu-system-ppc gcc-powerpc64le-linux-gnu', xcc: powerpc64le-linux-gnu-gcc, xemu: qemu-ppc64le-static, os: ubuntu-latest, },
{ type: S390X, pkgs: 'qemu-system-s390x gcc-s390x-linux-gnu', xcc: s390x-linux-gnu-gcc, xemu: qemu-s390x-static, os: ubuntu-latest, },
{ type: MIPS, pkgs: 'qemu-system-mips gcc-mips-linux-gnu', xcc: mips-linux-gnu-gcc, xemu: qemu-mips-static, os: ubuntu-latest, },
{ type: M68K, pkgs: 'qemu-system-m68k gcc-m68k-linux-gnu', xcc: m68k-linux-gnu-gcc, xemu: qemu-m68k-static, os: ubuntu-latest, },
{ type: RISC-V, pkgs: 'qemu-system-riscv64 gcc-riscv64-linux-gnu', xcc: riscv64-linux-gnu-gcc, xemu: qemu-riscv64-static, os: ubuntu-latest, },
{ type: SPARC, pkgs: 'qemu-system-sparc gcc-sparc64-linux-gnu', xcc: sparc64-linux-gnu-gcc, xemu: qemu-sparc64-static, os: ubuntu-20.04, },
{ type: ARM, pkgs: 'qemu-system-arm gcc-arm-linux-gnueabi', xcc: arm-linux-gnueabi-gcc, xemu: qemu-arm-static, os: ubuntu-latest, makevar: "", },
{ type: ARM64, pkgs: 'qemu-system-arm gcc-aarch64-linux-gnu', xcc: aarch64-linux-gnu-gcc, xemu: qemu-aarch64-static, os: ubuntu-latest, makevar: "", },
{ type: PPC, pkgs: 'qemu-system-ppc gcc-powerpc-linux-gnu', xcc: powerpc-linux-gnu-gcc, xemu: qemu-ppc-static, os: ubuntu-latest, makevar: "", },
{ type: PPC64LE, pkgs: 'qemu-system-ppc gcc-powerpc64le-linux-gnu', xcc: powerpc64le-linux-gnu-gcc, xemu: qemu-ppc64le-static, os: ubuntu-latest, makevar: "", },
{ type: S390X, pkgs: 'qemu-system-s390x gcc-s390x-linux-gnu', xcc: s390x-linux-gnu-gcc, xemu: qemu-s390x-static, os: ubuntu-latest, makevar: "", },
{ type: MIPS, pkgs: 'qemu-system-mips gcc-mips-linux-gnu', xcc: mips-linux-gnu-gcc, xemu: qemu-mips-static, os: ubuntu-latest, makevar: "", },
{ type: M68K, pkgs: 'qemu-system-m68k gcc-m68k-linux-gnu', xcc: m68k-linux-gnu-gcc, xemu: qemu-m68k-static, os: ubuntu-latest, makevar: "HAVE_MULTITHREAD=0", }, # bug in MT mode on m68k
{ type: RISC-V, pkgs: 'qemu-system-riscv64 gcc-riscv64-linux-gnu', xcc: riscv64-linux-gnu-gcc, xemu: qemu-riscv64-static, os: ubuntu-latest, makevar: "", },
{ type: SPARC, pkgs: 'qemu-system-sparc gcc-sparc64-linux-gnu', xcc: sparc64-linux-gnu-gcc, xemu: qemu-sparc64-static, os: ubuntu-20.04, makevar: "", },
]
runs-on: ${{ matrix.os }}
env: # Set environment variables
XCC: ${{ matrix.xcc }}
XEMU: ${{ matrix.xemu }}
MAKEVAR: ${{ matrix.makevar }}
steps:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # https://github.com/actions/checkout v4.1.7
@ -571,7 +572,7 @@ jobs:
- name: MIPS-M68K-RISCV-SPARC
if: ${{ matrix.type == 'MIPS' || matrix.type == 'M68K' || matrix.type == 'RISC-V' || matrix.type == 'SPARC' }}
run: make platformTest V=1 CC=$XCC QEMU_SYS=$XEMU
run: make platformTest V=1 CC=$XCC QEMU_SYS=$XEMU $MAKEVAR

View File

@ -706,7 +706,8 @@ static void LZ4IO_readAndProcess(void* arg)
if (!buffer)
END_PROCESS(31, "Allocation error : can't allocate buffer to read new chunk");
if (prefixSize) {
memcpy(buffer, rjd->prefix, 64 KB);
assert(prefixSize == 64 KB);
memcpy(buffer, rjd->prefix, prefixSize);
}
{ char* const in_buff = (char*)buffer + prefixSize;
size_t const inSize = fread(in_buff, (size_t)1, chunkSize, rjd->fin);
@ -744,7 +745,7 @@ static void LZ4IO_readAndProcess(void* arg)
cjd->lastBlock = inSize < chunkSize;
TPool_submitJob(rjd->tPool, LZ4IO_compressAndFreeChunk, cjd);
if (inSize == chunkSize) {
/* probably more ? read another chunk */
/* likely more => read another chunk */
rjd->blockNb++;
TPool_submitJob(rjd->tPool, LZ4IO_readAndProcess, rjd);
} } }
@ -886,6 +887,7 @@ static int LZ4IO_compressLegacy_internal(unsigned long long* readSize,
(double)wr.totalCSize / (double)(rjd.totalReadSize + !rjd.totalReadSize) * 100.);
*readSize = rjd.totalReadSize;
}
/* Close & Free */
_cfl_clean:
WR_destroy(&wr);
@ -1126,18 +1128,23 @@ static size_t LZ4IO_compressFrameChunk(const void* params,
if (cctx==NULL || LZ4F_isError(ccr))
END_PROCESS(51, "unable to create a LZ4F compression context");
}
/* init state, and writes frame header, will be overwritten at next stage.
* Also: no support for dictionary yet, meaning linked blocks are actually independent */
{ size_t const whr = LZ4F_compressBegin_usingCDict(cctx, dst, dstCapacity, cfcp->cdict, cfcp->prefs);
/* init state, and writes frame header, will be overwritten at next stage. */
if (prefixSize) {
size_t const whr = LZ4F_compressBegin_usingDict(cctx, dst, dstCapacity, (const char*)src - prefixSize, prefixSize, cfcp->prefs);
if (LZ4F_isError(whr))
END_PROCESS(52, "error initializing LZ4F compression context");
END_PROCESS(52, "error initializing LZ4F compression context with prefix");
assert(prefixSize == 64 KB);
} else {
size_t const whr = LZ4F_compressBegin_usingCDict(cctx, dst, dstCapacity, cfcp->cdict, cfcp->prefs);
if (LZ4F_isError(whr))
END_PROCESS(53, "error initializing LZ4F compression context");
}
/* let's now compress, overwriting unused header */
{ size_t const cSize = LZ4F_compressUpdate(cctx, dst, dstCapacity, src, srcSize, NULL);
if (LZ4F_isError(cSize))
END_PROCESS(53, "error compressing with LZ4F_compressUpdate");
END_PROCESS(55, "error compressing with LZ4F_compressUpdate");
LZ4F_freeCompressionContext(cctx);
(void)prefixSize;
return (size_t) cSize;
}
}
@ -1149,7 +1156,7 @@ static size_t LZ4IO_compressFrameChunk(const void* params,
*/
int
LZ4IO_compressFilename_extRess_MT(unsigned long long* inStreamSize,
cRess_t ress,
cRess_t* ress,
const char* srcFileName, const char* dstFileName,
int compressionLevel,
const LZ4IO_prefs_t* const io_prefs)
@ -1157,12 +1164,12 @@ LZ4IO_compressFilename_extRess_MT(unsigned long long* inStreamSize,
unsigned long long filesize = 0;
unsigned long long compressedfilesize = 0;
FILE* dstFile;
void* const srcBuffer = ress.srcBuffer;
void* const dstBuffer = ress.dstBuffer;
const size_t dstBufferSize = ress.dstBufferSize;
void* const srcBuffer = ress->srcBuffer;
void* const dstBuffer = ress->dstBuffer;
const size_t dstBufferSize = ress->dstBufferSize;
const size_t chunkSize = 4 MB; /* each job should be "sufficiently large" */
size_t readSize;
LZ4F_compressionContext_t ctx = ress.ctx; /* just a pointer */
LZ4F_compressionContext_t ctx = ress->ctx; /* just a pointer */
LZ4F_preferences_t prefs;
/* Init */
@ -1172,7 +1179,7 @@ LZ4IO_compressFilename_extRess_MT(unsigned long long* inStreamSize,
if (dstFile == NULL) { fclose(srcFile); return 1; }
/* Adjust compression parameters */
prefs = ress.preparedPrefs;
prefs = ress->preparedPrefs;
prefs.compressionLevel = compressionLevel;
if (io_prefs->contentSizeFlag) {
U64 const fileSize = UTIL_getOpenFileSize(srcFile);
@ -1182,7 +1189,7 @@ LZ4IO_compressFilename_extRess_MT(unsigned long long* inStreamSize,
}
/* read first chunk */
assert(chunkSize <= ress.srcBufferSize);
assert(chunkSize <= ress->srcBufferSize);
readSize = fread(srcBuffer, (size_t)1, chunkSize, srcFile);
if (ferror(srcFile))
END_PROCESS(40, "Error reading first chunk (%u bytes) of '%s' ", (unsigned)chunkSize, srcFileName);
@ -1191,7 +1198,7 @@ LZ4IO_compressFilename_extRess_MT(unsigned long long* inStreamSize,
/* single-block file */
if (readSize < chunkSize) {
/* Compress in single pass */
size_t const cSize = LZ4F_compressFrame_usingCDict(ctx, dstBuffer, dstBufferSize, srcBuffer, readSize, ress.cdict, &prefs);
size_t const cSize = LZ4F_compressFrame_usingCDict(ctx, dstBuffer, dstBufferSize, srcBuffer, readSize, ress->cdict, &prefs);
if (LZ4F_isError(cSize))
END_PROCESS(41, "Compression failed : %s", LZ4F_getErrorName(cSize));
compressedfilesize = cSize;
@ -1215,17 +1222,17 @@ LZ4IO_compressFilename_extRess_MT(unsigned long long* inStreamSize,
LZ4IO_CfcParameters cfcp;
ReadTracker rjd;
if (ress.tPool == NULL) {
ress.tPool = TPool_create(io_prefs->nbWorkers, 4);
assert(ress.wPool == NULL);
ress.wPool = TPool_create(1, 4);
if (ress.tPool == NULL || ress.wPool == NULL)
if (ress->tPool == NULL) {
ress->tPool = TPool_create(io_prefs->nbWorkers, 4);
assert(ress->wPool == NULL);
ress->wPool = TPool_create(1, 4);
if (ress->tPool == NULL || ress->wPool == NULL)
END_PROCESS(43, "can't create threadpools");
}
cfcp.prefs = &prefs;
cfcp.cdict = ress.cdict;
rjd.tPool = ress.tPool;
rjd.wpool = ress.wPool;
cfcp.cdict = ress->cdict;
rjd.tPool = ress->tPool;
rjd.wpool = ress->wPool;
rjd.fin = srcFile;
rjd.chunkSize = chunkSize;
rjd.totalReadSize = 0;
@ -1271,7 +1278,7 @@ LZ4IO_compressFilename_extRess_MT(unsigned long long* inStreamSize,
/* process first block */
{ CompressJobDesc cjd;
cjd.wpool = ress.wPool;
cjd.wpool = ress->wPool;
cjd.buffer = srcBuffer;
cjd.prefixSize = 0;
cjd.inSize = readSize;
@ -1282,7 +1289,7 @@ LZ4IO_compressFilename_extRess_MT(unsigned long long* inStreamSize,
cjd.wr = &wr;
cjd.maxCBlockSize = rjd.maxCBlockSize;
cjd.lastBlock = 0;
TPool_submitJob(ress.tPool, LZ4IO_compressChunk, &cjd);
TPool_submitJob(ress->tPool, LZ4IO_compressChunk, &cjd);
rjd.totalReadSize = readSize;
rjd.blockNb = 1;
if (prefixBuffer) {
@ -1291,11 +1298,11 @@ LZ4IO_compressFilename_extRess_MT(unsigned long long* inStreamSize,
}
/* Start the job chain */
TPool_submitJob(ress.tPool, LZ4IO_readAndProcess, &rjd);
TPool_submitJob(ress->tPool, LZ4IO_readAndProcess, &rjd);
/* Wait for all completion */
TPool_jobsCompleted(ress.tPool);
TPool_jobsCompleted(ress.wPool);
TPool_jobsCompleted(ress->tPool);
TPool_jobsCompleted(ress->wPool);
compressedfilesize += wr.totalCSize;
}
@ -1357,7 +1364,7 @@ LZ4IO_compressFilename_extRess_MT(unsigned long long* inStreamSize,
*/
int
LZ4IO_compressFilename_extRess_ST(unsigned long long* inStreamSize,
cRess_t ress,
const cRess_t* ress,
const char* srcFileName, const char* dstFileName,
int compressionLevel,
const LZ4IO_prefs_t* const io_prefs)
@ -1365,12 +1372,12 @@ LZ4IO_compressFilename_extRess_ST(unsigned long long* inStreamSize,
unsigned long long filesize = 0;
unsigned long long compressedfilesize = 0;
FILE* dstFile;
void* const srcBuffer = ress.srcBuffer;
void* const dstBuffer = ress.dstBuffer;
const size_t dstBufferSize = ress.dstBufferSize;
void* const srcBuffer = ress->srcBuffer;
void* const dstBuffer = ress->dstBuffer;
const size_t dstBufferSize = ress->dstBufferSize;
const size_t blockSize = io_prefs->blockSize;
size_t readSize;
LZ4F_compressionContext_t ctx = ress.ctx; /* just a pointer */
LZ4F_compressionContext_t ctx = ress->ctx; /* just a pointer */
LZ4F_preferences_t prefs;
/* Init */
@ -1381,7 +1388,7 @@ LZ4IO_compressFilename_extRess_ST(unsigned long long* inStreamSize,
memset(&prefs, 0, sizeof(prefs));
/* Adjust compression parameters */
prefs = ress.preparedPrefs;
prefs = ress->preparedPrefs;
prefs.compressionLevel = compressionLevel;
if (io_prefs->contentSizeFlag) {
U64 const fileSize = UTIL_getOpenFileSize(srcFile);
@ -1398,7 +1405,7 @@ LZ4IO_compressFilename_extRess_ST(unsigned long long* inStreamSize,
/* single-block file */
if (readSize < blockSize) {
/* Compress in single pass */
size_t const cSize = LZ4F_compressFrame_usingCDict(ctx, dstBuffer, dstBufferSize, srcBuffer, readSize, ress.cdict, &prefs);
size_t const cSize = LZ4F_compressFrame_usingCDict(ctx, dstBuffer, dstBufferSize, srcBuffer, readSize, ress->cdict, &prefs);
if (LZ4F_isError(cSize))
END_PROCESS(41, "Compression failed : %s", LZ4F_getErrorName(cSize));
compressedfilesize = cSize;
@ -1415,7 +1422,7 @@ LZ4IO_compressFilename_extRess_ST(unsigned long long* inStreamSize,
/* multiple-blocks file */
{
/* Write Frame Header */
size_t const headerSize = LZ4F_compressBegin_usingCDict(ctx, dstBuffer, dstBufferSize, ress.cdict, &prefs);
size_t const headerSize = LZ4F_compressBegin_usingCDict(ctx, dstBuffer, dstBufferSize, ress->cdict, &prefs);
if (LZ4F_isError(headerSize))
END_PROCESS(43, "File header generation failed : %s", LZ4F_getErrorName(headerSize));
if (fwrite(dstBuffer, 1, headerSize, dstFile) != headerSize)
@ -1482,31 +1489,25 @@ LZ4IO_compressFilename_extRess_ST(unsigned long long* inStreamSize,
static int
LZ4IO_compressFilename_extRess(unsigned long long* inStreamSize,
cRess_t ress,
cRess_t* ress,
const char* srcFileName, const char* dstFileName,
int compressionLevel,
const LZ4IO_prefs_t* const io_prefs)
{
#if LZ4IO_MULTITHREAD
/* only employ multi-threading in the following scenarios: */
if ( (io_prefs->nbWorkers != 1)
&& (io_prefs->blockIndependence == LZ4F_blockIndependent) /* blocks must be independent */
)
if (LZ4IO_MULTITHREAD)
return LZ4IO_compressFilename_extRess_MT(inStreamSize, ress, srcFileName, dstFileName, compressionLevel, io_prefs);
#endif
/* Only single-thread available */
return LZ4IO_compressFilename_extRess_ST(inStreamSize, ress, srcFileName, dstFileName, compressionLevel, io_prefs);
}
int LZ4IO_compressFilename(const char* srcFileName, const char* dstFileName, int compressionLevel, const LZ4IO_prefs_t* prefs)
{
TIME_t const timeStart = TIME_getTime();
clock_t const cpuStart = clock();
cRess_t const ress = LZ4IO_createCResources(prefs);
cRess_t ress = LZ4IO_createCResources(prefs);
unsigned long long processed;
int const result = LZ4IO_compressFilename_extRess(&processed, ress, srcFileName, dstFileName, compressionLevel, prefs);
int const result = LZ4IO_compressFilename_extRess(&processed, &ress, srcFileName, dstFileName, compressionLevel, prefs);
/* Free resources */
LZ4IO_freeCResources(ress);
@ -1541,7 +1542,7 @@ int LZ4IO_compressMultipleFilenames(
unsigned long long processed;
size_t const ifnSize = strlen(inFileNamesTable[i]);
if (LZ4IO_isStdout(suffix)) {
missed_files += LZ4IO_compressFilename_extRess(&processed, ress,
missed_files += LZ4IO_compressFilename_extRess(&processed, &ress,
inFileNamesTable[i], stdoutmark,
compressionLevel, prefs);
totalProcessed += processed;
@ -1559,7 +1560,7 @@ int LZ4IO_compressMultipleFilenames(
strcpy(dstFileName, inFileNamesTable[i]);
strcat(dstFileName, suffix);
missed_files += LZ4IO_compressFilename_extRess(&processed, ress,
missed_files += LZ4IO_compressFilename_extRess(&processed, &ress,
inFileNamesTable[i], dstFileName,
compressionLevel, prefs);
totalProcessed += processed;