2013-04-26 36 views
5

Tôi đang cố gắng sử dụng gói iterate iteratee để xử lý một tệp zip lớn trong không gian cố định. Tôi có một quy trình chạy dài mà tôi cần thực hiện trên mỗi tệp trong tệp zip. Những quy trình có thể (và nên) được chạy song song.Scalaz 7 Iteratee để xử lý tệp zip lớn (OutOfMemoryError)

Tôi đã tạo một EnumeratorT làm tăng mỗi ZipEntry thành đối tượng File. Chữ ký trông giống như:

def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO] 

Tôi muốn đính kèm một IterateeT sẽ thực hiện quy trình chạy dài trên mỗi tệp. Tôi về cơ bản kết thúc với một cái gì đó như:

type IOE[A] = IoExceptionOr[A] 

def action(f:File):IO[List[Promise[IOE[File]]]] = (
    consume[Promise[IOE[File]], IO, List] %= 
    map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %= 
    map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &= 
    enumZipFile(f) 
).run 

def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] = 
    Promise { Thread.sleep(5000); iof } 

Khi tôi cố gắng chạy nó:

action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get 

tôi nhận được một thông điệp java.lang.OutOfMemoryError: Java heap space. Điều đó có ý nghĩa với tôi, vì nó đang cố gắng xây dựng một danh sách khổng lồ trong bộ nhớ của tất cả các đối tượng IOPromise này.

Một vài câu hỏi:

  • Có ai có bất kỳ ý tưởng về làm thế nào để tránh tình trạng này? Nó cảm thấy như tôi đang tiếp cận vấn đề không chính xác, bởi vì tôi thực sự chỉ quan tâm đến longRunningProcess cho các tác dụng phụ của nó.
  • Phương pháp tiếp cận Enumerator có đúng cách tiếp cận sai không?

Tôi khá nhiều ý tưởng, vì vậy mọi thứ sẽ hữu ích.

Cảm ơn!

Update # 1

Đây là stack trace:

[error] java.lang.OutOfMemoryError: Java heap space 
[error]   at scalaz.Free.flatMap(Free.scala:46) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 

Tôi hiện đang lấy lời khuyên của nadavwr để đảm bảo mọi thứ được diễn xuất như tôi nghĩ rằng đó là. Tôi sẽ báo cáo lại mọi cập nhật.

Update # 2

Sử dụng ý tưởng từ cả các câu trả lời dưới đây, tôi tìm thấy một giải pháp phù hợp. Theo đề xuất của huynhjl (và tôi đã xác minh bằng cách sử dụng đề xuất của nadavwr về phân tích kết xuất vùng heap), consume đã gây ra mỗi ZipEntry tăng cao được lưu trữ trong bộ nhớ, đó là lý do tại sao quá trình này hết bộ nhớ. Tôi đã thay đổi consume thành foldM và cập nhật quy trình chạy dài để chỉ trả lại Promise[IOE[Unit]] thay vì tham chiếu đến tệp. Bằng cách đó tôi có một bộ sưu tập của tất cả các IoExceptions ở cuối. Đây là giải pháp làm việc:

def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
    foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %= 
    map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %= 
    map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &= 
    enumZipFile(f) 
).run 

def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] = 
    Promise { Thread.sleep(5000); iof.map(println) } 

Giải pháp này thổi phồng mỗi mục nhập khi tải lên không đồng bộ. Cuối cùng, tôi có một danh sách lớn các đối tượng được đáp ứng Promise có chứa bất kỳ lỗi nào. Tôi vẫn chưa hoàn toàn thuyết phục điều này là việc sử dụng Iteratee chính xác, nhưng bây giờ tôi có một số phần có thể tái sử dụng, có thể ghép lại được mà tôi có thể sử dụng trong các phần khác của hệ thống của mình (đây là một mô hình rất phổ biến đối với chúng tôi).

Cảm ơn sự giúp đỡ của bạn!

+0

Quy trình dài sẽ làm gì? Liệu nó tính toán một cái gì đó từ nội dung zip? – huynhjl

+0

Mỗi tệp trong tệp zip là một hình ảnh. Quá trình tải lên tệp đó lên Rackspace CloudFiles. Khi tôi tìm ra điều này, tôi sẽ cần thêm các quy trình bổ sung để thay đổi kích thước hình ảnh và sau đó tải chúng lên. –

+0

Lặp lại cảm thấy như trừu tượng sai cho công việc này, vì bạn muốn song song khối lượng công việc. Các diễn viên sẽ làm việc tốt hơn tôi nghĩ. – huynhjl

Trả lời

4

Không sử dụng consume. Xem câu trả lời gần đây khác của tôi: How to use IO with Scalaz7 Iteratees without overflowing the stack?

foldM có thể là lựa chọn tốt hơn.

