2012-12-01 40 views
18

Hiện tại tôi đang làm việc trên ứng dụng khách/máy chủ Delphi XE3 để chuyển tệp (với các thành phần Indy FTP). Phần khách hàng theo dõi một thư mục, nhận danh sách các tệp bên trong, tải chúng lên máy chủ và xóa các tệp gốc. Việc tải lên được thực hiện bằng một chuỗi riêng biệt, xử lý từng tệp một. Các tệp có thể nằm trong khoảng từ 0 đến vài nghìn và kích thước của chúng cũng thay đổi rất nhiều.Đồng bộ hóa tệp tải lên đa luồng

Đây là một ứng dụng Firemonkey được biên dịch cho cả OSX và Windows, vì vậy tôi đã phải sử dụng TThread thay vì OmniThreadLibrary, mà tôi ưa thích. Khách hàng của tôi báo cáo rằng ứng dụng bị đóng băng ngẫu nhiên. Tôi không thể sao chép nó, nhưng vì tôi không có nhiều kinh nghiệm với TThread, tôi có thể đã đặt tình trạng bế tắc ở đâu đó. Tôi đọc khá nhiều ví dụ, nhưng tôi vẫn không chắc chắn về một số chi tiết đa luồng.

Cấu trúc ứng dụng rất đơn giản:
Bộ hẹn giờ trong chuỗi chính sẽ kiểm tra thư mục và nhận thông tin về từng tệp vào một bản ghi, đi vào TList chung. Danh sách này giữ thông tin về tên của các tệp, kích thước, tiến trình, cho dù tệp được tải lên hoàn toàn hay phải được thử lại. Tất cả được hiển thị trong một lưới với thanh tiến trình, vv Danh sách này chỉ được truy cập bởi chuỗi chính. Sau đó các mục từ danh sách được gửi đến luồng bằng cách gọi phương thức AddFile (mã bên dưới). Chuỗi lưu trữ tất cả các tệp trong một hàng đợi an toàn theo chủ đề như thế này http://delphihaven.wordpress.com/2011/05/06/using-tmonitor-2/
Khi tệp được tải lên, chuỗi trình tải lên sẽ thông báo chuỗi chính với cuộc gọi đến Đồng bộ hóa.
Chuỗi chính định kỳ gọi phương thức Uploader.GetProgress để kiểm tra tiến trình của tệp hiện tại và hiển thị nó. Chức năng này không thực sự an toàn với luồng, nhưng nó có thể gây ra bế tắc hay chỉ trả về dữ liệu sai?

Cách an toàn và hiệu quả để thực hiện kiểm tra tiến độ là gì?

Vì vậy, cách tiếp cận này là OK hoặc tôi đã bỏ lỡ điều gì đó? Bạn sẽ làm điều này như thế nào?
Ví dụ: tôi mặc dù tạo chuỗi mới chỉ để đọc nội dung thư mục. Điều này có nghĩa là TList tôi sử dụng phải được tạo luồng an toàn, nhưng nó phải được truy cập mọi lúc để làm mới thông tin được hiển thị trong lưới GUI. Sẽ không phải tất cả việc đồng bộ hóa chỉ làm chậm GUI?

Tôi đã đăng mã đơn giản dưới đây trong trường hợp ai đó muốn xem nó. Nếu không, tôi sẽ rất vui khi nghe một số ý kiến ​​về những gì tôi nên sử dụng nói chung. Các mục tiêu chính là làm việc trên cả OSX và Windows; để có thể hiển thị thông tin về tất cả các tệp và tiến trình của tệp hiện tại; và đáp ứng bất kể số lượng và kích thước của tệp.

Đó là mã của chuỗi trình tải lên. Tôi đã xóa một số của nó cho việc đọc dễ dàng hơn:

