2011-01-12 29 views
25

Tôi có một người biết lắng nghe:Multi-threading với Net HttpListener

listener = new HttpListener(); 
listener.Prefixes.Add(@"http://+:8077/"); 
listener.Start(); 
listenerThread = new Thread(HandleRequests); 
listenerThread.Start(); 

Và tôi yêu cầu xử lý:

private void HandleRequests() 
{ 
    while (listener.IsListening) 
    { 
     var context = listener.BeginGetContext(new AsyncCallback(ListenerCallback), listener); 
     context.AsyncWaitHandle.WaitOne(); 
    } 
} 

private void ListenerCallback(IAsyncResult ar) 
{ 
    var listener = ar.AsyncState as HttpListener; 

    var context = listener.EndGetContext(ar); 

    //do some stuff 
} 

Tôi muốn viết void Stop() theo cách như vậy, rằng:

  1. Nó sẽ chặn cho đến khi tất cả các yêu cầu hiện được xử lý sẽ kết thúc (nghĩa là sẽ chờ tất cả các chuỗi "làm một số nội dung").
  2. Trong khi chờ đợi các yêu cầu đã bắt đầu, nó sẽ không cho phép thêm bất kỳ yêu cầu nào (ví dụ: trả lại vào đầu ListenerCallback).
  3. Sau đó, nó sẽ gọi listener.Stop() (listener.IsListening trở thành sai).

Làm cách nào để ghi?

EDIT: Bạn nghĩ gì về giải pháp này? Nó có an toàn không?

public void Stop() 
{ 
    lock (this) 
    { 
     isStopping = true; 
    } 
    resetEvent.WaitOne(); //initially set to true 
    listener.Stop(); 
} 

private void ListenerCallback(IAsyncResult ar) 
{ 
    lock (this) 
    { 
     if (isStopping) 
      return; 

     resetEvent.Reset(); 
     numberOfRequests++; 
    } 

    var listener = ar.AsyncState as HttpListener; 

    var context = listener.EndGetContext(ar); 

    //do some stuff 

    lock (this) 
    { 
     if (--numberOfRequests == 0) 
      resetEvent.Set(); 
    } 
} 

Trả lời

2

Tôi đã tham khảo ý kiến ​​mã của tôi trong EDIT một phần của câu hỏi của tôi và tôi đã quyết định chấp nhận nó với một số sửa đổi:

public void Stop() 
{ 
    lock (locker) 
    { 
     isStopping = true; 
    } 
    resetEvent.WaitOne(); //initially set to true 
    listener.Stop(); 
} 

private void ListenerCallback(IAsyncResult ar) 
{ 
    lock (locker) //locking on this is a bad idea, but I forget about it before 
    { 
     if (isStopping) 
      return; 

     resetEvent.Reset(); 
     numberOfRequests++; 
    } 

    try 
    { 
     var listener = ar.AsyncState as HttpListener; 

     var context = listener.EndGetContext(ar); 

     //do some stuff 
    } 
    finally //to make sure that bellow code will be executed 
    { 
     lock (locker) 
     { 
      if (--numberOfRequests == 0) 
       resetEvent.Set(); 
     } 
    } 
} 
0

Gọi đơn giản là listener.Stop() nên thực hiện thủ thuật. Điều này sẽ không chấm dứt bất kỳ kết nối nào đã được thiết lập nhưng sẽ ngăn chặn bất kỳ kết nối mới nào.

+1

Điều này không đúng. Nếu bạn gọi 'listener.Stop()' trong khi thực hiện 'ListenerCallback', bạn sẽ nhận được một ngoại lệ ví dụ. khi gọi 'EndGetContext' hoặc thậm chí sau, khi sử dụng luồng đầu ra. Tôi có thể bắt được ngoại lệ tất nhiên, nhưng tôi không muốn. – prostynick

+0

Trong mã của tôi, tôi sử dụng một lá cờ và không đề cập đến người nghe nữa sau khi đã gọi dừng lại trên nó, nhưng đóng trình lắng nghe không đóng các kết nối đã được chấp nhận, chỉ là người nghe. –

