Tôi đang cố gắng sử dụng gói iterate iteratee để xử lý một tệp zip lớn trong không gian cố định. Tôi có một quy trình chạy dài mà tôi cần thực hiện trên mỗi tệp trong tệp zip. Những quy trình có thể (và nên) được chạy song song.Scalaz 7 Iteratee để xử lý tệp zip lớn (OutOfMemoryError)
Tôi đã tạo một EnumeratorT
làm tăng mỗi ZipEntry
thành đối tượng File
. Chữ ký trông giống như:
def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO]
Tôi muốn đính kèm một IterateeT
sẽ thực hiện quy trình chạy dài trên mỗi tệp. Tôi về cơ bản kết thúc với một cái gì đó như:
type IOE[A] = IoExceptionOr[A]
def action(f:File):IO[List[Promise[IOE[File]]]] = (
consume[Promise[IOE[File]], IO, List] %=
map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %=
map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
enumZipFile(f)
).run
def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] =
Promise { Thread.sleep(5000); iof }
Khi tôi cố gắng chạy nó:
action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get
tôi nhận được một thông điệp java.lang.OutOfMemoryError: Java heap space
. Điều đó có ý nghĩa với tôi, vì nó đang cố gắng xây dựng một danh sách khổng lồ trong bộ nhớ của tất cả các đối tượng IO
và Promise
này.
Một vài câu hỏi:
- Có ai có bất kỳ ý tưởng về làm thế nào để tránh tình trạng này? Nó cảm thấy như tôi đang tiếp cận vấn đề không chính xác, bởi vì tôi thực sự chỉ quan tâm đến
longRunningProcess
cho các tác dụng phụ của nó. - Phương pháp tiếp cận
Enumerator
có đúng cách tiếp cận sai không?
Tôi khá nhiều ý tưởng, vì vậy mọi thứ sẽ hữu ích.
Cảm ơn!
Update # 1
Đây là stack trace:
[error] java.lang.OutOfMemoryError: Java heap space
[error] at scalaz.Free.flatMap(Free.scala:46)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
Tôi hiện đang lấy lời khuyên của nadavwr để đảm bảo mọi thứ được diễn xuất như tôi nghĩ rằng đó là. Tôi sẽ báo cáo lại mọi cập nhật.
Update # 2
Sử dụng ý tưởng từ cả các câu trả lời dưới đây, tôi tìm thấy một giải pháp phù hợp. Theo đề xuất của huynhjl (và tôi đã xác minh bằng cách sử dụng đề xuất của nadavwr về phân tích kết xuất vùng heap), consume
đã gây ra mỗi ZipEntry
tăng cao được lưu trữ trong bộ nhớ, đó là lý do tại sao quá trình này hết bộ nhớ. Tôi đã thay đổi consume
thành foldM
và cập nhật quy trình chạy dài để chỉ trả lại Promise[IOE[Unit]]
thay vì tham chiếu đến tệp. Bằng cách đó tôi có một bộ sưu tập của tất cả các IoExceptions ở cuối. Đây là giải pháp làm việc:
def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %=
map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %=
map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
enumZipFile(f)
).run
def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] =
Promise { Thread.sleep(5000); iof.map(println) }
Giải pháp này thổi phồng mỗi mục nhập khi tải lên không đồng bộ. Cuối cùng, tôi có một danh sách lớn các đối tượng được đáp ứng Promise
có chứa bất kỳ lỗi nào. Tôi vẫn chưa hoàn toàn thuyết phục điều này là việc sử dụng Iteratee chính xác, nhưng bây giờ tôi có một số phần có thể tái sử dụng, có thể ghép lại được mà tôi có thể sử dụng trong các phần khác của hệ thống của mình (đây là một mô hình rất phổ biến đối với chúng tôi).
Cảm ơn sự giúp đỡ của bạn!
Quy trình dài sẽ làm gì? Liệu nó tính toán một cái gì đó từ nội dung zip? – huynhjl
Mỗi tệp trong tệp zip là một hình ảnh. Quá trình tải lên tệp đó lên Rackspace CloudFiles. Khi tôi tìm ra điều này, tôi sẽ cần thêm các quy trình bổ sung để thay đổi kích thước hình ảnh và sau đó tải chúng lên. –
Lặp lại cảm thấy như trừu tượng sai cho công việc này, vì bạn muốn song song khối lượng công việc. Các diễn viên sẽ làm việc tốt hơn tôi nghĩ. – huynhjl