Blame | Last modification | View Log | RSS feed
/*
* Thread pool test program.
*
* This program creates a thread pool with a configurable number
* of threads and then submits a configurable number of tasks.
* Each task reports which thread executed it.
*
* An associated Python script (check.py) parses the output and
* checks that 'nthreads' tasks are executing in parallel.
*
* This program makes idealizing and simplifying assumptions.
*
* First, it assumes that there are enough physical CPUs available
* so that in fact 'nthreads' threads can execute. This may not
* be true on a heavily loaded machine; test results will not be
* reliable under these circumstances.
*
* Second, the specification does not require that all threads
* have reached the point where they wait on the pool's condition
* variable. To increase the likelihood that this is the case,
* the test script pauses for 1 second before submitting tasks.
*
* Written by G. Back for CS3214 Spring 2010.
*/
#include <assert.h>
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <time.h>
#include "threadpool.h"
// helper function to count number of threads in this process
static int count_number_of_threads(void);
/* Data to be passed to callable. */
struct callable_data {
int number;
};
/*
* A callable.
*
* Returns a string that reports its number, which thread
* executes the task, and the start and end time of the
* task's execution.
*/
static void *
callable_task(struct callable_data * callable)
{
char buf[1024];
struct timeval start, end;
assert(gettimeofday(&start, NULL) == 0);
struct timespec t = { .tv_sec = 0, .tv_nsec = 5000000 };
assert(nanosleep(&t, NULL) == 0);
assert(gettimeofday(&end, NULL) == 0);
snprintf(buf, sizeof buf,
"Future #%d Thread #%p start=%ld.%ld end=%ld.%ld",
callable->number, (void *)pthread_self(),
start.tv_sec, start.tv_usec,
end.tv_sec, end.tv_usec);
return strdup(buf);
}
int
main(int ac, char *av[])
{
assert (ac > 2 || !!!"Usage: threadpool_test <nthreads> <ntasks>");
int nthreads = atoi(av[1]);
int ntasks = atoi(av[2]);
struct thread_pool * ex = thread_pool_new(nthreads);
const int N = ntasks;
struct future * f[N];
// sleep .5 seconds to give threads time to start up
struct timespec sleep_for = { .tv_sec = 0, .tv_nsec = 5*1e8 };
nanosleep(&sleep_for, NULL);
int threadsstarted = count_number_of_threads() - 1;
if (threadsstarted != nthreads) {
printf("The thread pool started %d instead of %d threads.\n",
threadsstarted, nthreads);
return EXIT_FAILURE;
}
// check for busy-waiting implementation
struct rusage usage;
int rc = getrusage(RUSAGE_SELF, &usage);
if (rc == -1)
perror("getrusage"), exit(-1);
if (usage.ru_utime.tv_sec > 0 || usage.ru_utime.tv_usec > 400000) {
printf("Thread pool is consuming excessive CPU time without running any jobs\n");
return EXIT_FAILURE;
}
// submit N tasks and record futures obtained in return
int i;
for (i = 0; i < N; i++) {
struct callable_data * callable_data = malloc(sizeof *callable_data);
callable_data->number = i;
f[i] = thread_pool_submit(ex,
(thread_pool_callable_func_t) callable_task,
callable_data);
}
printf("Main thread: %p\n", (void *)pthread_self());
// wait for each future
for (i = 0; i < N; i++) {
printf("%s\n", (char *) future_get(f[i]));
}
// check that no pool thread shut down prematurely
threadsstarted = count_number_of_threads() - 1;
if (threadsstarted != nthreads) {
printf("Only %d thread pool threads are left, should be %d threads.\n",
threadsstarted, nthreads);
return EXIT_FAILURE;
}
thread_pool_shutdown(ex);
// sleep .3 seconds to give threads time to shut down
// pthread_join() is not atomic with respect to the number
// of tasks reported by /proc/self/status
sleep_for.tv_nsec = 3*1e8;
nanosleep(&sleep_for, NULL);
int threadsleft = count_number_of_threads();
if (threadsleft != 1) {
printf("The thread pool did not correctly shut down"
", there are %d threads left.\n", threadsleft);
return EXIT_FAILURE;
}
printf("Done.\n");
return EXIT_SUCCESS;
}
/**
* Count number of threads by scanning /proc/self/status
* for the Threads: ... line
*/
static int
count_number_of_threads(void)
{
FILE * p = fopen("/proc/self/status", "r");
while (!feof(p)) {
int threadsleft;
char buf[128];
fgets(buf, sizeof buf, p);
if (sscanf(buf, "Threads: %d\n", &threadsleft) != 1)
continue;
fclose(p);
return threadsleft;
}
printf("Internal error, please send email to gback@cs.vt.edu\n");
abort();
}