2012-02-15 29 views
6

Tôi đang cố gắng tìm hiểu cách sử dụng API Python của Yelp cho MapReduce, MRJob. Ví dụ truy cập từ đơn giản của chúng có ý nghĩa, nhưng tôi tò mò về cách mà một người sẽ xử lý một ứng dụng liên quan đến nhiều đầu vào. Ví dụ, thay vì chỉ đếm các từ trong một tài liệu, nhân một véc tơ với một ma trận. Tôi đã đưa ra giải pháp này, mà chức năng, nhưng cảm thấy ngớ ngẩn:Nhiều đầu vào với MRJob

class MatrixVectMultiplyTast(MRJob): 
    def multiply(self,key,line): 
      line = map(float,line.split(" ")) 
      v,col = line[-1],line[:-1] 

      for i in xrange(len(col)): 
        yield i,col[i]*v 

    def sum(self,i,occurrences): 
      yield i,sum(occurrences) 

    def steps(self): 
      return [self.mr (self.multiply,self.sum),] 

if __name__=="__main__": 
    MatrixVectMultiplyTast.run() 

Mã này được chạy ./matrix.py < input.txt và lý do nó hoạt động là ma trận lưu trữ trong INPUT.TXT bởi cột, với giá trị vector tương ứng ở cuối dòng.

Vì vậy, ma trận và vectơ sau:

enter image description here

được biểu diễn dưới dạng INPUT.TXT như:

enter image description here

Nói tóm lại, làm thế nào tôi sẽ đi về lưu trữ ma trận và vector tự nhiên hơn trong các tập tin riêng biệt và đưa chúng vào MRJob?

Trả lời

3

Nếu bạn có nhu cầu xử lý dữ liệu thô của bạn chống lại khác (hoặc cùng row_i, row_j) tập hợp dữ liệu, bạn có thể:

1) Tạo một xô S3 để lưu trữ một bản sao của dữ liệu của bạn. Chuyển vị trí của bản sao này tới lớp nhiệm vụ của bạn, ví dụ: self.options.bucket và self.options.my_datafile_copy_location trong mã bên dưới. Caveat: Thật không may, có vẻ như toàn bộ tập tin phải được "tải xuống" cho các máy tác vụ trước khi được xử lý. Nếu các kết nối bị thay đổi hoặc mất quá nhiều thời gian để tải, công việc này có thể bị lỗi. Đây là một số mã Python/MRJob để làm điều này.

Đặt này trong chức năng ánh xạ của bạn:

d1 = line1.split('\t', 1) 
v1, col1 = d1[0], d1[1] 
conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>) 
bucket = conn.get_bucket(self.options.bucket) # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING) 
data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip() 
### CAVEAT: Needs to get the whole file before processing the rest. 
for line2 in data_copy.split('\n'): 
    d2 = line2.split('\t', 1) 
    v2, col2 = d2[0], d2[1] 
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here: 
    yield <your output key, value pairs> 
conn.close() 

2) Tạo một miền SimpleDB, và lưu trữ tất cả dữ liệu của bạn trong đó. đọc ở đây trên boto và SimpleDB: http://code.google.com/p/boto/wiki/SimpleDbIntro

đang mapper của bạn sẽ trông như thế này:

dline = dline.strip() 
d0 = dline.split('\t', 1) 
v1, c1 = d0[0], d0[1] 
sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>) 
domain = sdb.get_domain(MY_DOMAIN_STRING_NAME) 
for item in domain: 
    v2, c2 = item.name, item['column'] 
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here: 
    yield <your output key, value pairs> 
sdb.close() 

tùy chọn thứ hai này có thể thực hiện tốt hơn nếu bạn có một lượng rất lớn dữ liệu, vì nó có thể làm theo yêu cầu cho mỗi hàng dữ liệu thay vì toàn bộ số tiền cùng một lúc. Hãy nhớ rằng các giá trị SimpleDB chỉ có thể dài tối đa 1024 ký tự, vì vậy bạn có thể cần phải nén/giải nén qua một số phương thức nếu giá trị dữ liệu của bạn dài hơn.

1

Theo hiểu biết của tôi, bạn sẽ không sử dụng MrJob trừ khi bạn muốn tận dụng các dịch vụ Hadoop cluster hoặc Hadoop từ Amazon, ngay cả khi ví dụ sử dụng chạy trên các tệp cục bộ.

MrJob sử dụng chính "Hadoop streaming" để gửi công việc.

