2015-07-26 12 views
5

Trong this câu hỏi tôi đã mô tả boost :: ASIOboost :: coroutine mô hình sử dụng gây ra tai nạn ngẫu nhiên của các ứng dụng của tôi và tôi công bố chiết xuất từ ​​mã của tôi và valgrindGDB đầu ra.Có gì sai với sự tăng này :: asio và boost :: mô hình sử dụng coroutine?

Để điều tra thêm sự cố, tôi đã tạo ra bằng chứng chứng minh khái niệm ứng dụng áp dụng cùng một mẫu. Tôi thấy rằng cùng một vấn đề nảy sinh trong chương trình nhỏ hơn mà tôi xuất bản ở đây.

Mã bắt đầu một vài chuỗi và tạo một nhóm kết nối với một vài kết nối giả (số do người dùng cung cấp). Đối số bổ sung là số nguyên không dấu có vai trò của các yêu cầu giả mạo. Việc triển khai giả của hàm sendRequest chỉ khởi động bộ đếm thời gian không đồng bộ cho số giây chờ đợi bằng số đầu vào và yileds từ hàm.

Ai đó có thể thấy sự cố với mã này và anh ấy có thể đề xuất một số sửa chữa cho nó không?

#include "asiocoroutineutils.h" 
#include "concurrentqueue.h" 

#include <iostream> 
#include <thread> 

#include <boost/lexical_cast.hpp> 

using namespace std; 
using namespace boost; 
using namespace utils; 

#define id this_thread::get_id() << ": " 

// --------------------------------------------------------------------------- 

/*! 
* \brief This is a fake Connection class 
*/ 
class Connection 
{ 
public: 
    Connection(unsigned connectionId) 
     : _id(connectionId) 
    { 
    } 

    unsigned getId() const 
    { 
     return _id; 
    } 

    void sendRequest(asio::io_service& ioService, 
        unsigned seconds, 
        AsioCoroutineJoinerProxy, 
        asio::yield_context yield) 
    { 
     cout << id << "Connection " << getId() 
      << " Start sending: " << seconds << endl; 

     // waiting on this timer is palceholder for any asynchronous operation 
     asio::steady_timer timer(ioService); 
     timer.expires_from_now(chrono::seconds(seconds)); 
     coroutineAsyncWait(timer, yield); 

     cout << id << "Connection " << getId() 
      << " Received response: " << seconds << endl; 
    } 

private: 
    unsigned _id; 
}; 

typedef std::unique_ptr<Connection> ConnectionPtr; 
typedef std::shared_ptr<asio::steady_timer> TimerPtr; 

// --------------------------------------------------------------------------- 

class ConnectionPool 
{ 
public: 
    ConnectionPool(size_t connectionsCount) 
    { 
     for(size_t i = 0; i < connectionsCount; ++i) 
     { 
      cout << "Creating connection: " << i << endl; 
      _connections.emplace_back(new Connection(i)); 
     } 
    } 

    ConnectionPtr getConnection(TimerPtr timer, 
           asio::yield_context& yield) 
    { 
     lock_guard<mutex> lock(_mutex); 

     while(_connections.empty()) 
     { 
      cout << id << "There is no free connection." << endl; 

      _timers.emplace_back(timer); 
      timer->expires_from_now(
       asio::steady_timer::clock_type::duration::max()); 

      _mutex.unlock(); 
      coroutineAsyncWait(*timer, yield); 
      _mutex.lock(); 

      cout << id << "Connection was freed." << endl; 
     } 

     cout << id << "Getting connection: " 
      << _connections.front()->getId() << endl; 

     ConnectionPtr connection = std::move(_connections.front()); 
     _connections.pop_front(); 
     return connection; 
    } 

    void addConnection(ConnectionPtr connection) 
    { 
     lock_guard<mutex> lock(_mutex); 

     cout << id << "Returning connection " << connection->getId() 
      << " to the pool." << endl; 

     _connections.emplace_back(std::move(connection)); 

     if(_timers.empty()) 
      return; 

     auto timer = _timers.back(); 
     _timers.pop_back(); 
     auto& ioService = timer->get_io_service(); 

     ioService.post([timer]() 
     { 
      cout << id << "Wake up waiting getConnection." << endl; 
      timer->cancel(); 
     }); 
    } 

private: 
    mutex _mutex; 
    deque<ConnectionPtr> _connections; 
    deque<TimerPtr> _timers; 
}; 

typedef unique_ptr<ConnectionPool> ConnectionPoolPtr; 

// --------------------------------------------------------------------------- 

class ScopedConnection 
{ 
public: 
    ScopedConnection(ConnectionPool& pool, 
        asio::io_service& ioService, 
        asio::yield_context& yield) 
     : _pool(pool) 
    { 
     auto timer = make_shared<asio::steady_timer>(ioService); 
     _connection = _pool.getConnection(timer, yield); 
    } 

    Connection& get() 
    { 
     return *_connection; 
    } 

    ~ScopedConnection() 
    { 
     _pool.addConnection(std::move(_connection)); 
    } 

private: 
    ConnectionPool& _pool; 
    ConnectionPtr _connection; 
}; 

// --------------------------------------------------------------------------- 

void sendRequest(asio::io_service& ioService, 
       ConnectionPool& pool, 
       unsigned seconds, 
       asio::yield_context yield) 
{ 
    cout << id << "Constructing request ..." << endl; 

    AsioCoroutineJoiner joiner(ioService); 

    ScopedConnection connection(pool, ioService, yield); 

    asio::spawn(ioService, bind(&Connection::sendRequest, 
           connection.get(), 
           std::ref(ioService), 
           seconds, 
           AsioCoroutineJoinerProxy(joiner), 
           placeholders::_1)); 

    joiner.join(yield); 

    cout << id << "Processing response ..." << endl; 
} 

