2013-05-01 38 views
9

Tôi đã triển khai một số mô hình dựa trên mô hình Đối tượng Hoạt động. Nó thực hiện rất đơn giản. Tôi có Scheduler, ActivationList, Requests và Futures để nhận phản hồi. Yêu cầu của tôi là như thế:boost :: asio và Active Object

  • Tiếp cận đối tượng đang làm việc phải được tuần tự bằng cách thực hiện các phương pháp của nó trong thread riêng của mình (req chính và giả định của Active mẫu thiết kế Object )
  • Caller sẽ có thể xác định ưu tiên thực hiện yêu cầu. Nó có nghĩa là nếu có nhiều hơn 0 yêu cầu chờ đợi để thực hiện, chúng sẽ được sắp xếp theo mức ưu tiên được gán cho mỗi yêu cầu. Các yêu cầu có mức độ ưu tiên cao hơn sẽ được thực thi trước tiên, nếu có một số yêu cầu đang chờ trên ActivationList luôn và chúng sẽ có mức độ ưu tiên cao hơn yêu cầu đã cho, yêu cầu này sẽ không bao giờ được thực thi. chỉ định số lượng yêu cầu tối đa đang chờ xử lý trong danh sách (giới hạn mức sử dụng bộ nhớ)
  • Có thể vô hiệu tất cả các yêu cầu đang chờ xử lý
  • Yêu cầu có thể trả về giá trị (chặn người gọi) HOẶC chỉ được thực hiện mà không có giá trị trả lại nhưng người gọi sẽ bị chặn cho đến khi yêu cầu được xử lý HOẶC người gọi sẽ không bị chặn và không quan trọng nếu yêu cầu đã được xử lý hoặc không g
  • Ngay trước khi thực hiện yêu cầu, một số phương pháp bảo vệ sẽ được thực hiện để kiểm tra xem yêu cầu đã cho có được thực thi hay không. Nếu không - nó sẽ trả lại một số giá trị không xác định cho người gọi (trong thực hiện hiện tại của tôi nó là tăng :: không, bởi vì mỗi kiểu yêu cầu trả về là tăng :: tùy chọn)

OK bây giờ câu hỏi: Có thể sử dụng boost :: asio và đáp ứng tất cả các yêu cầu của tôi? Việc triển khai của tôi đang hoạt động nhưng tôi muốn sử dụng cái gì đó có thể được triển khai theo cách tốt hơn nhiều so với tôi đã làm điều này. Ngoài ra tôi muốn biết nó cho tương lai và không "tái tạo lại bánh xe" một lần nữa.

+0

tăng ASIO sẽ không chặn. Phần cuối của phần thứ hai của bạn cuối cùng được bao gồm trong câu lệnh cuối cùng. tất cả mọi thứ khác là hoàn toàn có thể trong thường xuyên c + + với đẩy ra, mặc dù, thừa nhận, dễ dàng hơn với nó. Có thể muốn kiểm tra vào serialization tăng là tốt, nếu bạn chưa sử dụng nó. – johnathon

+0

Tôi đã triển khai nó bằng cách sử dụng đồng bằng C++. Trên thực tế với một sự trợ giúp lớn của thúc đẩy thread và tăng chỉ số đa conatiner. Nhưng mục đích là không sử dụng ipmplementation của tôi và thay vì nó, hãy sử dụng boost :: asio. – user2301299

Trả lời

28

Boost.Asio có thể được sử dụng để bao gồm ý định của Active Object: thực hiện phương thức decouple từ lời gọi phương thức. Yêu cầu bổ sung sẽ cần phải được xử lý ở cấp độ cao hơn, nhưng nó không quá phức tạp khi sử dụng Boost.Asio kết hợp với các thư viện Boost khác.

Scheduler có thể sử dụng:

ActivationList có thể được thực hiện như:

  • Một Boost.MultiIndex để đạt được yêu cầu phương pháp ưu tiên cao nhất. Với vị trí được gợi ý insert(), thứ tự chèn được giữ nguyên cho yêu cầu có cùng mức độ ưu tiên.
  • std::multiset hoặc std::multimap có thể được sử dụng. Tuy nhiên, nó không được chỉ định trong C++ 03 theo thứ tự yêu cầu với cùng một khóa (ưu tiên).
  • Nếu Request không cần phương thức bảo vệ, thì std::priority_queue có thể được sử dụng.

