2009-06-17 30 views
8

in my own answer to my own question, tôi có tình huống theo đó tôi đang xử lý một số lượng lớn sự kiện đến trên hàng đợi. Mỗi sự kiện được xử lý chính xác theo cùng một cách và mỗi sự kiện có thể được xử lý độc lập với tất cả các sự kiện khác.Xử lý đồng thời trong Scala

Chương trình của tôi tận dụng khung công tác đồng thời Scala và nhiều quy trình liên quan được mô hình hóa là Actor s. Vì Actor xử lý thông điệp của chúng theo tuần tự, chúng không phù hợp với vấn đề cụ thể này (mặc dù các diễn viên khác đang thực hiện các hành động mà tuần tự). Như tôi đã muốn Scala để "kiểm soát" mọi tạo vật chủ đề (mà tôi giả định là điểm của nó có một hệ thống đồng thời ở nơi đầu tiên) có vẻ như tôi có 2 lựa chọn:

  1. Gửi các sự kiện vào một hồ bơi của sự kiện bộ vi xử lý, mà tôi kiểm soát
  2. được Actor tôi để xử lý chúng đồng thời bởi một số cơ chế khác

tôi đã có thể nghĩ rằng # 1 phủ nhận quan điểm của việc sử dụng các hệ thống phụ diễn viên: bao nhiêu diễn viên xử lý nên tôi tạo không? là một câu hỏi rõ ràng. Những điều này được cho là ẩn từ tôi và được giải quyết bởi hệ thống con.

câu trả lời của tôi là phải làm như sau:

val eventProcessor = actor { 
    loop { 
    react { 
     case MyEvent(x) => 
     //I want to be able to handle multiple events at the same time 
     //create a new actor to handle it 
     actor { 
      //processing code here 
      process(x) 
     } 
    } 
    } 
} 

Có một cách tiếp cận tốt hơn? Điều này có đúng không?

chỉnh sửa: Một cách tiếp cận có thể tốt hơn là:

val eventProcessor = actor { 
    loop { 
    react { 
     case MyEvent(x) => 
     //Pass processing to the underlying ForkJoin framework 
     Scheduler.execute(process(e)) 
    } 
    } 
} 
+0

Mặc dù các diễn viên dường như không hỗ trợ trực tiếp một nhóm công nhân, nhưng điều này rất hữu ích trong việc vạch trần sự thiếu hụt này. Tất cả tài liệu có sẵn cho tôi, không đề cập đến điều này một cách rõ ràng. – ePharaoh

Trả lời

8

Điều này có vẻ giống như một bản sao của một câu hỏi khác. Vì vậy, tôi sẽ sao chép câu trả lời của tôi

Các diễn viên xử lý một thông báo cùng một lúc. Các mô hình cổ điển để xử lý nhiều thông điệp là có một diễn viên điều phối viên phía trước cho một nhóm các diễn viên tiêu dùng. Nếu bạn sử dụng phản ứng thì hồ bơi tiêu dùng có thể lớn nhưng sẽ chỉ sử dụng một số lượng nhỏ các luồng JVM. Đây là một ví dụ mà tôi tạo ra một nhóm gồm 10 người tiêu dùng và một điều phối viên ở phía trước cho họ.

import scala.actors.Actor 
import scala.actors.Actor._ 

case class Request(sender : Actor, payload : String) 
case class Ready(sender : Actor) 
case class Result(result : String) 
case object Stop 

def consumer(n : Int) = actor { 
    loop { 
    react { 
     case Ready(sender) => 
     sender ! Ready(self) 
     case Request(sender, payload) => 
     println("request to consumer " + n + " with " + payload) 
     // some silly computation so the process takes awhile 
     val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString 
     sender ! Result(result) 
     println("consumer " + n + " is done processing " + result) 
     case Stop => exit 
    } 
    } 
} 

// a pool of 10 consumers 
val consumers = for (n <- 0 to 10) yield consumer(n) 