type 
    TFileStatus = (fsToBeQueued, fsUploaded, fsQueued); 
    TFileInfo = record 
    ID: Integer; 
    Path: String; 
    Size: Int64; 
    UploadedSize: Int64; 
    Status: TFileStatus; 
    end; 

    TUploader = class(TThread) 
    private 
    FTP: TIdFTP; 
    fQueue: TThreadedQueue<TFileInfo>; 
    fCurrentFile: TFileInfo; 
    FUploading: Boolean; 
    procedure ConnectFTP; 
    function UploadFile(aFileInfo: TFileInfo): String; 
    procedure OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64); 
    procedure SignalComplete; 
    procedure SignalError(aError: String); 
    protected 
    procedure Execute; override; 
    public 
    property Uploading: Boolean read FUploading; 
    constructor Create; 
    destructor Destroy; override; 
    procedure Terminate; 
    procedure AddFile(const aFileInfo: TFileInfo); 
    function GetProgress: TFileInfo; 
    end; 

procedure TUploader.AddFile(const aFileInfo: TFileInfo); 
begin 
    fQueue.Enqueue(aFileInfo); 
end; 

procedure TUploader.ConnectFTP; 
begin 
    ... 
    FTP.Connect; 
end; 

constructor TUploader.Create; 
begin 
    inherited Create(false); 
    FreeOnTerminate := false; 
    fQueue := TThreadedQueue<TFileInfo>.Create; 
    // Create the TIdFTP and set ports and other params 
    ... 
end; 

destructor TUploader.Destroy; 
begin 
    fQueue.Close; 
    fQueue.Free; 
    FTP.Free; 
    inherited; 
end; 

// Process the whole queue and inform the main thread of the progress 
procedure TUploader.Execute; 
var 
    Temp: TFileInfo; 
begin 
    try 
    ConnectFTP; 
    except 
    on E: Exception do 
     SignalError(E.Message); 
    end; 

    // Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails 
    while fQueue.Peek(fCurrentFile) = wrSignaled do 
    try 
     if UploadFile(fCurrentFile) = '' then 
     begin 
     fQueue.Dequeue(Temp); // Delete the item from the queue if succesful 
     SignalComplete; 
     end; 
    except 
     on E: Exception do 
     SignalError(E.Message); 
    end; 
end; 

// Return the current file's info to the main thread. Used to update the progress indicators 
function TUploader.GetProgress: TFileInfo; 
begin 
    Result := fCurrentFile; 
end; 

// Update the uploaded size for the current file. This information is retrieved by a timer from the main thread to update the progress bar 
procedure TUploader.OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64); 
begin 
    fCurrentFile.UploadedSize := AWorkCount; 
end; 

procedure TUploader.SignalComplete; 
begin 
    Synchronize(
    procedure 
    begin 
     frmClientMain.OnCompleteFile(fCurrentFile); 
    end); 
end; 

procedure TUploader.SignalError(aError: String); 
begin 
    try 
    FTP.Disconnect; 
    except 
    end; 
    if fQueue.Closed then 
    Exit; 

    Synchronize(
    procedure 
    begin 
     frmClientMain.OnUploadError(aError); 
    end); 
end; 

// Clear the queue and terminate the thread 
procedure TUploader.Terminate; 
begin 
    fQueue.Close; 
    inherited; 
end; 

function TUploader.UploadFile(aFileInfo: TFileInfo): String; 
begin 
    Result := 'Error'; 
    try 
    if not FTP.Connected then 
     ConnectFTP; 
    FUploading := true; 
    FTP.Put(aFileInfo.Path, ExtractFileName(aFileInfo.Path));  
    Result := ''; 
    finally 
    FUploading := false; 
    end; 
end; 

Và các bộ phận của các chủ đề chính mà tương tác với người tải lên:

...... 
// Main form 
    fUniqueID: Integer; // This is a unique number given to each file, because there might be several with the same names(after one is uploaded and deleted) 
    fUploader: TUploader;   // The uploader thread 
    fFiles: TList<TFileInfo>; 
    fCurrentFileName: String;  // Used to display the progress 
    function IndexOfFile(aID: Integer): Integer; //Return the index of the record inside the fFiles given the file ID 
    public 
    procedure OnCompleteFile(aFileInfo: TFileInfo); 
    procedure OnUploadError(aError: String); 
    end; 

