2012-09-05 16 views
9

Tôi quen thuộc với đa luồng và tôi đã phát triển nhiều chương trình đa luồng trong Java và Objective-C thành công. Nhưng tôi không thể đạt được sau trong C sử dụng pthreads mà không sử dụng tham gia từ các chủ đề chính:Làm thế nào để đồng bộ hóa người quản lý/nhân viên pthreads mà không cần tham gia?

#include <stdio.h> 
#include <stdlib.h> 
#include <pthread.h> 

#define NUM_OF_THREADS 2 

struct thread_data { 
    int start; 
    int end; 
    int *arr; 
}; 

void print(int *ints, int n); 
void *processArray(void *args); 

int main(int argc, const char * argv[]) 
{ 
    int numOfInts = 10; 
    int *ints = malloc(numOfInts * sizeof(int)); 
    for (int i = 0; i < numOfInts; i++) { 
     ints[i] = i; 
    } 
    print(ints, numOfInts); // prints [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 

    pthread_t threads[NUM_OF_THREADS]; 
    struct thread_data thread_data[NUM_OF_THREADS]; 

    // these vars are used to calculate the index ranges for each thread 
    int remainingWork = numOfInts, amountOfWork; 
    int startRange, endRange = -1; 

    for (int i = 0; i < NUM_OF_THREADS; i++) { 

     amountOfWork = remainingWork/(NUM_OF_THREADS - i); 
     startRange = endRange + 1; 
     endRange = startRange + amountOfWork - 1; 

     thread_data[i].arr = ints; 
     thread_data[i].start = startRange; 
     thread_data[i].end = endRange; 

     pthread_create(&threads[i], NULL, processArray, (void *)&thread_data[i]); 

     remainingWork -= amountOfWork;  
    } 

    // 1. Signal to the threads to start working 


    // 2. Wait for them to finish 


    print(ints, numOfInts); // should print [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 

    free(ints); 
    return 0; 
} 

void *processArray(void *args) 
{ 
    struct thread_data *data = (struct thread_data *)args; 
    int *arr = data->arr; 
    int start = data->start; 
    int end = data->end; 

    // 1. Wait for a signal to start from the main thread 


    for (int i = start; i <= end; i++) { 
     arr[i] = arr[i] + 1; 
    } 

    // 2. Signal to the main thread that you're done 

    pthread_exit(NULL); 
} 

void print(int *ints, int n) 
{ 
    printf("["); 
    for (int i = 0; i < n; i++) { 
     printf("%d", ints[i]); 
     if (i+1 != n) 
      printf(", "); 
    } 
    printf("]\n"); 
} 

Tôi muốn đạt được những điều sau trong đoạn code trên:

Trong main():

  1. Tín hiệu cho chủ đề bắt đầu hoạt động.
  2. Đợi các chủ đề nền hoàn thành.

Trong processArray():

  1. Chờ cho một tín hiệu để bắt đầu từ các chủ đề chính
  2. tín hiệu đến các chủ đề chính mà bạn đã hoàn tất

tôi không muốn sử dụng phép nối trong chuỗi chính bởi vì trong the real application, luồng chính sẽ tạo chuỗi chỉ một lần và sau đó nó sẽ báo hiệu cho chuỗi nền hoạt động nhiều lần và tôi không thể để chủ đề chính tiếp tục trừ khi tất cả nền thr eads đã hoàn thành công việc. Trong processArray chức năng, tôi sẽ đặt một vòng lặp vô hạn như sau:

void *processArray(void *args) 
{ 
    struct thread_data *data = (struct thread_data *)args; 

    while (1) 
    { 
     // 1. Wait for a signal to start from the main thread 

     int *arr = data->arr; 
     int start = data->start; 
     int end = data->end;   

     // Process 
     for (int i = start; i <= end; i++) { 
      arr[i] = arr[i] + 1; 
     } 

     // 2. Signal to the main thread that you're done 

    } 

    pthread_exit(NULL); 
} 

