Subversion Repositories Code-Repo

Rev

Blame | Last modification | View Log | RSS feed

#include "threadpool.h"
#include "list.h"
#include <stdlib.h>
#include <semaphore.h>
#include <pthread.h>

struct future {
        sem_t finished;
        struct list_elem elem;
        thread_pool_callable_func_t callable;
        void * callable_data;
        void * result;
};

struct thread_pool {
        pthread_mutex_t mutex_lock;
        pthread_cond_t condition_var;
        struct list future_list;
        bool shutting_down;
        int thread_list_length;
        pthread_t * thread_list;
};

static void * thread_func(void * arg) {
        struct thread_pool *t_pool = arg;
        pthread_mutex_lock(&t_pool->mutex_lock);
        while(1) {
                // Get lock and wait for signal while there are no jobs queued
                while (list_size(&t_pool->future_list) == 0) {
                        pthread_cond_wait(&t_pool->condition_var,&t_pool->mutex_lock);
                        if (t_pool->shutting_down) {
                                pthread_mutex_unlock(&t_pool->mutex_lock);
                                return NULL;
                        }
                }
                // Retreive and remove the first future in the list
                struct list_elem *e = list_pop_front(&t_pool->future_list);
                pthread_mutex_unlock(&t_pool->mutex_lock);
                struct future *f_entry = list_entry(e, struct future, elem);

                // Store results of the function
                f_entry->result = f_entry->callable(f_entry->callable_data);
                // Signal completion of the future
                sem_post(&f_entry->finished);

                pthread_mutex_lock(&t_pool->mutex_lock);
                if (t_pool->shutting_down)
                        return NULL;
        }
}

/* Create a new thread pool with n threads. */
struct thread_pool * thread_pool_new(int nthreads) {
        // Allocate a new thread pool structure
        struct thread_pool *t_pool = malloc(sizeof(struct thread_pool));

        // Initialize the thread pool variables
        pthread_mutex_init(&t_pool->mutex_lock,NULL);
        pthread_cond_init(&t_pool->condition_var,NULL);
        list_init(&t_pool->future_list);
        t_pool->shutting_down = false;
        t_pool->thread_list_length = nthreads;

        // Allocate and start each thread in the thread pool
        t_pool->thread_list = malloc(nthreads * sizeof(pthread_t));
        int i;
        for (i = 0; i < nthreads; i++)
                pthread_create(t_pool->thread_list + i, NULL, thread_func, t_pool);

        return t_pool;
}

/* Shutdown this thread pool.  May or may not execute already queued tasks. */
void thread_pool_shutdown(struct thread_pool * t_pool) {
        // Set the shutdown flag and notify all worker threads
        pthread_mutex_lock(&t_pool->mutex_lock);
        t_pool->shutting_down = true;
        pthread_cond_broadcast(&t_pool->condition_var);
        pthread_mutex_unlock(&t_pool->mutex_lock);

        // Wait for all worker threads to join before returning
        int i;
        for (i = 0; i < t_pool->thread_list_length; i++)
                pthread_join(t_pool->thread_list[i],NULL);

        free(t_pool->thread_list);
        free(t_pool);
}

/* Submit a callable to thread pool and return future.
 * The returned future can be used in future_get() and future_free() */
struct future * thread_pool_submit(struct thread_pool * t_pool, 
        thread_pool_callable_func_t callable, void * callable_data) {
        // Allocate and initialize the new future
        struct future *f_entry = malloc(sizeof(struct future));
        sem_init(&f_entry->finished,0,0);
        f_entry->callable = callable;
        f_entry->callable_data = callable_data;

        // Get the lock on the thread pool and enqueue the future to the end of the work queue
        pthread_mutex_lock(&t_pool->mutex_lock);
        list_push_back(&t_pool->future_list,&f_entry->elem);
        // Notify one worker thread to process the future
        pthread_cond_signal(&t_pool->condition_var);
        pthread_mutex_unlock(&t_pool->mutex_lock);
        return f_entry;
}

/* Make sure that thread pool has completed executing this callable, then return result. */
void * future_get(struct future * f_entry) {
        sem_wait(&f_entry->finished);
        return f_entry->result;
}

/* Deallocate this future.  Must be called after future_get() */
void future_free(struct future * f_entry) {
        free(f_entry);
}