2010-08-16 17 views
16

Tôi là người mới trong Hadoop. Tôi đang thử chương trình Wordcount.MultipleOutputFormat in hadoop

Bây giờ để thử nhiều tệp đầu ra, tôi sử dụng MultipleOutputFormat. liên kết này đã giúp tôi làm điều đó. http://hadoop.apache.org/common/docs/r0.19.0/api/org/apache/hadoop/mapred/lib/MultipleOutputs.html

trong lớp học lái xe của tôi đã

MultipleOutputs.addNamedOutput(conf, "even", 
      org.apache.hadoop.mapred.TextOutputFormat.class, Text.class, 
      IntWritable.class); 

    MultipleOutputs.addNamedOutput(conf, "odd", 
      org.apache.hadoop.mapred.TextOutputFormat.class, Text.class, 
      IntWritable.class);` 

và tôi giảm lớp trở này

public static class Reduce extends MapReduceBase implements 
     Reducer<Text, IntWritable, Text, IntWritable> { 
    MultipleOutputs mos = null; 

    public void configure(JobConf job) { 
     mos = new MultipleOutputs(job); 
    } 

    public void reduce(Text key, Iterator<IntWritable> values, 
      OutputCollector<Text, IntWritable> output, Reporter reporter) 
      throws IOException { 
     int sum = 0; 
     while (values.hasNext()) { 
      sum += values.next().get(); 
     } 
     if (sum % 2 == 0) { 
      mos.getCollector("even", reporter).collect(key, new IntWritable(sum)); 
     }else { 
      mos.getCollector("odd", reporter).collect(key, new IntWritable(sum)); 
     } 
     //output.collect(key, new IntWritable(sum)); 
    } 
    @Override 
    public void close() throws IOException { 
     // TODO Auto-generated method stub 
    mos.close(); 
    } 
} 

Những điều làm việc, nhưng tôi nhận được rất nhiều các tác phẩm, (một số lẻ và một ngay cả đối với mỗi bản đồ -reduce)

Câu hỏi là: Làm cách nào tôi có thể chỉ có 2 tệp đầu ra (lẻ & thậm chí) sao cho mỗi đầu ra lẻ của mỗi bản đồ giảm được ghi vào số lẻ đó và thậm chí là cả.

+5

Bạn đang sử dụng MultipleOutputs không MultipleOutputFormat. Cả hai đều là các thư viện khác nhau. –

Trả lời

3

Mỗi bộ giảm tốc sử dụng OutputFormat để ghi bản ghi. Vì vậy, đó là lý do tại sao bạn đang nhận được một tập hợp các tập tin lẻ và thậm chí mỗi bộ giảm tốc. Điều này là do thiết kế sao cho mỗi bộ giảm tốc có thể thực hiện viết song song.

Nếu bạn muốn chỉ một tệp lẻ và đơn lẻ, bạn sẽ cần phải đặt mapred.reduce.tasks thành 1. Nhưng hiệu suất sẽ bị ảnh hưởng bởi vì tất cả người vẽ bản đồ sẽ được đưa vào một trình giảm tốc duy nhất.

Một tùy chọn khác là thay đổi quy trình đọc các tệp này để chấp nhận nhiều tệp đầu vào hoặc viết một quá trình riêng biệt kết hợp các tệp này với nhau.

+3

insttead thay đổi bản đồ nhiệm vụ màu đỏ, tôi ghi đè getFilenameForKeyValue() chức năng .. và điều này làm việc ..... cảm ơn. – raj

1

Nhiều tệp đầu ra sẽ được tạo dựa trên số lượng bộ giảm.

Bạn có thể sử dụng hadoop dfs -getmerge cho các kết quả được hợp nhất

+0

cảm ơn :) nhưng tôi cần làm điều này bằng bản đồ chỉ giảm, – raj

3

Tôi đã viết một lớp để thực hiện việc này. Chỉ cần sử dụng nó công việc của bạn:

job.setOutputFormatClass(m_customOutputFormatClass); 

Đây là lớp học của tôi:

import java.io.IOException; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.Map.Entry; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

/** 
* TextOutputFormat extension which enables writing the mapper/reducer's output in multiple files.<br> 
* <p> 
* <b>WARNING</b>: The number of different folder shuoldn't be large for one mapper since we keep an 
* {@link RecordWriter} instance per folder name. 
* </p> 
* <p> 
* In this class the folder name is defined by the written entry's key.<br> 
* To change this behavior simply extend this class and override the 
* {@link HdMultipleFileOutputFormat#getFolderNameExtractor()} method and create your own 
* {@link FolderNameExtractor} implementation. 
* </p> 
* 
* 
* @author ykesten 
* 
* @param <K> - Keys type 
* @param <V> - Values type 
*/ 
public class HdMultipleFileOutputFormat<K, V> extends TextOutputFormat<K, V> { 

    private String folderName; 

    private class MultipleFilesRecordWriter extends RecordWriter<K, V> { 

     private Map<String, RecordWriter<K, V>> fileNameToWriter; 
     private FolderNameExtractor<K, V> fileNameExtractor; 
     private TaskAttemptContext job; 

     public MultipleFilesRecordWriter(FolderNameExtractor<K, V> fileNameExtractor, TaskAttemptContext job) { 
      fileNameToWriter = new HashMap<String, RecordWriter<K, V>>(); 
      this.fileNameExtractor = fileNameExtractor; 
      this.job = job; 
     } 

     @Override 
     public void write(K key, V value) throws IOException, InterruptedException { 
      String fileName = fileNameExtractor.extractFolderName(key, value); 
      RecordWriter<K, V> writer = fileNameToWriter.get(fileName); 
      if (writer == null) { 
       writer = createNewWriter(fileName, fileNameToWriter, job); 
       if (writer == null) { 
        throw new IOException("Unable to create writer for path: " + fileName); 
       } 
      } 
      writer.write(key, value); 
     } 

     @Override 
     public void close(TaskAttemptContext context) throws IOException, InterruptedException { 
      for (Entry<String, RecordWriter<K, V>> entry : fileNameToWriter.entrySet()) { 
       entry.getValue().close(context); 
      } 
     } 

    } 

    private synchronized RecordWriter<K, V> createNewWriter(String folderName, 
      Map<String, RecordWriter<K, V>> fileNameToWriter, TaskAttemptContext job) { 
     try { 
      this.folderName = folderName; 
      RecordWriter<K, V> writer = super.getRecordWriter(job); 
      this.folderName = null; 
      fileNameToWriter.put(folderName, writer); 
      return writer; 
     } catch (Exception e) { 
      e.printStackTrace(); 
      return null; 
     } 
    } 

    @Override 
    public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException { 
     Path path = super.getDefaultWorkFile(context, extension); 
     if (folderName != null) { 
      String newPath = path.getParent().toString() + "/" + folderName + "/" + path.getName(); 
      path = new Path(newPath); 
     } 
     return path; 
    } 

    @Override 
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { 
     return new MultipleFilesRecordWriter(getFolderNameExtractor(), job); 
    } 

    public FolderNameExtractor<K, V> getFolderNameExtractor() { 
     return new KeyFolderNameExtractor<K, V>(); 
    } 

    public interface FolderNameExtractor<K, V> { 
     public String extractFolderName(K key, V value); 
    } 

    private static class KeyFolderNameExtractor<K, V> implements FolderNameExtractor<K, V> { 
     public String extractFolderName(K key, V value) { 
      return key.toString(); 
     } 
    } 

}