// This is called by the uploader with Synchronize 
procedure TfrmClientMain.OnUploadError(aError: String); 
begin 
    // show and log the error 
end; 

// This is called by the uploader with Synchronize 
procedure TfrmClientMain.OnCompleteFile(aFileInfo: TFileInfo); 
var 
    I: Integer; 
begin 
    I := IndexOfFile(aFileInfo.ID); 
    if (I >= 0) and (I < fFiles.Count) then 
    begin 
    aFileInfo.Status := fsUploaded; 
    aFileInfo.UploadedSize := aFileInfo.Size; 
    FFiles.Items[I] := aFileInfo; 
    Inc(FFilesUploaded); 
    TFile.Delete(aFileInfo.Path); 
    colProgressImg.UpdateCell(I); 
    end; 
end; 

procedure TfrmClientMain.ProcessFolder; 
var 
    NewFiles: TStringDynArray; 
    I, J: Integer; 
    FileInfo: TFileInfo; 
begin 
    // Remove completed files from the list if it contains more than XX files 
    while FFiles.Count > 1000 do 
     if FFiles[0].Status = fsUploaded then 
     begin 
     Dec(FFilesUploaded); 
     FFiles.Delete(0); 
     end else 
     Break; 

    NewFiles := TDirectory.GetFiles(WatchFolder, '*.*',TSearchOption.soAllDirectories); 
    for I := 0 to Length(NewFiles) - 1 do 
    begin 
      FileInfo.ID := FUniqueID; 
      Inc(FUniqueID); 
      FileInfo.Path := NewFiles[I]; 
      FileInfo.Size := GetFileSizeByName(NewFiles[I]); 
      FileInfo.UploadedSize := 0; 
      FileInfo.Status := fsToBeQueued; 
      FFiles.Add(FileInfo); 

     if (I mod 100) = 0 then 
     begin 
     UpdateStatusLabel; 
     grFiles.RowCount := FFiles.Count; 
     Application.ProcessMessages; 
     if fUploader = nil then 
      break; 
     end; 
    end; 

    // Send the new files and resend failed to the uploader thread 
    for I := 0 to FFiles.Count - 1 do 
     if (FFiles[I].Status = fsToBeQueued) then 
     begin 
     if fUploader = nil then 
      Break; 
     FileInfo := FFiles[I]; 
     FileInfo.Status := fsQueued; 
     FFiles[I] := FileInfo; 
     SaveDebug(1, 'Add: ' + ExtractFileName(FFiles[I].Path)); 
     FUploader.AddFile(FFiles[I]); 
     end; 
end; 

procedure TfrmClientMain.tmrGUITimer(Sender: TObject); 
var 
    FileInfo: TFileInfo; 
    I: Integer; 
begin 
    if (fUploader = nil) or not fUploader.Uploading then 
    Exit; 
    FileInfo := fUploader.GetProgress; 
    I := IndexOfFile(FileInfo.ID); 
    if (I >= 0) and (I < fFiles.Count) then 
    begin 
    fFiles.Items[I] := FileInfo; 
    fCurrentFileName := ExtractFileName(FileInfo.Path); 
    colProgressImg.UpdateCell(I); 
    end; 
end; 

function TfrmClientMain.IndexOfFile(aID: Integer): Integer; 
var 
    I: Integer; 
begin 
    Result := -1; 
    for I := 0 to FFiles.Count - 1 do 
    if FFiles[I].ID = aID then 
     Exit(I); 
end; 
+0

Tôi không chắc chắn và chưa thử nghiệm .. nhưng bạn đã cố gắng thêm TIdAntiFreeze và kiểm tra xem hành vi có giống nhau hay không? (FMX.IdAntiFreeze) – Whiler

