2016-02-25 15 views
9

Khi bạn tạo một kho chặn có dung lượng giới hạn và gọi triggerBatch trong khi (song song với) đăng một mục mới - đăng mục mới sẽ không thành công trong thời gian thực thi lô kích hoạt.Hành vi không mong muốn - Tách dòng các mục trong khi TriggerBatch đang thực thi

Lô kích hoạt cuộc gọi (mỗi lần X) được thực hiện để đảm bảo dữ liệu không bị trì hoãn quá lâu trong khối, trong trường hợp luồng dữ liệu đến bị tạm dừng hoặc bị chậm lại.

Đoạn mã sau sẽ xuất ra một số sự kiện "đăng sự cố". Ví dụ:

public static void Main(string[] args) 
    { 
     var batchBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions() { BoundedCapacity = 10000000 }); 
     var actionBlock = new ActionBlock<int[]>(x => ProcessBatch(x), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 }); 
     batchBlock.LinkTo(actionBlock); 

     var producerTask = Task.Factory.StartNew(() => 
     { 
      //Post 10K Items 
      for (int i = 0; i < 10000; i++) 
      { 
       var postResult = batchBlock.Post(i); 
       if (!postResult) 
        Console.WriteLine("Failed to Post"); 
      } 
     }); 

     var triggerBatchTask = Task.Factory.StartNew(() => 
      {      
       //Trigger Batch.. 
       for (int i = 0; i < 1000000; i++) 
        batchBlock.TriggerBatch(); 
      }); 

     producerTask.Wait(); 
     triggerBatchTask.Wait(); 
    } 

    public static void ProcessBatch(int[] batch) 
    { 
     Console.WriteLine("{0} - {1}", batch.First(), batch.Last()); 
    } 

* Lưu ý rằng trường hợp này chỉ có thể lặp lại khi lô khóa được Bounded.

Tôi có thiếu gì đó hay là vấn đề với batchBlock?

Trả lời

3

BatchBlock không thực sự từ chối mục, nó cố gắng trì hoãn mục đó. Ngoại trừ trường hợp của Post(), việc trì hoãn không phải là một lựa chọn. Một cách đơn giản để khắc phục điều này là sử dụng await batchBlock.SendAsync(i) thay vì batchBlock.Post(i) (điều này cũng có nghĩa là bạn cần thay đổi Task.Factory.StartNew(() => thành Task.Run(async() =>).

Tại sao điều này lại xảy ra? Theo the source code, nếu BatchBlock bị chặn, TriggerBatch() được xử lý không đồng bộ và trong khi được xử lý, sẽ không có mục mới nào được chấp nhận.

Trong mọi trường hợp, bạn không nên mong đợi true trên khối bị chặn, nếu khối đầy, Post() cũng sẽ trả về false.

+0

Điều này rất đáng ngạc nhiên đối với tôi ... – i3arnon

+0

Trong khi tôi đang sử dụng một giải pháp khác, bằng cách giới thiệu một khối khác chấp nhận lỗi, và cuối cùng tôi gọi triggerbatch theo cách nối tiếp trên cả hai khối. Để giải pháp đề xuất của bạn - chờ đợi và không đồng bộ sẽ tạo ra một nhiệm vụ để xử lý từng mục đến, điều này có thể gây ra vấn đề bộ nhớ khi bạn có một loạt các sự kiện nhiều nhiệm vụ sẽ được tạo ra không bị chặn. –

+0

@AlYaros Không, nó sẽ không. Nếu mục được chấp nhận, bạn sẽ nhận được một 'Task' được lưu trong bộ nhớ cache, do đó không có phân bổ ở đó. Và nếu mục bị hoãn, mã bạn đã hiển thị sẽ không thêm mục mới cho đến khi được chấp nhận. Nếu trong mã thực sự của bạn "chờ đợi" sẽ gây ra vấn đề, sau đó IMO hoặc bạn sẽ có thể sửa chữa chúng, hoặc bạn sẽ nhận được các vấn đề ngay cả khi không có nó. – svick

Các vấn đề liên quan