2012-06-16 19 views
8

Tôi có một công việc haddop rằng đầu ra của nó phải được ghi vào HBase. Tôi không thực sự cần giảm tốc, loại hàng tôi muốn chèn được xác định trong Mapper.Hadoop - Viết cho HBase trực tiếp từ Người lập bản đồ

Làm cách nào để sử dụng TableOutputFormat để đạt được điều này? Từ tất cả các ví dụ tôi đã thấy giả định là bộ giảm tốc là một trong những tạo ra các Đặt, và rằng TableMapper chỉ là để đọc từ bảng HBase.

Trong trường hợp của tôi đầu vào là HDFS đầu ra là Đặt vào bảng cụ thể, tôi không thể tìm thấy bất kỳ điều gì trong TableMapReduceUtil có thể giúp tôi với điều đó.

Có ví dụ nào ở đó có thể giúp tôi với điều đó không?

BTW, Tôi đang sử dụng API Hadoop mới

+0

bạn đang cố chèn bao nhiêu bản ghi? – Gevorg

Trả lời

1

Bạn chỉ cần thực hiện kết quả đầu ra của trình ánh xạ. OutputFormat chỉ chỉ định cách duy trì khóa-giá trị đầu ra. Nó không nhất thiết có nghĩa là các giá trị chính đến từ bộ giảm tốc. Bạn sẽ cần phải làm một cái gì đó như thế này trong mapper:

... extends TableMapper<ImmutableBytesWritable, Put>() { 
    ... 
    ... 
    context.write(<some key>, <some Put or Delete object>); 
} 
7

Đây là ví dụ về đọc từ tập tin và đặt tất cả các dòng vào HBase. Ví dụ này là từ "Hbase: Hướng dẫn dứt khoát" và bạn có thể tìm thấy nó trên kho lưu trữ. Để làm cho nó chỉ sao chép repo trên máy tính của bạn:

git clone git://github.com/larsgeorge/hbase-book.git 

Trong cuốn sách này bạn cũng có thể tìm thấy tất cả các giải thích về mã. Nhưng nếu một cái gì đó không thể hiểu được cho bạn, hãy hỏi.

` public class ImportFromFile { 
    public static final String NAME = "ImportFromFile"; 
    public enum Counters { LINES } 

    static class ImportMapper 
    extends Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> { 
     private byte[] family = null; 
     private byte[] qualifier = null; 

     @Override 
     protected void setup(Context context) 
     throws IOException, InterruptedException { 
     String column = context.getConfiguration().get("conf.column"); 
     byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column)); 
     family = colkey[0]; 
     if (colkey.length > 1) { 
      qualifier = colkey[1]; 
     } 
     } 

     @Override 
     public void map(LongWritable offset, Text line, Context context) 
     throws IOException { 
      try { 
      String lineString = line.toString(); 
      byte[] rowkey = DigestUtils.md5(lineString); 
      Put put = new Put(rowkey); 
      put.add(family, qualifier, Bytes.toBytes(lineString)); 
      context.write(new ImmutableBytesWritable(rowkey), put); 
      context.getCounter(Counters.LINES).increment(1); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
     } 
    } 

    private static CommandLine parseArgs(String[] args) throws ParseException { 
     Options options = new Options(); 
     Option o = new Option("t", "table", true, 
     "table to import into (must exist)"); 
     o.setArgName("table-name"); 
     o.setRequired(true); 
     options.addOption(o); 
     o = new Option("c", "column", true, 
     "column to store row data into (must exist)"); 
     o.setArgName("family:qualifier"); 
     o.setRequired(true); 
     options.addOption(o); 
     o = new Option("i", "input", true, 
     "the directory or file to read from"); 
     o.setArgName("path-in-HDFS"); 
     o.setRequired(true); 
     options.addOption(o); 
     options.addOption("d", "debug", false, "switch on DEBUG log level"); 
     CommandLineParser parser = new PosixParser(); 
     CommandLine cmd = null; 
     try { 
     cmd = parser.parse(options, args); 
     } catch (Exception e) { 
     System.err.println("ERROR: " + e.getMessage() + "\n"); 
     HelpFormatter formatter = new HelpFormatter(); 
     formatter.printHelp(NAME + " ", options, true); 
     System.exit(-1); 
     } 
     return cmd; 
    } 

    public static void main(String[] args) throws Exception { 
     Configuration conf = HBaseConfiguration.create(); 
     String[] otherArgs = 
     new GenericOptionsParser(conf, args).getRemainingArgs(); 
     CommandLine cmd = parseArgs(otherArgs); 
     String table = cmd.getOptionValue("t"); 
     String input = cmd.getOptionValue("i"); 
     String column = cmd.getOptionValue("c"); 
     conf.set("conf.column", column); 
     Job job = new Job(conf, "Import from file " + input + " into table " + table); 

      job.setJarByClass(ImportFromFile.class); 
     job.setMapperClass(ImportMapper.class); 
     job.setOutputFormatClass(TableOutputFormat.class); 
     job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table); 
     job.setOutputKeyClass(ImmutableBytesWritable.class); 
     job.setOutputValueClass(Writable.class); 
     job.setNumReduceTasks(0); 
     FileInputFormat.addInputPath(job, new Path(input)); 
     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
    }` 
+1

Tôi nhận được những điều sau đây: 'Ngoại lệ từ container-launch: org.apache.hadoop.util.Shell $ ExitCodeException' Bạn có gặp phải vấn đề này với mã bên trên không? Tôi đang sử dụng Hadoop2.4 và Hbase0.94.18 – Gevorg

Các vấn đề liên quan