Điều này có nghĩa là tất cả các đầu vào được chỉ định dưới dạng tệp hoặc thư mục từ Hadoop đều được truyền tới người lập bản đồ và kết quả tiếp theo để giảm tốc. Tất cả trình ánh xạ nhận được một lát đầu vào và xem xét tất cả các đầu vào để có sơ đồ giống nhau sao cho nó phân tích cú pháp và xử lý khóa, giá trị cho từng lát dữ liệu một cách thống nhất.

Bắt nguồn từ sự hiểu biết này, các yếu tố đầu vào là sơ đồ giống với trình ánh xạ.Cách duy nhất có thể bao gồm hai dữ liệu sơ đồ khác nhau là xen kẽ chúng trong cùng một tệp theo cách thức mà người lập bản đồ có thể hiểu đó là dữ liệu vectơ và đó là dữ liệu ma trận.

You are actually doing it already. 

Bạn chỉ có thể cải thiện điều đó bằng cách có một số thông số cụ thể nếu một dòng là dữ liệu ma trận hoặc dữ liệu vectơ. Khi bạn thấy dữ liệu vectơ thì dữ liệu ma trận trước đó sẽ được áp dụng cho nó.

matrix, 1, 2, ... 
matrix, 2, 4, ... 
vector, 3, 4, ... 
matrix, 1, 2, ... 
..... 

Nhưng quá trình bạn đã đề cập hoạt động tốt. Bạn phải có tất cả dữ liệu sơ đồ trong một tệp.

Điều này vẫn có vấn đề. Bản đồ K, V giảm hoạt động tốt hơn khi hoàn thành lược đồ trong một dòng và chứa một đơn vị xử lý đơn hoàn chỉnh.

Từ hiểu biết của tôi, bạn đã làm đúng cách nhưng tôi đoán Map-Reduce không phải là cơ chế phù hợp cho loại dữ liệu này. Tôi hy vọng một số người làm rõ điều này thậm chí còn xa hơn tôi có thể.

2

Câu trả lời thực tế cho câu hỏi của bạn là mrjob chưa hỗ trợ mô hình liên kết luồng hadoop, để đọc biến môi trường map_input_file (để lộ thuộc tính map.input.file) để xác định loại tệp nào bạn đang xử lý dựa trên đường dẫn và/hoặc tên của nó.

Bạn vẫn có thể có thể kéo nó ra khỏi, nếu bạn có thể dễ dàng phát hiện từ chỉ đọc dữ liệu chính nó mà gõ nó thuộc về, như được hiển thị trong bài viết này:

http://allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/

Tuy nhiên đó không phải là luôn luôn có thể ...

Nếu không thì myjob trông tuyệt vời và tôi muốn họ có thể thêm hỗ trợ cho điều này trong tương lai. Cho đến lúc đó, đây là một công cụ xử lý thỏa đáng đối với tôi.

1

Đây là cách tôi sử dụng nhiều đầu vào và dựa trên tên tệp thực hiện các thay đổi phù hợp trong giai đoạn người lập bản đồ.

Runner Chương trình:

from mrjob.hadoop import * 


#Define all arguments 

os.environ['HADOOP_HOME'] = '/opt/cloudera/parcels/CDH/lib/hadoop/' 
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME'))) 
job_running_time = datetime.datetime.now().strftime('%Y-%m-%d_%H_%M_%S') 
hadoop_bin = '/usr/bin/hadoop' 
mode = 'hadoop' 
hs = HadoopFilesystem([hadoop_bin]) 

input_file_names = ["hdfs:///app/input_file1/","hdfs:///app/input_file2/"] 

aargs = ['-r',mode,'--jobconf','mapred.job.name=JobName','--jobconf','mapred.reduce.tasks=3','--no-output','--hadoop-bin',hadoop_bin] 
aargs.extend(input_file_names) 
aargs.extend(['-o',output_dir]) 
print aargs 
status_file = True 

mr_job = MRJob(args=aargs) 
with mr_job.make_runner() as runner: 
    runner.run() 
os.environ['HADOOP_HOME'] = '' 
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME'))) 

Các MRJob Class:

class MR_Job(MRJob): 
    DEFAULT_OUTPUT_PROTOCOL = 'repr_value' 
    def mapper(self, _, line): 
    """ 
    This function reads lines from file. 
    """ 
    try: 
     #Need to clean email. 
     input_file_name = get_jobconf_value('map.input.file').split('/')[-2] 
       """ 
       Mapper code 
       """ 
    except Exception, e: 
     print e 

    def reducer(self, email_id,visitor_id__date_time): 
    try: 
     """ 
       Reducer Code 
       """ 
    except: 
     pass 


if __name__ == '__main__': 
    MRV_Email.run() 
Các vấn đề liên quan