2012-02-13 24 views
7

Tôi đang cố gắng để thực hiện những điều sau:Perl Queues và Threading

  1. Có một sợi mà đọc dữ liệu từ một tập tin rất lớn nói về 10GB và đẩy chúng vào hàng đợi. (Tôi không muốn cho hàng đợi để nhận được rất lớn hoặc)

  2. Trong khi thread buildQueue đang đẩy dữ liệu vào hàng đợi cùng một lúc có đề khoảng 5 công nhân bỏ hàng đợi và xử lý dữ liệu.

Tôi đã cố gắng nhưng các chủ đề khác của tôi không thể truy cập được do vòng lặp liên tục trong chuỗi buildQueue của tôi.

Cách tiếp cận của tôi có thể hoàn toàn sai. Cảm ơn bạn đã giúp đỡ, nó được nhiều đánh giá cao.

Dưới đây là các mã cho buildQueue:

sub buildQueue { 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 
    open DICT_FILE, $dict_path or die("Sorry, could not open file!"); 
    while (1) { 
     if (<DICT_FILE>) { 
      if ($queue->pending() < 100) { 
       my $query = <DICT_FILE>; 
       chomp($query); 
       $queue->enqueue($query); 
       my $count = $queue->pending(); 
       print "Queue Size: $count Query: $query\n"; 
      } 
     } 
    } 
} 

Và như tôi đã mong đợi khi chủ đề này được thực hiện không có gì khác sau sẽ được thực hiện bởi vì chủ đề này sẽ không hoàn thành.

my $builder = new Thread(&buildQueue); 

Vì chuỗi trình tạo sẽ chạy trong một thời gian dài, tôi không bao giờ tạo chuỗi công việc.

Dưới đây là toàn bộ mã:

#!/usr/bin/perl -w 
use strict; 
use Thread; 
use Thread::Queue; 


my $queue = new Thread::Queue(); 
my @threads; 

sub buildQueue { 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 
    open dict_file, $dict_path or die("Sorry, could not open file!"); 
    while (1) { 
     if (<dict_file>) { 
      if ($queue->pending() < 100) { 
       my $query = <dict_file>; 
       chomp($query); 
       $queue->enqueue($query); 
       my $count = $queue->pending(); 
       print "Queue Size: $count Query: $query\n"; 
      } 
     } 
    } 
} 

sub processor { 
    my $query; 
    while (1) { 
     if ($query = $queue->dequeue) { 
      print "$query\n"; 
     } 
    } 
} 

my $builder = new Thread(&buildQueue); 
push @threads, new Thread(&processor) for 1..5; 
+0

