2010-04-26 45 views
6

Tôi đã cố gắng sử dụng Hadoop để gửi số lượng dòng N tới một ánh xạ đơn. Tôi không yêu cầu các dòng được phân chia rồi.Nhiều dòng văn bản vào một bản đồ đơn

Tôi đã cố gắng sử dụng NLineInputFormat, tuy nhiên sẽ gửi N dòng văn bản từ dữ liệu đến từng trình ánh xạ một dòng tại một thời điểm [từ bỏ sau dòng thứ N].

Tôi đã cố gắng để thiết lập các tùy chọn và nó chỉ mất N dòng đầu vào gửi nó ở 1 dòng tại một thời điểm cho mỗi bản đồ:

job.setInt("mapred.line.input.format.linespermap", 10); 

Tôi đã tìm thấy một danh sách gửi thư giới thiệu tôi để ghi đè LineRecordReader :: tiếp theo, tuy nhiên điều đó không đơn giản như vậy, vì các thành viên dữ liệu nội bộ đều riêng tư.

Tôi vừa kiểm tra nguồn cho NLineInputFormat và mã cứng LineReader, vì vậy việc ghi đè sẽ không giúp ích.

Ngoài ra, btw Tôi đang sử dụng Hadoop 0,18 để tương thích với MapReduce của Amazon EC2.

+0

Tại sao các bạn cố gắng để làm điều này? Do nhiều dòng tạo thành một kỷ lục duy nhất trong một số ý nghĩa? –

+0

Tôi thực sự cần N số dòng ngẫu nhiên [như một tập hợp], tuy nhiên tôi có thể sống với hậu quả. Tôi cần nó để gửi nó đến bộ giảm tốc bên phải. – monksy

+0

Để trả lời câu hỏi của bạn, đúng vậy. – monksy

Trả lời

7

Bạn phải triển khai định dạng nhập của riêng bạn. Bạn cũng có khả năng xác định trình đọc bản ghi của riêng bạn sau đó.

Thật không may bạn phải xác định phương thức getSplits() -. Theo tôi, điều này sẽ khó hơn việc thực hiện đầu đọc ghi: Phương pháp này phải thực hiện một logic để chunk dữ liệu đầu vào.

Xem đoạn trích sau đây từ "Hadoop - Hướng dẫn dứt khoát" (một cuốn sách tuyệt vời, tôi sẽ luôn luôn khuyên!):

Dưới đây là giao diện:

public interface InputFormat<K, V> { 
    InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; 
    RecordReader<K, V> getRecordReader(InputSplit split, 
            JobConf job, 
            Reporter reporter) throws IOException; 
} 

Các JobClient gọi getSplits (phương pháp) , chuyển số lượng tác vụ bản đồ mong muốn làm đối số numSplits. Con số này được coi là một gợi ý, vì InputFormat imple- các đề xuất được tự do trả lại một số lượng chia tách khác nhau cho số được chỉ định trong numSplits. Sau khi tính toán các phân chia, khách hàng gửi chúng đến trình theo dõi công việc, trong đó sử dụng các vị trí lưu trữ của họ để lên lịch các tác vụ bản đồ để xử lý chúng trên các bộ công việc.

Trên công cụ nhiệm vụ, tác vụ bản đồ chuyển phần tách sang phương thức getRecordReader() trên InputFormat để có được một RecordReader cho phần tách đó. Một RecordReader nhỏ hơn một trình lặp trên bản ghi và tác vụ bản đồ sử dụng một để tạo các cặp khóa-giá trị bản ghi, mà nó chuyển đến hàm bản đồ. Một đoạn mã (dựa trên mã trong MapRunner) minh họa ý tưởng:

K key = reader.createKey(); 
V value = reader.createValue(); 
while (reader.next(key, value)) { 
    mapper.map(key, value, output, reporter); 
} 
+0

Loại đó hoạt động. Nhưng điều đó thực sự không trả lời câu hỏi. Đã xảy ra sự cố khi thêm InputFormats mới dưới 18.3. – monksy

+2

Ok Tôi xin lỗi. Thật vậy, không có câu hỏi thực sự, vì tôi không thấy dấu hỏi :-P Vì vậy, bạn cần biết thêm điều gì khác cụ thể hơn? –

1

Tôi nghĩ rằng trong trường hợp của bạn, bạn có thể làm theo mô hình phân cấp và thực hiện một wrapper xung quanh LineRecordReader đó sẽ ghi đè các phương pháp cần thiết tức là tiếp theo() (hoặc nextKeyValue() trong API mới) để đặt giá trị thành một kết nối của N dòng, thay vì một dòng.

