Blame | Last modification | View Log | Download | RSS feed
/** Parallel Quicksort.** Demo application that shows how one might use threadpools/futures* in an application.** Requires threadpool.c/threadpool.h** Written by Godmar Back gback@cs.vt.edu for CS3214 Fall 2010.*/#include <stdlib.h>#include <stdbool.h>#include <stdint.h>#include <stdio.h>#include <string.h>#include <unistd.h>#include <sys/time.h>#include <pthread.h>#include <assert.h>#include <getopt.h>#include "threadpool.h"typedef void (*sort_func)(int *, int);/* Return true if array 'a' is sorted. */static boolcheck_sorted(int a[], int n){int i;for (i = 0; i < n-1; i++)if (a[i] > a[i+1])return false;return true;}/* -------------------------------------------------------------* Built-in qsort.*/static int cmp_int(const void *a, const void *b){return *(int *)a - *(int *)b;}static void builtin_qsort(int *a, int N){qsort(a, N, sizeof(int), cmp_int);}/* -------------------------------------------------------------* Quicksort utility routines.*//* Swap two elements */static inline void swap(int *a, int *b){int tmp = *a;*a = *b;*b = tmp;}/* Partitioning using middle element as pivot */static intqsort_partition(int * array, int left, int right){int middle = left + (right-left)/2;// left <=> middleswap(array + left, array + middle);int current, last = left;for (current = left + 1; current <= right; ++current) {if (array[current] < array[left]) {++last;// last <=> currentswap(array + last, array + current);}}// left <=> lastswap(array + left, array + last);return last;}/* -------------------------------------------------------------* Serial implementation.*/static voidqsort_internal_serial(int *array, int left, int right){if (left >= right)return;int split = qsort_partition(array, left, right);qsort_internal_serial(array, left, split - 1);qsort_internal_serial(array, split + 1, right);}static voidqsort_serial(int *array, int N){qsort_internal_serial(array, 0, N - 1);}/* -------------------------------------------------------------* Parallel implementation.*/static struct thread_pool * threadpool;/* qsort_task describes a unit of parallel work */struct qsort_task {int *array;int left, right, depth;struct future *future;struct qsort_task *next;} * tasks; /* list of outstanding tasks, protected by task_lock */static pthread_mutex_t task_lock = PTHREAD_MUTEX_INITIALIZER;/* Create a qsort_task instance */static struct qsort_task *create_qsort_task(int * array, int left, int right, int depth){struct qsort_task * task = calloc(sizeof(struct qsort_task), 1);task->array = array;task->left = left;task->right = right;task->depth = depth;return task;}/* Add a qsort_task instance to list of outstanding tasks */static voidadd_qsort_task(struct qsort_task * task){pthread_mutex_lock(&task_lock);assert(task->future);task->next = tasks;tasks = task;pthread_mutex_unlock(&task_lock);}#define MAX_SEGMENTS 1024static int qsegment_size [MAX_SEGMENTS];static int qsegment_n = 0;/* Wait for all outstanding tasks. */static voidwait_for_tasks (){qsegment_n = 0;for (;;) {struct qsort_task * head = NULL;pthread_mutex_lock(&task_lock);head = tasks;if (head)tasks = head->next;pthread_mutex_unlock(&task_lock);if (!head)break;intptr_t size = (intptr_t) future_get(head->future);future_free(head->future);if (qsegment_n < MAX_SEGMENTS)qsegment_size[qsegment_n++] = size;free(head);}}static voidreport_stats(){builtin_qsort(qsegment_size, qsegment_n);int i;printf("Processed %d segments of sizes: ", qsegment_n);for (i = 0; i < qsegment_n; i++)printf("%d ", qsegment_size[i]);printf("\n");}/* Parallel qsort - returns size of segment sorted */static intqsort_internal_parallel(struct qsort_task * s){int * array = s->array;int left = s->left;int right = s->right;int depth = s->depth;if (left >= right)return 0;int split = qsort_partition(array, left, right);if (depth < 1) {qsort_internal_serial(array, left, split - 1);qsort_internal_serial(array, split + 1, right);} else {struct qsort_task * qleft = create_qsort_task(array, left, split-1, depth-1);qleft->future = thread_pool_submit(threadpool,(thread_pool_callable_func_t) qsort_internal_parallel,qleft);add_qsort_task(qleft);struct qsort_task * qright = create_qsort_task(array, split+1, right, depth-1);qright->future = thread_pool_submit(threadpool,(thread_pool_callable_func_t) qsort_internal_parallel,qright);add_qsort_task(qright);}return right - left;}// maximum depth to which each recursive call is executed in parallelstatic int depth = 3;static voidqsort_parallel(int *array, int N){struct qsort_task * top = create_qsort_task(array, 0, N-1, depth);qsort_internal_parallel(top);wait_for_tasks();free(top);}/** Benchmark one run of sort_func sorter*/static voidbenchmark(const char *benchmark_name, sort_func sorter, int *a0, int N){struct timeval start, end, diff;int *a = malloc(N * sizeof(int));memcpy(a, a0, N * sizeof(int));gettimeofday(&start, NULL);sorter(a, N);gettimeofday(&end, NULL);if (!check_sorted(a, N)) {fprintf(stderr, "Sort failed\n");abort();}timersub(&end, &start, &diff);printf("%-20s took %.3f sec.\n", benchmark_name, diff.tv_sec + diff.tv_usec / 1.0e6);free(a);}static voidusage(char *av0, int depth, int nthreads){fprintf(stderr, "Usage: %s [-d <n>] [-n <n>] [-b] [-q] [-s <n>] <N>\n"" -d parallel recursion depth, default %d\n"" -n number of threads in pool, default %d\n"" -b run built-in qsort\n"" -s specify srand() seed\n"" -q don't run serial qsort\n", av0, depth, nthreads);exit(0);}intmain(int ac, char *av[]){int nthreads = 4;int c;bool run_builtin_qsort = false;bool run_serial_qsort = true;while ((c = getopt(ac, av, "d:n:bhs:q")) != EOF) {switch (c) {case 'd':depth = atoi(optarg);break;case 'n':nthreads = atoi(optarg);break;case 's':srand(atoi(optarg));break;case 'b':run_builtin_qsort = true;break;case 'q':run_serial_qsort = false;break;case 'h':usage(av[0], depth, nthreads);}}if (optind == ac)usage(av[0], depth, nthreads);int N = atoi(av[optind]);int i, * a0 = malloc(N * sizeof(int));for (i = 0; i < N; i++)a0[i] = random();if (run_builtin_qsort)benchmark("Built-in qsort", builtin_qsort, a0, N);if (run_serial_qsort)benchmark("qsort serial", qsort_serial, a0, N);threadpool = thread_pool_new(nthreads);printf("Using %d threads, recursive parallel depth=%d\n", nthreads, depth);sleep(1);benchmark("qsort parallel", qsort_parallel, a0, N);report_stats();thread_pool_shutdown(threadpool);return 0;}