From f147fccd0c5da08ab05995b85922444f575c9183 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Tue, 15 Nov 2016 16:39:09 -0800 Subject: [PATCH 1/2] [pzstd] Fix frame size for small files + add logging --- contrib/pzstd/Pzstd.cpp | 15 ++++++--------- contrib/pzstd/Pzstd.h | 12 ++++++++++-- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/contrib/pzstd/Pzstd.cpp b/contrib/pzstd/Pzstd.cpp index c5b4ce4cb..1778fef25 100644 --- a/contrib/pzstd/Pzstd.cpp +++ b/contrib/pzstd/Pzstd.cpp @@ -341,15 +341,7 @@ static size_t calculateStep( std::uintmax_t size, size_t numThreads, const ZSTD_parameters ¶ms) { - size_t step = size_t{1} << (params.cParams.windowLog + 2); - // If file size is known, see if a smaller step will spread work more evenly - if (size != 0) { - const std::uintmax_t newStep = size / numThreads; - if (newStep != 0 && newStep <= std::numeric_limits::max()) { - step = std::min(step, static_cast(newStep)); - } - } - return step; + return size_t{1} << (params.cParams.windowLog + 2); } namespace { @@ -401,6 +393,7 @@ std::uint64_t asyncCompressChunks( // Break the input up into chunks of size `step` and compress each chunk // independently. size_t step = calculateStep(size, numThreads, params); + state.log(DEBUG, "Chosen frame size: %zu\n", step); auto status = FileStatus::Continue; while (status == FileStatus::Continue && !state.errorHolder.hasError()) { // Make a new input queue that we will put the chunk's input data into. @@ -415,6 +408,7 @@ std::uint64_t asyncCompressChunks( }); // Pass the output queue to the writer thread. chunks.push(std::move(out)); + state.log(VERBOSE, "Starting a new frame\n"); // Fill the input queue for the compression job we just started status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead); } @@ -551,11 +545,14 @@ std::uint64_t asyncDecompressFrames( if (frameSize == 0) { // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted // Pass the rest of the source to this decompression task + state.log(VERBOSE, + "Input not in pzstd format, falling back to serial decompression\n"); while (status == FileStatus::Continue && !state.errorHolder.hasError()) { status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead); } break; } + state.log(VERBOSE, "Decompressing a frame of size %zu", frameSize); // Fill the input queue for the decompression job we just started status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead); } diff --git a/contrib/pzstd/Pzstd.h b/contrib/pzstd/Pzstd.h index 9fb2c4884..dc60dd9b8 100644 --- a/contrib/pzstd/Pzstd.h +++ b/contrib/pzstd/Pzstd.h @@ -40,7 +40,8 @@ class SharedState { if (!options.decompress) { auto parameters = options.determineParameters(); cStreamPool.reset(new ResourcePool{ - [parameters]() -> ZSTD_CStream* { + [this, parameters]() -> ZSTD_CStream* { + this->log(VERBOSE, "Creating new ZSTD_CStream\n"); auto zcs = ZSTD_createCStream(); if (zcs) { auto err = ZSTD_initCStream_advanced( @@ -57,7 +58,8 @@ class SharedState { }}); } else { dStreamPool.reset(new ResourcePool{ - []() -> ZSTD_DStream* { + [this]() -> ZSTD_DStream* { + this->log(VERBOSE, "Creating new ZSTD_DStream\n"); auto zds = ZSTD_createDStream(); if (zds) { auto err = ZSTD_initDStream(zds); @@ -74,6 +76,12 @@ class SharedState { } } + ~SharedState() { + // The resource pools have references to this, so destroy them first. + cStreamPool.reset(); + dStreamPool.reset(); + } + Logger log; ErrorHolder errorHolder; std::unique_ptr> cStreamPool; From bcd61586a86dd3efb9b800636f6c7c83b10942cb Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Tue, 15 Nov 2016 17:46:28 -0800 Subject: [PATCH 2/2] [pzstd] Cast unused parameters to void --- contrib/pzstd/Pzstd.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/contrib/pzstd/Pzstd.cpp b/contrib/pzstd/Pzstd.cpp index 1778fef25..f4cb19d9c 100644 --- a/contrib/pzstd/Pzstd.cpp +++ b/contrib/pzstd/Pzstd.cpp @@ -341,6 +341,8 @@ static size_t calculateStep( std::uintmax_t size, size_t numThreads, const ZSTD_parameters ¶ms) { + (void)size; + (void)numThreads; return size_t{1} << (params.cParams.windowLog + 2); }