2012-08-28 43 views
6

Tôi đã triển khai một nhóm luồng sử dụng boost::asio và một số số boost::thread đối tượng gọi boost::asio::io_service::run(). Tuy nhiên, một yêu cầu mà tôi đã đưa ra là có một cách để theo dõi tất cả các chủ đề cho "sức khỏe". Mục đích của tôi là tạo một đối tượng sentinel đơn giản có thể được truyền qua pool thread - nếu nó làm cho nó thông qua, thì chúng ta có thể giả định rằng thread vẫn đang xử lý công việc.boost :: asio, hồ bơi chủ đề và theo dõi luồng

Tuy nhiên, khi thực hiện, tôi không chắc chắn cách (nếu) tôi có thể theo dõi tất cả các chủ đề trong hồ bơi một cách đáng tin cậy. Tôi đã chỉ đơn giản là giao chức năng thread để boost::asio::io_service::run(), do đó, gửi một đối tượng sentinel vào trường hợp io_service sẽ không đảm bảo thread sẽ thực sự nhận được rằng sentinel và làm công việc.

Một tùy chọn có thể chỉ định kỳ chèn nội dung gửi và hy vọng nó được chọn theo từng chuỗi ít nhất một lần trong một khoảng thời gian hợp lý, nhưng điều đó rõ ràng không lý tưởng.

Lấy ví dụ sau. Do cách mà trình xử lý được mã hóa, trong trường hợp này chúng ta có thể thấy rằng mỗi luồng sẽ thực hiện cùng một lượng công việc, nhưng trong thực tế, tôi sẽ không có quyền kiểm soát việc xử lý trình xử lý, một số có thể chạy dài trong khi một số khác sẽ gần như ngay lập tức.

#include <iostream> 
#include <boost/asio.hpp> 
#include <vector> 
#include <boost/thread.hpp> 
#include <boost/bind.hpp> 

void handler() 
{ 
    std::cout << boost::this_thread::get_id() << "\n"; 
    boost::this_thread::sleep(boost::posix_time::milliseconds(100)); 
} 

int main(int argc, char **argv) 
{ 
    boost::asio::io_service svc(3); 

    std::unique_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(svc)); 

    boost::thread one(boost::bind(&boost::asio::io_service::run, &svc)); 
    boost::thread two(boost::bind(&boost::asio::io_service::run, &svc)); 
    boost::thread three(boost::bind(&boost::asio::io_service::run, &svc)); 

    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 

    work.reset(); 

    three.join(); 
    two.join(); 
    one.join(); 

    return 0; 
} 

Trả lời

2

Các giải pháp mà tôi đã sử dụng dựa vào thực tế mà tôi sở hữu thực hiện của các đối tượng lốp bi-a. Tôi đã tạo một loại trình bao bọc sẽ cập nhật số liệu thống kê và sao chép các trình xử lý do người dùng xác định được đăng vào nhóm luồng. Chỉ loại trình bao bọc này mới được đăng lên io_service cơ bản. Phương pháp này cho phép tôi theo dõi các trình xử lý được đăng/thi hành mà không phải xâm nhập vào mã người dùng.

Dưới đây là một tước xuống và đơn giản hóa ví dụ:

#include <iostream> 
#include <memory> 
#include <vector> 
#include <boost/thread.hpp> 
#include <boost/asio.hpp> 

// Supports scheduling anonymous jobs that are 
// executable as returning nothing and taking 
// no arguments 
typedef std::function<void(void)> functor_type; 

// some way to store per-thread statistics 
typedef std::map<boost::thread::id, int> thread_jobcount_map; 

// only this type is actually posted to 
// the asio proactor, this delegates to 
// the user functor in operator() 
struct handler_wrapper 
{ 
    handler_wrapper(const functor_type& user_functor, thread_jobcount_map& statistics) 
     : user_functor_(user_functor) 
     , statistics_(statistics) 
    { 
    } 

    void operator()() 
    { 
     user_functor_(); 

     // just for illustration purposes, assume a long running job 
     boost::this_thread::sleep(boost::posix_time::milliseconds(100)); 

     // increment executed jobs 
     ++statistics_[boost::this_thread::get_id()]; 
    } 

    functor_type   user_functor_; 
    thread_jobcount_map& statistics_; 
}; 

// anonymous thread function, just runs the proactor 
void thread_func(boost::asio::io_service& proactor) 
{ 
    proactor.run(); 
} 

