minor: Semaphore init value

This commit is contained in:
Yann Collet 2024-07-05 17:16:27 -07:00
parent 097f0fba84
commit 3628163d2a

View File

@ -76,7 +76,7 @@ void TPOOL_completeJobs(TPOOL_ctx* ctx) {
typedef struct TPOOL_ctx_s {
HANDLE completionPort;
HANDLE workerThreads[LZ4_NBWORKERS_MAX];
int numWorkers;
int nbWorkers;
int queueSize;
__LONG32 numPendingJobs;
HANDLE jobSemaphore; // For queue size control
@ -86,15 +86,15 @@ void TPOOL_free(TPOOL_ctx* ctx) {
if (!ctx) return;
// Signal workers to exit by posting NULL completions
for (int i = 0; i < ctx->numWorkers; i++) {
for (int i = 0; i < ctx->nbWorkers; i++) {
PostQueuedCompletionStatus(ctx->completionPort, 0, 0, NULL);
}
// Wait for worker threads to finish
WaitForMultipleObjects(ctx->numWorkers, ctx->workerThreads, TRUE, INFINITE);
WaitForMultipleObjects(ctx->nbWorkers, ctx->workerThreads, TRUE, INFINITE);
// Close thread handles and completion port
for (int i = 0; i < ctx->numWorkers; i++) {
for (int i = 0; i < ctx->nbWorkers; i++) {
CloseHandle(ctx->workerThreads[i]);
}
CloseHandle(ctx->completionPort);
@ -166,7 +166,7 @@ TPOOL_ctx* TPOOL_create(int nbWorkers, int queueSize)
}
// Create worker threads
ctx->numWorkers = nbWorkers;
ctx->nbWorkers = nbWorkers;
for (int i = 0; i < nbWorkers; i++) {
ctx->workerThreads[i] = CreateThread(NULL, 0, WorkerThread, ctx, 0, NULL);
if (!ctx->workerThreads[i]) {
@ -178,7 +178,7 @@ TPOOL_ctx* TPOOL_create(int nbWorkers, int queueSize)
// Initialize other members (no changes here)
ctx->queueSize = queueSize;
ctx->numPendingJobs = 0;
ctx->jobSemaphore = CreateSemaphore(NULL, 0, queueSize, NULL);
ctx->jobSemaphore = CreateSemaphore(NULL, queueSize, queueSize, NULL);
if (!ctx->jobSemaphore) {
TPOOL_free(ctx);
return NULL;
@ -193,7 +193,7 @@ void TPOOL_submitJob(TPOOL_ctx* ctx, void (*job_function)(void*), void* arg)
if (!ctx || !job_function) return;
// Atomically increment pending jobs and check for overflow
if (InterlockedIncrement(&ctx->numPendingJobs) > ctx->numWorkers + ctx->queueSize) {
if (InterlockedIncrement(&ctx->numPendingJobs) > ctx->nbWorkers + ctx->queueSize) {
InterlockedDecrement(&ctx->numPendingJobs);
WaitForSingleObject(ctx->jobSemaphore, INFINITE);
}
@ -202,7 +202,7 @@ void TPOOL_submitJob(TPOOL_ctx* ctx, void (*job_function)(void*), void* arg)
PostQueuedCompletionStatus(ctx->completionPort,
0, // Bytes transferred not used
(ULONG_PTR)job_function, // Store function pointer in completionKey
(LPOVERLAPPED)arg); // Store argument in overlapped
(LPOVERLAPPED)arg); // Store argument in overlapped
}
void TPOOL_completeJobs(TPOOL_ctx* ctx)