| 141 |
Kevin |
1 |
#include "threadpool.h"
|
|
|
2 |
#include "list.h"
|
|
|
3 |
#include <stdlib.h>
|
|
|
4 |
#include <semaphore.h>
|
|
|
5 |
#include <pthread.h>
|
|
|
6 |
|
|
|
7 |
struct future {
|
|
|
8 |
sem_t finished;
|
|
|
9 |
struct list_elem elem;
|
|
|
10 |
thread_pool_callable_func_t callable;
|
|
|
11 |
void * callable_data;
|
|
|
12 |
void * result;
|
|
|
13 |
};
|
|
|
14 |
|
|
|
15 |
struct thread_pool {
|
|
|
16 |
pthread_mutex_t mutex_lock;
|
|
|
17 |
pthread_cond_t condition_var;
|
|
|
18 |
struct list future_list;
|
|
|
19 |
bool shutting_down;
|
|
|
20 |
int thread_list_length;
|
|
|
21 |
pthread_t * thread_list;
|
|
|
22 |
};
|
|
|
23 |
|
|
|
24 |
static void * thread_func(void * arg) {
|
|
|
25 |
struct thread_pool *t_pool = arg;
|
|
|
26 |
pthread_mutex_lock(&t_pool->mutex_lock);
|
|
|
27 |
while(1) {
|
|
|
28 |
// Get lock and wait for signal while there are no jobs queued
|
|
|
29 |
while (list_size(&t_pool->future_list) == 0) {
|
|
|
30 |
pthread_cond_wait(&t_pool->condition_var,&t_pool->mutex_lock);
|
|
|
31 |
if (t_pool->shutting_down) {
|
|
|
32 |
pthread_mutex_unlock(&t_pool->mutex_lock);
|
|
|
33 |
return NULL;
|
|
|
34 |
}
|
|
|
35 |
}
|
|
|
36 |
// Retreive and remove the first future in the list
|
|
|
37 |
struct list_elem *e = list_pop_front(&t_pool->future_list);
|
|
|
38 |
pthread_mutex_unlock(&t_pool->mutex_lock);
|
|
|
39 |
struct future *f_entry = list_entry(e, struct future, elem);
|
|
|
40 |
|
|
|
41 |
// Store results of the function
|
|
|
42 |
f_entry->result = f_entry->callable(f_entry->callable_data);
|
|
|
43 |
// Signal completion of the future
|
|
|
44 |
sem_post(&f_entry->finished);
|
|
|
45 |
|
|
|
46 |
pthread_mutex_lock(&t_pool->mutex_lock);
|
|
|
47 |
if (t_pool->shutting_down)
|
|
|
48 |
return NULL;
|
|
|
49 |
}
|
|
|
50 |
}
|
|
|
51 |
|
|
|
52 |
/* Create a new thread pool with n threads. */
|
|
|
53 |
struct thread_pool * thread_pool_new(int nthreads) {
|
|
|
54 |
// Allocate a new thread pool structure
|
|
|
55 |
struct thread_pool *t_pool = malloc(sizeof(struct thread_pool));
|
|
|
56 |
|
|
|
57 |
// Initialize the thread pool variables
|
|
|
58 |
pthread_mutex_init(&t_pool->mutex_lock,NULL);
|
|
|
59 |
pthread_cond_init(&t_pool->condition_var,NULL);
|
|
|
60 |
list_init(&t_pool->future_list);
|
|
|
61 |
t_pool->shutting_down = false;
|
|
|
62 |
t_pool->thread_list_length = nthreads;
|
|
|
63 |
|
|
|
64 |
// Allocate and start each thread in the thread pool
|
|
|
65 |
t_pool->thread_list = malloc(nthreads * sizeof(pthread_t));
|
|
|
66 |
int i;
|
|
|
67 |
for (i = 0; i < nthreads; i++)
|
|
|
68 |
pthread_create(t_pool->thread_list + i, NULL, thread_func, t_pool);
|
|
|
69 |
|
|
|
70 |
return t_pool;
|
|
|
71 |
}
|
|
|
72 |
|
|
|
73 |
/* Shutdown this thread pool. May or may not execute already queued tasks. */
|
|
|
74 |
void thread_pool_shutdown(struct thread_pool * t_pool) {
|
|
|
75 |
// Set the shutdown flag and notify all worker threads
|
|
|
76 |
pthread_mutex_lock(&t_pool->mutex_lock);
|
|
|
77 |
t_pool->shutting_down = true;
|
|
|
78 |
pthread_cond_broadcast(&t_pool->condition_var);
|
|
|
79 |
pthread_mutex_unlock(&t_pool->mutex_lock);
|
|
|
80 |
|
|
|
81 |
// Wait for all worker threads to join before returning
|
|
|
82 |
int i;
|
|
|
83 |
for (i = 0; i < t_pool->thread_list_length; i++)
|
|
|
84 |
pthread_join(t_pool->thread_list[i],NULL);
|
|
|
85 |
|
|
|
86 |
free(t_pool->thread_list);
|
|
|
87 |
free(t_pool);
|
|
|
88 |
}
|
|
|
89 |
|
|
|
90 |
/* Submit a callable to thread pool and return future.
|
|
|
91 |
* The returned future can be used in future_get() and future_free() */
|
|
|
92 |
struct future * thread_pool_submit(struct thread_pool * t_pool,
|
|
|
93 |
thread_pool_callable_func_t callable, void * callable_data) {
|
|
|
94 |
// Allocate and initialize the new future
|
|
|
95 |
struct future *f_entry = malloc(sizeof(struct future));
|
|
|
96 |
sem_init(&f_entry->finished,0,0);
|
|
|
97 |
f_entry->callable = callable;
|
|
|
98 |
f_entry->callable_data = callable_data;
|
|
|
99 |
|
|
|
100 |
// Get the lock on the thread pool and enqueue the future to the end of the work queue
|
|
|
101 |
pthread_mutex_lock(&t_pool->mutex_lock);
|
|
|
102 |
list_push_back(&t_pool->future_list,&f_entry->elem);
|
|
|
103 |
// Notify one worker thread to process the future
|
|
|
104 |
pthread_cond_signal(&t_pool->condition_var);
|
|
|
105 |
pthread_mutex_unlock(&t_pool->mutex_lock);
|
|
|
106 |
return f_entry;
|
|
|
107 |
}
|
|
|
108 |
|
|
|
109 |
/* Make sure that thread pool has completed executing this callable, then return result. */
|
|
|
110 |
void * future_get(struct future * f_entry) {
|
|
|
111 |
sem_wait(&f_entry->finished);
|
|
|
112 |
return f_entry->result;
|
|
|
113 |
}
|
|
|
114 |
|
|
|
115 |
/* Deallocate this future. Must be called after future_get() */
|
|
|
116 |
void future_free(struct future * f_entry) {
|
|
|
117 |
free(f_entry);
|
|
|
118 |
}
|