Request có thể là một loại không xác định:

  • boost::functionboost::bind thể được sử dụng để cung cấp một kiểu tẩy xoá, trong khi ràng buộc với các loại callable mà không giới thiệu một hệ thống phân cấp Request.

Futures có thể sử dụng hỗ trợ của Boost.Thread Futures.

  • future.valid() sẽ trả về true nếu Request đã được thêm vào ActivationList.
  • future.wait() sẽ chặn kết quả chờ để có sẵn.
  • future.get() sẽ chặn kết quả chờ.
  • Nếu người gọi không làm gì với số future, thì người gọi sẽ không bị chặn.
  • Một lợi ích khác khi sử dụng tương lai Boost.Thread là ngoại lệ có nguồn gốc từ bên trong một Request sẽ được chuyển đến Future.

Đây là một ví dụ hoàn chỉnh tận dụng thư viện Boost khác nhau và phải đáp ứng các yêu cầu:

// Standard includes 
#include <algorithm> // std::find_if 
#include <iostream> 
#include <string> 

// 3rd party includes 
#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/function.hpp> 
#include <boost/make_shared.hpp> 
#include <boost/multi_index_container.hpp> 
#include <boost/multi_index/ordered_index.hpp> 
#include <boost/multi_index/member.hpp> 
#include <boost/shared_ptr.hpp> 
#include <boost/thread.hpp> 
#include <boost/utility/result_of.hpp> 

/// @brief scheduler that provides limits with prioritized jobs. 
template <typename Priority, 
      typename Compare = std::less<Priority> > 
class scheduler 
{ 
public: 
    typedef Priority priority_type; 
private: 

    /// @brief method_request is used to couple the guard and call 
    ///  functions for a given method. 
    struct method_request 
    { 
    typedef boost::function<bool()> ready_func_type; 
    typedef boost::function<void()> run_func_type; 

    template <typename ReadyFunctor, 
       typename RunFunctor> 
    method_request(ReadyFunctor ready, 
        RunFunctor run) 
     : ready(ready), 
     run(run) 
    {} 

    ready_func_type ready; 
    run_func_type run; 
    }; 

    /// @brief Pair type used to associate a request with its priority. 
    typedef std::pair<priority_type, 
        boost::shared_ptr<method_request> > pair_type; 

    static bool is_method_ready(const pair_type& pair) 
    { 
    return pair.second->ready(); 
    } 

public: 

    /// @brief Construct scheduler. 
    /// 
    /// @param max_threads Maximum amount of concurrent task. 
    /// @param max_request Maximum amount of request. 
    scheduler(std::size_t max_threads, 
      std::size_t max_request) 
    : work_(io_service_), 
     max_request_(max_request), 
     request_count_(0) 
    { 
    // Spawn threads, dedicating them to the io_service. 
    for (std::size_t i = 0; i < max_threads; ++i) 
     threads_.create_thread(
     boost::bind(&boost::asio::io_service::run, &io_service_)); 
    } 

    /// @brief Destructor. 
    ~scheduler() 
    { 
    // Release threads from the io_service. 
    io_service_.stop(); 
    // Cleanup. 
    threads_.join_all(); 
    } 

    /// @brief Insert a method request into the scheduler. 
    /// 
    /// @param priority Priority of job. 
    /// @param ready_func Invoked to check if method is ready to run. 
    /// @param run_func Invoked when ready to run. 
    /// 
    /// @return future associated with the method. 
    template <typename ReadyFunctor, 
      typename RunFunctor> 
    boost::unique_future<typename boost::result_of<RunFunctor()>::type> 
    insert(priority_type priority, 
     const ReadyFunctor& ready_func, 
     const RunFunctor& run_func) 
    { 
    typedef typename boost::result_of<RunFunctor()>::type result_type; 
    typedef boost::unique_future<result_type> future_type; 

    boost::unique_lock<mutex_type> lock(mutex_); 

    // If max request has been reached, then return an invalid future. 
    if (max_request_ && 
     (request_count_ == max_request_)) 
     return future_type(); 

    ++request_count_; 

    // Use a packaged task to handle populating promise and future. 
    typedef boost::packaged_task<result_type> task_type; 

    // Bind does not work with rvalue, and packaged_task is only moveable, 
    // so allocate a shared pointer. 
    boost::shared_ptr<task_type> task = 
     boost::make_shared<task_type>(run_func); 

    // Create method request. 
    boost::shared_ptr<method_request> request = 
     boost::make_shared<method_request>(
     ready_func, 
     boost::bind(&task_type::operator(), task)); 

    // Insert into priority. Hint to inserting as close to the end as 
    // possible to preserve insertion order for request with same priority. 
    activation_list_.insert(activation_list_.end(), 
          pair_type(priority, request)); 

    // There is now an outstanding request, so post to dispatch. 
    io_service_.post(boost::bind(&scheduler::dispatch, this)); 

    return task->get_future(); 
    } 