+0

Tôi không biết ý bạn là gì khi nói "Tôi sử dụng cờ". Vấn đề là, trong 'ListenerCallback' tôi đang sử dụng listener và nếu một thread khác đóng nó, trong khi tôi đang sử dụng nó, tôi sẽ kết thúc với các ngoại lệ, mà tôi đã đề cập. – prostynick

4

Cũng có một số cách để giải quyết vấn đề này ... Đây là ví dụ đơn giản sử dụng semaphore để theo dõi công việc đang diễn ra và tín hiệu được nâng lên khi tất cả công nhân kết thúc. Điều này sẽ cung cấp cho bạn một ý tưởng cơ bản để làm việc.

Giải pháp dưới đây không phải là lý tưởng, lý tưởng nhất là chúng ta nên có được semaphore trước khi gọi BeginGetContext. Điều đó làm cho việc tắt máy trở nên khó khăn hơn, vì vậy tôi đã chọn sử dụng cách tiếp cận đơn giản hơn này. Nếu tôi đã làm điều này cho 'thực' tôi có thể viết quản lý thread của riêng tôi hơn là dựa vào ThreadPool. Điều này sẽ cho phép tắt máy đáng tin cậy hơn.

Dù sao đây là ví dụ hoàn chỉnh:

class TestHttp 
{ 
    static void Main() 
    { 
     using (HttpServer srvr = new HttpServer(5)) 
     { 
      srvr.Start(8085); 
      Console.WriteLine("Press [Enter] to quit."); 
      Console.ReadLine(); 
     } 
    } 
} 


class HttpServer : IDisposable 
{ 
    private readonly int _maxThreads; 
    private readonly HttpListener _listener; 
    private readonly Thread _listenerThread; 
    private readonly ManualResetEvent _stop, _idle; 
    private readonly Semaphore _busy; 

    public HttpServer(int maxThreads) 
    { 
     _maxThreads = maxThreads; 
     _stop = new ManualResetEvent(false); 
     _idle = new ManualResetEvent(false); 
     _busy = new Semaphore(maxThreads, maxThreads); 
     _listener = new HttpListener(); 
     _listenerThread = new Thread(HandleRequests); 
    } 

    public void Start(int port) 
    { 
     _listener.Prefixes.Add(String.Format(@"http://+:{0}/", port)); 
     _listener.Start(); 
     _listenerThread.Start(); 
    } 

    public void Dispose() 
    { Stop(); } 

    public void Stop() 
    { 
     _stop.Set(); 
     _listenerThread.Join(); 
     _idle.Reset(); 

     //aquire and release the semaphore to see if anyone is running, wait for idle if they are. 
     _busy.WaitOne(); 
     if(_maxThreads != 1 + _busy.Release()) 
      _idle.WaitOne(); 

     _listener.Stop(); 
    } 

    private void HandleRequests() 
    { 
     while (_listener.IsListening) 
     { 
      var context = _listener.BeginGetContext(ListenerCallback, null); 

      if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle })) 
       return; 
     } 
    } 

    private void ListenerCallback(IAsyncResult ar) 
    { 
     _busy.WaitOne(); 
     try 
     { 
      HttpListenerContext context; 
      try 
      { context = _listener.EndGetContext(ar); } 
      catch (HttpListenerException) 
      { return; } 

      if (_stop.WaitOne(0, false)) 
       return; 

      Console.WriteLine("{0} {1}", context.Request.HttpMethod, context.Request.RawUrl); 
      context.Response.SendChunked = true; 
      using (TextWriter tw = new StreamWriter(context.Response.OutputStream)) 
      { 
       tw.WriteLine("<html><body><h1>Hello World</h1>"); 
       for (int i = 0; i < 5; i++) 
       { 
        tw.WriteLine("<p>{0} @ {1}</p>", i, DateTime.Now); 
        tw.Flush(); 
        Thread.Sleep(1000); 
       } 
       tw.WriteLine("</body></html>"); 
      } 
     } 
     finally 
     { 
      if (_maxThreads == 1 + _busy.Release()) 
       _idle.Set(); 
     } 
    } 
} 
56

