2014-06-24 24 views
18

Tôi đã phải viết một ứng dụng giao diện điều khiển được gọi là dịch vụ web Microsoft Dynamics CRM để thực hiện một hành động trên hơn tám nghìn đối tượng CRM. Chi tiết về cuộc gọi dịch vụ web không liên quan và không được hiển thị ở đây nhưng tôi cần một máy khách đa luồng để tôi có thể thực hiện cuộc gọi song song. Tôi muốn để có thể kiểm soát số lượng các chủ đề được sử dụng từ một thiết lập cấu hình và cũng cho các ứng dụng để hủy bỏ toàn bộ hoạt động nếu số lượng các lỗi dịch vụ đạt đến một ngưỡng cấu hình xác định.Tại sao mã này không đồng bộ/chờ đợi TAP chậm hơn phiên bản TPL?

Tôi đã viết nó bằng cách sử dụng Task Parallel Library Task.Run và ContinueWith, theo dõi số lượng cuộc gọi (chủ đề) đang diễn ra, số lượng lỗi chúng tôi nhận được và liệu người dùng đã hủy từ bàn phím hay chưa. Tất cả mọi thứ đã làm việc tốt và tôi đã đăng nhập rộng rãi để đảm bảo bản thân rằng các chủ đề đã được hoàn thành sạch sẽ và tất cả mọi thứ đã được gọn gàng vào cuối chạy. Tôi có thể thấy rằng chương trình đang sử dụng số lượng chủ đề tối đa song song và, nếu đạt đến giới hạn tối đa của chúng tôi, hãy chờ cho đến khi hoàn thành nhiệm vụ đang chạy trước khi bắt đầu một chuỗi khác.

Trong khi xem xét mã của tôi, đồng nghiệp của tôi đề nghị tốt hơn nên làm điều đó với async/await thay vì nhiệm vụ và tiếp tục, vì vậy tôi đã tạo một nhánh và viết lại theo cách đó. Kết quả thật thú vị - phiên bản async/await chậm gần như gấp đôi, và nó không bao giờ đạt đến số lượng tối đa các phép/luồng song song cho phép. TPL luôn luôn có đến 10 chủ đề song song trong khi phiên bản async/await không bao giờ vượt quá 5.

Câu hỏi của tôi là: tôi đã nhầm lẫn theo cách tôi đã viết mã async/await (hoặc mã TPL) cũng)? Nếu tôi đã không mã hóa nó sai, bạn có thể giải thích lý do tại sao các async/await là kém hiệu quả, và không có nghĩa là nó là tốt hơn để thực hiện bằng cách sử dụng TPL cho mã đa luồng. Lưu ý rằng mã tôi đã thử nghiệm không thực sự gọi CRM - lớp CrmClient chỉ đơn giản là luồng ngủ trong một khoảng thời gian được chỉ định trong cấu hình (năm giây) và sau đó ném một ngoại lệ. Điều này có nghĩa rằng không có biến bên ngoài nào có thể ảnh hưởng đến hiệu suất.

Vì mục đích của câu hỏi này, tôi đã tạo một chương trình rút gọn kết hợp cả hai phiên bản; cái nào được gọi là được xác định bởi một thiết lập cấu hình. Mỗi người trong số họ bắt đầu với một Á hậu bootstrap thiết lập môi trường, tạo lớp hàng đợi, sau đó sử dụng một TaskCompletionSource để chờ hoàn thành. Một CancellationTokenSource được sử dụng để báo hiệu việc hủy bỏ từ người dùng. Danh sách các id cần xử lý được đọc từ một tệp được nhúng và được đẩy lên một ConcurrentQueue. Cả hai đều bắt đầu gọi StartCrmRequest nhiều lần như max-threads; sau đó, mỗi lần một kết quả được xử lý, phương thức ProcessResult gọi hàm StartCrmRequest một lần nữa, tiếp tục cho đến khi tất cả các id của chúng ta được xử lý.

