2011-01-31 40 views
43

Cố gắng sử dụng TThreadedQueue (Generics.Collections) trong một chương trình dành cho nhiều nhà sản xuất duy nhất. (Delphi-XE). Ý tưởng là để đẩy các đối tượng vào một hàng đợi và để cho một số luồng công nhân thoát khỏi hàng đợi.TThreadedQueue không có khả năng của nhiều người tiêu dùng?

Nó không hoạt động như mong đợi. Khi hai hoặc nhiều chuỗi công nhân đang gọi PopItem, các vi phạm truy cập được ném từ TThreadedQueue.

Nếu cuộc gọi tới PopItem được đăng với một phần quan trọng, tất cả đều ổn.

Chắc chắn TThreadedQueue sẽ có thể xử lý nhiều người tiêu dùng, vậy tôi có thiếu cái gì đó hoặc đây có phải là lỗi thuần túy trong TThreadedQueue không?

Đây là ví dụ đơn giản để tạo lỗi.

program TestThreadedQueue; 

{$APPTYPE CONSOLE} 

uses 
// FastMM4 in '..\..\..\FastMM4\FastMM4.pas', 
    Windows, 
    Messages, 
    Classes, 
    SysUtils, 
    SyncObjs, 
    Generics.Collections; 

type TThreadTaskMsg = 
     class(TObject) 
     private 
      threadID : integer; 
      threadMsg : string; 
     public 
      Constructor Create(ID : integer; const msg : string); 
     end; 

type TThreadReader = 
     class(TThread) 
     private 
      fPopQueue : TThreadedQueue<TObject>; 
      fSync  : TCriticalSection; 
      fMsg  : TThreadTaskMsg; 
      fException : Exception; 
      procedure DoSync; 
      procedure DoHandleException; 
     public 
      Constructor Create(popQueue : TThreadedQueue<TObject>; 
           sync  : TCriticalSection); 
      procedure Execute; override; 
     end; 

Constructor TThreadReader.Create(popQueue : TThreadedQueue<TObject>; 
            sync  : TCriticalSection); 
begin 
    fPopQueue:=   popQueue; 
    fMsg:=     nil; 
    fSync:=    sync; 
    Self.FreeOnTerminate:= FALSE; 
    fException:=   nil; 

    Inherited Create(FALSE); 
end; 

procedure TThreadReader.DoSync ; 
begin 
    WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId)); 
end; 

procedure TThreadReader.DoHandleException; 
begin 
    WriteLn('Exception ->' + fException.Message); 
end; 

procedure TThreadReader.Execute; 
var signal : TWaitResult; 
begin 
    NameThreadForDebugging('QueuePop worker'); 
    while not Terminated do 
    begin 
    try 
     {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. } 
     Sleep(20); 
     {- Serializing calls to PopItem works } 
     if Assigned(fSync) then fSync.Enter; 
     try 
     signal:= fPopQueue.PopItem(TObject(fMsg)); 
     finally 
     if Assigned(fSync) then fSync.Release; 
     end; 
     if (signal = wrSignaled) then 
     begin 
     try 
      if Assigned(fMsg) then 
      begin 
      fMsg.threadMsg:= '<Thread id :' +IntToStr(Self.threadId) + '>'; 
      fMsg.Free; // We are just dumping the message in this test 
      //Synchronize(Self.DoSync); 
      //PostMessage(fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0); 
      end; 
     except 
      on E:Exception do begin 
      end; 
     end; 
     end; 
     except 
     FException:= Exception(ExceptObject); 
     try 
     if not (FException is EAbort) then 
     begin 
      {Synchronize(} DoHandleException; //); 
     end; 
     finally 
     FException:= nil; 
     end; 
    end; 
    end; 
end; 

Constructor TThreadTaskMsg.Create(ID : Integer; Const msg : string); 
begin 
    Inherited Create; 

    threadID:= ID; 
    threadMsg:= msg; 
end; 

var 
    fSync : TCriticalSection; 
    fThreadQueue : TThreadedQueue<TObject>; 
    fReaderArr : array[1..4] of TThreadReader; 
    i : integer; 

begin 
    try 
    IsMultiThread:= TRUE; 

    fSync:=  TCriticalSection.Create; 
    fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100); 
    try 
     {- Calling without fSync throws exceptions when two or more threads calls PopItem 
     at the same time } 
     WriteLn('Creating worker threads ...'); 
     for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create(fThreadQueue,Nil); 
     {- Calling with fSync works ! } 
     //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create(fThreadQueue,fSync); 
     WriteLn('Init done. Pushing items ...'); 

     for i:= 1 to 100 do fThreadQueue.PushItem(TThreadTaskMsg.Create(i,'')); 

     ReadLn; 

    finally 
     for i:= 1 to 4 do fReaderArr[i].Free; 
     fThreadQueue.Free; 
     fSync.Free; 
    end; 

    except 
    on E: Exception do 
     begin 
     Writeln(E.ClassName, ': ', E.Message); 
     ReadLn; 
     end; 
    end; 