Lưu ý rằng Tôi mới đến C và API posix, vì vậy cho tôi xin lỗi nếu tôi là thiếu một cái gì đó rõ ràng. Nhưng tôi thực sự đã thử nhiều thứ, bắt đầu từ việc sử dụng một mutex, và một loạt các semaphores, và một hỗn hợp của cả hai, nhưng không thành công. Tôi nghĩ rằng một biến điều kiện có thể giúp đỡ, nhưng tôi không thể hiểu nó có thể được sử dụng như thế nào.

Cảm ơn thời gian của bạn.

vấn đề giải quyết:

Cảm ơn các bạn rất nhiều! Cuối cùng tôi đã có thể làm điều này để làm việc một cách an toàn và không sử dụng một tham gia bằng cách làm theo lời khuyên của bạn. Mặc dù giải pháp có phần xấu xí, nhưng công việc đã hoàn thành và hiệu suất đạt được rất đáng giá (như bạn sẽ thấy bên dưới). Đối với bất cứ ai quan tâm, đây là một mô phỏng của các ứng dụng thực tế tôi đang làm việc trên, trong đó các chủ đề chính giữ cho công việc liên tục để các chủ đề nền:

#include <stdio.h> 
#include <stdlib.h> 
#include <pthread.h> 

#define NUM_OF_THREADS 5 

struct thread_data { 
    int id; 
    int start; 
    int end; 
    int *arr; 
}; 

pthread_mutex_t currentlyIdleMutex = PTHREAD_MUTEX_INITIALIZER; 
pthread_cond_t currentlyIdleCond = PTHREAD_COND_INITIALIZER; 
int currentlyIdle; 

pthread_mutex_t workReadyMutex = PTHREAD_MUTEX_INITIALIZER; 
pthread_cond_t workReadyCond = PTHREAD_COND_INITIALIZER; 
int workReady; 

pthread_cond_t currentlyWorkingCond = PTHREAD_COND_INITIALIZER; 
pthread_mutex_t currentlyWorkingMutex= PTHREAD_MUTEX_INITIALIZER; 
int currentlyWorking; 

pthread_mutex_t canFinishMutex = PTHREAD_MUTEX_INITIALIZER; 
pthread_cond_t canFinishCond = PTHREAD_COND_INITIALIZER; 
int canFinish; 

void print(int *ints, int n); 
void *processArray(void *args); 
int validateResult(int *ints, int num, int start); 