Bạn có thể sao chép/tải chương trình hoàn chỉnh từ đây: https://bitbucket.org/kentrob/pmgfixso/

Dưới đây là cấu hình có liên quan:

<appSettings> 
    <add key="TellUserAfterNCalls" value="5"/> 
    <add key="CrmErrorsBeforeQuitting" value="20"/> 
    <add key="MaxThreads" value="10"/> 
    <add key="CallIntervalMsecs" value="5000"/> 
    <add key="UseAsyncAwait" value="True" /> 
</appSettings> 

Bắt đầu với phiên bản TPL, đây là Á hậu bootstrap mà đá ra khỏi quản lý hàng đợi :

public static class TplRunner 
{ 
    private static readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource(); 

    public static void StartQueue(RuntimeParameters parameters, IEnumerable<string> idList) 
    { 
     Console.CancelKeyPress += (s, args) => 
     { 
      CancelCrmClient(); 
      args.Cancel = true; 
     }; 

     var start = DateTime.Now; 
     Program.TellUser("Start: " + start); 

     var taskCompletionSource = new TplQueue(parameters) 
      .Start(CancellationTokenSource.Token, idList); 

     while (!taskCompletionSource.Task.IsCompleted) 
     { 
      if (Console.KeyAvailable) 
      { 
       if (Console.ReadKey().Key != ConsoleKey.Q) continue; 
       Console.WriteLine("When all threads are complete, press any key to continue."); 
       CancelCrmClient(); 
      } 
     } 

     var end = DateTime.Now; 
     Program.TellUser("End: {0}. Elapsed = {1} secs.", end, (end - start).TotalSeconds); 
    } 

    private static void CancelCrmClient() 
    { 
     CancellationTokenSource.Cancel(); 
     Console.WriteLine("Cancelling Crm client. Web service calls in operation will have to run to completion."); 
    } 
} 

Dưới đây là người quản lý hàng đợi TPL bản thân:

0.123.
public class TplQueue 
{ 
    private readonly RuntimeParameters parameters; 
    private readonly object locker = new object(); 
    private ConcurrentQueue<string> idQueue = new ConcurrentQueue<string>(); 
    private readonly CrmClient crmClient; 
    private readonly TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>(); 
    private int threadCount; 
    private int crmErrorCount; 
    private int processedCount; 
    private CancellationToken cancelToken; 

    public TplQueue(RuntimeParameters parameters) 
    { 
     this.parameters = parameters; 
     crmClient = new CrmClient(); 
    } 

    public TaskCompletionSource<bool> Start(CancellationToken cancellationToken, IEnumerable<string> ids) 
    { 
     cancelToken = cancellationToken; 

     foreach (var id in ids) 
     { 
      idQueue.Enqueue(id); 
     } 

     threadCount = 0; 

     // Prime our thread pump with max threads. 
     for (var i = 0; i < parameters.MaxThreads; i++) 
     { 
      Task.Run((Action) StartCrmRequest, cancellationToken); 
     } 

     return taskCompletionSource; 
    } 

    private void StartCrmRequest() 
    { 
     if (taskCompletionSource.Task.IsCompleted) 
     { 
      return; 
     } 

     if (cancelToken.IsCancellationRequested) 
     { 
      Program.TellUser("Crm client cancelling..."); 
      ClearQueue(); 
      return; 
     } 

     var count = GetThreadCount(); 

     if (count >= parameters.MaxThreads) 
     { 
      return; 
     } 

     string id; 
     if (!idQueue.TryDequeue(out id)) return; 

     IncrementThreadCount(); 
     crmClient.CompleteActivityAsync(new Guid(id), parameters.CallIntervalMsecs).ContinueWith(ProcessResult); 

     processedCount += 1; 
     if (parameters.TellUserAfterNCalls > 0 && processedCount%parameters.TellUserAfterNCalls == 0) 
     { 
      ShowProgress(processedCount); 
     } 
    } 

