diff --git a/programs/threadpool.c b/programs/threadpool.c index 48729913..15330928 100644 --- a/programs/threadpool.c +++ b/programs/threadpool.c @@ -76,7 +76,7 @@ void TPOOL_completeJobs(TPOOL_ctx* ctx) { typedef struct TPOOL_ctx_s { HANDLE completionPort; HANDLE workerThreads[LZ4_NBWORKERS_MAX]; - int numWorkers; + int nbWorkers; int queueSize; __LONG32 numPendingJobs; HANDLE jobSemaphore; // For queue size control @@ -86,15 +86,15 @@ void TPOOL_free(TPOOL_ctx* ctx) { if (!ctx) return; // Signal workers to exit by posting NULL completions - for (int i = 0; i < ctx->numWorkers; i++) { + for (int i = 0; i < ctx->nbWorkers; i++) { PostQueuedCompletionStatus(ctx->completionPort, 0, 0, NULL); } // Wait for worker threads to finish - WaitForMultipleObjects(ctx->numWorkers, ctx->workerThreads, TRUE, INFINITE); + WaitForMultipleObjects(ctx->nbWorkers, ctx->workerThreads, TRUE, INFINITE); // Close thread handles and completion port - for (int i = 0; i < ctx->numWorkers; i++) { + for (int i = 0; i < ctx->nbWorkers; i++) { CloseHandle(ctx->workerThreads[i]); } CloseHandle(ctx->completionPort); @@ -166,7 +166,7 @@ TPOOL_ctx* TPOOL_create(int nbWorkers, int queueSize) } // Create worker threads - ctx->numWorkers = nbWorkers; + ctx->nbWorkers = nbWorkers; for (int i = 0; i < nbWorkers; i++) { ctx->workerThreads[i] = CreateThread(NULL, 0, WorkerThread, ctx, 0, NULL); if (!ctx->workerThreads[i]) { @@ -178,7 +178,7 @@ TPOOL_ctx* TPOOL_create(int nbWorkers, int queueSize) // Initialize other members (no changes here) ctx->queueSize = queueSize; ctx->numPendingJobs = 0; - ctx->jobSemaphore = CreateSemaphore(NULL, 0, queueSize, NULL); + ctx->jobSemaphore = CreateSemaphore(NULL, queueSize, queueSize, NULL); if (!ctx->jobSemaphore) { TPOOL_free(ctx); return NULL; @@ -193,7 +193,7 @@ void TPOOL_submitJob(TPOOL_ctx* ctx, void (*job_function)(void*), void* arg) if (!ctx || !job_function) return; // Atomically increment pending jobs and check for overflow - if (InterlockedIncrement(&ctx->numPendingJobs) > ctx->numWorkers + ctx->queueSize) { + if (InterlockedIncrement(&ctx->numPendingJobs) > ctx->nbWorkers + ctx->queueSize) { InterlockedDecrement(&ctx->numPendingJobs); WaitForSingleObject(ctx->jobSemaphore, INFINITE); } @@ -202,7 +202,7 @@ void TPOOL_submitJob(TPOOL_ctx* ctx, void (*job_function)(void*), void* arg) PostQueuedCompletionStatus(ctx->completionPort, 0, // Bytes transferred not used (ULONG_PTR)job_function, // Store function pointer in completionKey - (LPOVERLAPPED)arg); // Store argument in overlapped + (LPOVERLAPPED)arg); // Store argument in overlapped } void TPOOL_completeJobs(TPOOL_ctx* ctx)