completion ports: minor readability refactor

This commit is contained in:
Yann Collet 2024-07-07 13:35:52 -07:00
parent c688b4bd58
commit cfdb8ac60b

View File

@ -83,40 +83,42 @@ typedef struct TPool_s {
HANDLE allJobsCompleted; /* Event */
} TPool;
void TPool_free(TPool* ctx)
void TPool_free(TPool* pool)
{
if (!ctx) return;
if (!pool) return;
/* Signal workers to exit by posting NULL completions */
for (int i = 0; i < ctx->nbWorkers; i++) {
PostQueuedCompletionStatus(ctx->completionPort, 0, 0, NULL);
for (int i = 0; i < pool->nbWorkers; i++) {
PostQueuedCompletionStatus(pool->completionPort, 0, 0, NULL);
}
/* Wait for worker threads to finish */
WaitForMultipleObjects(ctx->nbWorkers, ctx->workerThreads, TRUE, INFINITE);
WaitForMultipleObjects(pool->nbWorkers, pool->workerThreads, TRUE, INFINITE);
/* Close thread handles and completion port */
for (int i = 0; i < ctx->nbWorkers; i++) {
CloseHandle(ctx->workerThreads[i]);
{ int i;
for (i = 0; i < pool->nbWorkers; i++) {
CloseHandle(pool->workerThreads[i]);
}
}
free(ctx->workerThreads);
CloseHandle(ctx->completionPort);
free(pool->workerThreads);
CloseHandle(pool->completionPort);
/* Clean up synchronization objects */
CloseHandle(ctx->jobSlotAvail);
CloseHandle(ctx->allJobsCompleted);
CloseHandle(pool->jobSlotAvail);
CloseHandle(pool->allJobsCompleted);
free(ctx);
free(pool);
}
static DWORD WINAPI WorkerThread(LPVOID lpParameter)
{
TPool* ctx = (TPool*)lpParameter;
TPool* const pool = (TPool*)lpParameter;
DWORD bytesTransferred;
ULONG_PTR completionKey;
LPOVERLAPPED overlapped;
while (GetQueuedCompletionStatus(ctx->completionPort,
while (GetQueuedCompletionStatus(pool->completionPort,
&bytesTransferred, &completionKey,
&overlapped, INFINITE)) {
@ -127,10 +129,10 @@ static DWORD WINAPI WorkerThread(LPVOID lpParameter)
((void (*)(void*))completionKey)(overlapped);
/* Signal job completion */
if (InterlockedDecrement(&ctx->nbPendingJobs) == 0) {
SetEvent(ctx->allJobsCompleted);
if (InterlockedDecrement(&pool->nbPendingJobs) == 0) {
SetEvent(pool->allJobsCompleted);
}
ReleaseSemaphore(ctx->jobSlotAvail, 1, NULL);
ReleaseSemaphore(pool->jobSlotAvail, 1, NULL);
}
return 0;
@ -138,8 +140,7 @@ static DWORD WINAPI WorkerThread(LPVOID lpParameter)
TPool* TPool_create(int nbWorkers, int queueSize)
{
TPool* const ctx = calloc(1, sizeof(TPool));
if (!ctx) return NULL;
TPool* pool;
/* parameters sanitization */
if (nbWorkers <= 0 || queueSize <= 0) {
@ -148,65 +149,63 @@ TPool* TPool_create(int nbWorkers, int queueSize)
if (nbWorkers>LZ4_NBWORKERS_MAX)
nbWorkers=LZ4_NBWORKERS_MAX;
pool = calloc(1, sizeof(TPool));
if (!pool) return NULL;
/* Create completion port */
ctx->completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nbWorkers);
if (!ctx->completionPort) {
free(ctx);
return NULL;
}
pool->completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nbWorkers);
if (!pool->completionPort) { goto _cleanup; }
/* Create worker threads */
ctx->nbWorkers = nbWorkers;
ctx->workerThreads = (HANDLE*)malloc(sizeof(HANDLE) * nbWorkers);
if (ctx->workerThreads == NULL) {
TPool_free(ctx);
return NULL;
}
for (int i = 0; i < nbWorkers; i++) {
ctx->workerThreads[i] = CreateThread(NULL, 0, WorkerThread, ctx, 0, NULL);
if (!ctx->workerThreads[i]) {
TPool_free(ctx);
return NULL;
pool->nbWorkers = nbWorkers;
pool->workerThreads = (HANDLE*)malloc(sizeof(HANDLE) * nbWorkers);
if (pool->workerThreads == NULL) { goto _cleanup; }
{ int i;
for (i = 0; i < nbWorkers; i++) {
pool->workerThreads[i] = CreateThread(NULL, 0, WorkerThread, pool, 0, NULL);
if (!pool->workerThreads[i]) { goto _cleanup; }
}
}
/* Initialize other members (no changes here) */
ctx->queueSize = queueSize;
ctx->nbPendingJobs = 0;
ctx->jobSlotAvail = CreateSemaphore(NULL, queueSize+nbWorkers, queueSize+nbWorkers, NULL);
if (!ctx->jobSlotAvail) {
TPool_free(ctx);
return NULL;
}
ctx->allJobsCompleted = CreateEvent(NULL, FALSE, FALSE, NULL);
if (!ctx->allJobsCompleted) {
TPool_free(ctx);
return NULL;
}
return ctx;
/* Initialize sync objects members */
pool->queueSize = queueSize;
pool->nbPendingJobs = 0;
pool->jobSlotAvail = CreateSemaphore(NULL, queueSize+nbWorkers, queueSize+nbWorkers, NULL);
if (!pool->jobSlotAvail) { goto _cleanup; }
pool->allJobsCompleted = CreateEvent(NULL, FALSE, FALSE, NULL);
if (!pool->allJobsCompleted) { goto _cleanup; }
return pool;
_cleanup:
TPool_free(pool);
return NULL;
}
void TPool_submitJob(TPool* ctx, void (*job_function)(void*), void* arg)
void TPool_submitJob(TPool* pool, void (*job_function)(void*), void* arg)
{
if (!ctx || !job_function) return;
if (!pool || !job_function) return;
/* Atomically increment pending jobs and check for overflow */
WaitForSingleObject(ctx->jobSlotAvail, INFINITE);
ResetEvent(ctx->allJobsCompleted);
InterlockedIncrement(&ctx->nbPendingJobs);
WaitForSingleObject(pool->jobSlotAvail, INFINITE);
ResetEvent(pool->allJobsCompleted);
InterlockedIncrement(&pool->nbPendingJobs);
/* Post the job directly to the completion port */
PostQueuedCompletionStatus(ctx->completionPort,
PostQueuedCompletionStatus(pool->completionPort,
0, /* Bytes transferred not used */
(ULONG_PTR)job_function, /* Store function pointer in completionKey */
(LPOVERLAPPED)arg); /* Store argument in overlapped */
}
void TPool_jobsCompleted(TPool* ctx)
void TPool_jobsCompleted(TPool* pool)
{
if (!ctx) return;
WaitForSingleObject(ctx->allJobsCompleted, INFINITE);
if (!pool) return;
WaitForSingleObject(pool->allJobsCompleted, INFINITE);
}
#else