    private void ProcessResult(Task<CrmResultMessage> response) 
    { 
     if (response.Result.CrmResult == CrmResult.Failed && ++crmErrorCount == parameters.CrmErrorsBeforeQuitting) 
     { 
      Program.TellUser(
       "Quitting because CRM error count is equal to {0}. Already queued web service calls will have to run to completion.", 
       crmErrorCount); 
      ClearQueue(); 
     } 

     var count = DecrementThreadCount(); 

     if (idQueue.Count == 0 && count == 0) 
     { 
      taskCompletionSource.SetResult(true); 
     } 
     else 
     { 
      StartCrmRequest(); 
     } 
    } 

    private int GetThreadCount() 
    { 
     lock (locker) 
     { 
      return threadCount; 
     } 
    } 

    private void IncrementThreadCount() 
    { 
     lock (locker) 
     { 
      threadCount = threadCount + 1; 
     } 
    } 

    private int DecrementThreadCount() 
    { 
     lock (locker) 
     { 
      threadCount = threadCount - 1; 
      return threadCount; 
     } 
    } 

    private void ClearQueue() 
    { 
     idQueue = new ConcurrentQueue<string>(); 
    } 

    private static void ShowProgress(int processedCount) 
    { 
     Program.TellUser("{0} activities processed.", processedCount); 
    } 
} 

Lưu ý rằng tôi biết rằng một vài bộ đếm không phải là chủ đề an toàn nhưng chúng không quan trọng; biến ThreadCount là biến quan trọng duy nhất.

Dưới đây là hình nộm phương pháp CRM client:

public Task<CrmResultMessage> CompleteActivityAsync(Guid activityId, int callIntervalMsecs) 
{ 
    // Here we would normally call a CRM web service. 
    return Task.Run(() => 
    { 
     try 
     { 
      if (callIntervalMsecs > 0) 
      { 
       Thread.Sleep(callIntervalMsecs); 
      } 
      throw new ApplicationException("Crm web service not available at the moment."); 
     } 
     catch 
     { 
      return new CrmResultMessage(activityId, CrmResult.Failed); 
     } 
    }); 
} 

Và đây là những async cùng/chờ lớp (với phương pháp phổ biến bị loại bỏ vì lợi ích của ngắn gọn):

public static class AsyncRunner 
{ 
    private static readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource(); 

    public static void StartQueue(RuntimeParameters parameters, IEnumerable<string> idList) 
    { 
     var start = DateTime.Now; 
     Program.TellUser("Start: " + start); 

     var taskCompletionSource = new AsyncQueue(parameters) 
      .StartAsync(CancellationTokenSource.Token, idList).Result; 

     while (!taskCompletionSource.Task.IsCompleted) 
     { 
      ... 
     } 

     var end = DateTime.Now; 
     Program.TellUser("End: {0}. Elapsed = {1} secs.", end, (end - start).TotalSeconds); 
    } 
} 

Các async/await queue manager:

public class AsyncQueue 
{ 
    private readonly RuntimeParameters parameters; 
    private readonly object locker = new object(); 
    private readonly CrmClient crmClient; 
    private readonly TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>(); 
    private CancellationToken cancelToken; 
    private ConcurrentQueue<string> idQueue = new ConcurrentQueue<string>(); 
    private int threadCount; 
    private int crmErrorCount; 
    private int processedCount; 

    public AsyncQueue(RuntimeParameters parameters) 
    { 
     this.parameters = parameters; 
     crmClient = new CrmClient(); 
    } 

    public async Task<TaskCompletionSource<bool>> StartAsync(CancellationToken cancellationToken, 
     IEnumerable<string> ids) 
    { 
     cancelToken = cancellationToken; 

     foreach (var id in ids) 
     { 
      idQueue.Enqueue(id); 
     } 
     threadCount = 0; 

     // Prime our thread pump with max threads. 
     for (var i = 0; i < parameters.MaxThreads; i++) 
     { 
      await StartCrmRequest(); 
     } 

     return taskCompletionSource; 
    } 