    /// @brief Insert a method request into the scheduler. 
    /// 
    /// @param ready_func Invoked to check if method is ready to run. 
    /// @param run_func Invoked when ready to run. 
    /// 
    /// @return future associated with the method. 
    template <typename ReadyFunctor, 
      typename RunFunctor> 
    boost::unique_future<typename boost::result_of<RunFunctor()>::type> 
    insert(const ReadyFunctor& ready_func, 
     const RunFunctor& run_func) 
    { 
    return insert(priority_type(), ready_func, run_func); 
    } 

    /// @brief Insert a method request into the scheduler. 
    /// 
    /// @param priority Priority of job. 
    /// @param run_func Invoked when ready to run. 
    /// 
    /// @return future associated with the method. 
    template <typename RunFunctor> 
    boost::unique_future<typename boost::result_of<RunFunctor()>::type> 
    insert(priority_type priority, 
     const RunFunctor& run_func) 
    { 
    return insert(priority, &always_ready, run_func); 
    } 

    /// @brief Insert a method request with default priority into the 
    ///  scheduler. 
    /// 
    /// @param run_func Invoked when ready to run. 
    /// 
    /// @param functor Job to run. 
    /// 
    /// @return future associated with the job. 
    template <typename RunFunc> 
    boost::unique_future<typename boost::result_of<RunFunc()>::type> 
    insert(const RunFunc& run_func) 
    { 
    return insert(&always_ready, run_func); 
    } 

    /// @brief Cancel all outstanding request. 
    void cancel() 
    { 
    boost::unique_lock<mutex_type> lock(mutex_); 
    activation_list_.clear(); 
    request_count_ = 0; 
    } 

private: 

    /// @brief Dispatch a request. 
    void dispatch() 
    { 
    // Get the current highest priority request ready to run from the queue. 
    boost::unique_lock<mutex_type> lock(mutex_); 
    if (activation_list_.empty()) return; 

    // Find the highest priority method ready to run. 
    typedef typename activation_list_type::iterator iterator; 
    iterator end = activation_list_.end(); 
    iterator result = std::find_if(
     activation_list_.begin(), end, &is_method_ready); 

    // If no methods are ready, then post into dispatch, as the 
    // method may have become ready. 
    if (end == result) 
    { 
     io_service_.post(boost::bind(&scheduler::dispatch, this)); 
     return; 
    } 

    // Take ownership of request. 
    boost::shared_ptr<method_request> method = result->second; 
    activation_list_.erase(result); 

    // Run method without mutex. 
    lock.unlock(); 
    method->run();  
    lock.lock(); 

    // Perform bookkeeping. 
    --request_count_; 
    } 

    static bool always_ready() { return true; } 

private: 

    /// @brief List of outstanding request. 
    typedef boost::multi_index_container< 
    pair_type, 
    boost::multi_index::indexed_by< 
     boost::multi_index::ordered_non_unique< 
     boost::multi_index::member<pair_type, 
            typename pair_type::first_type, 
            &pair_type::first>, 
     Compare 
     > 
    > 
    > activation_list_type; 
    activation_list_type activation_list_; 

    /// @brief Thread group managing threads servicing pool. 
    boost::thread_group threads_; 

    /// @brief io_service used to function as a thread pool. 
    boost::asio::io_service io_service_; 

    /// @brief Work is used to keep threads servicing io_service. 
    boost::asio::io_service::work work_; 

    /// @brief Maximum amount of request. 
    const std::size_t max_request_; 

    /// @brief Count of outstanding request. 
    std::size_t request_count_; 

    /// @brief Synchronize access to the activation list. 
    typedef boost::mutex mutex_type; 
    mutex_type mutex_; 
}; 

typedef scheduler<unsigned int, 
        std::greater<unsigned int> > high_priority_scheduler; 

/// @brief adder is a simple proxy that will delegate work to 
///  the scheduler. 
class adder 
{ 
public: 
    adder(high_priority_scheduler& scheduler) 
    : scheduler_(scheduler) 
    {} 

