/* threadpool.h - part of lz4 project Copyright (C) Yann Collet 2023 GPL v2 License This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. You can contact the author at : - LZ4 source repository : https://github.com/lz4/lz4 - LZ4 public forum : https://groups.google.com/forum/#!forum/lz4c */ /* ====== Dependencies ======= */ #include #include "lz4conf.h" /* LZ4IO_MULTITHREAD */ #include "threadpool.h" /* ====== Compiler specifics ====== */ #if defined(_MSC_VER) # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */ #endif #if !LZ4IO_MULTITHREAD /* ===================================================== */ /* Backup implementation with no multi-threading support */ /* ===================================================== */ /* Non-zero size, to ensure g_poolCtx != NULL */ struct TPool_s { int dummy; }; static TPool g_poolCtx; TPool* TPool_create(int numThreads, int queueSize) { (void)numThreads; (void)queueSize; return &g_poolCtx; } void TPool_free(TPool* ctx) { assert(!ctx || ctx == &g_poolCtx); (void)ctx; } void TPool_submitJob(TPool* ctx, void (*job_function)(void*), void* arg) { (void)ctx; job_function(arg); } void TPool_jobsCompleted(TPool* ctx) { assert(!ctx || ctx == &g_poolCtx); (void)ctx; } #elif defined(_WIN32) /* Window TPool implementation using Completion Ports */ #include typedef struct TPool_s { HANDLE completionPort; HANDLE* workerThreads; int nbWorkers; int queueSize; LONG nbPendingJobs; HANDLE jobSlotAvail; /* For queue size control */ HANDLE allJobsCompleted; /* Event */ } TPool; void TPool_free(TPool* pool) { if (!pool) return; /* Signal workers to exit by posting NULL completions */ { int i; for (i = 0; i < pool->nbWorkers; i++) { PostQueuedCompletionStatus(pool->completionPort, 0, 0, NULL); } } /* Wait for worker threads to finish */ WaitForMultipleObjects(pool->nbWorkers, pool->workerThreads, TRUE, INFINITE); /* Close thread handles and completion port */ { int i; for (i = 0; i < pool->nbWorkers; i++) { CloseHandle(pool->workerThreads[i]); } } free(pool->workerThreads); CloseHandle(pool->completionPort); /* Clean up synchronization objects */ CloseHandle(pool->jobSlotAvail); CloseHandle(pool->allJobsCompleted); free(pool); } static DWORD WINAPI WorkerThread(LPVOID lpParameter) { TPool* const pool = (TPool*)lpParameter; DWORD bytesTransferred; ULONG_PTR completionKey; LPOVERLAPPED overlapped; while (GetQueuedCompletionStatus(pool->completionPort, &bytesTransferred, &completionKey, &overlapped, INFINITE)) { /* End signal */ if (overlapped == NULL) { break; } /* Execute job */ ((void (*)(void*))completionKey)(overlapped); /* Signal job completion */ if (InterlockedDecrement(&pool->nbPendingJobs) == 0) { SetEvent(pool->allJobsCompleted); } ReleaseSemaphore(pool->jobSlotAvail, 1, NULL); } return 0; } TPool* TPool_create(int nbWorkers, int queueSize) { TPool* pool; /* parameters sanitization */ if (nbWorkers <= 0 || queueSize <= 0) return NULL; if (nbWorkers>LZ4_NBWORKERS_MAX) nbWorkers=LZ4_NBWORKERS_MAX; pool = calloc(1, sizeof(TPool)); if (!pool) return NULL; /* Create completion port */ pool->completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nbWorkers); if (!pool->completionPort) { goto _cleanup; } /* Create worker threads */ pool->nbWorkers = nbWorkers; pool->workerThreads = (HANDLE*)malloc(sizeof(HANDLE) * nbWorkers); if (pool->workerThreads == NULL) { goto _cleanup; } { int i; for (i = 0; i < nbWorkers; i++) { pool->workerThreads[i] = CreateThread(NULL, 0, WorkerThread, pool, 0, NULL); if (!pool->workerThreads[i]) { goto _cleanup; } } } /* Initialize sync objects members */ pool->queueSize = queueSize; pool->nbPendingJobs = 0; pool->jobSlotAvail = CreateSemaphore(NULL, queueSize+nbWorkers, queueSize+nbWorkers, NULL); if (!pool->jobSlotAvail) { goto _cleanup; } pool->allJobsCompleted = CreateEvent(NULL, FALSE, FALSE, NULL); if (!pool->allJobsCompleted) { goto _cleanup; } return pool; _cleanup: TPool_free(pool); return NULL; } void TPool_submitJob(TPool* pool, void (*job_function)(void*), void* arg) { assert(pool); /* Atomically increment pending jobs and check for overflow */ WaitForSingleObject(pool->jobSlotAvail, INFINITE); ResetEvent(pool->allJobsCompleted); InterlockedIncrement(&pool->nbPendingJobs); /* Post the job directly to the completion port */ PostQueuedCompletionStatus(pool->completionPort, 0, /* Bytes transferred not used */ (ULONG_PTR)job_function, /* Store function pointer in completionKey */ (LPOVERLAPPED)arg); /* Store argument in overlapped */ } void TPool_jobsCompleted(TPool* pool) { assert(pool); WaitForSingleObject(pool->allJobsCompleted, INFINITE); } #else /* pthread availability assumed */ #include /* malloc, free */ #include /* pthread_* */ /* A job is just a function with an opaque argument */ typedef struct TPool_job_s { void (*job_function)(void*); void *arg; } TPool_job; struct TPool_s { pthread_t* threads; size_t threadCapacity; size_t threadLimit; /* The queue is a circular buffer */ TPool_job* queue; size_t queueHead; size_t queueTail; size_t queueSize; /* The number of threads working on jobs */ size_t numThreadsBusy; /* Indicates if the queue is empty */ int queueEmpty; /* The mutex protects the queue */ pthread_mutex_t queueMutex; /* Condition variable for pushers to wait on when the queue is full */ pthread_cond_t queuePushCond; /* Condition variables for poppers to wait on when the queue is empty */ pthread_cond_t queuePopCond; /* Indicates if the queue is shutting down */ int shutdown; }; static void TPool_shutdown(TPool* ctx); void TPool_free(TPool* ctx) { if (!ctx) { return; } TPool_shutdown(ctx); pthread_mutex_destroy(&ctx->queueMutex); pthread_cond_destroy(&ctx->queuePushCond); pthread_cond_destroy(&ctx->queuePopCond); free(ctx->queue); free(ctx->threads); free(ctx); } static void* TPool_thread(void* opaque); TPool* TPool_create(int nbThreads, int queueSize) { TPool* ctx; /* Check parameters */ if (nbThreads<1 || queueSize<1) { return NULL; } /* Allocate the context and zero initialize */ ctx = (TPool*)calloc(1, sizeof(TPool)); if (!ctx) { return NULL; } /* init pthread variables */ { int error = 0; error |= pthread_mutex_init(&ctx->queueMutex, NULL); error |= pthread_cond_init(&ctx->queuePushCond, NULL); error |= pthread_cond_init(&ctx->queuePopCond, NULL); if (error) { TPool_free(ctx); return NULL; } } /* Initialize the job queue. * It needs one extra space since one space is wasted to differentiate * empty and full queues. */ ctx->queueSize = (size_t)queueSize + 1; ctx->queue = (TPool_job*)calloc(1, ctx->queueSize * sizeof(TPool_job)); if (ctx->queue == NULL) { TPool_free(ctx); return NULL; } ctx->queueHead = 0; ctx->queueTail = 0; ctx->numThreadsBusy = 0; ctx->queueEmpty = 1; ctx->shutdown = 0; /* Allocate space for the thread handles */ ctx->threads = (pthread_t*)calloc(1, (size_t)nbThreads * sizeof(pthread_t)); if (ctx->threads == NULL) { TPool_free(ctx); return NULL; } ctx->threadCapacity = 0; /* Initialize the threads */ { int i; for (i = 0; i < nbThreads; ++i) { if (pthread_create(&ctx->threads[i], NULL, &TPool_thread, ctx)) { ctx->threadCapacity = (size_t)i; TPool_free(ctx); return NULL; } } ctx->threadCapacity = (size_t)nbThreads; ctx->threadLimit = (size_t)nbThreads; } return ctx; } /* TPool_thread() : * Work thread for the thread pool. * Waits for jobs and executes them. * @returns : NULL on failure else non-null. */ static void* TPool_thread(void* opaque) { TPool* const ctx = (TPool*)opaque; if (!ctx) { return NULL; } for (;;) { /* Lock the mutex and wait for a non-empty queue or until shutdown */ pthread_mutex_lock(&ctx->queueMutex); while ( ctx->queueEmpty || (ctx->numThreadsBusy >= ctx->threadLimit) ) { if (ctx->shutdown) { /* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit), * a few threads will be shutdown while !queueEmpty, * but enough threads will remain active to finish the queue */ pthread_mutex_unlock(&ctx->queueMutex); return opaque; } pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); } /* Pop a job off the queue */ { TPool_job const job = ctx->queue[ctx->queueHead]; ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; ctx->numThreadsBusy++; ctx->queueEmpty = (ctx->queueHead == ctx->queueTail); /* Unlock the mutex, signal a pusher, and run the job */ pthread_cond_signal(&ctx->queuePushCond); pthread_mutex_unlock(&ctx->queueMutex); job.job_function(job.arg); /* If the intended queue size was 0, signal after finishing job */ pthread_mutex_lock(&ctx->queueMutex); ctx->numThreadsBusy--; pthread_cond_signal(&ctx->queuePushCond); pthread_mutex_unlock(&ctx->queueMutex); } } /* for (;;) */ assert(0); /* Unreachable */ } /*! TPool_shutdown() : Shutdown the queue, wake any sleeping threads, and join all of the threads. */ static void TPool_shutdown(TPool* ctx) { /* Shut down the queue */ pthread_mutex_lock(&ctx->queueMutex); ctx->shutdown = 1; pthread_mutex_unlock(&ctx->queueMutex); /* Wake up sleeping threads */ pthread_cond_broadcast(&ctx->queuePushCond); pthread_cond_broadcast(&ctx->queuePopCond); /* Join all of the threads */ { size_t i; for (i = 0; i < ctx->threadCapacity; ++i) { pthread_join(ctx->threads[i], NULL); /* note : could fail */ } } } /*! TPool_jobsCompleted() : * Waits for all queued jobs to finish executing. */ void TPool_jobsCompleted(TPool* ctx){ pthread_mutex_lock(&ctx->queueMutex); while(!ctx->queueEmpty || ctx->numThreadsBusy > 0) { pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); } pthread_mutex_unlock(&ctx->queueMutex); } /** * Returns 1 if the queue is full and 0 otherwise. * * When queueSize is 1 (pool was created with an intended queueSize of 0), * then a queue is empty if there is a thread free _and_ no job is waiting. */ static int isQueueFull(TPool const* ctx) { if (ctx->queueSize > 1) { return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize); } else { return (ctx->numThreadsBusy == ctx->threadLimit) || !ctx->queueEmpty; } } static void TPool_submitJob_internal(TPool* ctx, void (*job_function)(void*), void *arg) { TPool_job job; job.job_function = job_function; job.arg = arg; assert(ctx != NULL); if (ctx->shutdown) return; ctx->queueEmpty = 0; ctx->queue[ctx->queueTail] = job; ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize; pthread_cond_signal(&ctx->queuePopCond); } void TPool_submitJob(TPool* ctx, void (*job_function)(void*), void* arg) { assert(ctx != NULL); pthread_mutex_lock(&ctx->queueMutex); /* Wait until there is space in the queue for the new job */ while (isQueueFull(ctx) && (!ctx->shutdown)) { pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); } TPool_submitJob_internal(ctx, job_function, arg); pthread_mutex_unlock(&ctx->queueMutex); } #endif /* LZ4IO_NO_MT */