    private async Task StartCrmRequest() 
    { 
     if (taskCompletionSource.Task.IsCompleted) 
     { 
      return; 
     } 

     if (cancelToken.IsCancellationRequested) 
     { 
      ... 
      return; 
     } 

     var count = GetThreadCount(); 

     if (count >= parameters.MaxThreads) 
     { 
      return; 
     } 

     string id; 
     if (!idQueue.TryDequeue(out id)) return; 

     IncrementThreadCount(); 
     var crmMessage = await crmClient.CompleteActivityAsync(new Guid(id), parameters.CallIntervalMsecs); 
     ProcessResult(crmMessage); 

     processedCount += 1; 
     if (parameters.TellUserAfterNCalls > 0 && processedCount%parameters.TellUserAfterNCalls == 0) 
     { 
      ShowProgress(processedCount); 
     } 
    } 

    private async void ProcessResult(CrmResultMessage response) 
    { 
     if (response.CrmResult == CrmResult.Failed && ++crmErrorCount == parameters.CrmErrorsBeforeQuitting) 
     { 
      Program.TellUser(
       "Quitting because CRM error count is equal to {0}. Already queued web service calls will have to run to completion.", 
       crmErrorCount); 
      ClearQueue(); 
     } 

     var count = DecrementThreadCount(); 

     if (idQueue.Count == 0 && count == 0) 
     { 
      taskCompletionSource.SetResult(true); 
     } 
     else 
     { 
      await StartCrmRequest(); 
     } 
    } 
} 

Vì vậy, hãy đặt MaxThreads thành 10 và CrmErrorsBeforeQuitting to 20, phiên bản TPL trên máy của tôi hoàn thành trong 19 giây và phiên bản async/await mất 35 giây. Cho rằng tôi có hơn 8000 cuộc gọi để thực hiện điều này là một sự khác biệt đáng kể. Bất kỳ ý tưởng?

+2

Hãy xem xét sử dụng ConfigureAwait (sai) trên tất cả các tác vụ được chờ đợi của bạn mà bạn không cần phải quay lại SynchronizationContext gốc. –

+0

@GregC, bạn hoàn toàn đúng. Tôi đã không nhận thấy rằng đó là một ứng dụng giao diện điều khiển, và thậm chí nếu nó không phải là 8000 đang chờ nơi bối cảnh được chụp không thực sự tạo ra một vấn đề hiệu suất đáng kể. –

+0

Cảm ơn tất cả các ý kiến. Tôi chỉ đọc một bài báo của Jon Skeet có vẻ liên quan: http://msmvps.com/blogs/jon_skeet/archive/2010/11/02/configuring-waiting.aspx –

Trả lời

10

Tôi nghĩ rằng tôi đang gặp vấn đề ở đây, hoặc ít nhất là một phần của nó. Nhìn kỹ vào hai bit mã dưới đây; chúng không tương đương.

// Prime our thread pump with max threads. 
for (var i = 0; i < parameters.MaxThreads; i++) 
{ 
    Task.Run((Action) StartCrmRequest, cancellationToken); 
} 

Và:

// Prime our thread pump with max threads. 
for (var i = 0; i < parameters.MaxThreads; i++) 
{ 
    await StartCrmRequest(); 
} 

Trong đoạn mã gốc (Tôi dùng nó như một cho rằng nó là chức năng âm thanh) có một cuộc gọi duy nhất để ContinueWith. Đó là chính xác có bao nhiêu await báo cáo tôi mong đợi để xem trong một viết lại tầm thường nếu nó có nghĩa là để bảo tồn hành vi ban đầu.

Không phải là quy tắc cứng nhắc và nhanh chóng và chỉ áp dụng trong các trường hợp đơn giản, nhưng vẫn là điều tốt để theo dõi.