end. 

Cập nhật: Các lỗi trong TMonitor khiến TThreadedQueue sụp đổ là cố định trong Delphi XE2.

Cập nhật 2: Kiểm tra trên nhấn mạnh hàng đợi ở trạng thái trống. Darian Miller thấy rằng nhấn mạnh hàng đợi ở trạng thái đầy đủ, vẫn có thể tái tạo lỗi trong XE2. Lỗi một lần nữa là trong TMonitor. Xem câu trả lời bên dưới để biết thêm thông tin. Và cũng là một liên kết đến QC101114.

Cập nhật 3: Với Delphi XE2-cập nhật 4 đã có một sửa chữa công bố cho TMonitor rằng sẽ chữa các vấn đề trong TThreadedQueue. Các thử nghiệm của tôi cho đến nay không thể tái tạo bất kỳ lỗi nào trong TThreadedQueue nữa. Thử nghiệm nhà sản xuất đơn/nhiều chủ đề tiêu dùng khi hàng đợi trống và đầy. Cũng được thử nghiệm nhiều nhà sản xuất/nhiều người tiêu dùng. Tôi thay đổi các chủ đề của người đọc và các chủ đề của người viết từ 1 đến 100 mà không có bất kỳ trục trặc nào. Nhưng biết lịch sử, tôi dám người khác phá vỡ TMonitor.

+4

Hi LU RD! Chào mừng bạn đến với StackOverflow. Đây là một câu hỏi hay mà bạn có, nhưng có thể dễ dàng kiểm tra xem mã đã được đăng một chút khác đi hay không. Bạn đã bao gồm một nửa .pas của một biểu mẫu, không có DFM tương ứng, và điều đó khiến chúng tôi khó sao chép và điều tra hơn. Vấn đề dường như không liên quan đến giao diện người dùng, vì vậy có cách nào bạn có thể giảm điều này xuống ứng dụng giao diện điều khiển không? Cảm ơn. –

+0

Mason, ứng dụng bảng điều khiển được thực hiện. –

+1

Các sự cố vẫn còn trong XE2 ... –

Trả lời

19

Vâng, thật khó để chắc chắn nếu không có nhiều thử nghiệm, nhưng chắc chắn có vẻ như đây là lỗi, hoặc trong TThreadedQueue hoặc trong TMonitor. Dù bằng cách nào nó trong RTL và không phải mã của bạn. Bạn nên gửi báo cáo này dưới dạng báo cáo QC và sử dụng ví dụ ở trên làm mã "cách tạo lại".

+0

Mason, cảm ơn. Tôi sẽ QC vào ngày mai trừ khi có người khác có ý kiến ​​khác. Có vẻ như lỗi trong TMonitor. –

+7

QC# 91246 TThreadedQueue không thành công với nhiều người tiêu dùng. Bình chọn cho nó nếu bạn thích. –

+5

