Subversion Repositories Code-Repo

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
141 Kevin 1
#include "threadpool.h"
2
#include "list.h"
3
#include <stdlib.h>
4
#include <semaphore.h>
5
#include <pthread.h>
6
 
7
struct future {
8
	sem_t finished;
9
	struct list_elem elem;
10
	thread_pool_callable_func_t callable;
11
	void * callable_data;
12
	void * result;
13
};
14
 
15
struct thread_pool {
16
	pthread_mutex_t mutex_lock;
17
	pthread_cond_t condition_var;
18
	struct list future_list;
19
	bool shutting_down;
20
	int thread_list_length;
21
	pthread_t * thread_list;
22
};
23
 
24
static void * thread_func(void * arg) {
25
	struct thread_pool *t_pool = arg;
26
	pthread_mutex_lock(&t_pool->mutex_lock);
27
	while(1) {
28
		// Get lock and wait for signal while there are no jobs queued
29
		while (list_size(&t_pool->future_list) == 0) {
30
			pthread_cond_wait(&t_pool->condition_var,&t_pool->mutex_lock);
31
			if (t_pool->shutting_down) {
32
				pthread_mutex_unlock(&t_pool->mutex_lock);
33
				return NULL;
34
			}
35
		}
36
		// Retreive and remove the first future in the list
37
		struct list_elem *e = list_pop_front(&t_pool->future_list);
38
		pthread_mutex_unlock(&t_pool->mutex_lock);
39
		struct future *f_entry = list_entry(e, struct future, elem);
40
 
41
		// Store results of the function
42
		f_entry->result = f_entry->callable(f_entry->callable_data);
43
		// Signal completion of the future
44
		sem_post(&f_entry->finished);
45
 
46
		pthread_mutex_lock(&t_pool->mutex_lock);
47
		if (t_pool->shutting_down)
48
			return NULL;
49
	}
50
}
51
 
52
/* Create a new thread pool with n threads. */
53
struct thread_pool * thread_pool_new(int nthreads) {
54
	// Allocate a new thread pool structure
55
	struct thread_pool *t_pool = malloc(sizeof(struct thread_pool));
56
 
57
	// Initialize the thread pool variables
58
	pthread_mutex_init(&t_pool->mutex_lock,NULL);
59
	pthread_cond_init(&t_pool->condition_var,NULL);
60
	list_init(&t_pool->future_list);
61
	t_pool->shutting_down = false;
62
	t_pool->thread_list_length = nthreads;
63
 
64
	// Allocate and start each thread in the thread pool
65
	t_pool->thread_list = malloc(nthreads * sizeof(pthread_t));
66
	int i;
67
	for (i = 0; i < nthreads; i++)
68
		pthread_create(t_pool->thread_list + i, NULL, thread_func, t_pool);
69
 
70
	return t_pool;
71
}
72
 
73
/* Shutdown this thread pool.  May or may not execute already queued tasks. */
74
void thread_pool_shutdown(struct thread_pool * t_pool) {
75
	// Set the shutdown flag and notify all worker threads
76
	pthread_mutex_lock(&t_pool->mutex_lock);
77
	t_pool->shutting_down = true;
78
	pthread_cond_broadcast(&t_pool->condition_var);
79
	pthread_mutex_unlock(&t_pool->mutex_lock);
80
 
81
	// Wait for all worker threads to join before returning
82
	int i;
83
	for (i = 0; i < t_pool->thread_list_length; i++)
84
		pthread_join(t_pool->thread_list[i],NULL);
85
 
86
	free(t_pool->thread_list);
87
	free(t_pool);
88
}
89
 
90
/* Submit a callable to thread pool and return future.
91
 * The returned future can be used in future_get() and future_free() */
92
struct future * thread_pool_submit(struct thread_pool * t_pool, 
93
	thread_pool_callable_func_t callable, void * callable_data) {
94
	// Allocate and initialize the new future
95
	struct future *f_entry = malloc(sizeof(struct future));
96
	sem_init(&f_entry->finished,0,0);
97
	f_entry->callable = callable;
98
	f_entry->callable_data = callable_data;
99
 
100
	// Get the lock on the thread pool and enqueue the future to the end of the work queue
101
	pthread_mutex_lock(&t_pool->mutex_lock);
102
	list_push_back(&t_pool->future_list,&f_entry->elem);
103
	// Notify one worker thread to process the future
104
	pthread_cond_signal(&t_pool->condition_var);
105
	pthread_mutex_unlock(&t_pool->mutex_lock);
106
	return f_entry;
107
}
108
 
109
/* Make sure that thread pool has completed executing this callable, then return result. */
110
void * future_get(struct future * f_entry) {
111
	sem_wait(&f_entry->finished);
112
	return f_entry->result;
113
}
114
 
115
/* Deallocate this future.  Must be called after future_get() */
116
void future_free(struct future * f_entry) {
117
	free(f_entry);
118
}