2012-03-22 39 views
6

Tôi đã viết một ứng dụng Scala (2.9.1-1) cần xử lý vài triệu hàng từ một truy vấn cơ sở dữ liệu. Tôi đang chuyển đổi ResultSet đến một Stream sử dụng kỹ thuật thể hiện trong câu trả lời cho một trong những previous questions tôi:Mức tiêu thụ bộ nhớ của một Scala Stream song song

class Record(...) 

val resultSet = statement.executeQuery(...) 

new Iterator[Record] { 
    def hasNext = resultSet.next() 
    def next = new Record(resultSet.getString(1), resultSet.getInt(2), ...) 
}.toStream.foreach { record => ... } 

và điều này đã làm việc rất tốt.

Kể từ khi cơ thể của foreach đóng cửa là rất CPU chuyên sâu, và như là một minh chứng cho tính thực tiễn của lập trình chức năng, nếu tôi thêm một .par trước foreach, việc đóng cửa được chạy song song với không có nỗ lực khác, ngoại trừ việc làm chắc chắn rằng phần thân của bao đóng là chỉ an toàn (nó được viết theo một kiểu chức năng không có dữ liệu có thể thay đổi ngoại trừ việc in tới nhật ký an toàn chỉ).

Tuy nhiên, tôi lo lắng về mức tiêu thụ bộ nhớ. Là .par gây ra toàn bộ kết quả được đặt để tải trong RAM, hoặc không hoạt động song song tải chỉ như nhiều hàng vì nó có chủ đề hoạt động? Tôi đã phân bổ 4G cho JVM (64-bit với -Xmx4g) nhưng trong tương lai tôi sẽ chạy nó trên nhiều hàng hơn và lo lắng rằng cuối cùng tôi sẽ có được một bộ nhớ ngoài.

Có mô hình nào tốt hơn để thực hiện loại xử lý song song này theo cách chức năng không? Tôi đã hiển thị ứng dụng này cho đồng nghiệp của tôi như là một ví dụ về giá trị của lập trình chức năng và máy đa lõi.

+0

Chỉ courious. DBMS gì bạn đang sử dụng, và những gì Scala DB API để truy vấn nó? – santiagobasulto

+0

Tôi đang truy cập cơ sở dữ liệu Microsoft SQL Server 2012 chạy trên Windows Server 2008 R2 bằng trình điều khiển JDBC của Microsoft (http://msdn.microsoft.com/en-us/sqlserver/aa937724). – Ralph

Trả lời

4

Nếu bạn nhìn vào scaladoc of Stream, bạn sẽ nhận thấy rằng các lớp định nghĩa của parParallelizable đặc điểm ... và, nếu bạn nhìn vào source code of this trait, bạn sẽ nhận thấy rằng phải mất mỗi phần tử từ bộ sưu tập độc đáo và đặt chúng thành một bộ kết hợp, do đó, bạn sẽ được tải mỗi hàng vào một ParSeq:

def par: ParRepr = { 
    val cb = parCombiner 
    for (x <- seq) cb += x 
    cb.result 
    } 

    /** The default `par` implementation uses the combiner provided by this method 
    * to create a new parallel collection. 
    * 
    * @return a combiner for the parallel collection of type `ParRepr` 
    */ 
    protected[this] def parCombiner: Combiner[A, ParRepr] 

một giải pháp khả thi là để parallelize một cách rõ ràng tính toán của bạn, nhờ diễn viên chẳng hạn. Bạn có thể xem this example từ tài liệu akka chẳng hạn, điều đó có thể hữu ích trong ngữ cảnh của bạn.

+0

Tôi sợ điều đó. Tôi nghĩ đến việc bắn lên một tập hợp các luồng và sau đó có mỗi hàng kéo từ tập hợp kết quả (đồng bộ), nhưng nó không giống như một giải pháp rất chức năng. – Ralph

+0

Yêu cầu một diễn viên bọc truy vấn và sinh ra một Router với một Resizer mà bạn hướng dẫn để kéo khối. –

-1

Các akka stream thư viện mới là việc sửa chữa bạn đang tìm kiếm:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Source, Sink} 

def iterFromQuery() : Iterator[Record] = { 
    val resultSet = statement.executeQuery(...) 
    new Iterator[Record] { 
    def hasNext = resultSet.next() 
    def next = new Record(...) 
    } 
} 

def cpuIntensiveFunction(record : Record) = { 
... 
} 

implicit val actorSystem = ActorSystem() 
implicit val materializer = ActorMaterializer() 
implicit val execContext = actorSystem.dispatcher 

val poolSize = 10 //number of Records in memory at once 

val stream = 
    Source(iterFromQuery).runWith(Sink.foreachParallel(poolSize)(cpuIntensiveFunction)) 

stream onComplete {_ => actorSystem.shutdown()} 
Các vấn đề liên quan