2016-03-07 13 views
5

Ví dụ: giả sử tôi phân bổ cấu trúc bằng mới và viết con trỏ vào đầu ghi của một đường ống ẩn danh.Việc sử dụng một đường ống ẩn danh có giới thiệu một rào cản bộ nhớ cho truyền thông liên lạc không?

Nếu tôi đọc con trỏ từ đầu đọc tương ứng, tôi có đảm bảo xem nội dung 'đúng' trên cấu trúc không? Ngoài ra, các kết quả của socketpair() trên unix & tự kết nối qua loopback tcp trên các cửa sổ có cùng sự bảo đảm.

Bối cảnh là một thiết kế máy chủ mà centralizes sự kiện văn với select/epoll

+0

Bạn nên viết cấu trúc chứ không phải con trỏ đến nó. – EJP

+0

Bạn đang sử dụng [Windows anonymous pipes] (https://msdn.microsoft.com/en-us/library/windows/desktop/aa365152%28v=vs.85%29.aspx) hoặc [posix ones] (http) : //linux.die.net/man/2/pipe)? Bạn đã bao gồm cả hai thẻ. –

+0

Đường ống ẩn danh trên nền tảng Unix và tcp qua vòng lặp cho Windows. – Bwmat

Trả lời

4

Ví dụ, nói tôi phân bổ một struct với mới và viết con trỏ vào ghi cuối của một ống nặc danh.

Nếu tôi đọc con trỏ từ đầu đọc tương ứng, tôi có đảm bảo xem nội dung 'đúng' trên cấu trúc không?

No. Không có gì đảm bảo rằng CPU viết sẽ xóa sạch ghi bộ nhớ cache của nó và hiển thị cho CPU khác có thể đọc.

Cũng quan tâm là liệu kết quả của socketpair() trên unix & tự kết nối qua vòng lặp tcp trên các cửa sổ có cùng đảm bảo hay không.

số

+0

Để diễn giải David trước đó, bạn đang nói về cơ chế bộ nhớ cache nào và tại thời điểm nào nó sẽ thất bại? –

+1

Tôi không chắc David là ai. Dù sao, tôi không nói một chính sách CPU hoặc bộ nhớ cache cụ thể. OP gắn thẻ câu hỏi của họ với Windows và POSIX, có nghĩa là câu trả lời có lẽ nên xem xét nhiều hơn chỉ là một kiến ​​trúc cụ thể. Nó có thể thất bại nếu một luồng ghi cấu trúc vào bộ nhớ, sau đó ghi vào đường ống, và sau đó luồng khác đọc từ đường ống và sau đó đọc cấu trúc từ bộ nhớ. Vì cấu trúc trong bộ nhớ hoàn toàn tách biệt với dữ liệu đi qua đường ống, có thể nó không được đồng bộ hóa qua các lõi. – Cornstalks

+1

Đồng ý (mặc dù xem lưu ý của tôi về những cân nhắc thực tế). Nhưng lưu ý rằng không trả lời câu hỏi của tôi, vì câu trả lời của bạn cho biết "không đảm bảo rằng CPU viết sẽ xóa sạch bộ nhớ cache của nó và làm cho nó hiển thị với CPU khác" và những gì tôi không hiểu là bộ nhớ cache nào cơ chế bạn đang đề cập đến ở đây và trong kịch bản nào nó sẽ khiến mọi thứ thất bại? –

0

Tôi tin rằng, trường hợp của bạn có thể được giảm xuống còn 2 mô hình chủ đề này:

int data = 0; 
std::atomic<int*> atomicPtr{nullptr}; 
//... 

void thread1() 
{ 
    data = 42; 
    atomicPtr.store(&integer, std::memory_order_release); 
} 

void thread2() 
{ 
    int* ptr = nullptr; 
    while(!ptr) 
     ptr = atomicPtr.load(std::memory_order_consume); 
    assert(*ptr == 42); 
} 

Vì bạn có 2 quá trình bạn không thể sử dụng một biến nguyên tử trên chúng, nhưng kể từ khi bạn đã liệt kê bạn có thể bỏ qua atomicPtr.load(std::memory_order_consume) từ phần tiêu thụ vì, AFAIK, tất cả các kiến ​​trúc Windows đang chạy đảm bảo tải này chính xác mà không có bất kỳ rào cản nào ở phía tải. Trong thực tế, tôi nghĩ rằng không có nhiều kiến ​​trúc trên mạng mà hướng dẫn đó không phải là NO-OP (tôi chỉ nghe nói về DEC Alpha)

2

Thực tế, gọi write(), đó là cuộc gọi hệ thống, sẽ kết thúc khóa một hoặc nhiều cấu trúc dữ liệu trong hạt nhân, nên xử lý vấn đề sắp xếp lại. Ví dụ, POSIX yêu cầu lần đọc tiếp theo để xem dữ liệu được viết trước cuộc gọi của họ, điều này ngụ ý một khóa (hoặc một số loại có được/giải phóng) một mình.

Cho dù đó là một phần của thông số chính thức của các cuộc gọi, có lẽ nó không phải.

+0

1) Mặc dù vậy, một khóa được ngụ ý khác với một rào cản bộ nhớ. Ổ khóa hạt nhân không phải là rào cản bộ nhớ đầy đủ. 2) Sắp xếp lại không phải là vấn đề duy nhất; dữ liệu bộ nhớ cache cũ trong CPU cũng là một vấn đề. – Cornstalks

