[pzstd] Add Logger class

This commit is contained in:
Nick Terrell 2016-10-12 19:02:27 -07:00
parent e9e151ce31
commit baa152e56e
3 changed files with 100 additions and 49 deletions

72
contrib/pzstd/Logging.h Normal file
View File

@ -0,0 +1,72 @@
/**
* Copyright (c) 2016-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
#pragma once
#include <cstdio>
#include <mutex>
namespace pzstd {
constexpr int ERROR = 1;
constexpr int INFO = 2;
constexpr int DEBUG = 3;
constexpr int VERBOSE = 4;
class Logger {
std::mutex mutex_;
FILE* out_;
const int level_;
using Clock = std::chrono::system_clock;
Clock::time_point lastUpdate_;
std::chrono::milliseconds refreshRate_;
public:
explicit Logger(int level, FILE* out = stderr)
: out_(out), level_(level), lastUpdate_(Clock::now()),
refreshRate_(150) {}
bool logsAt(int level) {
return level <= level_;
}
template <typename... Args>
void operator()(int level, const char *fmt, Args... args) {
if (level > level_) {
return;
}
std::lock_guard<std::mutex> lock(mutex_);
std::fprintf(out_, fmt, args...);
}
template <typename... Args>
void update(int level, const char *fmt, Args... args) {
if (level > level_) {
return;
}
std::lock_guard<std::mutex> lock(mutex_);
auto now = Clock::now();
if (now - lastUpdate_ > refreshRate_) {
lastUpdate_ = now;
std::fprintf(out_, "\r");
std::fprintf(out_, fmt, args...);
}
}
void clear(int level) {
if (level > level_) {
return;
}
std::lock_guard<std::mutex> lock(mutex_);
std::fprintf(out_, "\r%79s\r", "");
}
};
}

View File

@ -85,30 +85,28 @@ static std::uint64_t handleOneInput(const Options &options,
options.determineParameters());
});
// Start writing
bytesWritten = writeFile(state, outs, outputFd, options.decompress,
options.verbosity);
bytesWritten = writeFile(state, outs, outputFd, options.decompress);
} else {
// Add a job that reads the input and starts all the decompression jobs
readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] {
bytesRead = asyncDecompressFrames(state, outs, executor, inputFd);
});
// Start writing
bytesWritten = writeFile(state, outs, outputFd, options.decompress,
options.verbosity);
bytesWritten = writeFile(state, outs, outputFd, options.decompress);
}
}
if (options.verbosity > 1 && !state.errorHolder.hasError()) {
if (!state.errorHolder.hasError()) {
std::string inputFileName = inputFile == "-" ? "stdin" : inputFile;
std::string outputFileName = outputFile == "-" ? "stdout" : outputFile;
if (!options.decompress) {
double ratio = static_cast<double>(bytesWritten) /
static_cast<double>(bytesRead + !bytesRead);
std::fprintf(stderr, "%-20s :%6.2f%% (%6" PRIu64 " => %6" PRIu64
state.log(INFO, "%-20s :%6.2f%% (%6" PRIu64 " => %6" PRIu64
" bytes, %s)\n",
inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten,
outputFileName.c_str());
} else {
std::fprintf(stderr, "%-20s: %" PRIu64 " bytes \n",
state.log(INFO, "%-20s: %" PRIu64 " bytes \n",
inputFileName.c_str(),bytesWritten);
}
}
@ -138,7 +136,7 @@ static FILE *openInputFile(const std::string &inputFile,
static FILE *openOutputFile(const Options &options,
const std::string &outputFile,
ErrorHolder &errorHolder) {
SharedState& state) {
if (outputFile == "-") {
SET_BINARY_MODE(stdout);
return stdout;
@ -148,41 +146,39 @@ static FILE *openOutputFile(const Options &options,
auto outputFd = std::fopen(outputFile.c_str(), "rb");
if (outputFd != nullptr) {
std::fclose(outputFd);
if (options.verbosity <= 1) {
errorHolder.setError("Output file exists");
if (!state.log.logsAt(INFO)) {
state.errorHolder.setError("Output file exists");
return nullptr;
}
std::fprintf(
stderr,
state.log(
INFO,
"pzstd: %s already exists; do you wish to overwrite (y/n) ? ",
outputFile.c_str());
int c = getchar();
if (c != 'y' && c != 'Y') {
errorHolder.setError("Not overwritten");
state.errorHolder.setError("Not overwritten");
return nullptr;
}
}
}
auto outputFd = std::fopen(outputFile.c_str(), "wb");
if (!errorHolder.check(
if (!state.errorHolder.check(
outputFd != nullptr, "Failed to open output file")) {
return 0;
return nullptr;
}
return outputFd;
}
int pzstdMain(const Options &options) {
int returnCode = 0;
SharedState state(options.decompress, options.determineParameters());
SharedState state(options);
for (const auto& input : options.inputFiles) {
// Setup the shared state
auto printErrorGuard = makeScopeGuard([&] {
if (state.errorHolder.hasError()) {
returnCode = 1;
if (options.verbosity > 0) {
std::fprintf(stderr, "pzstd: %s: %s.\n", input.c_str(),
state.errorHolder.getError().c_str());
}
state.log(ERROR, "pzstd: %s: %s.\n", input.c_str(),
state.errorHolder.getError().c_str());
}
});
// Open the input file
@ -197,7 +193,7 @@ int pzstdMain(const Options &options) {
"Input file does not have extension .zst")) {
continue;
}
auto outputFd = openOutputFile(options, outputFile, state.errorHolder);
auto outputFd = openOutputFile(options, outputFile, state);
if (outputFd == nullptr) {
continue;
}
@ -578,33 +574,14 @@ static bool writeData(ByteRange data, FILE* fd) {
return true;
}
void updateWritten(int verbosity, std::uint64_t bytesWritten) {
if (verbosity <= 1) {
return;
}
using Clock = std::chrono::system_clock;
static Clock::time_point then;
constexpr std::chrono::milliseconds refreshRate{150};
auto now = Clock::now();
if (now - then > refreshRate) {
then = now;
std::fprintf(stderr, "\rWritten: %u MB ",
static_cast<std::uint32_t>(bytesWritten >> 20));
}
}
std::uint64_t writeFile(
SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
FILE* outputFd,
bool decompress,
int verbosity) {
bool decompress) {
auto& errorHolder = state.errorHolder;
auto lineClearGuard = makeScopeGuard([verbosity] {
if (verbosity > 1) {
std::fprintf(stderr, "\r%79s\r", "");
}
auto lineClearGuard = makeScopeGuard([&state] {
state.log.clear(INFO);
});
std::uint64_t bytesWritten = 0;
std::shared_ptr<BufferWorkQueue> out;
@ -630,7 +607,8 @@ std::uint64_t writeFile(
return bytesWritten;
}
bytesWritten += buffer.size();
updateWritten(verbosity, bytesWritten);
state.log.update(INFO, "Written: %u MB ",
static_cast<std::uint32_t>(bytesWritten >> 20));
}
}
return bytesWritten;

View File

@ -9,6 +9,7 @@
#pragma once
#include "ErrorHolder.h"
#include "Logging.h"
#include "Options.h"
#include "utils/Buffer.h"
#include "utils/Range.h"
@ -35,8 +36,9 @@ int pzstdMain(const Options& options);
class SharedState {
public:
SharedState(bool decompress, ZSTD_parameters parameters) {
if (!decompress) {
SharedState(const Options& options) : log(options.verbosity) {
if (!options.decompress) {
auto parameters = options.determineParameters();
cStreamPool.reset(new ResourcePool<ZSTD_CStream>{
[parameters]() -> ZSTD_CStream* {
auto zcs = ZSTD_createCStream();
@ -72,6 +74,7 @@ class SharedState {
}
}
Logger log;
ErrorHolder errorHolder;
std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool;
std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool;
@ -129,13 +132,11 @@ std::uint64_t asyncDecompressFrames(
* (de)compression job.
* @param outputFd The file descriptor to write to
* @param decompress Are we decompressing?
* @param verbosity The verbosity level to log at
* @returns The number of bytes written
*/
std::uint64_t writeFile(
SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
FILE* outputFd,
bool decompress,
int verbosity);
bool decompress);
}