+2

TIdAntiFreeze được thiết kế để ngăn chặn sự đóng băng của GUI khi bạn sử dụng một thành phần Indy từ sợi chính (ví dụ: bị bỏ trên biểu mẫu). Tôi sử dụng nó trong một chủ đề riêng biệt vì vậy tôi không thấy nó sẽ giúp ích gì. Ít nhất theo như tôi biết ... – VGeorgiev

+0

Ở giao diện đầu tiên, xử lý lỗi của bạn có vẻ sai với tôi. Ví dụ, trong phương thức Execute, nếu cuộc gọi ConnectFTP thất bại, bạn _eat_ ngoại lệ (sau khi thông báo về lỗi), và bạn vẫn phát hành các cuộc gọi đến UploadFile. IMHO bạn phải _clean_ đó, và để cho thread chết với một FatalException hoặc xử lý đúng ngoại lệ bên trong phương thức Execute, ví dụ, thử lại kết nối một số lần, có thể phụ thuộc vào loại lỗi. Mặt khác, nếu bạn có danh sách trong chuỗi chính, tôi không thấy lý do tại sao bạn cần một hàng đợi trong từng chuỗi riêng lẻ. – jachguate

Trả lời

0

Điều này có thể không phải là vấn đề, nhưng TFileInfo là một kỷ lục.

Điều này có nghĩa là khi được truyền dưới dạng tham số (không const/var), nó sẽ được sao chép. Điều này có thể dẫn đến sự cố với những thứ như chuỗi trong bản ghi không nhận được số lượng tham chiếu được cập nhật khi bản ghi được sao chép.

Một điều cần thử là đặt một lớp và chuyển một thể hiện làm tham số (nghĩa là một Con trỏ tới dữ liệu trên vùng heap).

Điều gì đó khác cần chú ý là được chia sẻ của Int64 (ví dụ: giá trị kích thước của bạn) trên các hệ thống 32 bit luồng.

Cập nhật/đọc chúng không được thực hiện một cách nguyên tử & bạn không có bất kỳ sự bảo vệ cụ thể nào, vì vậy có thể đọc giá trị để nhận được các bit trên và dưới không khớp do luồng. (ví dụ: Đọc trên 32 bit, Viết trên 32 bit, Viết 32 bit thấp hơn, Đọc Thấp hơn 32 bit, với lần đọc & viết trong các chủ đề khác nhau). Điều này có thể không gây ra các vấn đề bạn đang nhìn thấy và trừ khi bạn đang làm việc với các tập tin chuyển giao> 4GB, không bao giờ có thể gây ra cho bạn bất kỳ vấn đề.

0

Điểm dừng chắc chắn khó phát hiện, nhưng điều này có thể là vấn đề. Trong mã của bạn, tôi không thấy rằng bạn đã thêm bất kỳ thời gian chờ nào vào enqueue, peek hoặc dequeue - có nghĩa là nó sẽ mặc định là Infinite.

Dòng enqueue có dòng này - nghĩa là giống như bất kỳ đối tượng đồng bộ nào, nó sẽ chặn cho đến khi Enter hoàn thành (nó khóa màn hình) hoặc Timeout xuất hiện (vì bạn không có thời gian chờ, nó sẽ đợi mãi mãi)

TSimpleThreadedQueue.Enqueue(const Item: T; Timeout: LongWord): TWaitResult; 
...  
if not TMonitor.Enter(FQueue, Timeout) 

Tôi cũng sẽ giả định rằng bạn đã tự thực hiện PEEK dựa trên Dequeue - chỉ bạn mới không thực sự xóa mục đó.

Đó dường như thực hiện thời gian chờ của riêng mình - tuy nhiên, bạn vẫn còn có những điều sau đây:

function TSimpleThreadedQueue.Peek/Dequeue(var Item: T; Timeout: LongWord): TWaitResult; 
... 
if not TMonitor.Enter(FQueue, Timeout) 