// --------------------------------------------------------------------------- 

void threadFunc(ConnectionPool& pool, 
       ConcurrentQueue<unsigned>& requests) 
{ 
    try 
    { 
     asio::io_service ioService; 

     while(true) 
     { 
      unsigned request; 
      if(!requests.tryPop(request)) 
       break; 

      cout << id << "Scheduling request: " << request << endl; 

      asio::spawn(ioService, bind(sendRequest, 
             std::ref(ioService), 
             std::ref(pool), 
             request, 
             placeholders::_1)); 
     } 

     ioService.run(); 
    } 
    catch(const std::exception& e) 
    { 
     cerr << id << "Error: " << e.what() << endl; 
    } 
} 

// --------------------------------------------------------------------------- 

int main(int argc, char* argv[]) 
{ 
    if(argc < 3) 
    { 
     cout << "Usage: ./async_request poolSize threadsCount r0 r1 ..." 
      << endl; 
     return -1; 
    } 

    try 
    { 
     auto poolSize = lexical_cast<size_t>(argv[1]); 
     auto threadsCount = lexical_cast<size_t>(argv[2]); 

     ConcurrentQueue<unsigned> requests; 
     for(int i = 3; i < argc; ++i) 
     { 
      auto request = lexical_cast<unsigned>(argv[i]); 
      requests.tryPush(request); 
     } 

     ConnectionPoolPtr pool(new ConnectionPool(poolSize)); 

     vector<unique_ptr<thread>> threads; 
     for(size_t i = 0; i < threadsCount; ++i) 
     { 
      threads.emplace_back(
       new thread(threadFunc, std::ref(*pool), std::ref(requests))); 
     } 

     for_each(threads.begin(), threads.end(), mem_fn(&thread::join)); 
    } 
    catch(const std::exception& e) 
    { 
     cerr << "Error: " << e.what() << endl; 
    } 

    return 0; 
} 

Dưới đây là một số tiện ích helper sử dụng bởi các mã trên:

#pragma once 

#include <boost/asio/steady_timer.hpp> 
#include <boost/asio/spawn.hpp> 

namespace utils 
{ 

inline void coroutineAsyncWait(boost::asio::steady_timer& timer, 
           boost::asio::yield_context& yield) 
{ 
    boost::system::error_code ec; 
    timer.async_wait(yield[ec]); 
    if(ec && ec != boost::asio::error::operation_aborted) 
     throw std::runtime_error(ec.message()); 
} 

class AsioCoroutineJoiner 
{ 
public: 
    explicit AsioCoroutineJoiner(boost::asio::io_service& io) 
     : _timer(io), _count(0) {} 

    void join(boost::asio::yield_context yield) 
    { 
     assert(_count > 0); 
     _timer.expires_from_now(
      boost::asio::steady_timer::clock_type::duration::max()); 
     coroutineAsyncWait(_timer, yield); 
    } 

    void inc() 
    { 
     ++_count; 
    } 

    void dec() 
    { 
     assert(_count > 0); 
     --_count; 
     if(0 == _count) 
      _timer.cancel(); 
    } 

private: 
    boost::asio::steady_timer _timer; 
    std::size_t _count; 

}; // AsioCoroutineJoiner class 

class AsioCoroutineJoinerProxy 
{ 
public: 
    AsioCoroutineJoinerProxy(AsioCoroutineJoiner& joiner) 
     : _joiner(joiner) 
    { 
     _joiner.inc(); 
    } 

    AsioCoroutineJoinerProxy(const AsioCoroutineJoinerProxy& joinerProxy) 
     : _joiner(joinerProxy._joiner) 
    { 
     _joiner.inc(); 
    } 

    ~AsioCoroutineJoinerProxy() 
    { 
     _joiner.dec(); 
    } 

private: 
    AsioCoroutineJoiner& _joiner; 

}; // AsioCoroutineJoinerProxy class 

} // utils namespace 

Đối với đầy đủ các mã phần còn thiếu cuối cùng là ConcurrentQueue lớp. Nó quá dài để dán nó ở đây, nhưng nếu bạn muốn bạn có thể tìm thấy nó here.

Ví dụ sử dụng các ứng dụng là:

./connectionpooltest 3 3 5 7 8 1 0 9 2 4 3 6

nơi số đầu tiên 3 là kết nối giả đếm và lần thứ hai số 3 là số lượng các chủ đề đã sử dụng. Các số sau đó là các yêu cầu giả mạo.

Kết quả của valgrindGDB cũng giống như trong đề cập ở trên question.

Đã sử dụng phiên bản tăng1.57. Trình biên dịch là GCC 4.8.3. Hệ điều hành là CentOS Linux phát hành 7.1.1503

+5

'#define id this_thread :: get_id() <<": "' Bạn có nghiêm túc không? – erenon

+0

có thể trùng lặp của [Nguyên nhân gây ra sự cố ngẫu nhiên trong tăng :: coroutine?] (Http: // stackoverflow.com/questions/31610415/what-cause-a-random-crash-in-boostcoroutine) – PSIAlt

+0

Nhưng mã bây giờ đã hoàn thành (có vẻ như) @PSIAlt Tôi muốn cho nó một cơ hội như thế này – sehe

Trả lời

1

Dường như tất cả valgrind lỗi gây ra do BOOST_USE_VALGRIND vĩ mô không được định nghĩa là Tanner Sansbury điểm trong bình luận liên quan đến this câu hỏi. Dường như ngoại trừ chương trình này là chính xác.

Các vấn đề liên quan