2013-04-17 21 views
15

Tôi đang tìm một ví dụ đang sử dụng API mới để đọc và ghi các tệp chuỗi.Đọc và Viết Sequencefile bằng cách sử dụng Hadoop 2.0 Apis

hiệu quả Tôi cần phải biết làm thế nào để sử dụng các chức năng này

createWriter(Configuration conf, org.apache.hadoop.io.SequenceFile.Writer.Option... opts) 

Định nghĩa Cũ không làm việc cho tôi:

SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass()); 

Tương tự như vậy tôi cần phải biết những gì sẽ là mã cho đọc Tệp trình tự, vì phần đuôi không được chấp nhận:

SequenceFile.Reader(fs, path, conf); 

Dưới đây là cách sử dụng giống nhau -

String uri = args[0]; 
    Configuration conf = new Configuration(); 
    Path path = new Path(uri); 

    IntWritable key = new IntWritable(); 
    Text value = new Text(); 

    CompressionCodec Codec = new GzipCodec(); 
    SequenceFile.Writer writer = null; 
    Option optPath = SequenceFile.Writer.file(path); 
    Option optKey = SequenceFile.Writer.keyClass(key.getClass()); 
    Option optVal = SequenceFile.Writer.valueClass(value.getClass()); 
    Option optCom = SequenceFile.Writer.compression(CompressionType.RECORD, Codec); 

     writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCom); 

Trả lời

12
public class SequenceFilesTest { 
    @Test 
    public void testSeqFileReadWrite() throws IOException { 
    Configuration conf = new Configuration(); 
    FileSystem fs = FileSystem.getLocal(conf); 
    Path seqFilePath = new Path("file.seq"); 
    SequenceFile.Writer writer = SequenceFile.createWriter(conf, 
      Writer.file(seqFilePath), Writer.keyClass(Text.class), 
      Writer.valueClass(IntWritable.class)); 

    writer.append(new Text("key1"), new IntWritable(1)); 
    writer.append(new Text("key2"), new IntWritable(2)); 

    writer.close(); 

    SequenceFile.Reader reader = new SequenceFile.Reader(conf, 
      Reader.file(seqFilePath)); 

    Text key = new Text(); 
    IntWritable val = new IntWritable(); 

    while (reader.next(key, val)) { 
     System.err.println(key + "\t" + val); 
    } 

    reader.close(); 
    } 
} 
+1

này vẫn còn sử dụng các API cũ 'SequenceFile.Reader đọc = đọc mới (fs, seqFilePath, conf);?' –

+0

Cám ơn nỗ lực của bạn –

+0

Ý anh là gì bởi API cũ (mapred vs MapReduce này? mã không sử dụng bất kỳ thứ gì từ các gói đó) –

-4

Bạn cần phải thiết lập SequenceFile như là định dạng đầu vào

job.setInputFormatClass(SequenceFileInputFormat.class); 

Bạn sẽ tìm thấy một ví dụ về đọc SeequnceFile dạng HDFS here.

7

tôi đến muộn hơn một năm để trả lời nhưng chỉ đã bắt đầu với Hadoop 2.4.1 :)

Dưới đây là mã, ai đó có thể tìm thấy nó hữu ích.

