2008-12-16 23 views
17

Tôi đang phát triển một chương trình liên tục gửi luồng dữ liệu dưới nền và tôi muốn cho phép người dùng đặt giới hạn cho cả giới hạn tải lên và tải xuống.Điều chỉnh băng thông trong C#

Tôi đã đọc trên token bucketleaky bucket alghorhithms, và dường như sau này có vẻ phù hợp với mô tả vì đây không phải là vấn đề tối đa hóa băng thông mạng mà là không phô trương càng tốt.

Tuy nhiên, tôi không chắc chắn về cách thực hiện điều này. Một cách tiếp cận tự nhiên là mở rộng lớp Stream trừu tượng để làm cho nó đơn giản hơn để mở rộng lưu lượng hiện có, nhưng điều này không đòi hỏi sự tham gia của các chuỗi phụ để gửi dữ liệu trong khi đồng thời nhận (thùng rò rỉ)? Bất kỳ gợi ý nào về các triển khai khác thực hiện tương tự sẽ được đánh giá cao.

Ngoài ra, mặc dù tôi có thể sửa đổi số lượng dữ liệu mà chương trình nhận được, việc điều chỉnh băng thông hoạt động như thế nào ở cấp C#? Liệu máy tính vẫn nhận được dữ liệu và chỉ đơn giản là lưu nó, có hiệu quả hủy bỏ hiệu ứng điều chỉnh hoặc nó sẽ đợi cho đến khi tôi yêu cầu để nhận được nhiều hơn?

CHỈNH SỬA: Tôi quan tâm đến việc điều chỉnh cả dữ liệu đến và đi, nơi tôi không có quyền kiểm soát đối với kết thúc ngược lại của luồng.

Trả lời

1

Tôi đã đưa ra một triển khai khác nhau về ThrottledStream-Class được đề cập bởi arul. Phiên bản của tôi sử dụng WaitHandle và Bộ hẹn giờ có khoảng thời gian 1 giây:

public ThrottledStream(Stream parentStream, int maxBytesPerSecond=int.MaxValue) 
{ 
    MaxBytesPerSecond = maxBytesPerSecond; 
    parent = parentStream; 
    processed = 0; 
    resettimer = new System.Timers.Timer(); 
    resettimer.Interval = 1000; 
    resettimer.Elapsed += resettimer_Elapsed; 
    resettimer.Start();   
} 

protected void Throttle(int bytes) 
{ 
    try 
    { 
     processed += bytes; 
     if (processed >= maxBytesPerSecond) 
      wh.WaitOne(); 
    } 
    catch 
    { 
    } 
} 

private void resettimer_Elapsed(object sender, ElapsedEventArgs e) 
{ 
    processed = 0; 
    wh.Set(); 
} 

Bất cứ khi nào giới hạn băng thông vượt quá Chủ đề sẽ ngủ cho đến khi giây tiếp theo bắt đầu. Không cần tính toán thời gian ngủ tối ưu.

Full Thực hiện:

public class ThrottledStream : Stream 
{ 
    #region Properties 

    private int maxBytesPerSecond; 
    /// <summary> 
    /// Number of Bytes that are allowed per second 
    /// </summary> 
    public int MaxBytesPerSecond 
    { 
     get { return maxBytesPerSecond; } 
     set 
     { 
      if (value < 1) 
       throw new ArgumentException("MaxBytesPerSecond has to be >0"); 

      maxBytesPerSecond = value; 
     } 
    } 

    #endregion 


    #region Private Members 

    private int processed; 
    System.Timers.Timer resettimer; 
    AutoResetEvent wh = new AutoResetEvent(true); 
    private Stream parent; 

    #endregion 

    /// <summary> 
    /// Creates a new Stream with Databandwith cap 
    /// </summary> 
    /// <param name="parentStream"></param> 
    /// <param name="maxBytesPerSecond"></param> 
    public ThrottledStream(Stream parentStream, int maxBytesPerSecond=int.MaxValue) 
    { 
     MaxBytesPerSecond = maxBytesPerSecond; 
     parent = parentStream; 
     processed = 0; 
     resettimer = new System.Timers.Timer(); 
     resettimer.Interval = 1000; 
     resettimer.Elapsed += resettimer_Elapsed; 
     resettimer.Start();   
    } 

    protected void Throttle(int bytes) 
    { 
     try 
     { 
      processed += bytes; 
      if (processed >= maxBytesPerSecond) 
       wh.WaitOne(); 
     } 
     catch 
     { 
     } 
    } 

    private void resettimer_Elapsed(object sender, ElapsedEventArgs e) 
    { 
     processed = 0; 
     wh.Set(); 
    } 

    #region Stream-Overrides 

    public override void Close() 
    { 
     resettimer.Stop(); 
     resettimer.Close(); 
     base.Close(); 
    } 
    protected override void Dispose(bool disposing) 
    { 
     resettimer.Dispose(); 
     base.Dispose(disposing); 
    } 