int main(int argc, const char * argv[]) 
{ 
    int numOfInts = 10; 
    int *ints = malloc(numOfInts * sizeof(int)); 
    for (int i = 0; i < numOfInts; i++) { 
     ints[i] = i; 
    } 
// print(ints, numOfInts); 

    pthread_t threads[NUM_OF_THREADS]; 
    struct thread_data thread_data[NUM_OF_THREADS]; 
    workReady = 0; 
    canFinish = 0; 
    currentlyIdle = 0; 
    currentlyWorking = 0; 

    // these vars are used to calculate the index ranges for each thread 
    int remainingWork = numOfInts, amountOfWork; 
    int startRange, endRange = -1; 
    // Create the threads and give each one its data struct. 
    for (int i = 0; i < NUM_OF_THREADS; i++) { 

     amountOfWork = remainingWork/(NUM_OF_THREADS - i); 
     startRange = endRange + 1; 
     endRange = startRange + amountOfWork - 1; 

     thread_data[i].id = i; 
     thread_data[i].arr = ints; 
     thread_data[i].start = startRange; 
     thread_data[i].end = endRange; 

     pthread_create(&threads[i], NULL, processArray, (void *)&thread_data[i]); 
     remainingWork -= amountOfWork; 
    } 

    int loops = 1111111; 
    int expectedStartingValue = ints[0] + loops; // used to validate the results 
    // The elements in ints[] should be incremented by 1 in each loop 
    while (loops-- != 0) { 

     // Make sure all of them are ready 
     pthread_mutex_lock(&currentlyIdleMutex); 
     while (currentlyIdle != NUM_OF_THREADS) { 
      pthread_cond_wait(&currentlyIdleCond, &currentlyIdleMutex); 
     } 
     pthread_mutex_unlock(&currentlyIdleMutex); 

     // All threads are now blocked; it's safe to not lock the mutex. 
     // Prevent them from finishing before authorized. 
     canFinish = 0; 
     // Reset the number of currentlyWorking threads 
     currentlyWorking = NUM_OF_THREADS; 

     // Signal to the threads to start 
     pthread_mutex_lock(&workReadyMutex); 
     workReady = 1; 
     pthread_cond_broadcast(&workReadyCond); 
     pthread_mutex_unlock(&workReadyMutex);  

     // Wait for them to finish 
     pthread_mutex_lock(&currentlyWorkingMutex); 
     while (currentlyWorking != 0) { 
      pthread_cond_wait(&currentlyWorkingCond, &currentlyWorkingMutex); 
     } 
     pthread_mutex_unlock(&currentlyWorkingMutex); 

     // The threads are now waiting for permission to finish 
     // Prevent them from starting again 
     workReady = 0; 
     currentlyIdle = 0; 

     // Allow them to finish 
     pthread_mutex_lock(&canFinishMutex); 
     canFinish = 1; 
     pthread_cond_broadcast(&canFinishCond); 
     pthread_mutex_unlock(&canFinishMutex); 
    } 

// print(ints, numOfInts); 

    if (validateResult(ints, numOfInts, expectedStartingValue)) { 
     printf("Result correct.\n"); 
    } 
    else { 
     printf("Result invalid.\n");  
    } 

    // clean up 
    for (int i = 0; i < NUM_OF_THREADS; i++) { 
     pthread_cancel(threads[i]); 
    } 
    free(ints); 

    return 0; 
} 

void *processArray(void *args) 
{ 
    struct thread_data *data = (struct thread_data *)args; 
    int *arr = data->arr; 
    int start = data->start; 
    int end = data->end; 

    while (1) { 

     // Set yourself as idle and signal to the main thread, when all threads are idle main will start 
     pthread_mutex_lock(&currentlyIdleMutex); 
     currentlyIdle++; 
     pthread_cond_signal(&currentlyIdleCond); 
     pthread_mutex_unlock(&currentlyIdleMutex); 

     // wait for work from main 
     pthread_mutex_lock(&workReadyMutex); 
     while (!workReady) { 
      pthread_cond_wait(&workReadyCond , &workReadyMutex); 
     } 
     pthread_mutex_unlock(&workReadyMutex); 

     // Do the work 
     for (int i = start; i <= end; i++) { 
      arr[i] = arr[i] + 1; 
     } 

     // mark yourself as finished and signal to main 
     pthread_mutex_lock(&currentlyWorkingMutex); 
     currentlyWorking--; 
     pthread_cond_signal(&currentlyWorkingCond); 
     pthread_mutex_unlock(&currentlyWorkingMutex); 

     // Wait for permission to finish 
     pthread_mutex_lock(&canFinishMutex); 
     while (!canFinish) { 
      pthread_cond_wait(&canFinishCond , &canFinishMutex); 
     } 
     pthread_mutex_unlock(&canFinishMutex); 
    } 

    pthread_exit(NULL); 
} 

int validateResult(int *ints, int n, int start) 
{ 
    int tmp = start; 
    for (int i = 0; i < n; i++, tmp++) { 
     if (ints[i] != tmp) { 
      return 0; 
     } 
    } 
    return 1; 
} 

void print(int *ints, int n) 
{ 
    printf("["); 
    for (int i = 0; i < n; i++) { 
     printf("%d", ints[i]); 
     if (i+1 != n) 
      printf(", "); 
    } 
    printf("]\n"); 
} 

