2012-08-13 20 views
11

Tôi muốn sử dụng Play2 để tải lên tệp CSV rất lớn (hàng triệu dòng) trên elasticsearch. Tôi đã viết đoạn mã sau hoạt động tốt.Chơi 2 Scala - Cách tốt nhất để tải lên tệp CSV lớn với Iteratee để xử lý từng dòng phản ứng

Tôi không hài lòng với cách tôi bỏ qua tiêu đề phản hồi http trong đoạn đầu tiên. Nên có một cách để chuỗi vòng lặp này với một Iteratee đầu tiên bỏ qua tiêu đề http và chuyển trực tiếp sang trạng thái Xong, nhưng tôi đã không tìm thấy cách.

Nếu bất cứ ai có thể giúp

object ReactiveFileUpload extends Controller { 
    def upload = Action(BodyParser(rh => new CsvIteratee(isFirst = true))) { 
    request => 
     Ok("File Processed") 
    } 
} 

case class CsvIteratee(state: Symbol = 'Cont, input: Input[Array[Byte]] = Empty, lastChunk: String = "", isFirst: Boolean = false) extends Iteratee[Array[Byte], Either[Result, String]] { 
    def fold[B](
       done: (Either[Result, String], Input[Array[Byte]]) => Promise[B], 
       cont: (Input[Array[Byte]] => Iteratee[Array[Byte], Either[Result, String]]) => Promise[B], 
       error: (String, Input[Array[Byte]]) => Promise[B] 
       ): Promise[B] = state match { 
    case 'Done => 
     done(Right(lastChunk), Input.Empty) 

    case 'Cont => cont(in => in match { 
     case in: El[Array[Byte]] => { 
     // Retrieve the part that has not been processed in the previous chunk and copy it in front of the current chunk 
     val content = lastChunk + new String(in.e) 
     val csvBody = 
      if (isFirst) 
      // Skip http header if it is the first chunk 
      content.drop(content.indexOf("\r\n\r\n") + 4) 
      else content 
     val csv = new CSVReader(new StringReader(csvBody), ';') 
     val lines = csv.readAll 
     // Process all lines excepted the last one since it is cut by the chunk 
     for (line <- lines.init) 
      processLine(line) 
     // Put forward the part that has not been processed 
     val last = lines.last.toList.mkString(";") 
     copy(input = in, lastChunk = last, isFirst = false) 
     } 
     case Empty => copy(input = in, isFirst = false) 
     case EOF => copy(state = 'Done, input = in, isFirst = false) 
     case _ => copy(state = 'Error, input = in, isFirst = false) 
    }) 

    case _ => 
     error("Unexpected state", input) 

    } 

    def processLine(line: Array[String]) = WS.url("http://localhost:9200/affa/na/").post(
    toJson(
     Map(
     "date" -> toJson(line(0)), 
     "trig" -> toJson(line(1)), 
     "code" -> toJson(line(2)), 
     "nbjours" -> toJson(line(3).toDouble) 
    ) 
    ) 
) 
} 
+1

Bạn có thể quan tâm đến bởi không chính thức play-iteratee-extras, trong đó có một [CSV phân tích cú pháp] (https://github.com/jroper/play -ngoại-extras/blob/master/src/chính/scala/play/extras/iteratees/Csv.scala). –

Trả lời

1

Để chuỗi hai iteratees với nhau, sử dụng flatMap.

val combinedIteratee = firstIteratee.flatMap(firstResult => secondIteratee) 

Hoặc, sử dụng cho sự hiểu biết:

val combinedIteratee = for { 
    firstResult <- firstIteratee 
    secondResult <- secondIteratee 
} yield secondResult 

Bạn có thể sử dụng flatMap trình tự như nhiều iteratees với nhau như bạn muốn.

Bạn có thể muốn làm điều gì đó như:

val headerAndCsvIteratee = for { 
    headerResult <- headerIteratee 
    csvResult <- csvIteratee 
} yield csvResult 
Các vấn đề liên quan