+0

@Cornstalks Tôi chỉ suy đoán ở đây, nhưng trong bối cảnh của IO tôi sẽ không nghĩ rằng RCU là những gì sẽ xảy ra, nhưng thay vì một cái gì đó ít nhất là dọc theo dòng mua/phát hành. WRT bộ nhớ cache, xem ghi chú trên POSIX. Nếu bạn đang nói về bộ nhớ cache CPU chính nó, tôi nghĩ rằng điều này được bao phủ bởi sự kết hợp bộ nhớ cache (xe buýt snooping hoặc bất cứ điều gì). –

+0

Có, tôi đang nói về chính bộ nhớ cache CPU. Nếu CPU sử dụng mô hình [tính nhất quán yếu] (https://en.wikipedia.org/wiki/Weak_consistency), thì có thể bộ nhớ của cấu trúc không được đồng bộ hóa giữa các bộ xử lý. Ngoài ra, nếu máy tính có nhiều CPU, thì [mọi thứ trở nên thực sự khó khăn] (https://www.kernel.org/doc/Documentation/memory-barriers.txt) (xem phần CÓ THỂ KHÔNG ĐƯỢC H ASS TRỢ VỀ BỘ NHỚ NHỚ ?) – Cornstalks

2

Một con trỏ chỉ là địa chỉ bộ nhớ, vì vậy miễn là bạn đang trên cùng một quá trình con trỏ sẽ hợp lệ trên chuỗi nhận và sẽ trỏ đến cùng cấu trúc. Nếu bạn đang trên các quy trình khác nhau, lúc tốt nhất bạn sẽ nhận được ngay lập tức một lỗi bộ nhớ, tồi tệ hơn bạn sẽ đọc (hoặc viết) vào một bộ nhớ ngẫu nhiên mà về cơ bản là hành vi không xác định.

Bạn có đọc đúng nội dung không? Không tốt hơn cũng không tồi tệ hơn nếu con trỏ của bạn ở trong một biến tĩnh được chia sẻ bởi cả hai chủ đề: bạn vẫn phải làm một số đồng bộ nếu bạn muốn nhất quán.

Liệu loại địa chỉ truyền có quan trọng giữa bộ nhớ tĩnh (được chia sẻ theo chủ đề), đường ống ẩn danh, cặp socket, vòng lặp tcp, v.v ... không? Không: tất cả các kênh đó chuyển byte, vì vậy nếu bạn vượt qua địa chỉ bộ nhớ, bạn sẽ nhận được địa chỉ bộ nhớ của mình. Những gì còn lại bạn sau đó là đồng bộ hóa, bởi vì ở đây bạn chỉ là chia sẻ một địa chỉ bộ nhớ.

Nếu bạn không sử dụng bất kỳ đồng bộ khác, bất cứ điều gì có thể xảy ra (đã làm tôi đã nói về hành vi undefined?):

  • đọc thread có thể truy cập vào bộ nhớ trước khi nó đã được viết bằng cách viết một dữ liệu cũ cho
  • nếu bạn quên để khai báo các thành viên struct như không ổn định, đọc thread có thể tiếp tục sử dụng các giá trị lưu trữ, đây một lần nữa nhận được dữ liệu cũ
  • dữ liệu chủ đề
  • đọc có thể đọc một phần văn bản có nghĩa là dữ liệu rời rạc
0

Tôi đồng ý với câu trả lời của Serge Ballesta. Trong cùng một quy trình, có thể gửi và nhận địa chỉ đối tượng qua đường ống ẩn danh.

Vì cuộc gọi hệ thống write được đảm bảo là nguyên tử khi kích thước tin nhắn dưới PIPE_BUF (thường là 4096 byte), nên các chủ đề đa nhà sản xuất sẽ không làm lộn xộn địa chỉ đối tượng của nhau (8 byte cho ứng dụng 64 bit).

Nói chuyện rẻ, đây là mã demo cho Linux (mã phòng thủ và trình xử lý lỗi được bỏ qua để đơn giản). Chỉ cần sao chép & dán vào pipe_ipc_demo.cc rồi biên dịch & chạy thử nghiệm.

#include <unistd.h> 
#include <string.h> 
#include <pthread.h> 
#include <string> 
#include <list> 

template<class T> class MPSCQ { // pipe based Multi Producer Single Consumer Queue 
public: 
     MPSCQ(); 
     ~MPSCQ(); 
     int producerPush(const T* t); 
     T* consumerPoll(double timeout = 1.0); 
private: 
     void _consumeFd(); 
     int _selectFdConsumer(double timeout); 
     T* _popFront(); 
private: 
     int _fdProducer; 
     int _fdConsumer; 
     char* _consumerBuf; 
     std::string* _partial; 
     std::list<T*>* _list; 
     static const int _PTR_SIZE; 
     static const int _CONSUMER_BUF_SIZE; 
}; 

template<class T> const int MPSCQ<T>::_PTR_SIZE = sizeof(void*); 
template<class T> const int MPSCQ<T>::_CONSUMER_BUF_SIZE = 1024; 

template<class T> MPSCQ<T>::MPSCQ() : 
     _fdProducer(-1), 
     _fdConsumer(-1) { 
     _consumerBuf = new char[_CONSUMER_BUF_SIZE]; 
     _partial = new std::string;  // for holding partial pointer address 
     _list = new std::list<T*>;  // unconsumed T* cache 
     int fd_[2]; 
     int r = pipe(fd_); 
     _fdConsumer = fd_[0]; 
     _fdProducer = fd_[1]; 
} 


template<class T> MPSCQ<T>::~MPSCQ() { /* omitted */ } 

template<class T> int MPSCQ<T>::producerPush(const T* t) { 
     return t == NULL ? 0 : write(_fdProducer, &t, _PTR_SIZE); 
} 

template<class T> T* MPSCQ<T>::consumerPoll(double timeout) { 
     T* t = _popFront(); 
     if (t != NULL) { 
       return t; 
     } 
     if (_selectFdConsumer(timeout) <= 0) { // timeout or error 
       return NULL; 
     } 
     _consumeFd(); 
     return _popFront(); 
} 

template<class T> void MPSCQ<T>::_consumeFd() { 
     memcpy(_consumerBuf, _partial->data(), _partial->length()); 
     ssize_t r = read(_fdConsumer, _consumerBuf, _CONSUMER_BUF_SIZE - _partial->length()); 
     if (r <= 0) { // EOF or error, error handler omitted 
       return; 
     } 
     const char* p = _consumerBuf; 
     int remaining_len_ = _partial->length() + r; 
     T* t; 
     while (remaining_len_ >= _PTR_SIZE) { 
       memcpy(&t, p, _PTR_SIZE); 
       _list->push_back(t); 
       remaining_len_ -= _PTR_SIZE; 
       p += _PTR_SIZE; 
     } 
     *_partial = std::string(p, remaining_len_); 
} 

template<class T> int MPSCQ<T>::_selectFdConsumer(double timeout) { 
     int r; 
     int nfds_ = _fdConsumer + 1; 
     fd_set readfds_; 
     struct timeval timeout_; 
     int64_t usec_ = timeout * 1000000.0; 
     while (true) { 
       timeout_.tv_sec = usec_/1000000; 
       timeout_.tv_usec = usec_ % 1000000; 
       FD_ZERO(&readfds_); 
       FD_SET(_fdConsumer, &readfds_); 
       r = select(nfds_, &readfds_, NULL, NULL, &timeout_); 
       if (r < 0 && errno == EINTR) { 
         continue; 
       } 
       return r; 
     } 
} 

template<class T> T* MPSCQ<T>::_popFront() { 
     if (!_list->empty()) { 
       T* t = _list->front(); 
       _list->pop_front(); 
       return t; 
     } else { 
       return NULL; 
     } 
} 

// = = = = = test code below = = = = = 

#define _LOOP_CNT 5000000 
#define _ONE_MILLION 1000000 
#define _PRODUCER_THREAD_NUM 2 

struct TestMsg {  // all public 
     int _threadId; 
     int _msgId; 
     int64_t _val; 
     TestMsg(int thread_id, int msg_id, int64_t val) : 
       _threadId(thread_id), 
       _msgId(msg_id), 
       _val(val) { }; 
}; 

static MPSCQ<TestMsg> _QUEUE; 
static int64_t _SUM = 0; 

void* functor_producer(void* arg) { 
     int my_thr_id_ = pthread_self(); 
     TestMsg* msg_; 
     for (int i = 0; i <= _LOOP_CNT; ++ i) { 
       if (i == _LOOP_CNT) { 
         msg_ = new TestMsg(my_thr_id_, i, -1); 
       } else { 
         msg_ = new TestMsg(my_thr_id_, i, i + 1); 
       } 
       _QUEUE.producerPush(msg_); 
     } 
     return NULL; 
} 


void* functor_consumer(void* arg) { 
     int msg_cnt_ = 0; 
     int stop_cnt_ = 0; 
     TestMsg* msg_; 
     while (true) { 
       if ((msg_ = _QUEUE.consumerPoll()) == NULL) { 
         continue; 
       } 
       int64_t val_ = msg_->_val; 
       delete msg_; 
       if (val_ <= 0) { 
         if ((++ stop_cnt_) >= _PRODUCER_THREAD_NUM) { 
           printf("All done, _SUM=%ld\n", _SUM); 
           break; 
         } 
       } else { 
         _SUM += val_; 
         if ((++ msg_cnt_) % _ONE_MILLION == 0) { 
           printf("msg_cnt_=%d, _SUM=%ld\n", msg_cnt_, _SUM); 
         } 
       } 
     } 
     return NULL; 
} 

int main(int argc, char* const* argv) { 
     pthread_t consumer_; 
     pthread_create(&consumer_, NULL, functor_consumer, NULL); 
     pthread_t producers_[_PRODUCER_THREAD_NUM]; 
     for (int i = 0; i < _PRODUCER_THREAD_NUM; ++ i) { 
       pthread_create(&producers_[i], NULL, functor_producer, NULL); 
     } 
     for (int i = 0; i < _PRODUCER_THREAD_NUM; ++ i) { 
       pthread_join(producers_[i], NULL); 
     } 
     pthread_join(consumer_, NULL); 
     return 0; 
} 

Và đây là kết quả xét nghiệm (2 * sum(1..5000000) == (1 + 5000000) * 5000000 == 25000005000000):

$ g++ -o pipe_ipc_demo pipe_ipc_demo.cc -lpthread 
$ ./pipe_ipc_demo ## output may vary except for the final _SUM 
msg_cnt_=1000000, _SUM=251244261289 
msg_cnt_=2000000, _SUM=1000708879236 
msg_cnt_=3000000, _SUM=2250159002500 
msg_cnt_=4000000, _SUM=4000785160225 
msg_cnt_=5000000, _SUM=6251640644676 
msg_cnt_=6000000, _SUM=9003167062500 
msg_cnt_=7000000, _SUM=12252615629881 
msg_cnt_=8000000, _SUM=16002380952516 
msg_cnt_=9000000, _SUM=20252025092401 
msg_cnt_=10000000, _SUM=25000005000000 
All done, _SUM=25000005000000 

Kỹ thuật này cho thấy đây được sử dụng trong các ứng dụng sản xuất của chúng tôi. Một cách sử dụng điển hình là luồng người tiêu dùng đóng vai trò như một người viết nhật ký và các luồng công nhân có thể viết các thông điệp tường trình gần như không đồng bộ. Có, gần như có nghĩa là đôi khi chuỗi nhà văn có thể bị chặn trong write() khi ống đầy, và đây là tính năng kiểm soát tắc nghẽn đáng tin cậy do hệ điều hành cung cấp.

1

Câu hỏi thú vị với, cho đến nay, chỉ có một câu trả lời chính xác từ Bắp chân.

Trong cùng một quy trình (đa luồng) không có đảm bảo nào vì con trỏ và dữ liệu theo các đường dẫn khác nhau để đến đích của chúng. Đảm bảo mua/phát tiềm ẩn không áp dụng vì dữ liệu cấu trúc không thể truy cập con trỏ thông qua bộ nhớ cache và chính thức bạn đang xử lý một cuộc đua dữ liệu. Tuy nhiên, hãy xem cách con trỏ và dữ liệu cấu trúc tự đến được chuỗi thứ hai (thông qua bộ nhớ cache đường ống và bộ nhớ tương ứng), có một cơ hội thực sự rằng cơ chế này sẽ không gây ra bất kỳ tác hại nào. Gửi con trỏ tới chuỗi ngang hàng lấy 3 cuộc gọi hệ thống (write() trong chuỗi gửi, select()read() trong chuỗi nhận) là (tương đối) tốn kém và theo thời gian giá trị con trỏ có sẵn trong chuỗi nhận, cấu trúc dữ liệu có thể đã đến từ lâu.

Lưu ý rằng đây chỉ là quan sát, cơ chế vẫn không chính xác.

Các vấn đề liên quan