Tôi không chắc chắn mặc dù nếu pthread_cancel là đủ cho dọn dẹp! Đối với rào cản, nó sẽ là một trợ giúp lớn nếu nó không bị giới hạn bởi một số hệ điều hành như đã đề cập bởi @Jeremy.

Benchmarks:

tôi muốn chắc chắn rằng những điều kiện này nhiều người đang không thực sự làm chậm lại các thuật toán, vì vậy tôi đã thiết lập tiêu chuẩn này để so sánh hai giải pháp:

#include <stdio.h> 
#include <stdlib.h> 
#include <pthread.h> 
#include <unistd.h> 
#include <sys/time.h> 
#include <sys/resource.h> 

#define NUM_OF_THREADS 5 
struct thread_data { 
    int start; 
    int end; 
    int *arr; 
}; 
pthread_mutex_t currentlyIdleMutex = PTHREAD_MUTEX_INITIALIZER; 
pthread_cond_t currentlyIdleCond = PTHREAD_COND_INITIALIZER; 
int currentlyIdle; 
pthread_mutex_t workReadyMutex = PTHREAD_MUTEX_INITIALIZER; 
pthread_cond_t workReadyCond = PTHREAD_COND_INITIALIZER; 
int workReady; 
pthread_cond_t currentlyWorkingCond = PTHREAD_COND_INITIALIZER; 
pthread_mutex_t currentlyWorkingMutex= PTHREAD_MUTEX_INITIALIZER; 
int currentlyWorking; 
pthread_mutex_t canFinishMutex = PTHREAD_MUTEX_INITIALIZER; 
pthread_cond_t canFinishCond = PTHREAD_COND_INITIALIZER; 
int canFinish; 

void *processArrayMutex(void *args); 
void *processArrayJoin(void *args); 
double doItWithMutex(pthread_t *threads, struct thread_data *data, int loops); 
double doItWithJoin(pthread_t *threads, struct thread_data *data, int loops); 

int main(int argc, const char * argv[]) 
{ 
    int numOfInts = 10; 
    int *join_ints = malloc(numOfInts * sizeof(int)); 
    int *mutex_ints = malloc(numOfInts * sizeof(int)); 
    for (int i = 0; i < numOfInts; i++) { 
     join_ints[i] = i; 
     mutex_ints[i] = i; 
    } 

    pthread_t join_threads[NUM_OF_THREADS]; 
    pthread_t mutex_threads[NUM_OF_THREADS]; 
    struct thread_data join_thread_data[NUM_OF_THREADS]; 
    struct thread_data mutex_thread_data[NUM_OF_THREADS]; 
    workReady = 0; 
    canFinish = 0; 
    currentlyIdle = 0; 
    currentlyWorking = 0; 

    int remainingWork = numOfInts, amountOfWork; 
    int startRange, endRange = -1; 
    for (int i = 0; i < NUM_OF_THREADS; i++) { 
     amountOfWork = remainingWork/(NUM_OF_THREADS - i); 
     startRange = endRange + 1; 
     endRange = startRange + amountOfWork - 1; 

     join_thread_data[i].arr = join_ints; 
     join_thread_data[i].start = startRange; 
     join_thread_data[i].end = endRange; 
     mutex_thread_data[i].arr = mutex_ints; 
     mutex_thread_data[i].start = startRange; 
     mutex_thread_data[i].end = endRange; 

     pthread_create(&mutex_threads[i], NULL, processArrayMutex, (void *)&mutex_thread_data[i]); 
     remainingWork -= amountOfWork; 
    } 

    int numOfBenchmarkTests = 100; 
    int numberOfLoopsPerTest= 1000; 

    double join_sum = 0.0, mutex_sum = 0.0; 
    for (int i = 0; i < numOfBenchmarkTests; i++) 
    { 
     double joinTime = doItWithJoin(join_threads, join_thread_data, numberOfLoopsPerTest); 
     double mutexTime= doItWithMutex(mutex_threads, mutex_thread_data, numberOfLoopsPerTest); 

     join_sum += joinTime; 
     mutex_sum+= mutexTime;  
    } 

    double join_avg = join_sum/numOfBenchmarkTests; 
    double mutex_avg= mutex_sum/numOfBenchmarkTests; 

    printf("Join average : %f\n", join_avg); 
    printf("Mutex average: %f\n", mutex_avg); 

    double diff = join_avg - mutex_avg; 
    if (diff > 0.0) 
     printf("Mutex is %.0f%% faster.\n", 100 * diff/join_avg); 
    else if (diff < 0.0) 
     printf("Join is %.0f%% faster.\n", 100 * diff/mutex_avg); 
    else 
     printf("Both have the same performance."); 

    free(join_ints); 
    free(mutex_ints); 

    return 0; 
} 

