0,0 → 1,118 |
#include "threadpool.h" |
#include "list.h" |
#include <stdlib.h> |
#include <semaphore.h> |
#include <pthread.h> |
|
struct future { |
sem_t finished; |
struct list_elem elem; |
thread_pool_callable_func_t callable; |
void * callable_data; |
void * result; |
}; |
|
struct thread_pool { |
pthread_mutex_t mutex_lock; |
pthread_cond_t condition_var; |
struct list future_list; |
bool shutting_down; |
int thread_list_length; |
pthread_t * thread_list; |
}; |
|
static void * thread_func(void * arg) { |
struct thread_pool *t_pool = arg; |
pthread_mutex_lock(&t_pool->mutex_lock); |
while(1) { |
// Get lock and wait for signal while there are no jobs queued |
while (list_size(&t_pool->future_list) == 0) { |
pthread_cond_wait(&t_pool->condition_var,&t_pool->mutex_lock); |
if (t_pool->shutting_down) { |
pthread_mutex_unlock(&t_pool->mutex_lock); |
return NULL; |
} |
} |
// Retreive and remove the first future in the list |
struct list_elem *e = list_pop_front(&t_pool->future_list); |
pthread_mutex_unlock(&t_pool->mutex_lock); |
struct future *f_entry = list_entry(e, struct future, elem); |
|
// Store results of the function |
f_entry->result = f_entry->callable(f_entry->callable_data); |
// Signal completion of the future |
sem_post(&f_entry->finished); |
|
pthread_mutex_lock(&t_pool->mutex_lock); |
if (t_pool->shutting_down) |
return NULL; |
} |
} |
|
/* Create a new thread pool with n threads. */ |
struct thread_pool * thread_pool_new(int nthreads) { |
// Allocate a new thread pool structure |
struct thread_pool *t_pool = malloc(sizeof(struct thread_pool)); |
|
// Initialize the thread pool variables |
pthread_mutex_init(&t_pool->mutex_lock,NULL); |
pthread_cond_init(&t_pool->condition_var,NULL); |
list_init(&t_pool->future_list); |
t_pool->shutting_down = false; |
t_pool->thread_list_length = nthreads; |
|
// Allocate and start each thread in the thread pool |
t_pool->thread_list = malloc(nthreads * sizeof(pthread_t)); |
int i; |
for (i = 0; i < nthreads; i++) |
pthread_create(t_pool->thread_list + i, NULL, thread_func, t_pool); |
|
return t_pool; |
} |
|
/* Shutdown this thread pool. May or may not execute already queued tasks. */ |
void thread_pool_shutdown(struct thread_pool * t_pool) { |
// Set the shutdown flag and notify all worker threads |
pthread_mutex_lock(&t_pool->mutex_lock); |
t_pool->shutting_down = true; |
pthread_cond_broadcast(&t_pool->condition_var); |
pthread_mutex_unlock(&t_pool->mutex_lock); |
|
// Wait for all worker threads to join before returning |
int i; |
for (i = 0; i < t_pool->thread_list_length; i++) |
pthread_join(t_pool->thread_list[i],NULL); |
|
free(t_pool->thread_list); |
free(t_pool); |
} |
|
/* Submit a callable to thread pool and return future. |
* The returned future can be used in future_get() and future_free() */ |
struct future * thread_pool_submit(struct thread_pool * t_pool, |
thread_pool_callable_func_t callable, void * callable_data) { |
// Allocate and initialize the new future |
struct future *f_entry = malloc(sizeof(struct future)); |
sem_init(&f_entry->finished,0,0); |
f_entry->callable = callable; |
f_entry->callable_data = callable_data; |
|
// Get the lock on the thread pool and enqueue the future to the end of the work queue |
pthread_mutex_lock(&t_pool->mutex_lock); |
list_push_back(&t_pool->future_list,&f_entry->elem); |
// Notify one worker thread to process the future |
pthread_cond_signal(&t_pool->condition_var); |
pthread_mutex_unlock(&t_pool->mutex_lock); |
return f_entry; |
} |
|
/* Make sure that thread pool has completed executing this callable, then return result. */ |
void * future_get(struct future * f_entry) { |
sem_wait(&f_entry->finished); |
return f_entry->result; |
} |
|
/* Deallocate this future. Must be called after future_get() */ |
void future_free(struct future * f_entry) { |
free(f_entry); |
} |