val coordinator = actor { 
    loop { 
    react { 
     case msg @ Request(sender, payload) => 
      consumers foreach {_ ! Ready(self)} 
      react { 
       // send the request to the first available consumer 
       case Ready(consumer) => consumer ! msg 
      } 
     case Stop => 
      consumers foreach {_ ! Stop} 
      exit 
    } 
    } 
} 

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop 
for (i <- 0 to 1000) coordinator ! Request(self, i.toString) 

Mã này kiểm tra xem người tiêu dùng nào có sẵn và gửi yêu cầu đến người tiêu dùng đó. Các giải pháp thay thế chỉ được gán ngẫu nhiên cho người tiêu dùng hoặc sử dụng một bộ lập lịch vòng tròn.

Tùy thuộc vào những gì bạn đang làm, bạn có thể được phục vụ tốt hơn với tương lai của Scala. Ví dụ: nếu bạn không thực sự cần diễn viên thì tất cả các máy móc trên có thể được viết là

import scala.actors.Futures._ 

def transform(payload : String) = {  
    val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString 
    println("transformed " + payload + " to " + result) 
    result 
} 

val results = for (i <- 0 to 1000) yield future(transform(i.toString)) 
+0

Cảm ơn - Tôi không biết rằng bạn có thể gọi các nhiệm vụ trên cùng một lịch trình mà khung công tác diễn viên sử dụng. Tôi nghĩ rằng cách tiếp cận tốt nhất là do đó sử dụng Scheduler.execute (process (e)) –

+0

Ngoài ra - có; đó là một câu hỏi rất giống nhau (mà tôi liên kết đến) nhưng không hoàn toàn giống nhau. Câu hỏi đầu tiên là "là diễn viên tuần tự?" trong khi câu hỏi thứ hai là "Là diễn viên là tuần tự, làm thế nào để làm X" –

+0

Ngẫu nhiên: '0 đến 10' chứa 11 phần tử, không phải 10. –

3

Nếu các sự kiện đều có thể được xử lý một cách độc lập, tại sao họ vào một hàng đợi? Không biết gì về thiết kế của bạn, điều này có vẻ như một bước không cần thiết. Nếu bạn có thể soạn hàm process với bất kỳ thứ gì đang kích hoạt các sự kiện đó, bạn có thể có khả năng làm giảm hàng đợi.

Tác nhân cơ bản là hiệu ứng đồng thời được trang bị hàng đợi. Nếu bạn muốn xử lý đồng thời nhiều thư, bạn không thực sự muốn một diễn viên. Bạn chỉ muốn một hàm (Any =>()) được lên lịch để thực hiện vào một thời điểm thuận tiện nào đó.

Có nói rằng, cách tiếp cận của bạn là hợp lý nếu bạn muốn ở trong thư viện tác nhân và nếu hàng đợi sự kiện không nằm trong tầm kiểm soát của bạn.

Scalaz tạo sự khác biệt giữa Diễn viên và Hiệu ứng đồng thời. Trong khi Actor của nó rất nhẹ, scalaz.concurrent.Effect vẫn sáng hơn. Đây là mã của bạn gần như được dịch sang thư viện Scalaz:

val eventProcessor = effect (x => process x) 

Đây là đầu thân mới nhất chưa được tiết lộ.

+0

Cảm ơn! Họ đang ở trên một "hàng đợi" hoàn toàn bởi vì tôi đang gửi chúng đến một diễn viên và một diễn viên có một hàng đợi, mà nó xử lý tuần tự. Vì thư viện tác nhân là cách tôi đang _supposed_ xử lý đồng thời (*) ở Scala, tôi đang cố gắng sử dụng nó. Nếu không thì tôi chỉ sử dụng ExecutorService.invokeAll. –

+0

Xem thêm nhận xét của tôi về jschen ở trên. Tôi đã viết mã đồng thời trong Java trong một thời gian dài và cố gắng tìm ranh giới chính xác giữa việc sử dụng các diễn viên và, um, không sử dụng các diễn viên trong một chương trình scala được dự kiến ​​là đồng thời. –

+1

