Blame | Last modification | View Log | Download | RSS feed
#include "threadpool_exec.h"#include "list.h"#include <stdlib.h>#include <semaphore.h>#include <pthread.h>struct future {struct list_elem elem;thread_pool_callable_func_t callable;void * callable_data;};struct thread_pool {pthread_mutex_t mutex_lock;pthread_cond_t condition_var;struct list future_list;bool shutting_down;int thread_list_length;pthread_t * thread_list;};static void * thread_func(void * arg) {struct thread_pool *t_pool = arg;pthread_mutex_lock(&t_pool->mutex_lock);while(1) {// Get lock and wait for signal while there are no jobs queuedwhile (list_size(&t_pool->future_list) == 0) {pthread_cond_wait(&t_pool->condition_var,&t_pool->mutex_lock);if (t_pool->shutting_down) {pthread_mutex_unlock(&t_pool->mutex_lock);return NULL;}}// Retreive and remove the first future in the liststruct list_elem *e = list_pop_front(&t_pool->future_list);pthread_mutex_unlock(&t_pool->mutex_lock);struct future *f_entry = list_entry(e, struct future, elem);f_entry->callable(f_entry->callable_data);free(f_entry);pthread_mutex_lock(&t_pool->mutex_lock);if (t_pool->shutting_down)return NULL;}}/* Create a new thread pool with n threads. */struct thread_pool * thread_pool_new(int nthreads) {// Allocate a new thread pool structurestruct thread_pool *t_pool = malloc(sizeof(struct thread_pool));// Initialize the thread pool variablespthread_mutex_init(&t_pool->mutex_lock,NULL);pthread_cond_init(&t_pool->condition_var,NULL);list_init(&t_pool->future_list);t_pool->shutting_down = false;t_pool->thread_list_length = nthreads;// Allocate and start each thread in the thread poolt_pool->thread_list = malloc(nthreads * sizeof(pthread_t));int i;for (i = 0; i < nthreads; i++)pthread_create(t_pool->thread_list + i, NULL, thread_func, t_pool);return t_pool;}/* Shutdown this thread pool. May or may not execute already queued tasks. */void thread_pool_shutdown(struct thread_pool * t_pool) {// Set the shutdown flag and notify all worker threadspthread_mutex_lock(&t_pool->mutex_lock);t_pool->shutting_down = true;pthread_cond_broadcast(&t_pool->condition_var);pthread_mutex_unlock(&t_pool->mutex_lock);// Wait for all worker threads to join before returningint i;for (i = 0; i < t_pool->thread_list_length; i++)pthread_join(t_pool->thread_list[i],NULL);free(t_pool->thread_list);free(t_pool);}/* Submit a callable to thread pool and return future.* The returned future can be used in future_get() and future_free() */void thread_pool_submit(struct thread_pool * t_pool,thread_pool_callable_func_t callable, void * callable_data) {// Allocate and initialize the new futurestruct future *f_entry = malloc(sizeof(struct future));f_entry->callable = callable;f_entry->callable_data = callable_data;// Get the lock on the thread pool and enqueue the future to the end of the work queuepthread_mutex_lock(&t_pool->mutex_lock);list_push_back(&t_pool->future_list,&f_entry->elem);// Notify one worker thread to process the futurepthread_cond_signal(&t_pool->condition_var);pthread_mutex_unlock(&t_pool->mutex_lock);}