+0

Chỉ mới bắt đầu Đó là sự hiểu biết của tôi (có thể bị lỗi) rằng cơ thể chính của mã tiếp tục chạy sau khi chờ đợi để vòng lặp sẽ tiếp tục. Chỉ cần xác nhận điều đó ngay bây giờ –

+0

@RobKent, điều này thực sự trông giống như viết lại khá đơn giản - bạn vừa làm quá nhiều trong một số phần thay đổi những thứ không cần phải thay đổi (một lần nữa, tôi giả định rằng mã ban đầu là chính xác để bắt đầu với) .Tôi đã thực hiện chỉnh sửa lại số câu lệnh 'await' –

+0

,es, chỉ cần thử nghiệm nó và vòng lặp với sự chờ đợi trong nó là đồng bộ –

0

Khi được triển khai với async/await, tôi mong đợi thuật toán liên kết I/O sẽ chạy trên một chuỗi đơn lẻ. Không giống như @KirillShlenskiy, tôi tin rằng bit có trách nhiệm "đưa trở lại" vào ngữ cảnh của người gọi không chịu trách nhiệm cho việc làm chậm. Tôi nghĩ rằng bạn vượt quá hồ bơi thread bằng cách cố gắng sử dụng nó cho các hoạt động I/O-ràng buộc. Nó được thiết kế chủ yếu cho ops tính toán.

Hãy xem qua ForEachAsync. Tôi cảm thấy đó là những gì bạn đang tìm kiếm (thảo luận Stephen Toub, bạn sẽ tìm thấy video Wischik của ý nghĩa quá):

http://blogs.msdn.com/b/pfxteam/archive/2012/03/05/10278165.aspx

(Sử dụng mức độ đồng thời để giảm bộ nhớ)

http://vimeo.com/43808831 http://vimeo.com/43808833

+0

Cảm ơn, tôi sẽ xem xét các liên kết này –

+0

Có một nhận xét đặc biệt về bài viết đó tôi đồng ý với: "Từ khóa mới đang chờ đợi và không đồng bộ này thực sự làm hỏng bộ não của tôi . Tại một thời điểm tôi nghĩ rằng tôi hiểu họ và ở một người khác tôi nhận ra rằng tôi không. "Tôi cần nghiên cứu thêm. –

+0

@RobKent Tôi vui mừng khi chia sẻ liên kết đến nội dung tuyệt vời phong phú này. Chúc mừng đọc và xem! – GregC

4

Tôi nghĩ rằng bạn phức tạp hơn giải pháp của mình và cuối cùng không nhận được nơi bạn muốn thực hiện.

Trước hết, kết nối với bất kỳ máy chủ HTTP nào bị giới hạn bởi service point manager. default limit cho môi trường máy khách là 2, nhưng bạn có thể tự tăng nó.

Không có vấn đề bao nhiêu chủ đề bạn sinh ra, sẽ không có yêu cầu hoạt động nhiều hơn những người được allwed.

Sau đó, như ai đó đã chỉ ra, await chặn logic luồng thực thi.

Và cuối cùng, bạn đã dành thời gian tạo AsyncQueue khi bạn nên sử dụng TPL data flows.

+0

Vâng, cảm ơn bạn đã chỉ ra giới hạn ServicePointManager - Tôi không biết điều đó. Mặc dù điều đó chắc chắn có liên quan khi tôi thực hiện cuộc gọi web thực tế, vì mục đích của câu hỏi này, điều đó không liên quan bởi vì chúng tôi chỉ cần biết tại sao async/await hoạt động chậm hơn mà không thực sự thực hiện cuộc gọi web. Cảm ơn bình luận của bạn mặc dù. –

+0

Nếu bạn thực sự muốn chứng minh điều đó, bạn sẽ cần một ví dụ phức tạp hơn rất nhiều. –

Các vấn đề liên quan