LInk vào QCReport: [http://qc.embarcadero.com/wc/qcmain.aspx?d=91246](http://qc.embarcadero.com/wc/qcmain.aspx?d=91246) – jachguate

10

Tôi khuyên bạn nên sử dụng OmniThreadLibrary http://www.thedelphigeek.com/search/label/OmniThreadLibrary khi làm việc với chủ đề, song song, v.v. Primoz đã thực hiện một công việc rất tốt và trên trang web bạn sẽ tìm thấy nhiều tài liệu hữu ích.

+1

Tôi biết rõ OmniThreadLibrary và AsyncCalls của Andreas Hausladenhttp: //andy.jgknet.de/blog/bugfix-units/asynccalls-29-asynchronous-function-calls/. –

1

Tôi không nghĩ rằng TThreadedQueue là nghĩa vụ phải hỗ trợ nhiều người tiêu dùng. Đó là một FIFO, theo tập tin trợ giúp. Tôi theo ấn tượng rằng có một chủ đề đẩy và một số khác (chỉ một!) Popping.

+8

FIFO chỉ là một cách để nói cách hàng đợi được làm trống. Nó không có nghĩa là có thể chỉ có một chủ đề kéo công việc từ hàng đợi. Đặc biệt không phải khi nó được gọi là * ThreadedQueue *. –

+2

Nó được gọi là ThreadedQueue vì pusher và popper có thể nằm trong các chủ đề khác nhau. Trong thế giới đa luồng không có gì đến miễn phí, do đó tôi nghĩ rằng các tài liệu sẽ đề cập đến nhiều nhà sản xuất và/hoặc hỗ trợ người tiêu dùng nếu nó có sẵn. Nó không được đề cập, vì vậy tôi nghĩ rằng nó không phải là công việc. – Giel

+3

hàng đợi được bảo vệ bởi màn hình. Bản thân màn hình phải an toàn trong môi trường đa luồng. Nếu hàng đợi không an toàn cho nhiều người tiêu dùng, ít nhất nó cũng nên loại bỏ một ngoại lệ có thể bị bắt. –

3

Tôi đã tìm lớp TThreadedQueue nhưng dường như không có trong D2009 của tôi. Tôi không chính xác sẽ giết bản thân mình trên này - Delphi thread hỗ trợ luôn luôn là err .. errm ... 'không tối ưu' và tôi nghi ngờ rằng TThreadedQueue là không khác nhau :)

Tại sao sử dụng Generics cho PC (Nhà sản xuất/người tiêu dùng) đối tượng? Một hậu duệ TObjectQueue đơn giản sẽ làm tốt - đã sử dụng này trong nhiều thập kỷ - hoạt động tốt với nhiều nhà sản xuất/người tiêu dùng:

unit MinimalSemaphorePCqueue; 

{ Absolutely minimal P-C queue based on TobjectQueue and a semaphore. 

The semaphore count reflects the queue count 
'push' will always succeed unless memory runs out, then you're stuft anyway. 
'pop' has a timeout parameter as well as the address of where any received 
object is to be put. 
'pop' returns immediately with 'true' if there is an object on the queue 
available for it. 
'pop' blocks the caller if the queue is empty and the timeout is not 0. 
'pop' returns false if the timeout is exceeded before an object is available 
from the queue. 
'pop' returns true if an object is available from the queue before the timeout 
is exceeded. 
If multiple threads have called 'pop' and are blocked because the queue is 
empty, a single 'push' will make only one of the waiting threads ready. 


Methods to push/pop from the queue 
A 'semaHandle' property that can be used in a 'waitForMultipleObjects' call. 
When the handle is signaled, the 'peek' method will retrieve the queued object. 
} 
interface 

uses 
    Windows, Messages, SysUtils, Classes,syncObjs,contnrs; 


type 

pObject=^Tobject; 


TsemaphoreMailbox=class(TobjectQueue) 
private 
    countSema:Thandle; 
protected 
    access:TcriticalSection; 
public 
    property semaHandle:Thandle read countSema; 
    constructor create; virtual; 
    procedure push(aObject:Tobject); virtual; 
    function pop(pResObject:pObject;timeout:DWORD):boolean; virtual; 
    function peek(pResObject:pObject):boolean; virtual; 
    destructor destroy; override; 
end; 


implementation 

{ TsemaphoreMailbox } 

constructor TsemaphoreMailbox.create; 
begin 
{$IFDEF D2009} 
    inherited Create; 
{$ELSE} 
    inherited create; 
{$ENDIF} 
    access:=TcriticalSection.create; 
    countSema:=createSemaphore(nil,0,maxInt,nil); 