Tôi đã googled việc thực hiện mẫu của ParagraphRecordReader sử dụng LineRecordReader để đọc dòng dữ liệu đầu vào theo dòng (và nối nó) cho đến khi gặp EOF hoặc dòng trống. Sau đó, nó trả về cặp, trong đó giá trị là một đoạn (thay vì một dòng). Hơn nữa, ParagraphInputFormat cho ParagraphRecordReader này đơn giản như TextInputFormat chuẩn.

Bạn có thể tìm thấy các liên kết cần thiết để triển khai này và một vài từ về bài đăng sau: http://hadoop-mapreduce.blogspot.com/2011/03/little-more-complicated-recordreaders.html.

nhất

2

tôi giải quyết vấn đề này thời gian gần đây bằng cách đơn giản tạo InputFormat riêng tôi đó sẽ ghi đè NLineInputFormat và thực hiện một MultiLineRecordReader tùy chỉnh thay vì LineReader mặc định.

Tôi đã chọn để mở rộng NLineInputFormat vì tôi muốn có cùng một đảm bảo có chính xác N dòng trên mỗi phân chia.

đọc kỷ lục này được lấy gần như là từ http://bigdatacircus.com/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/

Những điều duy nhất tôi sửa đổi là tài sản cho maxLineLength rằng bây giờ sử dụng API mới, và các giá trị cho NLINESTOPROCESS đó được đọc từ setNumLinesPerSplit() INSEAD bị hardcoded NLineInputFormat của (để linh hoạt hơn).

Dưới đây là kết quả:

public class MultiLineInputFormat extends NLineInputFormat{ 
    @Override 
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) { 
     context.setStatus(genericSplit.toString()); 
     return new MultiLineRecordReader(); 
    } 

    public static class MultiLineRecordReader extends RecordReader<LongWritable, Text>{ 
     private int NLINESTOPROCESS; 
     private LineReader in; 
     private LongWritable key; 
     private Text value = new Text(); 
     private long start =0; 
     private long end =0; 
     private long pos =0; 
     private int maxLineLength; 

     @Override 
     public void close() throws IOException { 
      if (in != null) { 
       in.close(); 
      } 
     } 

     @Override 
     public LongWritable getCurrentKey() throws IOException,InterruptedException { 
      return key; 
     } 

     @Override 
     public Text getCurrentValue() throws IOException, InterruptedException { 
      return value; 
     } 

     @Override 
     public float getProgress() throws IOException, InterruptedException { 
      if (start == end) { 
       return 0.0f; 
      } 
      else { 
       return Math.min(1.0f, (pos - start)/(float)(end - start)); 
      } 
     } 

     @Override 
     public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException, InterruptedException { 
      NLINESTOPROCESS = getNumLinesPerSplit(context); 
      FileSplit split = (FileSplit) genericSplit; 
      final Path file = split.getPath(); 
      Configuration conf = context.getConfiguration(); 
      this.maxLineLength = conf.getInt("mapreduce.input.linerecordreader.line.maxlength",Integer.MAX_VALUE); 
      FileSystem fs = file.getFileSystem(conf); 
      start = split.getStart(); 
      end= start + split.getLength(); 
      boolean skipFirstLine = false; 
      FSDataInputStream filein = fs.open(split.getPath()); 

      if (start != 0){ 
       skipFirstLine = true; 
       --start; 
       filein.seek(start); 
      } 
      in = new LineReader(filein,conf); 
      if(skipFirstLine){ 
       start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start)); 
      } 
      this.pos = start; 
     } 

     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
      if (key == null) { 
       key = new LongWritable(); 
      } 
      key.set(pos); 
      if (value == null) { 
       value = new Text(); 
      } 
      value.clear(); 
      final Text endline = new Text("\n"); 
      int newSize = 0; 
      for(int i=0;i<NLINESTOPROCESS;i++){ 
       Text v = new Text(); 
       while (pos < end) { 
        newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength)); 
        value.append(v.getBytes(),0, v.getLength()); 
        value.append(endline.getBytes(),0, endline.getLength()); 
        if (newSize == 0) { 
         break; 
        } 
        pos += newSize; 
        if (newSize < maxLineLength) { 
         break; 
        } 
       } 
      } 
      if (newSize == 0) { 
       key = null; 
       value = null; 
       return false; 
      } else { 
       return true; 
      } 
     } 
    } 

} 
Các vấn đề liên quan