    public override bool CanRead 
    { 
     get { return parent.CanRead; } 
    } 

    public override bool CanSeek 
    { 
     get { return parent.CanSeek; } 
    } 

    public override bool CanWrite 
    { 
     get { return parent.CanWrite; } 
    } 

    public override void Flush() 
    { 
     parent.Flush(); 
    } 

    public override long Length 
    { 
     get { return parent.Length; } 
    } 

    public override long Position 
    { 
     get 
     { 
      return parent.Position; 
     } 
     set 
     { 
      parent.Position = value; 
     } 
    } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     Throttle(count); 
     return parent.Read(buffer, offset, count); 
    } 

    public override long Seek(long offset, SeekOrigin origin) 
    { 
     return parent.Seek(offset, origin); 
    } 

    public override void SetLength(long value) 
    { 
     parent.SetLength(value); 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     Throttle(count); 
     parent.Write(buffer, offset, count); 
    } 

    #endregion 


} 
+0

Nó sẽ trở nên chính xác hơn nếu bạn không đặt 'đã xử lý' thành' 0' khi bộ đếm thời gian được chọn, nhưng trừ 'maxBytesPerSecond' khỏi nó. –

1

Dựa trên @ giải pháp 0xDEADBEEF của tôi tạo ra như sau (kiểm chứng) giải pháp dựa trên Rx schedulers:

public class ThrottledStream : Stream 
{ 
    private readonly Stream parent; 
    private readonly int maxBytesPerSecond; 
    private readonly IScheduler scheduler; 
    private readonly IStopwatch stopwatch; 

    private long processed; 

    public ThrottledStream(Stream parent, int maxBytesPerSecond, IScheduler scheduler) 
    { 
     this.maxBytesPerSecond = maxBytesPerSecond; 
     this.parent = parent; 
     this.scheduler = scheduler; 
     stopwatch = scheduler.StartStopwatch(); 
     processed = 0; 
    } 

    public ThrottledStream(Stream parent, int maxBytesPerSecond) 
     : this (parent, maxBytesPerSecond, Scheduler.Immediate) 
    { 
    } 

    protected void Throttle(int bytes) 
    { 
     processed += bytes; 
     var targetTime = TimeSpan.FromSeconds((double)processed/maxBytesPerSecond); 
     var actualTime = stopwatch.Elapsed; 
     var sleep = targetTime - actualTime; 
     if (sleep > TimeSpan.Zero) 
     { 
      using (var waitHandle = new AutoResetEvent(initialState: false)) 
      { 
       scheduler.Sleep(sleep).GetAwaiter().OnCompleted(() => waitHandle.Set()); 
       waitHandle.WaitOne(); 
      } 
     } 
    } 

    public override bool CanRead 
    { 
     get { return parent.CanRead; } 
    } 

    public override bool CanSeek 
    { 
     get { return parent.CanSeek; } 
    } 

    public override bool CanWrite 
    { 
     get { return parent.CanWrite; } 
    } 

    public override void Flush() 
    { 
     parent.Flush(); 
    } 

    public override long Length 
    { 
     get { return parent.Length; } 
    } 

    public override long Position 
    { 
     get 
     { 
      return parent.Position; 
     } 
     set 
     { 
      parent.Position = value; 
     } 
    } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     var read = parent.Read(buffer, offset, count); 
     Throttle(read); 
     return read; 
    } 

    public override long Seek(long offset, SeekOrigin origin) 
    { 
     return parent.Seek(offset, origin); 
    } 

    public override void SetLength(long value) 
    { 
     parent.SetLength(value); 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     Throttle(count); 
     parent.Write(buffer, offset, count); 
    } 
} 

và một số xét nghiệm mà chỉ mất một phần nghìn giây:

[TestMethod] 
public void ShouldThrottleReading() 
{ 
    var content = Enumerable 
     .Range(0, 1024 * 1024) 
     .Select(_ => (byte)'a') 
     .ToArray(); 
    var scheduler = new TestScheduler(); 
    var source = new ThrottledStream(new MemoryStream(content), content.Length/8, scheduler); 
    var target = new MemoryStream(); 

    var t = source.CopyToAsync(target); 

    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(4).Ticks); 
    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks - 1); 
    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks); 
    t.Wait(10).Should().BeTrue(); 
} 

[TestMethod] 
public void ShouldThrottleWriting() 
{ 
    var content = Enumerable 
     .Range(0, 1024 * 1024) 
     .Select(_ => (byte)'a') 
     .ToArray(); 
    var scheduler = new TestScheduler(); 
    var source = new MemoryStream(content); 
    var target = new ThrottledStream(new MemoryStream(), content.Length/8, scheduler); 

    var t = source.CopyToAsync(target); 

    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(4).Ticks); 
    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks - 1); 
    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks); 
    t.Wait(10).Should().BeTrue(); 
} 
Các vấn đề liên quan