Diễn viên không phải là thuốc chữa bách bệnh, và không có gì nói rằng bạn phải sử dụng diễn viên nếu bạn muốn đồng thời trong Scala. Nó chỉ là một thư viện và, theo ý kiến ​​của tôi, một thư viện quá phức tạp. – Apocalisp

1

Điều này nghe giống như một vấn đề người tiêu dùng/nhà sản xuất đơn giản. Tôi muốn sử dụng hàng đợi với một nhóm người tiêu dùng. Bạn có lẽ có thể viết điều này với một vài dòng mã bằng cách sử dụng java.util.concurrent.

+0

Toàn bộ điểm sử dụng thư viện diễn viên scala là nó có thể ánh xạ mã của bạn tốt hơn (được viết bằng cách sử dụng các diễn viên) lên đồng thời có sẵn trong môi trường hoạt động hiện tại. Vì vậy, nếu Scala nghĩ rằng nó có 4 bộ vi xử lý, có lẽ nó sẽ tạo ra một hồ bơi chủ đề sao lưu cho các diễn viên của nó với 4 công nhân. Tôi không có gì bằng cách tạo ra hồ bơi thread riêng của riêng tôi để thực thi công việc này - tất cả những gì tôi sẽ kết thúc là một tải ngữ cảnh không cần thiết. Tôi hoàn toàn nhận thức được cách giải quyết vấn đề này bằng Java - Tôi đang hỏi về cách giải quyết nó bằng thư viện diễn viên Scala, do đó các thẻ. –

+0

Ít nhất tôi cho rằng đó là toàn bộ vấn đề khi sử dụng nó: -/ –

+0

Xin lỗi, tôi đã không nhận ra đây là một bài tập học thuật với các diễn viên. Tôi nghĩ bạn muốn có một giải pháp tốt cho vấn đề này. "Vì vậy, nếu Scala nghĩ rằng nó có 4 bộ vi xử lý, có lẽ nó sẽ tạo ra một hồ bơi chủ đề sao lưu cho các diễn viên của mình với 4 công nhân." Đây có thể là hai dòng mã sử dụng java.util.concurrent mà bạn có thể dễ dàng sử dụng từ scala. Tôi sử dụng nó từ jruby tất cả các thời gian. – jshen

1

Mục đích của một diễn viên (tốt, một trong số họ) là để đảm bảo rằng trạng thái trong diễn viên chỉ có thể được truy cập bởi một chuỗi đơn tại một thời điểm. Nếu việc xử lý thư không phụ thuộc vào bất kỳ trạng thái có thể thay đổi nào trong diễn viên, thì có thể sẽ thích hợp hơn khi chỉ gửi một tác vụ lên lịch biểu hoặc một nhóm luồng để xử lý. Sự trừu tượng thêm mà nam diễn viên cung cấp thực sự đang cản trở bạn.

Có các phương thức thuận tiện trong scala.actors.Scheduler cho điều này, hoặc bạn có thể sử dụng một Executor từ java.util.concurrent.

1

Tác nhân nhẹ hơn chủ đề và tùy chọn khác là sử dụng đối tượng diễn viên như đối tượng Runnable bạn được sử dụng để gửi đến một Thread Pool. Sự khác biệt chính là bạn không cần phải lo lắng về ThreadPool - nhóm thread được quản lý cho bạn bởi khung công tác diễn viên và chủ yếu là một mối quan tâm về cấu hình.

def submit(e: MyEvent) = actor { 
    // no loop - the actor exits immediately after processing the first message 
    react { 
    case MyEvent(x) => 
     process(x) 
    } 
} ! e // immediately send the new actor a message 

Sau đó nộp một tin nhắn, nói điều này:

submit(new MyEvent(x)) 

, tương ứng với

eventProcessor ! new MyEvent(x) 

từ câu hỏi của bạn.

Đã thử nghiệm mẫu này thành công với 1 triệu tin nhắn được gửi và nhận trong khoảng 10 giây trên máy tính xách tay i7 lõi ​​tứ.

Hy vọng điều này sẽ hữu ích.

Các vấn đề liên quan