// From https://stackoverflow.com/a/2349941/408286 
double get_time() 
{ 
    struct timeval t; 
    struct timezone tzp; 
    gettimeofday(&t, &tzp); 
    return t.tv_sec + t.tv_usec*1e-6; 
} 

double doItWithMutex(pthread_t *threads, struct thread_data *data, int num_loops) 
{ 
    double start = get_time(); 

    int loops = num_loops; 
    while (loops-- != 0) { 
     // Make sure all of them are ready 
     pthread_mutex_lock(&currentlyIdleMutex); 
     while (currentlyIdle != NUM_OF_THREADS) { 
      pthread_cond_wait(&currentlyIdleCond, &currentlyIdleMutex); 
     } 
     pthread_mutex_unlock(&currentlyIdleMutex); 

     // All threads are now blocked; it's safe to not lock the mutex. 
     // Prevent them from finishing before authorized. 
     canFinish = 0; 
     // Reset the number of currentlyWorking threads 
     currentlyWorking = NUM_OF_THREADS; 

     // Signal to the threads to start 
     pthread_mutex_lock(&workReadyMutex); 
     workReady = 1; 
     pthread_cond_broadcast(&workReadyCond); 
     pthread_mutex_unlock(&workReadyMutex); 

     // Wait for them to finish 
     pthread_mutex_lock(&currentlyWorkingMutex); 
     while (currentlyWorking != 0) { 
      pthread_cond_wait(&currentlyWorkingCond, &currentlyWorkingMutex); 
     } 
     pthread_mutex_unlock(&currentlyWorkingMutex); 

     // The threads are now waiting for permission to finish 
     // Prevent them from starting again 
     workReady = 0; 
     currentlyIdle = 0; 

     // Allow them to finish 
     pthread_mutex_lock(&canFinishMutex); 
     canFinish = 1; 
     pthread_cond_broadcast(&canFinishCond); 
     pthread_mutex_unlock(&canFinishMutex); 
    } 

    return get_time() - start; 
} 

double doItWithJoin(pthread_t *threads, struct thread_data *data, int num_loops) 
{ 
    double start = get_time(); 

    int loops = num_loops; 
    while (loops-- != 0) { 
     // create them 
     for (int i = 0; i < NUM_OF_THREADS; i++) { 
      pthread_create(&threads[i], NULL, processArrayJoin, (void *)&data[i]); 
     } 
     // wait 
     for (int i = 0; i < NUM_OF_THREADS; i++) { 
      pthread_join(threads[i], NULL); 
     } 
    } 

    return get_time() - start; 
} 