Lưu ý: Mã này bao gồm mã 1.x nhận xét để đọc và viết tệp chuỗi. Tôi đã tự hỏi nơi nào nó nhặt hệ thống tập tin nhưng khi tôi thực hiện nó trực tiếp trên cụm, nó nhặt nó đúng cách (có lẽ, từ cốt lõi-site.xml như đã đề cập trong Configuration

import java.io.File; 
import java.io.IOException; 
import java.io.RandomAccessFile; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.io.IOUtils; 
import org.apache.hadoop.io.SequenceFile; 
import org.apache.hadoop.io.SequenceFile.Reader.Option; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.Writable; 
import org.apache.hadoop.util.ReflectionUtils; 

public class SequenceFileOperator { 

    private Configuration conf = new Configuration(); 
    /*private FileSystem fs; 
    { 
     try { 
      fs = FileSystem.get(URI.create("hdfs://cldx-1336-1202:9000"), conf); 
     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    }*/ 

    public static void main(String[] args) throws IOException { 
     // TODO Auto-generated method stub 

     if (args == null || args.length < 2) { 

      System.out 
        .println("Following are the possible invocations <operation id> <arg1> <arg2> ..."); 

      System.out 
        .println("1 <absolute path of directory containing documents> <HDFS path of the sequence file"); 

      System.out.println("2 <HDFS path of the sequence file>"); 
      return; 
     } 

     int operation = Integer.valueOf(args[0]); 

     SequenceFileOperator docToSeqFileWriter = new SequenceFileOperator(); 

     switch (operation) { 

     case 1: { 
      String docDirectoryPath = args[1]; 
      String sequenceFilePath = args[2]; 

      System.out.println("Writing files present at " + docDirectoryPath 
        + " to the sequence file " + sequenceFilePath); 

      docToSeqFileWriter.loadDocumentsToSequenceFile(docDirectoryPath, 
        sequenceFilePath); 

      break; 
     } 

     case 2: { 

      String sequenceFilePath = args[1]; 

      System.out.println("Reading the sequence file " + sequenceFilePath); 

      docToSeqFileWriter.readSequenceFile(sequenceFilePath); 

      break; 
     } 

     } 

    } 

    private void readSequenceFile(String sequenceFilePath) throws IOException { 
     // TODO Auto-generated method stub 

     /* 
     * SequenceFile.Reader sequenceFileReader = new SequenceFile.Reader(fs, 
     * new Path(sequenceFilePath), conf); 
     */ 
     Option filePath = SequenceFile.Reader.file(new Path(sequenceFilePath)); 
     SequenceFile.Reader sequenceFileReader = new SequenceFile.Reader(conf, 
       filePath); 

     Writable key = (Writable) ReflectionUtils.newInstance(
       sequenceFileReader.getKeyClass(), conf); 
     Writable value = (Writable) ReflectionUtils.newInstance(
       sequenceFileReader.getValueClass(), conf); 

     try { 

      while (sequenceFileReader.next(key, value)) { 

       System.out 
         .printf("[%s] %s %s \n", 
           sequenceFileReader.getPosition(), key, 
           value.getClass()); 
      } 
     } finally { 
      IOUtils.closeStream(sequenceFileReader); 
     } 

    } 

    private void loadDocumentsToSequenceFile(String docDirectoryPath, 
      String sequenceFilePath) throws IOException { 
     // TODO Auto-generated method stub 

     File docDirectory = new File(docDirectoryPath); 

     if (!docDirectory.isDirectory()) { 
      System.out 
        .println("Please provide an absolute path of a directory that contains the documents to be added to the sequence file"); 
      return; 
     } 

     /* 
     * SequenceFile.Writer sequenceFileWriter = 
     * SequenceFile.createWriter(fs, conf, new Path(sequenceFilePath), 
     * Text.class, BytesWritable.class); 
     */ 
     org.apache.hadoop.io.SequenceFile.Writer.Option filePath = SequenceFile.Writer 
       .file(new Path(sequenceFilePath)); 
     org.apache.hadoop.io.SequenceFile.Writer.Option keyClass = SequenceFile.Writer 
       .keyClass(Text.class); 
     org.apache.hadoop.io.SequenceFile.Writer.Option valueClass = SequenceFile.Writer 
       .valueClass(BytesWritable.class); 

     SequenceFile.Writer sequenceFileWriter = SequenceFile.createWriter(
       conf, filePath, keyClass, valueClass); 

     File[] documents = docDirectory.listFiles(); 

     try { 
      for (File document : documents) { 

       RandomAccessFile raf = new RandomAccessFile(document, "r"); 
       byte[] content = new byte[(int) raf.length()]; 

       raf.readFully(content); 

       sequenceFileWriter.append(new Text(document.getName()), 
         new BytesWritable(content)); 

       raf.close(); 
      } 
     } finally { 
      IOUtils.closeStream(sequenceFileWriter); 
     } 

    } 
} 
0

cho việc đọc, bạn có thể sử dụng

Path path= new Path("/bar"); 
Reader sequenceFileReader = new SequenceFile.Reader(conf,SequenceFile.Reader.file(path)); 
Các vấn đề liên quan