Blame | Last modification | View Log | RSS feed
#include "threadpool.h"
#include "list.h"
#include <stdlib.h>
#include <semaphore.h>
#include <pthread.h>
struct future {
sem_t finished;
struct list_elem elem;
thread_pool_callable_func_t callable;
void * callable_data;
void * result;
};
struct thread_pool {
pthread_mutex_t mutex_lock;
pthread_cond_t condition_var;
struct list future_list;
bool shutting_down;
int thread_list_length;
pthread_t * thread_list;
};
static void * thread_func(void * arg) {
struct thread_pool *t_pool = arg;
pthread_mutex_lock(&t_pool->mutex_lock);
while(1) {
// Get lock and wait for signal while there are no jobs queued
while (list_size(&t_pool->future_list) == 0) {
pthread_cond_wait(&t_pool->condition_var,&t_pool->mutex_lock);
if (t_pool->shutting_down) {
pthread_mutex_unlock(&t_pool->mutex_lock);
return NULL;
}
}
// Retreive and remove the first future in the list
struct list_elem *e = list_pop_front(&t_pool->future_list);
pthread_mutex_unlock(&t_pool->mutex_lock);
struct future *f_entry = list_entry(e, struct future, elem);
// Store results of the function
f_entry->result = f_entry->callable(f_entry->callable_data);
// Signal completion of the future
sem_post(&f_entry->finished);
pthread_mutex_lock(&t_pool->mutex_lock);
if (t_pool->shutting_down)
return NULL;
}
}
/* Create a new thread pool with n threads. */
struct thread_pool * thread_pool_new(int nthreads) {
// Allocate a new thread pool structure
struct thread_pool *t_pool = malloc(sizeof(struct thread_pool));
// Initialize the thread pool variables
pthread_mutex_init(&t_pool->mutex_lock,NULL);
pthread_cond_init(&t_pool->condition_var,NULL);
list_init(&t_pool->future_list);
t_pool->shutting_down = false;
t_pool->thread_list_length = nthreads;
// Allocate and start each thread in the thread pool
t_pool->thread_list = malloc(nthreads * sizeof(pthread_t));
int i;
for (i = 0; i < nthreads; i++)
pthread_create(t_pool->thread_list + i, NULL, thread_func, t_pool);
return t_pool;
}
/* Shutdown this thread pool. May or may not execute already queued tasks. */
void thread_pool_shutdown(struct thread_pool * t_pool) {
// Set the shutdown flag and notify all worker threads
pthread_mutex_lock(&t_pool->mutex_lock);
t_pool->shutting_down = true;
pthread_cond_broadcast(&t_pool->condition_var);
pthread_mutex_unlock(&t_pool->mutex_lock);
// Wait for all worker threads to join before returning
int i;
for (i = 0; i < t_pool->thread_list_length; i++)
pthread_join(t_pool->thread_list[i],NULL);
free(t_pool->thread_list);
free(t_pool);
}
/* Submit a callable to thread pool and return future.
* The returned future can be used in future_get() and future_free() */
struct future * thread_pool_submit(struct thread_pool * t_pool,
thread_pool_callable_func_t callable, void * callable_data) {
// Allocate and initialize the new future
struct future *f_entry = malloc(sizeof(struct future));
sem_init(&f_entry->finished,0,0);
f_entry->callable = callable;
f_entry->callable_data = callable_data;
// Get the lock on the thread pool and enqueue the future to the end of the work queue
pthread_mutex_lock(&t_pool->mutex_lock);
list_push_back(&t_pool->future_list,&f_entry->elem);
// Notify one worker thread to process the future
pthread_cond_signal(&t_pool->condition_var);
pthread_mutex_unlock(&t_pool->mutex_lock);
return f_entry;
}
/* Make sure that thread pool has completed executing this callable, then return result. */
void * future_get(struct future * f_entry) {
sem_wait(&f_entry->finished);
return f_entry->result;
}
/* Deallocate this future. Must be called after future_get() */
void future_free(struct future * f_entry) {
free(f_entry);
}