2015-11-15 29 views
5

Tôi muốn nhóm theo giá trị và sau đó tìm giá trị tối đa trong mỗi nhóm bằng PySpark. Tôi có mã sau đây nhưng bây giờ tôi đang bit bị mắc kẹt về cách trích xuất giá trị tối đa.pyspark: grouby và sau đó nhận giá trị tối đa của mỗi nhóm

# some file contains tuples ('user', 'item', 'occurrences') 
data_file = sc.textData('file:///some_file.txt') 
# Create the triplet so I index stuff 
data_file = data_file.map(lambda l: l.split()).map(lambda l: (l[0], l[1], float(l[2]))) 
# Group by the user i.e. r[0] 
grouped = data_file.groupBy(lambda r: r[0]) 
# Here is where I am stuck 
group_list = grouped.map(lambda x: (list(x[1]))) #? 

Returns cái gì đó như:

[[(u'u1', u's1', 20), (u'u1', u's2', 5)], [(u'u2', u's3', 5), (u'u2', u's2', 10)]] 

Tôi muốn tìm max 'xảy ra' cho mỗi người dùng bây giờ. Kết quả cuối cùng sau khi thực hiện tối đa sẽ dẫn đến RDD giống như sau:

[[(u'u1', u's1', 20)], [(u'u2', u's2', 10)]] 

Nơi chỉ có tập dữ liệu tối đa sẽ còn lại cho mỗi người dùng trong tệp. Nói cách khác, tôi muốn thay đổi giá trị của RDD để chỉ chứa một bộ ba đơn lẻ cho mỗi lần xuất hiện tối đa của mỗi người dùng.

Trả lời

9

Không cần cho groupBy đây. Đơn giản reduceByKey sẽ làm tốt và hầu hết thời gian sẽ hiệu quả hơn:

data_file = sc.parallelize([ 
    (u'u1', u's1', 20), (u'u1', u's2', 5), 
    (u'u2', u's3', 5), (u'u2', u's2', 10)]) 

max_by_group = (data_file 
    .map(lambda x: (x[0], x)) # Convert to PairwiseRD 
    # Take maximum of the passed arguments by the last element (key) 
    # equivalent to: 
    # lambda x, y: x if x[-1] > y[-1] else y 
    .reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-1])) 
    .values()) # Drop keys 

max_by_group.collect() 
## [('u2', 's2', 10), ('u1', 's1', 20)] 
+1

bạn có thể giải thích điều này '(lambda x1, x2: max (x1, x2, key = lambda x: x [-1])) 'nếu có thể? – WoodChopper

+1

@WoodChopper 'max' chỉ là một tiêu chuẩn Python' max'. Nó lấy các phần tử và trả về phần tử lớn nhất. Đối số 'khóa' mô tả cách so sánh các phần tử (ở đây là mục cuối cùng). – zero323

2

Tôi nghĩ rằng tôi đã tìm thấy các giải pháp:

from pyspark import SparkContext, SparkConf 

def reduce_by_max(rdd): 
    """ 
    Helper function to find the max value in a list of values i.e. triplets. 
    """ 
    max_val = rdd[0][2] 
    the_index = 0 

    for idx, val in enumerate(rdd): 
     if val[2] > max_val: 
      max_val = val[2] 
      the_index = idx 

    return rdd[the_index] 

conf = SparkConf() \ 
    .setAppName("Collaborative Filter") \ 
    .set("spark.executor.memory", "5g") 
sc = SparkContext(conf=conf) 

# some file contains tuples ('user', 'item', 'occurrences') 
data_file = sc.textData('file:///some_file.txt') 

# Create the triplet so I can index stuff 
data_file = data_file.map(lambda l: l.split()).map(lambda l: (l[0], l[1], float(l[2]))) 

# Group by the user i.e. r[0] 
grouped = data_file.groupBy(lambda r: r[0]) 

# Get the values as a list 
group_list = grouped.map(lambda x: (list(x[1]))) 

# Get the max value for each user. 
max_list = group_list.map(reduce_by_max).collect() 
Các vấn đề liên quan