class ThreadPool 
{ 
public: 
    ThreadPool(size_t thread_count) 
    { 
     threads_.reserve(thread_count); 

     work_.reset(new boost::asio::io_service::work(proactor_)); 

     for(size_t curr = 0; curr < thread_count; ++curr) 
     { 
     boost::thread th(thread_func, boost::ref(proactor_)); 

     // inserting into this map before any work can be scheduled 
     // on it, means that we don't have to look it for lookups 
     // since we don't dynamically add threads 
     thread_jobcount_.insert(std::make_pair(th.get_id(), 0)); 

     threads_.emplace_back(std::move(th)); 
     } 
    } 

    // the only way for a user to get work into 
    // the pool is to use this function, which ensures 
    // that the handler_wrapper type is used 
    void schedule(const functor_type& user_functor) 
    { 
     handler_wrapper to_execute(user_functor, thread_jobcount_); 
     proactor_.post(to_execute); 
    } 

    void join() 
    { 
     // join all threads in pool: 
     work_.reset(); 
     proactor_.stop(); 

     std::for_each(
     threads_.begin(), 
     threads_.end(), 
     [] (boost::thread& t) 
     { 
     t.join(); 
     }); 
    } 

    // just an example showing statistics 
    void log() 
    { 
     std::for_each(
     thread_jobcount_.begin(), 
     thread_jobcount_.end(), 
     [] (const thread_jobcount_map::value_type& it) 
     { 
     std::cout << "Thread: " << it.first << " executed " << it.second << " jobs\n"; 
     }); 
    } 

private: 
    std::vector<boost::thread> threads_; 
    std::unique_ptr<boost::asio::io_service::work> work_; 
    boost::asio::io_service proactor_; 
    thread_jobcount_map  thread_jobcount_; 
}; 

struct add 
{ 
    add(int lhs, int rhs, int* result) 
     : lhs_(lhs) 
     , rhs_(rhs) 
     , result_(result) 
    { 
    } 

    void operator()() 
    { 
     *result_ = lhs_ + rhs_; 
    } 

    int lhs_,rhs_; 
    int* result_; 
}; 

int main(int argc, char **argv) 
{ 
    // some "state objects" that are 
    // manipulated by the user functors 
    int x = 0, y = 0, z = 0; 

    // pool of three threads 
    ThreadPool pool(3); 

    // schedule some handlers to do some work 
    pool.schedule(add(5, 4, &x)); 
    pool.schedule(add(2, 2, &y)); 
    pool.schedule(add(7, 8, &z)); 

    // give all the handlers time to execute 
    boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); 

    std::cout 
     << "x = " << x << "\n" 
     << "y = " << y << "\n" 
     << "z = " << z << "\n"; 

    pool.join(); 

    pool.log(); 
} 

Output:

x = 9 
y = 4 
z = 15 
Thread: 0000000000B25430 executed 1 jobs 
Thread: 0000000000B274F0 executed 1 jobs 
Thread: 0000000000B27990 executed 1 jobs 
+0

bạn có thể thêm mã vào câu trả lời của mình @Chad không? –

+0

Xong. Hạnh phúc cho bất kỳ thông tin phản hồi về nó. – Chad

6

Bạn có thể sử dụng phiên bản io_service chung giữa tất cả các chủ đề và cá thể io_service riêng cho mỗi chuỗi. Mỗi chủ đề sẽ thực hiện một phương pháp như thế này:

void Mythread::threadLoop() 
{ 
    while(/* termination condition */) 
    { 
     commonIoService.run_one(); 
     privateIoService.run_one(); 

     commonConditionVariable.timed_wait(time); 
    } 
} 

Bằng cách này, nếu bạn muốn đảm bảo rằng một số nhiệm vụ được thực hiện trong một chủ đề, bạn chỉ cần gửi nhiệm vụ này trong io_service thuộc sở hữu của nó.

Để gửi một nhiệm vụ trong hồ bơi thread của bạn, bạn có thể làm:

void MyThreadPool::post(Hander handler) 
{ 
    commonIoService.post(handler); 
    commonConditionVariable.notify_all(); 
} 
+0

Một cách tiếp cận thú vị, nhưng tôi đang tìm kiếm một cái gì đó một chút thẳng hơn về phía trước. Nếu không có gì khác xuất hiện trong vài ngày tới, tôi có thể chấp nhận câu trả lời này. – Chad

+0

Tôi nghĩ rằng không có giải pháp đơn giản nào khi sử dụng asio khởi động. Tôi đã phát triển một giải pháp như thế này với vài mã và nó hoạt động. –

Các vấn đề liên quan