end; 

destructor TsemaphoreMailbox.destroy; 
begin 
    access.free; 
    closeHandle(countSema); 
    inherited; 
end; 

function TsemaphoreMailbox.pop(pResObject: pObject; 
    timeout: DWORD): boolean; 
// dequeues an object, if one is available on the queue. If the queue is empty, 
// the caller is blocked until either an object is pushed on or the timeout 
// period expires 
begin // wait for a unit from the semaphore 
    result:=(WAIT_OBJECT_0=waitForSingleObject(countSema,timeout)); 
    if result then // if a unit was supplied before the timeout, 
    begin 
    access.acquire; 
    try 
     pResObject^:=inherited pop; // get an object from the queue 
    finally 
     access.release; 
    end; 
    end; 
end; 

procedure TsemaphoreMailbox.push(aObject: Tobject); 
// pushes an object onto the queue. If threads are waiting in a 'pop' call, 
// one of them is made ready. 
begin 
    access.acquire; 
    try 
    inherited push(aObject); // shove the object onto the queue 
    finally 
    access.release; 
    end; 
    releaseSemaphore(countSema,1,nil); // release one unit to semaphore 
end; 

function TsemaphoreMailbox.peek(pResObject: pObject): boolean; 
begin 
    access.acquire; 
    try 
    result:=(count>0); 
    if result then pResObject^:=inherited pop; // get an object from the queue 
    finally 
    access.release; 
    end; 
end; 
end. 
+0

Cảm ơn câu trả lời của bạn. Tôi đã thấy lớp TThreadedQueue trong tài liệu cho XE và thực hiện một bài kiểm tra đơn giản cho một ứng dụng thực tế mà tôi có. Đây là phát bắn đầu tiên của tôi về generics và nó không phát triển tốt. Như bạn có thể thấy từ các bình luận khác, lỗi nằm trong lớp TMonitor, nó sẽ có tác động nếu ai đó xây dựng một ứng dụng đa luồng song song. Việc triển khai của tôi đã kết thúc bằng cách sử dụng một hàng đợi đơn giản được bảo vệ với một phần quan trọng để đẩy và popping. –

4

dụ của bạn dường như làm việc tốt dưới XE2, nhưng nếu chúng ta điền vào hàng đợi của bạn nó không thành công với AV trên PushItem.(Tested dưới XE2 Update1)

Để tái sản xuất, chỉ tăng tạo nhiệm vụ của bạn 100-1100 (độ sâu hàng đợi của bạn đã được thiết lập tại 1024)

for i:= 1 to 1100 do fThreadQueue.PushItem(TThreadTaskMsg.Create(i,'')); 

này chết cho tôi mỗi khi trên Windows 7. Tôi ban đầu cố gắng đẩy liên tục để căng thẳng kiểm tra nó, và nó thất bại ở vòng lặp 30 ... sau đó tại vòng 16 ... sau đó tại 65 vì vậy tại các khoảng khác nhau nhưng nó luôn thất bại tại một số điểm.

iLoop := 0; 
    while iLoop < 1000 do 
    begin 
    Inc(iLoop); 
    WriteLn('Loop: ' + IntToStr(iLoop)); 
    for i:= 1 to 100 do fThreadQueue.PushItem(TThreadTaskMsg.Create(i,'')); 
    end; 
+0

Ồ không, tại một số thời điểm, tôi sợ điều này cũng có thể là một điểm phá vỡ, giống như khi hàng đợi trống. Tôi thậm chí đã thực hiện một bình luận về điều này trên một bài đăng trong SO. Silly của tôi không để kiểm tra nó. Tôi sẽ thực hiện thêm một số xét nghiệm để xác nhận nó. –

+0

Yepp, không nhất quán trên Windows 7 64 bit (XE2 Update 2), cả trên 32 và 64 bit exe. Bạn sẽ QC nó hay tôi sẽ làm điều đó? –

+0

Được báo cáo là [QC101114] (http://qc.embarcadero.com/wc/qcmain.aspx?d=101114) –

Các vấn đề liên quan