Để hoàn chỉnh, đây là những gì nó sẽ trông như thế nào nếu bạn quản lý đề người lao động của riêng bạn:

class HttpServer : IDisposable 
{ 
    private readonly HttpListener _listener; 
    private readonly Thread _listenerThread; 
    private readonly Thread[] _workers; 
    private readonly ManualResetEvent _stop, _ready; 
    private Queue<HttpListenerContext> _queue; 

    public HttpServer(int maxThreads) 
    { 
     _workers = new Thread[maxThreads]; 
     _queue = new Queue<HttpListenerContext>(); 
     _stop = new ManualResetEvent(false); 
     _ready = new ManualResetEvent(false); 
     _listener = new HttpListener(); 
     _listenerThread = new Thread(HandleRequests); 
    } 

    public void Start(int port) 
    { 
     _listener.Prefixes.Add(String.Format(@"http://+:{0}/", port)); 
     _listener.Start(); 
     _listenerThread.Start(); 

     for (int i = 0; i < _workers.Length; i++) 
     { 
      _workers[i] = new Thread(Worker); 
      _workers[i].Start(); 
     } 
    } 

    public void Dispose() 
    { Stop(); } 

    public void Stop() 
    { 
     _stop.Set(); 
     _listenerThread.Join(); 
     foreach (Thread worker in _workers) 
      worker.Join(); 
     _listener.Stop(); 
    } 

    private void HandleRequests() 
    { 
     while (_listener.IsListening) 
     { 
      var context = _listener.BeginGetContext(ContextReady, null); 

      if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle })) 
       return; 
     } 
    } 

    private void ContextReady(IAsyncResult ar) 
    { 
     try 
     { 
      lock (_queue) 
      { 
       _queue.Enqueue(_listener.EndGetContext(ar)); 
       _ready.Set(); 
      } 
     } 
     catch { return; } 
    } 

    private void Worker() 
    { 
     WaitHandle[] wait = new[] { _ready, _stop }; 
     while (0 == WaitHandle.WaitAny(wait)) 
     { 
      HttpListenerContext context; 
      lock (_queue) 
      { 
       if (_queue.Count > 0) 
        context = _queue.Dequeue(); 
       else 
       { 
        _ready.Reset(); 
        continue; 
       } 
      } 

      try { ProcessRequest(context); } 
      catch (Exception e) { Console.Error.WriteLine(e); } 
     } 
    } 

    public event Action<HttpListenerContext> ProcessRequest; 
} 
+0

Điều này thật tuyệt vời - nó đóng vai trò như một ứng viên tuyệt vời để kiểm tra thông lượng HttpListener chống lại. – Jonno

+0

Cảm ơn bạn rất nhiều vì đoạn mã đó! Có hai vấn đề nhỏ: 1. ProcessRequest có thể là null 2. HttpListenerContext không phải là luồng an toàn trừ khi nó là tĩnh –

+0

@MartinMeeser cảm ơn cho nhận xét. cho 1. thay vì gói nó trong try catch block, chúng ta có thể sử dụng 'ProcessRequest? .Invoke (context);'. Đối với 2. Tuy nhiên, nếu tĩnh không phải là một lựa chọn những gì bạn đề nghị? – JohnTube

0

Điều này sử dụng hàng đợi đã nhập BlockingCollection cho các yêu cầu dịch vụ. Nó có thể sử dụng được. Bạn nên lấy được một lớp từ câu trả lời này và ghi đè lên.

using System; 
using System.Collections.Concurrent; 
using System.Net; 
using System.Text; 
using System.Threading; 