    /// @brief Add a and b with a priority. 
    /// 
    /// @return Return future result. 
    template <typename T> 
    boost::unique_future<T> add(
    high_priority_scheduler::priority_type priority, 
    const T& a, const T& b) 
    { 
    // Insert method request 
    return scheduler_.insert(
     priority, 
     boost::bind(&adder::do_add<T>, a, b)); 
    } 

    /// @brief Add a and b. 
    /// 
    /// @return Return future result. 
    template <typename T> 
    boost::unique_future<T> add(const T& a, const T& b) 
    { 
    return add(high_priority_scheduler::priority_type(), a, b); 
    } 

private: 

    /// @brief Actual add a and b. 
    template <typename T> 
    static T do_add(const T& a, const T& b) 
    { 
    std::cout << "Starting addition of '" << a 
       << "' and '" << b << "'" << std::endl; 
    // Mimic busy work. 
    boost::this_thread::sleep_for(boost::chrono::seconds(2)); 
    std::cout << "Finished addition" << std::endl; 
    return a + b; 
    } 

private: 
    high_priority_scheduler& scheduler_; 
}; 

bool get(bool& value) { return value; } 
void guarded_call() 
{ 
    std::cout << "guarded_call" << std::endl; 
} 

int main() 
{ 
    const unsigned int max_threads = 1; 
    const unsigned int max_request = 4; 

    // Sscheduler 
    high_priority_scheduler scheduler(max_threads, max_request); 

    // Proxy 
    adder adder(scheduler); 

    // Client 

    // Add guarded method to scheduler. 
    bool ready = false; 
    std::cout << "Add guarded method." << std::endl; 
    boost::unique_future<void> future1 = scheduler.insert(
    boost::bind(&get, boost::ref(ready)), 
    &guarded_call); 

    // Add 1 + 100 with default priority. 
    boost::unique_future<int> future2 = adder.add(1, 100); 

    // Force sleep to try to get scheduler to run request 2 first. 
    boost::this_thread::sleep_for(boost::chrono::seconds(1)); 

    // Add: 
    // 2 + 200 with low priority (5) 
    // "test" + "this" with high priority (99) 
    boost::unique_future<int> future3 = adder.add(5, 2, 200); 
    boost::unique_future<std::string> future4 = adder.add(99, 
    std::string("test"), std::string("this")); 

    // Max request should have been reached, so add another. 
    boost::unique_future<int> future5 = adder.add(3, 300); 

    // Check if request was added. 
    std::cout << "future1 is valid: " << future1.valid() 
      << "\nfuture2 is valid: " << future2.valid() 
      << "\nfuture3 is valid: " << future3.valid() 
      << "\nfuture4 is valid: " << future4.valid() 
      << "\nfuture5 is valid: " << future5.valid() 
      << std::endl; 

    // Get results for future2 and future3. Do nothing with future4's results. 
    std::cout << "future2 result: " << future2.get() 
      << "\nfuture3 result: " << future3.get() 
      << std::endl; 

    std::cout << "Unguarding method." << std::endl; 
    ready = true; 
    future1.wait(); 
} 

Việc thực hiện sử dụng hồ bơi thread trong tổng số 1 với tối đa là 4 yêu cầu.

  • request1 được bảo vệ cho đến khi kết thúc chương trình và phải được chạy cuối cùng.
  • yêu cầu2 (1 + 100) được chèn với mức độ ưu tiên mặc định và phải được chạy trước tiên.
  • yêu cầu3 (2 + 200) được chèn mức độ ưu tiên thấp và sẽ chạy sau request4.
  • request4 ('test' + 'this') được chèn với mức độ ưu tiên cao và phải chạy trước request3.
  • yêu cầu5 không thể chèn do yêu cầu tối đa và không nên hợp lệ.

Đầu ra là như sau:

Add guarded method. 
Starting addition of '1' and '100' 
future1 is valid: 1 
future2 is valid: 1 
future3 is valid: 1 
future4 is valid: 1 
future5 is valid: 0 
Finished addition 
Starting addition of 'test' and 'this' 
Finished addition 
Starting addition of '2' and '200' 
Finished addition 
future2 result: 101 
future3 result: 202 
Unguarding method. 
guarded_call
+1

Cảm ơn câu trả lời này, ước gì tôi có thể cung cấp cho bạn nhiều hơn 1 upvote. – MrEvil

+0

Bài viết rất hữu ích, những gì tăng thiếu là trường hợp sử dụng mà tôi không phải để cọ rửa github/như vậy cho – arynaq

Các vấn đề liên quan