2014-07-13 16 views
16

làm thế nào để bạn thả các hàng từ RDD trong PySpark? Riêng hàng đầu tiên, vì nó có xu hướng chứa các tên cột trong bộ dữ liệu của tôi. Từ perusing API, tôi dường như không thể tìm thấy một cách dễ dàng để làm điều này. Tất nhiên tôi có thể làm điều này thông qua Bash/HDFS, nhưng tôi chỉ muốn biết nếu điều này có thể được thực hiện từ bên trong PySpark.PySpark Drop Rows

+1

sử dụng 'bộ lọc' để lọc ra hàng xấu – aaronman

+0

Nếu bạn muốn xóa hàng đầu tiên thì sao? Và chúng ta hãy nói vì lợi ích của đối số, chúng ta không thể sử dụng bất kỳ thông tin nào trong vectơ hàng x, tức là chúng ta không thể làm 'lambda x: (một số điều kiện sử dụng x)'. – Jack

+0

Thanh toán câu trả lời của tôi có thể gần hơn với những gì bạn đang tìm kiếm – aaronman

Trả lời

15

AFAIK không có cách nào dễ dàng để thực hiện việc này.

này nên làm như lừa, mặc dù:

val header = data.first 
val rows = data.filter(line => line != header) 
+0

Điều này là hợp lý. Cảm ơn! – Jack

+0

Không phải là dữ liệu. Đầu tiên? data.take (1) sẽ trả về một mảng [T] với chiều dài 1. – Bar

+0

@Bar yes - bạn nói đúng. Tôi sẽ cập nhật câu trả lời. Cảm ơn. – maasg

1

Cá nhân tôi nghĩ rằng chỉ sử dụng một bộ lọc để thoát khỏi công cụ này là cách dễ dàng nhất. Nhưng theo nhận xét của bạn, tôi có một cách tiếp cận khác. Glom RDD để mỗi phân vùng là một mảng (tôi giả sử bạn có 1 tệp cho mỗi phân vùng và mỗi tệp có hàng vi phạm trên đầu) và sau đó chỉ bỏ qua phần tử đầu tiên (đây là với scala api).

data.glom().map(x => for (elem <- x.drop(1){/*do stuff*/}) //x is an array so just skip the 0th index

Hãy ghi nhớ một trong những tính năng lớn của RDD là họ là không thay đổi, vì vậy tự nhiên loại bỏ một hàng là một điều khó khăn để làm

UPDATE: Better giải pháp.
rdd.mapPartions(x => for (elem <- x.drop(1){/*do stuff*/})
Tương tự như các glom nhưng không có chi phí đưa mọi thứ vào một mảng, vì x là một biến lặp trong trường hợp này

13

cụ thể để PySpark:

Theo @maasg, bạn có thể làm điều này:

header = rdd.first() 
rdd.filter(lambda line: line != header) 

nhưng không chính xác về mặt kỹ thuật vì bạn có thể loại trừ các dòng chứa dữ liệu cũng như tiêu đề. Tuy nhiên, điều này dường như làm việc cho tôi:

def remove_header(itr_index, itr): 
    return iter(list(itr)[1:]) if itr_index == 0 else itr 
rdd.mapPartitionsWithIndex(remove_header) 

Tương tự:

rdd.zipWithIndex().filter(lambda tup: tup[1] > 0).map(lambda tup: tup[0]) 

Tôi mới đến Spark, vì vậy không thể thông minh nhận xét về sẽ được nhanh nhất.

+0

Bạn có thể giải thích những gì đang xảy ra ở đó (tôi là một nhà phát triển JS) không? 'return iter (danh sách (itr) [1:] nếu itr_index == 0 else itr)'? 1) - 'iter' nhận một' (object [, sentinel]) '- vì vậy tôi đoán' iter' lấy danh sách 'iterr iterables (rows), sau đó sử dụng toán tử' range' của Python, bắt đầu từ chỉ số thứ 2 (0 dựa), sau đó lặp lại xuống cho đến khi 'itr_index == 0', nếu không, tiếp tục trả về' dòng itr'? Tôi hỏi vì tôi đang sử dụng cùng một thứ nhưng hàng đầu tiên của các trường không xuất hiện, thay vì hàng đầu tiên của dữ liệu sẽ trở thành các trường. – Growler

+1

'Lặp lại' có thể gây nhầm lẫn cho vấn đề. Nếu 'rdd.mapParitionsWithIndex' trả về chỉ mục của phân vùng, cộng với dữ liệu phân vùng như một danh sách, nó sẽ chỉ là' itr [1:] nếu itr_index == 0 else itr'- ie nếu đó là phân vùng đầu tiên (tức là ' itr_index == 0') sau đó loại trừ hàng đầu tiên (tức là tiêu đề), và nó không phải là phân vùng đầu tiên (nghĩa là không có tiêu đề), chỉ trả lại toàn bộ phân vùng. 'Iter' và' list' là bởi vì nó thực sự sử dụng các iterables thay vì các danh sách. Là một sang một bên, tôi khá chắc chắn có một tuyến đường hiệu quả hơn so với 'iter (danh sách (itr) [1:])'. – ianhoolihan

4

Một cách đơn giản để đạt được điều này trong PySpark (Python API):

noHeaderRDD = rawRDD.zipWithIndex().filter(lambda (row,index): index > 0).keys() 
0

tôi đã thử nghiệm với spark2.1. Giả sử bạn muốn xóa 14 hàng đầu tiên mà không biết về số lượng tệp cột có.

sc = spark.sparkContext 
lines = sc.textFile("s3://location_of_csv") 
parts = lines.map(lambda l: l.split(",")) 
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0]) 

withColumn là hàm df. Vì vậy, dưới đây sẽ không làm việc trong phong cách RDD như được sử dụng trong trường hợp trên.

parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)