void *processArrayMutex(void *args) 
{ 
    struct thread_data *data = (struct thread_data *)args; 
    int *arr = data->arr; 
    int start = data->start; 
    int end = data->end; 

    while (1) { 

     // Set yourself as idle and signal to the main thread, when all threads are idle main will start 
     pthread_mutex_lock(&currentlyIdleMutex); 
     currentlyIdle++; 
     pthread_cond_signal(&currentlyIdleCond); 
     pthread_mutex_unlock(&currentlyIdleMutex); 

     // wait for work from main 
     pthread_mutex_lock(&workReadyMutex); 
     while (!workReady) { 
      pthread_cond_wait(&workReadyCond , &workReadyMutex); 
     } 
     pthread_mutex_unlock(&workReadyMutex); 

     // Do the work 
     for (int i = start; i <= end; i++) { 
      arr[i] = arr[i] + 1; 
     } 

     // mark yourself as finished and signal to main 
     pthread_mutex_lock(&currentlyWorkingMutex); 
     currentlyWorking--; 
     pthread_cond_signal(&currentlyWorkingCond); 
     pthread_mutex_unlock(&currentlyWorkingMutex); 

     // Wait for permission to finish 
     pthread_mutex_lock(&canFinishMutex); 
     while (!canFinish) { 
      pthread_cond_wait(&canFinishCond , &canFinishMutex); 
     } 
     pthread_mutex_unlock(&canFinishMutex); 
    } 

    pthread_exit(NULL); 
} 

void *processArrayJoin(void *args) 
{ 
    struct thread_data *data = (struct thread_data *)args; 
    int *arr = data->arr; 
    int start = data->start; 
    int end = data->end; 

    // Do the work 
    for (int i = start; i <= end; i++) { 
     arr[i] = arr[i] + 1; 
    } 

    pthread_exit(NULL); 
} 

Và đầu ra là:

Join average : 0.153074 
Mutex average: 0.071588 
Mutex is 53% faster. 

Cảm ơn bạn một lần nữa.Tôi thực sự đánh giá cao sự giúp đỡ của bạn!

+0

Rất tiếc, không đọc kỹ câu hỏi của bạn. Bạn cần phải thực hiện một rào cản tham gia lại tại số bình luận 2. http://pcbec3.ihep.su/~miagkov/code/barrier.c – nhahtdh

+0

@nhahtdh Đừng lo lắng về điều đó. Tôi sẽ xem xét mã. Cảm ơn! – Motasim

+0

Michael Burr đề xuất một chức năng hiện có để thực hiện điều đó. (Tôi nghĩ rằng POSIX đã không xác định điều này, nhưng nó là có tất cả cùng). – nhahtdh

Trả lời

2

Bạn cần sử dụng kỹ thuật đồng bộ hóa khác với join, điều đó rõ ràng.

Rất tiếc, bạn có nhiều tùy chọn. Một là một "rào cản đồng bộ", mà về cơ bản là một điều mà mỗi thread mà đạt đến nó khối cho đến khi họ đã đạt được nó (bạn chỉ định số lượng các chủ đề trước). Nhìn vào pthread_barrier.

Cách khác là sử dụng cặp biến điều kiện/mutex (pthread_cond_*). Khi mỗi thread kết thúc nó sẽ có mutex, tăng số lượng, báo hiệu condvar. Các chủ đề chính chờ đợi trên condvar cho đến khi đếm đạt giá trị nó mong đợi. Mã này trông như thế này:

// thread has finished 
mutex_lock 
++global_count 
// optional optimization: only execute the next line when global_count >= N 
cond_signal 
mutex_unlock 

// main is waiting for N threads to finish 
mutex_lock 
while (global_count < N) { 
    cond_wait 
} 
mutex_unlock 

khác là sử dụng một semaphore mỗi thread - khi thread kết thúc nó đăng tải semaphore riêng của mình, và chờ đợi chủ đề chính trên mỗi semaphore lần lượt thay vì tham gia mỗi chủ đề lần lượt .

Bạn cũng cần đồng bộ hóa để khởi động lại các chủ đề cho công việc tiếp theo - đây có thể là đối tượng đồng bộ thứ hai cùng loại với mục đầu tiên, với chi tiết thay đổi cho thực tế là bạn có 1 poster và N bồi bàn thay vì hơn cách khác. Hoặc bạn có thể (cẩn thận) tái sử dụng cùng một đối tượng cho cả hai mục đích.

