tôi thông qua thực hiện của tôi song song/người tiêu dùng dựa trên các mã trong this questionParallel.ForEach bị đình trệ khi tích hợp với BlockingCollection
class ParallelConsumer<T> : IDisposable
{
private readonly int _maxParallel;
private readonly Action<T> _action;
private readonly TaskFactory _factory = new TaskFactory();
private CancellationTokenSource _tokenSource;
private readonly BlockingCollection<T> _entries = new BlockingCollection<T>();
private Task _task;
public ParallelConsumer(int maxParallel, Action<T> action)
{
_maxParallel = maxParallel;
_action = action;
}
public void Start()
{
try
{
_tokenSource = new CancellationTokenSource();
_task = _factory.StartNew(
() =>
{
Parallel.ForEach(
_entries.GetConsumingEnumerable(),
new ParallelOptions { MaxDegreeOfParallelism = _maxParallel, CancellationToken = _tokenSource.Token },
(item, loopState) =>
{
Log("Taking" + item);
if (!_tokenSource.IsCancellationRequested)
{
_action(item);
Log("Finished" + item);
}
else
{
Log("Not Taking" + item);
_entries.CompleteAdding();
loopState.Stop();
}
});
},
_tokenSource.Token);
}
catch (OperationCanceledException oce)
{
System.Diagnostics.Debug.WriteLine(oce);
}
}
private void Log(string message)
{
Console.WriteLine(message);
}
public void Stop()
{
Dispose();
}
public void Enqueue(T entry)
{
Log("Enqueuing" + entry);
_entries.Add(entry);
}
public void Dispose()
{
if (_task == null)
{
return;
}
_tokenSource.Cancel();
while (!_task.IsCanceled)
{
}
_task.Dispose();
_tokenSource.Dispose();
_task = null;
}
}
Và đây là một mã kiểm tra
class Program
{
static void Main(string[] args)
{
TestRepeatedEnqueue(100, 1);
}
private static void TestRepeatedEnqueue(int itemCount, int parallelCount)
{
bool[] flags = new bool[itemCount];
var consumer = new ParallelConsumer<int>(parallelCount,
(i) =>
{
flags[i] = true;
}
);
consumer.Start();
for (int i = 0; i < itemCount; i++)
{
consumer.Enqueue(i);
}
Thread.Sleep(1000);
Debug.Assert(flags.All(b => b == true));
}
}
Bài kiểm tra luôn luôn thất bại - nó luôn luôn bị mắc kẹt ở khoảng 93th-item từ 100 thử nghiệm. Bất kỳ ý tưởng nào một phần của mã của tôi đã gây ra sự cố này và cách khắc phục?
Cảm ơn. Điều này giải quyết được vấn đề của tôi. Dù sao, khi tôi kiểm tra thêm, mã trong OP của tôi không thất bại khi số lượng mục là thành viên của chuỗi này, [A200672] (http://oeis.org/A200672) ví dụ: 1, 2, 3, 5, 7, 9, 13, 17, 21, 29, 37, 45, 61, 77, 93, ... Bất kỳ ý tưởng nào tại sao? chỉ tò mò thôi. – user69715
@ user69715 Đó là loại hành vi kỳ lạ mà tôi tìm thấy khi tôi cố gắng làm một điều tương tự. Tôi đoán nó chỉ là một số tương tác kỳ lạ giữa Parallel.ForEach() và BlockingCollection cơ bản, nhưng tôi không thể thực sự giải thích nó. –