Cũng cố gắng ánh xạ tệp tới một thứ khác (như mã trả về thành công) để xem liệu tệp đó có cho phép JVM thu thập các mục zip bị thổi phồng không.

+0

Cảm ơn bạn đã trả lời. Cuối cùng, sử dụng 'foldM' dường như là chìa khóa. –

0

tôi bắt đầu ra câu trả lời sau một cách nhanh chóng đọc qua, và bằng cách nào đó đã 'stack overflow' mắc kẹt trong tâm trí của tôi thay vì 'ra khỏi lỗi bộ nhớ' ... Phải là URL :-)

Tuy nhiên, tính toán chức năng dựa vào việc thu thập lại dễ bị tràn ngăn xếp, vì vậy tôi đã để lại câu trả lời đúng chỗ cho bất kỳ cơ thể nào vấp ngã, và hứa sẽ cố gắng đưa ra câu trả lời phù hợp hơn.

Nếu những gì bạn có là tràn ngăn xếp, bạn sẽ cần một 'tấm bạt lò xo', một cấu trúc giúp tăng tính toán của bạn ra khỏi ngăn xếp giữa các lần thu thập.

Xem phần có tiêu đề "Stackless Scala with Free Monads" trong Learning Scalaz Day 18, một phần của loạt bài đăng tuyệt vời của @ eed3si9n.

Xem thêm this gist bởi @mpilquist, minh họa một iteratee trampolined.

+1

Haha, stackoverflow.com là một tên không may khi bạn đang nói về các quy trình hoạt động lâu dài, chức năng. –

1

Làm thế nào đắt (về bộ nhớ là longRunningProcess của bạn? Làm thế nào về tập tin giảm phát? Có phải họ đang thực hiện số lần bạn mong đợi họ được? (Một bộ đếm đơn giản sẽ là hữu ích)

Một stack trace sẽ Bạn có thể sử dụng đối số JVM -XX:+HeapDumpOnOutOfMemoryError và sau đó phân tích nó với VisualVM, Eclipse, nếu bạn muốn chắc chắn những gì đang chiếm quá nhiều bộ nhớ, bạn có thể sử dụng đối số JVM -XX:+HeapDumpOnOutOfMemoryError và sau đó phân tích nó với VisualVM, Eclipse MAT hoặc các máy phân tích heap khác.

Follo wup

Có vẻ lạ với tôi rằng bạn đang liệt kê các lời hứa. Đó là phản trực giác để bắt đầu một tính toán độc lập của cả điều tra viên và iteratee. Một giải pháp dựa trên iteratee có thể được phục vụ tốt hơn bởi một điều tra viên trả về các phần tử 'trơ' thay vì lời hứa. Thật không may, điều đó sẽ làm cho việc xử lý các tệp riêng lẻ của bạn nối tiếp, nhưng điều đó lặp lại đối với việc xử lý luồng không bị chặn.

Giải pháp dựa trên diễn viên phù hợp với IMHO tốt hơn, nhưng cả hai diễn viên và lặp lại (đặc biệt là sau) dường như quá mức cho những gì bạn đang cố gắng hoàn thành (ít nhất là các phần bạn đang chia sẻ).

Vui lòng xem xét tương lai đơn giản/lời hứa từ gói scala.concurrent của Scala 2.10 và chắc chắn cũng xem bộ sưu tập song song của Scala. Tôi sẽ không giới thiệu các khái niệm bổ sung vào mã trước khi những chứng minh này không đủ. Hãy thử định nghĩa một ExecutionContext có kích thước cố định để hạn chế sự song song của bạn.

+0

Lời khuyên tuyệt vời. Tôi sẽ trải qua từng bước để đảm bảo mọi thứ đang được thực hiện như tôi cho là vậy. Tôi đã cập nhật câu hỏi của mình ở trên bằng dấu vết ngăn xếp. Tôi sẽ cố gắng đổ đống tiếp theo. Cảm ơn! –

+0

Về việc theo dõi của bạn: Tôi đồng ý với những lo ngại của bạn về việc sử dụng Iteratee cho quy trình này. Từ những gì tôi đăng, nó chắc chắn có vẻ như quá mức cần thiết. Tuy nhiên, mô hình tải xuống tệp (hoặc tệp), truyền trực tuyến nội dung, xử lý từng mục nhập, sau đó thực hiện điều gì đó với kết quả được sử dụng khắp nơi trong ứng dụng của chúng tôi. Tôi cảm thấy như Iteratee đã cho tôi một số đoạn mã đẹp, có thể tái sử dụng mà tôi có thể sử dụng để xây dựng những quy trình lớn hơn này. Cảm ơn bạn rất nhiều vì đã dành thời gian và giúp đỡ! –

Các vấn đề liên quan