Nếu bạn đã thử những điều này và mã của bạn không hoạt động, có thể hỏi một câu hỏi cụ thể mới về mã bạn đã thử. Tất cả đều phù hợp với nhiệm vụ.

+0

Cảm ơn bạn Steve! Lời khuyên của bạn là có giá trị nhất :) – Motasim

1

Để yêu cầu tất cả các chuỗi bắt đầu hoạt động, nó có thể đơn giản như một biến số nguyên chung được khởi tạo bằng 0 và các luồng chỉ đợi cho đến khi nó khác 0. Bằng cách này bạn không cần vòng lặp while (1) trong hàm chuỗi.

Để chờ cho đến khi tất cả được thực hiện, pthread_join là đơn giản nhất vì nó sẽ thực sự chặn cho đến khi chuỗi kết nối được thực hiện. Nó cũng cần thiết để làm sạch công cụ hệ thống sau khi các chủ đề (như nếu không giá trị trả về từ các chủ đề sẽ được lưu trữ cho phần còn lại của chương trình). Vì bạn có một mảng của tất cả các chủ đề pthread_t, chỉ cần lặp lại từng chuỗi một. Như một phần của chương trình của bạn không làm bất cứ điều gì khác, và phải đợi cho đến khi tất cả các chủ đề được thực hiện, chỉ cần chờ đợi cho họ theo thứ tự là okay.

+0

Cảm ơn Joachim, cùng một câu hỏi cho Michael: Chủ đề chính có thể nối chuỗi nền ngay cả khi chuỗi nền không trả về hoặc gọi' pthread_exit'? Bởi vì như tôi đã viết, các chủ đề nền sẽ được trong một vòng lặp vô hạn. – Motasim

+0

@Mota Trong khi các luồng có thể được lặp trong khi chương trình đang chạy, chúng phải dừng lại đôi khi. Hoặc là bạn bảo họ dừng lại (sử dụng biến số nguyên đơn giản cũng đủ ở đây) hoặc bằng cách thoát ra khỏi luồng chính (tức là trở về từ 'main' hoặc gọi' exit'). Các 'pthread_join' chức năng _will_ khối cho đến khi chủ đề nó tham gia thoát. –

+0

Cảm ơn Joachim! Lời khuyên của bạn rất có giá trị. – Motasim

4

Có một số cơ chế đồng bộ hóa bạn có thể sử dụng (ví dụ: biến điều kiện). Tôi nghĩ đơn giản nhất là sử dụng pthread_barrier để đồng bộ hóa phần bắt đầu của chuỗi.

Giả sử rằng bạn muốn tất cả các chủ đề 'đồng bộ hóa' trên mỗi vòng lặp lặp lại, bạn chỉ có thể sử dụng lại rào cản. Nếu bạn cần một cái gì đó linh hoạt hơn, một biến điều kiện có thể thích hợp hơn.

Khi bạn quyết định thời gian cho chuỗi kết thúc (bạn chưa chỉ ra cách các luồng sẽ biết thoát ra khỏi vòng lặp vô hạn - một biến chia sẻ đơn giản có thể được sử dụng cho điều đó; loại nguyên tử hoặc được bảo vệ bằng một mutex), chuỗi main() nên sử dụng pthread_join() để chờ cho tất cả các chuỗi hoàn tất.

+0

Cảm ơn Michael, nhưng chủ đề chính có thể tham gia chuỗi nền ngay cả khi chuỗi nền không trả về hoặc gọi 'pthread_exit'? Bởi vì như tôi đã viết, các chủ đề nền sẽ được trong một vòng lặp vô hạn. – Motasim

+0

