2017-10-16 15 views
13

Tôi cố gắng để thực hiện một container lockless thread-safe, tương tự như std :: vector, theo này https://software.intel.com/en-us/blogs/2008/07/24/tbbconcurrent_vector-secrets-of-memory-organizationThực hiện concurrent_vector theo blog của intel

Từ những gì tôi hiểu, để ngăn chặn tái phân bổ và hủy bỏ hiệu lực tất cả các vòng lặp trên tất cả các chủ đề, thay vì một mảng liền kề duy nhất, chúng thêm các khối tiếp giáp mới.
Mỗi khối họ thêm là có kích thước tăng quyền hạn là 2, vì vậy họ có thể sử dụng nhật ký (chỉ mục) để tìm phân khúc phù hợp, nơi một mục tại [index] được cho là. Từ những gì tôi thu thập, chúng có một mảng con trỏ tĩnh cho các phân đoạn, vì vậy chúng có thể truy cập chúng nhanh chóng, tuy nhiên chúng không biết số lượng phân đoạn người dùng muốn, do đó, chúng tạo ra một phân đoạn ban đầu và nếu số lượng các phân đoạn vượt quá số lượng hiện tại, chúng phân bổ một phân đoạn lớn và chuyển sang sử dụng phân đoạn đó.

Vấn đề là việc thêm phân đoạn mới không thể thực hiện được theo cách an toàn không khóa hoặc ít nhất là tôi chưa tìm ra cách. Tôi có thể tăng kích thước hiện tại một cách nguyên tử, nhưng chỉ vậy thôi.
Và cũng chuyển từ nhỏ sang mảng lớn các con trỏ phân đoạn liên quan đến một bản phân bổ lớn và bản sao bộ nhớ, vì vậy tôi không thể hiểu cách chúng đang thực hiện nó.

Họ có một số mã được đăng trực tuyến, nhưng tất cả các chức năng quan trọng là không có mã nguồn sẵn có, chúng nằm trong DLL khối xây dựng chuỗi của họ. Dưới đây là một số mã đó chứng tỏ vấn đề:

template<typename T> 
class concurrent_vector 
{ 
    private: 
     int size = 0; 
     int lastSegmentIndex = 0; 

     union 
     { 
      T* segmentsSmall[3]; 
      T** segmentsLarge; 
     }; 

     void switch_to_large() 
     { 
      //Bunch of allocations, creates a T* segmentsLarge[32] basically and reassigns all old entries into it 
     } 

    public: 
     concurrent_vector() 
     { 
      //The initial array is contiguous just for the sake of cache optimization 
      T* initialContiguousBlock = new T[2 + 4 + 8]; //2^1 + 2^2 + 2^3 
      segmentsSmall[0] = initialContiguousBlock; 
      segmentsSmall[1] = initialContiguousBlock + 2; 
      segmentsSmall[2] = initialContiguousBlock + 2 + 4; 
     } 

     void push_back(T& item) 
     { 
      if(size > 2 + 4 + 8) 
      { 
       switch_to_large(); //This is the problem part, there is no possible way to make this thread-safe without a mutex lock. I don't understand how Intel does it. It includes a bunch of allocations and memory copies. 
      } 

      InterlockedIncrement(&size); //Ok, so size is atomically increased 

      //afterwards adds the item to the appropriate slot in the appropriate segment 
     } 
}; 
+0

Tôi không thấy bất kỳ nơi nào trong tuyên bố bài viết này chứa vùng chứa này. Tôi tin rằng bạn có thể sử dụng có mutex cho hoạt động tăng trưởng chỉ miễn là bạn giữ dữ liệu khác tại chỗ. – R2RT

Trả lời

2

tôi sẽ không cố gắng để làm cho segmentsLargesegmentsSmall công đoàn. Có điều này lãng phí thêm một con trỏ. Sau đó, con trỏ, cho phép gọi nó chỉ segments ban đầu có thể trỏ đến phân đoạnSmall.

Mặt khác, các phương pháp khác luôn có thể sử dụng cùng một con trỏ làm cho chúng đơn giản hơn.

Và chuyển đổi từ nhỏ thành lớn có thể được thực hiện bằng cách so sánh trao đổi con trỏ.

Tôi không chắc chắn cách thức này có thể được thực hiện một cách an toàn với công đoàn.

Ý tưởng sẽ trông giống như thế này (lưu ý rằng tôi đã sử dụng C++ 11, thư viện Intel có trước, vì vậy họ có thể đã làm nó với nội tại nguyên tử của chúng). Điều này có thể bỏ sót một vài chi tiết mà tôi chắc chắn người Intel đã nghĩ nhiều hơn, vì vậy bạn có thể sẽ phải kiểm tra điều này chống lại việc triển khai tất cả các phương pháp khác.

#include <atomic> 
#include <array> 
#include <cstddef> 
#include <climits> 

template<typename T> 
class concurrent_vector 
{ 
private: 
    std::atomic<size_t> size; 
    std::atomic<T**> segments; 
    std::array<T*, 3> segmentsSmall; 
    unsigned lastSegmentIndex = 0; 

    void switch_to_large() 
    { 
    T** segmentsOld = segments; 
    if(segmentsOld == segmentsSmall.data()) { 
     // not yet switched 
     T** segmentsLarge = new T*[sizeof(size_t) * CHAR_BIT]; 
     // note that we leave the original segment allocations alone and just copy the pointers 
     std::copy(segmentsSmall.begin(), segmentsSmall.end(), segmentsLarge); 
     for(unsigned i = segmentsSmall.size(); i < numSegments; ++i) { 
     segmentsLarge = nullptr; 
     } 
     // now both the old and the new segments array are valid 
     if(segments.compare_exchange_strong(segmentsOld, segmentsLarge)) { 
     // success! 
     return; 
     } else { 
     // already switched, just clean up 
     delete[] segmentsLarge; 
     } 
    } 
    } 

public: 
    concurrent_vector() : size(0), segments(segmentsSmall.data()) 
    { 
    //The initial array is contiguous just for the sake of cache optimization 
    T* initialContiguousBlock = new T[2 + 4 + 8]; //2^1 + 2^2 + 2^3 
    segmentsSmall[0] = initialContiguousBlock; 
    segmentsSmall[1] = initialContiguousBlock + 2; 
    segmentsSmall[2] = initialContiguousBlock + 2 + 4; 
    } 

    void push_back(T& item) 
    { 
    if(size > 2 + 4 + 8) { 
     switch_to_large(); 
    } 
    // here we may have to allocate more segments atomically 
    ++size; 

    //afterwards adds the item to the appropriate slot in the appropriate segment 
    } 
}; 
+0

nhưng điều gì sẽ xảy ra nếu hai chủ đề cố gắng thêm một mục và trong khi một trong số chúng đi qua chức năng này, một số khác cũng đi vào nó và phân bổ lại, không biết rằng luồng đầu tiên đã phân bổ phân đoạnLớn tại thời điểm này –

+0

, luồng chậm hơn sẽ xóa các mục lớn của mình, điều này không có vấn đề gì, bởi vì không có luồng nào khác có thể truy cập nó, vì trao đổi so sánh nguyên tử của nó không thành công. – PaulR

Các vấn đề liên quan