namespace Service 
{ 
    class HttpServer : IDisposable 
    { 
     private HttpListener httpListener; 
     private Thread listenerLoop; 
     private Thread[] requestProcessors; 
     private BlockingCollection<HttpListenerContext> messages; 

     public HttpServer(int threadCount) 
     { 
      requestProcessors = new Thread[threadCount]; 
      messages = new BlockingCollection<HttpListenerContext>(); 
      httpListener = new HttpListener(); 
     } 

     public virtual int Port { get; set; } = 80; 

     public virtual string[] Prefixes 
     { 
      get { return new string[] {string.Format(@"http://+:{0}/", Port)}; } 
     } 

     public void Start(int port) 
     { 
      listenerLoop = new Thread(HandleRequests); 

      foreach(string prefix in Prefixes) httpListener.Prefixes.Add(prefix); 

      listenerLoop.Start(); 

      for (int i = 0; i < requestProcessors.Length; i++) 
      { 
       requestProcessors[i] = StartProcessor(i, messages); 
      } 
     } 

     public void Dispose() { Stop(); } 

     public void Stop() 
     { 
      messages.CompleteAdding(); 

      foreach (Thread worker in requestProcessors) worker.Join(); 

      httpListener.Stop(); 
      listenerLoop.Join(); 
     } 

     private void HandleRequests() 
     { 
      httpListener.Start(); 
      try 
      { 
       while (httpListener.IsListening) 
       { 
        Console.WriteLine("The Linstener Is Listening!"); 
        HttpListenerContext context = httpListener.GetContext(); 

        messages.Add(context); 
        Console.WriteLine("The Linstener has added a message!"); 
       } 
      } 
      catch(Exception e) 
      { 
       Console.WriteLine (e.Message); 
      } 
     } 

     private Thread StartProcessor(int number, BlockingCollection<HttpListenerContext> messages) 
     { 
      Thread thread = new Thread(() => Processor(number, messages)); 
      thread.Start(); 
      return thread; 
     } 

     private void Processor(int number, BlockingCollection<HttpListenerContext> messages) 
     { 
      Console.WriteLine ("Processor {0} started.", number); 
      try 
      { 
       for (;;) 
       { 
        Console.WriteLine ("Processor {0} awoken.", number); 
        HttpListenerContext context = messages.Take(); 
        Console.WriteLine ("Processor {0} dequeued message.", number); 
        Response (context); 
       } 
      } catch { } 

      Console.WriteLine ("Processor {0} terminated.", number); 
     } 

     public virtual void Response(HttpListenerContext context) 
     { 
      SendReply(context, new StringBuilder("<html><head><title>NULL</title></head><body>This site not yet implementd.</body></html>")); 
     } 

     public static void SendReply(HttpListenerContext context, StringBuilder responseString) 
     { 
      byte[] buffer = System.Text.Encoding.UTF8.GetBytes(responseString.ToString()); 
      context.Response.ContentLength64 = buffer.Length; 
      System.IO.Stream output = context.Response.OutputStream; 
      output.Write(buffer, 0, buffer.Length); 
      output.Close(); 
     } 
    } 
} 

Đây là ví dụ về cách sử dụng. Không cần sử dụng các sự kiện hoặc bất kỳ khối khóa nào. BlockingCollection giải quyết tất cả những vấn đề này.

using System; 
using System.Collections.Concurrent; 
using System.IO; 
using System.Net; 
using System.Text; 
using System.Threading; 

namespace Service 
{ 
    class Server 
    { 
    public static void Main (string[] args) 
    { 
     HttpServer Service = new QuizzServer (8); 
     Service.Start (80); 
     for (bool coninute = true; coninute ;) 
     { 
      string input = Console.ReadLine().ToLower(); 
      switch (input) 
      { 
       case "stop": 
        Console.WriteLine ("Stop command accepted."); 
        Service.Stop(); 
        coninute = false; 
        break; 
       default: 
        Console.WriteLine ("Unknown Command: '{0}'.",input); 
        break; 
      } 
     } 
    } 
    } 
} 
Các vấn đề liên quan