đâu thời gian chờ là Infinite - vì vậy, nếu bạn đang ở trong phương pháp cái nhìn chờ đợi cho nó để được báo hiệu với một vô hạn thời gian chờ, sau đó bạn không thể Enqueue một cái gì đó từ một chủ đề thứ hai mà không ngăn chặn thread đó chờ đợi cho phương pháp peek để trở thành hoàn thành vào một thời gian chờ vô hạn.

Dưới đây là một đoạn nhận xét từ TMonitor

Enter locks the monitor object with an optional timeout (in ms) value. 
Enter without a timeout will wait until the lock is obtained. 
If the procedure returns it can be assumed that the lock was acquired. 
Enter with a timeout will return a boolean status indicating whether or 
not the lock was obtained (True) or the attempt timed out prior to 
acquire the lock (False). Calling Enter with an INFINITE timeout 
is the same as calling Enter without a timeout. 

Kể từ khi thực hiện sử dụng Infinite theo mặc định, và một giá trị TMonitor.Spinlock không được cung cấp, mà sẽ ngăn chặn các chủ đề cho đến khi nó có thể có được đối tượng FQueue .

Đề nghị của tôi sẽ phải thay đổi mã của bạn như sau:

// Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails 
    while true do 
    case fQueue.Peek(fCurrentFile,10) 
     wrSignaled: 
     try 
      if UploadFile(fCurrentFile) = '' then 
      begin 
      fQueue.Dequeue(Temp); // Delete the item from the queue if succesful 
      SignalComplete; 
      end; 
     except 
      on E: Exception do 
      SignalError(E.Message); 
     end; 
     wrTimeout: sleep(10); 
     wrIOCompletion, 
     wrAbandoned, 
     wrError: break; 
    end; //case 

Bằng cách này, cái nhìn sẽ không giữ khóa trên FQueue vô thời hạn, để lại một cửa sổ cho Enqueue để có được nó và thêm các tập tin từ luồng chính (UI).

+0

Cảm ơn câu trả lời chi tiết. Tôi đồng ý rằng hai dòng TMonitor.Enter() có thể gây ra bế tắc. TMonitor.Enter() trong TSimpleThreadedQueue.Peek/Dequeue được theo sau bởi TMonitor.Wait(). Nếu tôi hiểu chính xác, Wait sẽ tạm thời khóa khóa và cho phép chủ đề khác đặt khóa trong phương pháp Enqueue, do đó nó không gây ra bế tắc. Chờ đợi sau đó cố gắng để đặt một khóa một lần nữa. Bế tắc mà tôi đã xảy ra rất hiếm khi, trong trường hợp bạn mô tả nó sẽ xảy ra gần như mọi lúc bởi vì luồng bắt đầu trước khi có bất kỳ dữ liệu nào trong hàng đợi. – VGeorgiev

+0

Hummm .. Nhìn vào nguồn cho TMonitor.Enter, tôi không nghĩ rằng đó là trường hợp nếu bạn không thiết lập một SpinCount - hầu hết các mã cho spin được bỏ qua nơi SpinCount = 0 Ở đâu cuối cùng bạn nhận được để điều này line: Result: = MonitorSupport.WaitOrSignalObject (nil, GetEvent, Timeout) = WAIT_OBJECT_0; – SilverKnight

+0

Tôi tin rằng đó là trường hợp - tuy nhiên, từ việc đọc và cố gắng hiểu những gì màn hình thực hiện, nó quay trong một khoảng thời gian cụ thể (nghĩa là rất ngắn) - khi nó trở nên dài hơn, thì bạn có khả năng của bế tắc - hãy xem bài viết Wiki này trên SpinLock - http://en.wikipedia.org/wiki/Spinlock – SilverKnight

0