@Mota: xin lỗi - tôi đã bỏ lỡ phần đó của câu hỏi. Tôi đã cập nhật câu trả lời. Nếu nhu cầu của bạn đơn giản, rào cản có thể được tái sử dụng; nếu nhu cầu của bạn phức tạp hơn, bạn có thể muốn sử dụng biến điều kiện 'pthread_cond_t' (bạn có thể cần nhiều hơn một). –

+0

Về nguyên tắc, bạn sẽ không cần * nhiều hơn một biến điều kiện, bởi vì những người khác nhau có thể kiểm tra các điều kiện khác nhau tại các thời điểm khác nhau bằng cách sử dụng cùng một cặp mutex/condvar và bất cứ khi nào bất kỳ điều kiện nào thay đổi bạn phát sóng condvar. Để có hiệu quả và mã rõ ràng, bạn có thể * muốn * nhiều hơn một. –

1

Bạn đang làm việc ở mức trừu tượng sai. Vấn đề này đã được giải quyết rồi. Bạn đang reimplementing một hàng đợi công việc + thread hồ bơi.

OpenMP có vẻ phù hợp với vấn đề của bạn. Nó chuyển đổi các chú thích #pragma thành mã chuỗi. Tôi tin rằng nó sẽ cho phép bạn thể hiện những gì bạn đang cố gắng làm khá trực tiếp.

Sử dụng libdispatch, những gì bạn đang cố gắng thực hiện sẽ được biểu thị dưới dạng một hàng dispatch_apply nhắm mục tiêu một hàng đợi đồng thời. Điều này hoàn toàn chờ đợi cho tất cả các nhiệm vụ con để hoàn thành. Dưới OS X, nó được thực hiện bằng cách sử dụng một giao diện làm việc pthread không di động; dưới FreeBSD, tôi tin rằng nó quản lý một nhóm pthreads trực tiếp.

Nếu đó là tính di động liên quan đến việc thúc đẩy bạn sử dụng pthreads thô, không sử dụng các rào cản pthread. Các rào cản là một phần mở rộng bổ sung trên và trên các chủ đề POSIX cơ bản. OS X chẳng hạn không hỗ trợ nó. Để biết thêm thông tin, hãy xem POSIX. Chặn các sợi chính cho đến khi tất cả các chủ đề con đã hoàn thành có thể được thực hiện bằng cách sử dụng một số được bảo vệ bởi một biến điều kiện hoặc, thậm chí đơn giản hơn, sử dụng một đường ống và chặn đọc nơi số byte để đọc phù hợp với số lượng chủ đề . Mỗi thread ghi một byte vào công việc hoàn thành, sau đó ngủ cho đến khi nó nhận được công việc mới từ chủ đề chính. Các chủ đề chính unblocks một lần mỗi thread đã viết của nó "Tôi đã hoàn tất!" byte.

Việc chuyển công việc tới các chuỗi con có thể được thực hiện bằng cách sử dụng mutex bảo vệ bộ mô tả công việc và điều kiện để báo hiệu công việc mới. Bạn có thể sử dụng một mảng các mô tả công việc mà tất cả các luồng vẽ từ đó. Trên tín hiệu, mỗi người cố gắng lấy mutex. Khi lấy mutex, nó sẽ làm giảm bớt một số công việc, báo hiệu một lần nữa nếu hàng đợi là nonempty, và sau đó xử lý công việc của nó, sau đó nó sẽ báo hiệu hoàn thành chủ đề chính.

Bạn có thể sử dụng lại "hàng đợi công việc" này để bỏ chặn luồng chính bằng cách thêm kết quả, với chuỗi chính chờ cho đến khi độ dài hàng đợi phù hợp với số lượng chủ đề; cách tiếp cận đường ống chỉ sử dụng chặn read để thực hiện việc này cho bạn.

+0

Cảm ơn bạn Jeremy! Lời khuyên của bạn rất có giá trị! – Motasim

Các vấn đề liên quan