2014-10-10 14 views
5

Tôi cố gắng triển khai chương trình Người tiêu dùng sản xuất trong scala mà không sử dụng Hàng đợi. Bởi vì tôi nghĩ rằng diễn viên đã thực hiện "hàng đợi thư" hoặc cái gì khác, nó sẽ là dư thừa để viết mã một lần nữa.Cách chính xác để thực hiện Người tiêu dùng sản xuất trong scala

Tôi đã cố gắng viết chương trình trong Actor hoàn toàn. Dưới đây là chương trình nhiều người tiêu dùng nhiều nhà sản xuất. Nhà sản xuất ngủ trong một thời gian để mô phỏng làm điều gì đó. Người tiêu dùng không ngủ chút nào.

Tuy nhiên tôi không biết làm thế nào để tắt chương trình nếu tôi không thêm một diễn viên người giám sát người tiêu dùng, cũng như một đối tượng Promise cho việc sử dụng (lớp Supervisor trong code) "chờ đợi"

Is có anyway để thoát khỏi chúng?

import akka.actor.Actor.Receive 
import akka.actor._ 
import akka.routing._; 
import akka.util._ 

import scala.concurrent.{Await, Promise} 
import scala.concurrent.duration._ 

class Producer(val pool:ActorRef)(val name:String) extends Actor { 

    def receive = { 
    case _ => 
     while (true) { 
     val sleepTime = scala.util.Random.nextInt(1000) 
     Thread.sleep(sleepTime) 
     println("Producer %s send food" format name) 
     pool ! name 
     } 
    } 
} 

class Consumer(supervisor : ActorRef)(val name:String) extends Actor { 

    var counter = 0 

    def receive = { 
    case s => 
     counter += 1 
     println("%s eat food produced by %s" format (name,s)) 

     if (counter >= 10) { 
     println("%s is full" format name) 

     context.stop(self) 
     supervisor ! 1 
     } 
    } 
} 

class Supervisor(p:Promise[String]) extends Actor { 

    var r = 3 

    def receive = { 
    case _ => 
     r -= 1 
     if (0 == r) { 
     println("All consumer stopped") 
     context.stop(self) 
     p success ("Good") 
     } 
    } 

} 

object Try3 { 

    def work(): Unit = { 
    val system = ActorSystem("sys1") 
    val nProducer = 5; 
    val nConsumer = 3; 
    val p = Promise[String] 
    val supervisor = system.actorOf(Props(new Supervisor(p))); 
    val arrConsumer = for (i <- 1 to nConsumer) yield system.actorOf(Props(new Consumer(supervisor)("Consumer %d" format (i)))) 
    val poolConsumer = system.actorOf(Props.empty.withRouter(RoundRobinRouter(arrConsumer))) 
    val arrProducer = for (i <- 1 to nProducer) yield system.actorOf(Props(new Producer(poolConsumer)("Producer %d" format (i)))) 

    arrProducer foreach (_ ! "start") 

    Await.result(p.future,Duration.Inf) 
    println("great!") 
    system.shutdown 
    } 

    def main(args:Array[String]): Unit = { 
    work() 
    } 
} 

Chức năng nhận Lớp sản xuất có vấn đề sẽ không bị tắt vì trong khi không có điều kiện ngắt.

Cách duy nhất tôi có thể nghĩ là "gửi tin nhắn đến chính nhà sản xuất". Tôi tự hỏi đó có phải là cách thông thường để thực hiện loại yêu cầu này không?

Đây là mã chỉnh sửa:

class Producer(val pool:ActorRef)(val name:String) extends Actor { 

    // original implementation: 
    // def receive = { 
    // case _ => 
    // while (true){ 
    //  val sleepTime = scala.util.Random.nextInt(1000) 
    //  Thread.sleep(sleepTime) 
    //  println("Producer %s send food" format name) 
    //  pool ! name 
    // } 
    // } 

    case object Loop; 

    def receive = { 
    case _ => 
     val sleepTime = scala.util.Random.nextInt(1000) 
     Thread.sleep(sleepTime) 
     println("Producer %s send food" format name) 
     pool ! name 
     self ! Loop //send message to itself 
    } 
} 

Bất kể thực hiện của tôi, cách chính xác để thực hiện chương trình tiêu dùng sản xuất trong scala là gì, với diễn viên hoặc tương lai/Promise?

Trả lời

2

Bạn không bao giờ nên chặn (trong trường hợp của bạn Thread.sleep, trong khi vòng lặp) bên trong một diễn viên. Chặn bên trong một diễn viên lồng một chuỗi từ một nhóm luồng được sử dụng trong tất cả các diễn viên. Ngay cả một số lượng nhỏ các nhà sản xuất như của bạn sẽ làm cho tất cả các diễn viên trong ActorSystem bị tước đoạt từ các chủ đề và khiến họ không sử dụng được.

Thay vào đó, hãy sử dụng Scheduler để lên lịch gửi một bản gửi định kỳ trong Nhà sản xuất của bạn.

override def preStart(): Unit = { 
    import scala.concurrent.duration._ 
    import context.dispatcher 
    context.system.scheduler.schedule(
    initialDelay = 0.seconds, 
    interval = 1.second, 
    receiver = pool, 
    message = name 
) 
} 
+1

Cảm ơn bạn @Martynas. Bạn đã giải quyết vấn đề "Vòng lặp" của tôi. Tôi vẫn đang tìm kiếm câu trả lời cho việc thực hiện thanh lịch của Nhà sản xuất-Người tiêu dùng. – worldterminator

0

Bạn nghĩ gì về việc thực hiện Terminator diễn viên :)

object Terminator { 
    case class WatchMe(ref: ActorRef) 
} 
class Terminator extends Actor { 
    var consumers: Map[ActorRef, ActorRef] = Map() 

    def receive = { 
     case WatchMe(ref) => { 
     consumers += ref -> ref 
     context.watch(ref) 
     } 
     case Terminated(ref) => { 
     context.unwatch(ref) 
     consumers.get(ref).foreach { ref -> ref ! PoisonPill } 
     consumers -= ref 
     //If all consumers are dead stop.self and delegate NoConsumers message higher in hierarchy 
     if(consumers.size == 0) { 
      delegate() 
      context.stop(self) 
     } 
     } 
    } 
} 
Các vấn đề liên quan