Đây có thể là một cảnh quay dài, nhưng đây là một khả năng khác [câu trả lời trước đây có thể có nhiều khả năng] (cái mà tôi vừa chạy qua), việc sử dụng Synchronize có thể gây ra bế tắc.Dưới đây là một blog về lý do tại sao điều này xảy ra: Delphi-Workaround-for-TThread-SynchronizeWaitFor-.aspx

Điểm thích hợp từ bài viết:

Chủ đề A gọi Synchronize (MethodA)

Chủ đề B gọi Synchronize (MethodB)

Sau đó, trong ngữ cảnh của Chủ đề chính:

Các cuộc gọi chủ đề chính CheckSynchronize() trong khi xử lý tin nhắn

CheckSynchronize được triển khai để xử lý theo lô tất cả cuộc gọi chờ (*). Vì vậy, nó chọn lên hàng đợi của các cuộc gọi chờ (có chứa MethodA và MethodB) và các vòng thông qua từng cái một.

Phương thứcA thực hiện trong ngữ cảnh của chủ đề chính. Giả MethodA gọi ThreadB.WaitFor

WAITFOR gọi CheckSynchronize xử lý bất cứ chờ đợi cuộc gọi đến Đồng bộ hóa

Về lý thuyết, điều này thì nên xử lý ThreadB của Synchronize (MethodB), phép Thread B để hoàn thành. Tuy nhiên, MethodB đã là sở hữu của cuộc gọi CheckSynchronize đầu tiên, vì vậy nó không bao giờ được gọi là được gọi.

DEADLOCK!

Embarcadero QC article mô tả sự cố chi tiết hơn.

Mặc dù tôi không thấy bất kỳ lệnh gọi ProcessMessages nào trong mã trên, hoặc cho vấn đề đó, WaitFor sẽ được gọi trong Đồng bộ hóa, nó vẫn có thể là vấn đề mà đồng bộ được gọi, một chuỗi khác gọi đồng bộ hóa - nhưng chủ đề chính đã được đồng bộ hóa và đang chặn.

Lần đầu tiên tôi không nhấp vào, vì tôi có xu hướng tránh Đồng bộ hóa các cuộc gọi như dịch và cập nhật giao diện người dùng thiết kế từ chủ đề bằng các phương pháp khác như gửi thư và danh sách an toàn chỉ với thông báo tin nhắn thay vì đồng bộ hóa cuộc gọi.

+0

Cảm ơn bạn một lần nữa vì đã nhận được rất nhiều chi tiết về điều này. Và xin lỗi vì những câu trả lời chậm trễ, tôi đi du lịch những ngày này ... Những gì bạn mô tả ở đây cũng xảy ra với tôi và tôi nghĩ Synchronize là vấn đề. Tôi sử dụng nó bởi vì không có SendMessage/PostMessage trên OSX hoặc ít nhất tôi không biết nếu có một thay thế. Vì vậy, Synchronize là một giải pháp dễ dàng tại thời điểm đó. Một số thời gian trước, tôi viết lại rất nhiều mã và tôi không có đóng băng này nữa, nhưng tôi không biết vấn đề ở đâu. Có thể liên quan đến các thành phần Indy TCP mà tôi đã sử dụng, vì chúng không ổn định trên OSX ... – VGeorgiev

+0

Không phải là vấn đề. Tôi đã nhìn vào một cái gì đó khác và đi qua bài đăng này mà không có một câu trả lời. Tôi đã sử dụng nó như là một bài tập học tập để xem những gì lớp màn hình sẽ làm (tôi chưa bao giờ sử dụng nó). Tôi luôn quan tâm đến các kỹ thuật khác nhau có thể cải thiện mã luồng của tôi (chủ yếu là trong việc giảm mức sử dụng CPU, nhưng cũng có trong các phương thức giao tiếp khác nhau). Nó đã là một bổ nhào thú vị trong lớp, và hy vọng một người khác cũng sẽ được hưởng lợi từ cuộc thảo luận. – SilverKnight

Các vấn đề liên quan