Tôi muốn sử dụng Play2 để tải lên tệp CSV rất lớn (hàng triệu dòng) trên elasticsearch. Tôi đã viết đoạn mã sau hoạt động tốt.Chơi 2 Scala - Cách tốt nhất để tải lên tệp CSV lớn với Iteratee để xử lý từng dòng phản ứng
Tôi không hài lòng với cách tôi bỏ qua tiêu đề phản hồi http trong đoạn đầu tiên. Nên có một cách để chuỗi vòng lặp này với một Iteratee đầu tiên bỏ qua tiêu đề http và chuyển trực tiếp sang trạng thái Xong, nhưng tôi đã không tìm thấy cách.
Nếu bất cứ ai có thể giúp
object ReactiveFileUpload extends Controller {
def upload = Action(BodyParser(rh => new CsvIteratee(isFirst = true))) {
request =>
Ok("File Processed")
}
}
case class CsvIteratee(state: Symbol = 'Cont, input: Input[Array[Byte]] = Empty, lastChunk: String = "", isFirst: Boolean = false) extends Iteratee[Array[Byte], Either[Result, String]] {
def fold[B](
done: (Either[Result, String], Input[Array[Byte]]) => Promise[B],
cont: (Input[Array[Byte]] => Iteratee[Array[Byte], Either[Result, String]]) => Promise[B],
error: (String, Input[Array[Byte]]) => Promise[B]
): Promise[B] = state match {
case 'Done =>
done(Right(lastChunk), Input.Empty)
case 'Cont => cont(in => in match {
case in: El[Array[Byte]] => {
// Retrieve the part that has not been processed in the previous chunk and copy it in front of the current chunk
val content = lastChunk + new String(in.e)
val csvBody =
if (isFirst)
// Skip http header if it is the first chunk
content.drop(content.indexOf("\r\n\r\n") + 4)
else content
val csv = new CSVReader(new StringReader(csvBody), ';')
val lines = csv.readAll
// Process all lines excepted the last one since it is cut by the chunk
for (line <- lines.init)
processLine(line)
// Put forward the part that has not been processed
val last = lines.last.toList.mkString(";")
copy(input = in, lastChunk = last, isFirst = false)
}
case Empty => copy(input = in, isFirst = false)
case EOF => copy(state = 'Done, input = in, isFirst = false)
case _ => copy(state = 'Error, input = in, isFirst = false)
})
case _ =>
error("Unexpected state", input)
}
def processLine(line: Array[String]) = WS.url("http://localhost:9200/affa/na/").post(
toJson(
Map(
"date" -> toJson(line(0)),
"trig" -> toJson(line(1)),
"code" -> toJson(line(2)),
"nbjours" -> toJson(line(3).toDouble)
)
)
)
}
Bạn có thể quan tâm đến bởi không chính thức play-iteratee-extras, trong đó có một [CSV phân tích cú pháp] (https://github.com/jroper/play -ngoại-extras/blob/master/src/chính/scala/play/extras/iteratees/Csv.scala). –