Một vài câu hỏi: Bạn đề cập đến chuỗi chủ đề xây dựng hàng đợi của bạn sẽ không kết thúc, nhưng nó có làm gì không? Kích thước hàng đợi có giảm dưới 100 hay cao hơn 0 không? Ngoài ra, [Tôi không chắc bạn đang tạo chủ đề của mình một cách chính xác] (http://perldoc.perl.org/perlthrtut.html). Không phải nó là 'my $ builder = threads-> create (\ & buildQueue);'? –

+0

Công cụ xây dựng hàng đợi được xây dựng tốt nhưng do chuỗi công nhân không đạt được nên chúng không thể xóa bất kỳ thứ gì từ hàng đợi sao cho hàng đợi bị kẹt ở mức 100 trong khi hàng đợi xây dựng vẫn chạy vì vòng lặp liên tục. – Sinista

+0

Hmmm, tôi sẽ cần phải xem thêm mã để thiết lập ngữ cảnh, đặc biệt là nơi bạn tạo chủ đề. Bạn không phải là 'join'ing hoặc' detach'ing builder queue trước khi tạo các threads worker, đúng không? –

Trả lời

10

Bạn sẽ cần phải đánh dấu khi bạn muốn chủ đề của bạn để thoát (thông qua một trong hai joinor detach). Thực tế là bạn có các vòng vô hạn không có câu hỏi last để thoát ra khỏi chúng cũng là một vấn đề.

Chỉnh sửa: Tôi cũng quên một phần rất quan trọng! Each worker thread will block, waiting for another item to process off of the queue until they get an undef in the queue. Do đó tại sao chúng tôi đặc biệt enqueue undef một lần cho mỗi thread sau khi xây dựng hàng đợi được thực hiện.

Hãy thử:

#!/usr/bin/perl -w 
use strict; 
use threads; 
use Thread::Queue; 


my $queue = new Thread::Queue(); 
our @threads; #Do you really need our instead of my? 

sub buildQueue 
{ 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 

    #Three-argument open, please! 
    open my $dict_file, "<",$dict_path or die("Sorry, could not open file!"); 
    while(my $query=<$dict_file>) 
    { 
     chomp($query); 
     while(1) 
     { #Wait to see if our queue has < 100 items... 
      if ($queue->pending() < 100) 
      { 
       $queue->enqueue($query); 
       print "Queue Size: " . $queue->pending . "\n"; 
       last; #This breaks out of the infinite loop 
      } 
     } 
    } 
    close($dict_file); 
    foreach(1..5) 
    { 
     $queue->enqueue(undef); 
    } 
} 

sub processor 
{ 
    my $query; 
    while ($query = $queue->dequeue) 
    { 
     print "Thread " . threads->tid . " got $query\n"; 
    } 
} 

my $builder=threads->create(\&buildQueue); 
push @threads,threads->create(\&process) for 1..5; 

#Waiting for our threads to finish. 
$builder->join; 
foreach(@threads) 
{ 
    $_->join; 
} 
+1

Có vẻ như vấn đề là mô-đun Chủ đề không được sử dụng, tôi đã chuyển sang mô-đun chủ đề thay vào đó và mã của tôi hoạt động như bây giờ. Cảm ơn bạn Jack Nhiều người đã chỉ cho tôi đúng hướng. – Sinista

1

Nghe có vẻ như trường hợp này có thể làm với các mô-đun Parallel::ForkManager.

+0

Rất thích xem giải pháp ForkManager nếu có thể. – Sinista

0

Một cách tiếp cận khác nhau: Bạn cũng có thể sử dụng user_tasks trong MCE 1.2+ và tạo ra hai đa nhântasks, một nhiệm vụ để đọc (vì nó là một tập tin lớn, bạn cũng có thể được hưởng lợi từ việc đọc song song trong khi vẫn giữ tập tin đọc tìm kiếm) và một nhiệm vụ để xử lý, v.v.

Mã bên dưới vẫn sử dụng Thread::Queue để quản lý hàng đợi bộ đệm của bạn.

Phụ buildQueue có điều khiển kích thước hàng đợi và đẩy dữ liệu trực tiếp đến quy trình quản lý '$ R_QUEUE vì chúng tôi đã sử dụng chuỗi, do đó, nó có quyền truy cập vào không gian bộ nhớ của cha mẹ. Nếu bạn muốn sử dụng dĩa thay thế, bạn vẫn có thể truy cập hàng đợi thông qua chức năng gọi lại. Nhưng ở đây tôi đã chọn chỉ đơn giản là chỉ cần đẩy vào hàng đợi.

Các tiểu đơn vị processQueue sẽ chỉ đơn giản là bỏ qua bất kỳ thứ gì trong hàng đợi cho đến khi không còn chờ xử lý nào nữa.

Các task_end phụ trong mỗi tác vụ chỉ được chạy một lần bởi quy trình quản lý ở cuối mỗi tác vụ, vì vậy chúng tôi sử dụng nó để báo hiệu dừng cho quy trình công nhân của chúng tôi.

Rõ ràng, có rất nhiều tự do trong cách bạn muốn đoạn dữ liệu của bạn cho người lao động, vì vậy bạn có thể quyết định thuộc vào kích thước của đoạn hoặc thậm chí làm thế nào để húp dữ liệu của bạn trong.

#!/usr/bin/env perl 
use strict; 
use warnings; 
use threads; 
use threads::shared; 
use Thread::Queue; 
use MCE; 

my $R_QUEUE = Thread::Queue->new; 
my $queue_workers = 8; 
my $process_workers = 8; 
my $chunk_size = 1; 

print "Enter a file name: "; 
my $input_file = <STDIN>; 
chomp($input_file); 

sub buildQueue { 
    my ($self, $chunk_ref, $chunk_id) = @_; 
    if ($R_QUEUE->pending() < 100) { 
     $R_QUEUE->enqueue($chunk_ref); 
     $self->sendto('stdout', "Queue Size: " . $R_QUEUE->pending ."\n"); 
    } 
} 

sub processQueue { 
    my $self = shift; 
    my $wid = $self->wid; 
    while (my $buff = $R_QUEUE->dequeue) { 
     $self->sendto('stdout', "Thread " . $wid . " got $$buff"); 
    } 
} 

my $mce = MCE->new(
    input_data => $input_file, # this could be a filepath or a file handle or even a scalar to treat like a file, check the documentation for more details. 
    chunk_size => $chunk_size, 
    use_slurpio => 1, 

    user_tasks => [ 
     { # queueing task 
      max_workers => $queue_workers, 
      user_func => \&buildQueue, 
      use_threads => 1, # we'll use threads to have access to the parent's variables in shared memory. 
      task_end => sub { $R_QUEUE->enqueue((undef) x $process_workers) } # signal stop to our process workers when they hit the end of the queue. Thanks > Jack Maney! 
     }, 
     { # process task 
      max_workers => $process_workers, 
      user_func => \&processQueue, 
      use_threads => 1, # we'll use threads to have access to the parent's variables in shared memory 
      task_end => sub { print "Finished processing!\n"; } 
     } 
    ] 
); 

$mce->run(); 

exit; 
3

Các Mô-đun MCE cho Perl yêu các tệp lớn. Với MCE, người ta có thể chunk nhiều dòng cùng một lúc, slurp một đoạn lớn như một chuỗi vô hướng, hoặc đọc 1 dòng tại một thời điểm. Chunking nhiều dòng cùng một lúc làm giảm chi phí cho IPC.

MCE 1.504 hiện đã hết. Nó cung cấp MCE :: Queue với sự hỗ trợ cho các tiến trình con bao gồm các luồng. Ngoài ra, bản phát hành 1.5 đi kèm với 5 mô hình (MCE :: Flow, MCE :: Grep, MCE :: Loop, MCE :: Bản đồ, và MCE :: Stream), đảm bảo việc instantiating MCE instance cũng như auto- điều chỉnh max_workers và chunk_size. Người ta có thể ghi đè các tùy chọn này btw.

Dưới đây, MCE :: Vòng lặp được sử dụng để trình diễn.

use MCE::Loop; 

print "Enter a file name: "; 
my $dict_path = <STDIN>; 
chomp($dict_path); 

mce_loop_f { 
    my ($mce, $chunk_ref, $chunk_id) = @_; 

    foreach my $line (@$chunk_ref) { 
     chomp $line; 
     ## add your code here to process $line 
    } 

} $dict_path; 

Nếu bạn muốn chỉ định số lượng công nhân và/hoặc chunk_size, thì có 2 cách để thực hiện.

use MCE::Loop max_workers => 5, chunk_size => 300000; 

Hoặc ...

use MCE::Loop; 

MCE::Loop::init { 
    max_workers => 5, 
    chunk_size => 300000 
}; 

Mặc dù chunking được ưa thích cho các tập tin lớn, người ta có thể so sánh thời gian với chunking một dòng tại một thời điểm. Người ta có thể bỏ qua dòng đầu tiên bên trong khối (nhận xét). Lưu ý cách không cần vòng lặp bên trong. $ chunk_ref vẫn là một mảng ref chứa 1 dòng. Hàm vô hướng đầu vào $ _ chứa dòng khi chunk_size bằng 1, nếu không trỏ tới $ chunk_ref.

use MCE::Loop; 

MCE::Loop::init { 
    max_workers => 5, 
    chunk_size => 1 
}; 

print "Enter a file name: "; 
my $dict_path = <STDIN>; 
chomp($dict_path); 

mce_loop_f { 
# my ($mce, $chunk_ref, $chunk_id) = @_; 

    my $line = $_; 
    ## add your code here to process $line or $_ 

} $dict_path; 

Tôi hy vọng rằng bản trình diễn này hữu ích cho những người muốn xử lý tệp song song.

:) mario