Subversion Repositories Code-Repo

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
141 Kevin 1
/*
2
 * Thread pool test program.
3
 *
4
 * This program creates a thread pool with a configurable number
5
 * of threads and then submits a configurable number of tasks.
6
 * Each task reports which thread executed it.
7
 *
8
 * An associated Python script (check.py) parses the output and
9
 * checks that 'nthreads' tasks are executing in parallel.
10
 *
11
 * This program makes idealizing and simplifying assumptions.
12
 *
13
 * First, it assumes that there are enough physical CPUs available
14
 * so that in fact 'nthreads' threads can execute. This may not
15
 * be true on a heavily loaded machine; test results will not be 
16
 * reliable under these circumstances.
17
 *
18
 * Second, the specification does not require that all threads
19
 * have reached the point where they wait on the pool's condition
20
 * variable.  To increase the likelihood that this is the case,
21
 * the test script pauses for 1 second before submitting tasks.
22
 *
23
 * Written by G. Back for CS3214 Spring 2010.
24
 */
25
#include <assert.h>
26
#include <pthread.h>
27
#include <stdio.h>
28
#include <string.h>
29
#include <stdlib.h>
30
#include <unistd.h>
31
#include <sys/time.h>
32
#include <sys/resource.h>
33
#include <time.h>
34
 
35
#include "threadpool.h"
36
 
37
// helper function to count number of threads in this process
38
static int count_number_of_threads(void);
39
 
40
/* Data to be passed to callable. */
41
struct callable_data {
42
    int number;
43
};
44
 
45
/* 
46
 * A callable. 
47
 *
48
 * Returns a string that reports its number, which thread
49
 * executes the task, and the start and end time of the 
50
 * task's execution.
51
 */
52
static void *
53
callable_task(struct callable_data * callable)
54
{
55
    char buf[1024];
56
 
57
    struct timeval start, end;
58
    assert(gettimeofday(&start, NULL) == 0);
59
 
60
    struct timespec t = { .tv_sec = 0, .tv_nsec = 5000000 };
61
    assert(nanosleep(&t, NULL) == 0);
62
 
63
    assert(gettimeofday(&end, NULL) == 0);
64
 
65
    snprintf(buf, sizeof buf, 
66
            "Future #%d Thread #%p start=%ld.%ld end=%ld.%ld", 
67
             callable->number, (void *)pthread_self(),
68
            start.tv_sec, start.tv_usec,
69
            end.tv_sec, end.tv_usec);
70
    return strdup(buf);
71
}
72
 
73
int
74
main(int ac, char *av[])
75
{
76
    assert (ac > 2 || !!!"Usage: threadpool_test <nthreads> <ntasks>");
77
 
78
    int nthreads = atoi(av[1]);
79
    int ntasks = atoi(av[2]);
80
    struct thread_pool * ex = thread_pool_new(nthreads);
81
    const int N = ntasks;
82
    struct future * f[N];
83
 
84
    // sleep .5 seconds to give threads time to start up
85
    struct timespec sleep_for = { .tv_sec = 0, .tv_nsec = 5*1e8 };
86
    nanosleep(&sleep_for, NULL);
87
 
88
    int threadsstarted = count_number_of_threads() - 1;
89
    if (threadsstarted != nthreads) {
90
        printf("The thread pool started %d instead of %d threads.\n",
91
                threadsstarted, nthreads);
92
        return EXIT_FAILURE;
93
    }
94
 
95
    // check for busy-waiting implementation
96
    struct rusage usage;
97
    int rc = getrusage(RUSAGE_SELF, &usage);
98
    if (rc == -1)
99
        perror("getrusage"), exit(-1);
100
 
101
    if (usage.ru_utime.tv_sec > 0 || usage.ru_utime.tv_usec > 400000) {
102
        printf("Thread pool is consuming excessive CPU time without running any jobs\n");
103
        return EXIT_FAILURE;
104
    }
105
 
106
    // submit N tasks and record futures obtained in return
107
    int i;
108
    for (i = 0; i < N; i++) {
109
        struct callable_data * callable_data = malloc(sizeof *callable_data);
110
        callable_data->number = i;
111
        f[i] = thread_pool_submit(ex, 
112
                               (thread_pool_callable_func_t) callable_task, 
113
                               callable_data);
114
    }
115
    printf("Main thread: %p\n", (void *)pthread_self());
116
 
117
    // wait for each future
118
    for (i = 0; i < N; i++) {
119
        printf("%s\n", (char *) future_get(f[i]));
120
    }
121
 
122
    // check that no pool thread shut down prematurely
123
    threadsstarted = count_number_of_threads() - 1;
124
    if (threadsstarted != nthreads) {
125
        printf("Only %d thread pool threads are left, should be %d threads.\n",
126
                threadsstarted, nthreads);
127
        return EXIT_FAILURE;
128
    }
129
    thread_pool_shutdown(ex);
130
 
131
    // sleep .3 seconds to give threads time to shut down
132
    // pthread_join() is not atomic with respect to the number
133
    // of tasks reported by /proc/self/status
134
    sleep_for.tv_nsec = 3*1e8;
135
    nanosleep(&sleep_for, NULL);
136
 
137
    int threadsleft = count_number_of_threads();
138
    if (threadsleft != 1) {
139
        printf("The thread pool did not correctly shut down"
140
               ", there are %d threads left.\n", threadsleft);
141
        return EXIT_FAILURE;
142
    }
143
 
144
    printf("Done.\n");
145
    return EXIT_SUCCESS;
146
}
147
 
148
/**
149
 * Count number of threads by scanning /proc/self/status
150
 * for the Threads: ... line
151
 */
152
static int
153
count_number_of_threads(void)
154
{
155
    FILE * p = fopen("/proc/self/status", "r");
156
    while (!feof(p)) {
157
        int threadsleft;
158
        char buf[128];
159
        fgets(buf, sizeof buf, p);
160
        if (sscanf(buf, "Threads: %d\n", &threadsleft) != 1)
161
            continue;
162
 
163
        fclose(p);
164
        return threadsleft;
165
    }
166
    printf("Internal error, please send email to gback@cs.vt.edu\n");
167
    abort();
168
}