2015-06-23 18 views
10

Tôi đang viết một thành phần cần có tệp nhị phân mới trong một đường dẫn HDFS cụ thể, để tôi có thể thực hiện một số việc học trực tuyến dựa trên dữ liệu này. Vì vậy, tôi muốn đọc tập tin nhị phân được tạo ra bởi Flume từ HDFS trong dòng. Tôi tìm thấy một số chức năng được cung cấp bởi tia lửa API, nhưLàm thế nào để sử dụng API Java Spark để đọc luồng tệp nhị phân từ HDFS?

public JavaDStream<byte[]> binaryRecordsStream(String directory,int recordLength) 

public <K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>> 
    JavaPairInputDStream<K,V> fileStream(String directory, Class<K> kClass, Class<V> vClass, Class<F> fClass) 

Nhưng, tôi thực sự không biết làm thế nào để sử dụng các chức năng này. Tôi đã thử binaryRecordStream, nhưng nó xác định độ dài tệp cụ thể, vì vậy nó không tốt.

Đối fileStream chức năng, tôi đã sử dụng:

SparkConf sparkConf = new SparkConf().setAppName("SparkFileStreamTest").setMaster("local[2]"); 

// Create the context with the specified batch size 
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(durationInMillis)); 

JavaPairInputDStream<LongWritable, BytesWritable> inputDStream = jssc.fileStream(hdfsPath, LongWritable.class, BytesWritable.class, CustomInputFormat.class); 

//********************************************************************** 
JavaPairInputDStream<LongWritable, BytesWritable> inputDStream = jssc.fileStream(
      hdfsPath, LongWritable.class, BytesWritable.class, CustomInputFormat.class); 

JavaDStream<byte[]> content = inputDStream.map(new Function<Tuple2<LongWritable, BytesWritable>, byte[]>() { 
    @Override 
    public byte[] call(Tuple2<LongWritable, BytesWritable> tuple2) { 
     System.out.println("----------------[testReadFileStreamFromHDFS] ENTER ......"); 
     if (tuple2 == null) { 
      System.out.println("----------------[testReadFileStreamFromHDFS] TUPLE = NULL"); 
      System.out.println("----------------[testReadFileStreamFromHDFS] END."); 
      return null; 
     } 
     else { 
      System.out.println("----------------[testReadFileStreamFromHDFS] KEY = [" + tuple2._1().toString() + "]"); 
      System.out.println("----------------[testReadFileStreamFromHDFS] VAL-LENGTH = [" + tuple2._2().getBytes().length + "]"); 
      System.out.println("----------------[testReadFileStreamFromHDFS] END."); 
      return tuple2._2().getBytes(); 
     } 
    } 
}); 

/***********************************************************************/ 
if (content == null) { 
     System.out.println("----------------[testReadFileStreamFromHDFS] CONTENT = NULL"); 
} 
else { 
    System.out.println("----------------[testReadFileStreamFromHDFS] CONTENT-length = [" + content.count()); 
    content.print(); 
} 

System.out.println("----------------[testReadFileStreamFromHDFS] END-111."); 

jssc.start(); 
jssc.awaitTermination(); 
System.out.println("----------------[testReadFileStreamFromHDFS] END-222."); 

Đối CustomInputFormat, tôi đã tạo

public class CustomInputFormat extends FileInputFormat<LongWritable, BytesWritable> { 

private CustomInputSplit mInputSplit; 

public CustomInputFormat() { 
    mInputSplit = new CustomInputSplit(); 
} 

@Override 
public List<InputSplit> getSplits(JobContext context) 
     throws IOException { 

    System.out.println("----------------[CustomInputFormat] 1111 ......"); 
    final ArrayList<InputSplit> result = new ArrayList<InputSplit>(); 
    result.add(mInputSplit); 

    System.out.println("----------------[CustomInputFormat] 2222 ......"); 
    return result; 
} 

@Override 
public RecordReader<LongWritable, BytesWritable> createRecordReader(
     InputSplit inputSplit, TaskAttemptContext taskAttemptContext) 
     throws IOException, InterruptedException { 

    System.out.println("----------------[CustomInputFormat] 3333 ......"); 
    System.out.println("----------------[CustomInputFormat] ENTER createRecordReader, inputSplit-length = [" 
      + inputSplit.getLength() + "]"); 

    mInputSplit.init(inputSplit); 

    System.out.println("----------------[CustomInputFormat] 4444 ......"); 
    return new CustomRecordReader(); 
} 

@Override 
protected boolean isSplitable(JobContext context, Path filename) { 
    System.out.println("----------------[CustomInputFormat] 5555 ......"); 
    return false; 
} 

public class CustomRecordReader extends RecordReader<LongWritable, BytesWritable> { 

private BytesWritable mValues; 
private int mCursor; 

public CustomRecordReader() { 
    System.out.println("----------------[CustomRecordReader] 1111 ......"); 
    mValues = null; 
    mCursor = 0; 
    System.out.println("----------------[CustomRecordReader] 2222 ......"); 
} 

@Override 
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) 
     throws IOException, InterruptedException { 
    System.out.println("----------------[CustomRecordReader] 3333 ......"); 
    CustomInputSplit customInputSplit = (CustomInputSplit) inputSplit; 
    mValues = customInputSplit.getValues(); 
    System.out.println("----------------[CustomRecordReader] 4444 ......"); 
} 

@Override 
public boolean nextKeyValue() throws IOException, InterruptedException { 
    System.out.println("----------------[CustomRecordReader] 5555 ......"); 
    boolean existNext = (mCursor == 0); 
    mCursor++; 
    System.out.println("----------------[CustomRecordReader] 6666 ......"); 
    return existNext; 
} 

@Override 
public LongWritable getCurrentKey() throws IOException, InterruptedException { 
    System.out.println("----------------[CustomRecordReader] 7777 ......"); 
    return new LongWritable(0); 
} 

@Override 
public BytesWritable getCurrentValue() throws IOException, InterruptedException { 
    System.out.println("----------------[CustomRecordReader] 8888 ......"); 
    return mValues; 
} 

@Override 
public float getProgress() throws IOException, InterruptedException { 
    System.out.println("----------------[CustomRecordReader] 9999 ......"); 
    return 0; 
} 

@Override 
public void close() throws IOException { 
    System.out.println("----------------[CustomRecordReader] AAAA ......"); 
    mValues = null; 
} 
} 

public class CustomInputSplit extends InputSplit implements Writable { 

private long mLength; 
private String[] mLocations; 

private final BytesWritable mContent; 

public CustomInputSplit() { 
    System.out.println("----------------[CustomInputSplit] 1111 ......"); 
    mLength = 0; 
    mLocations = null; 

    mContent = new BytesWritable(); 
    System.out.println("----------------[CustomInputSplit] 2222 ......"); 
} 

public void init(InputSplit inputSplit) throws IOException, InterruptedException { 
    System.out.println("----------------[CustomInputSplit] 3333 ......"); 
    mLength = inputSplit.getLength(); 

    String[] locations = inputSplit.getLocations(); 
    if (locations != null) { 
     int numLocations = locations.length; 
     mLocations = new String[numLocations]; 
     for (int i = 0; i < numLocations; i++) { 
      mLocations[i] = locations[i]; 
     } 
    } 
    System.out.println("----------------[CustomInputSplit] 4444 ......"); 
} 

@Override 
public long getLength() throws IOException, InterruptedException { 
    System.out.println("----------------[CustomInputSplit] 5555 ......"); 
    return mLength; 
} 

@Override 
public String[] getLocations() throws IOException, InterruptedException { 
    if (mLocations == null) { 
     System.out.println("----------------[CustomInputSplit] 6666-0001 ...... mLocations = [NULL]"); 
     mLocations = new String[] {"localhost"}; 
    } 
    System.out.println("----------------[CustomInputSplit] 6666-0002 ...... mLocations-length = [" + mLocations.length + "]"); 
    return mLocations; 
} 

@Override 
public void write(DataOutput dataOutput) throws IOException { 
    System.out.println("----------------[CustomInputSplit] 7777 ......"); 
    mContent.write(dataOutput); 
} 

@Override 
public void readFields(DataInput dataInput) throws IOException { 
    System.out.println("----------------[CustomInputSplit] 8888 ......"); 
    mContent.readFields(dataInput); 
} 

public BytesWritable getValues() { 
    System.out.println("----------------[CustomInputSplit] 9999 ......"); 
    return mContent; 
} 
} 

Nhưng khi tôi in:

System.out.println("----------------[testReadFileStreamFromHDFS] VAL-LENGTH = [" + tuple2._2().getBytes().length + "]");

tôi luôn luôn nhận được 0 chiều dài :

----------------[testReadFileStreamFromHDFS] VAL-LENGTH = [0]

Có một số vấn đề với CustomerInputFormat.class? Có ai biết làm thế nào để sử dụng Spark API Java dòng để đọc tập tin nhị phân từ HDFS?

Trả lời

0

thử này

 JavaStreamingContext context 
    JavaSparkContext jContext = context.sparkContext(); 
    JavaPairRDD<String, PortableDataStream> rdd = jContext.binaryFiles(fsURI + directoryPath); 

    JavaRDD<Object> rdd1 = rdd.map(new Function<Tuple2<String, PortableDataStream>, Object>() { 
    private static final long serialVersionUID = -7894402430221488712L; 

    @Override 
    public Object call(Tuple2<String, PortableDataStream> arg0) throws Exception { 
    byte[] imageInByte = arg0._2().toArray(); 
    String base64Encoded = DatatypeConverter.printBase64Binary(imageInByte); 
    return (arg0._1 + Constants.COMMA_DELIMITER + base64Encoded).getBytes(); 
    } 
    }); 
    java.util.Queue<JavaRDD<Object>> queue = new LinkedList(); 
    queue.add(rdd1); 
    JavaDStream<Object> dStream = context.queueStream(queue); 

Giới hạn duy nhất với apparoach này là nó sẽ không có khả năng đọc tập tin mới từ HDFS tạo sau khi bắt đầu đường ống này.

0

sử dụng phương pháp này: Viết Receiver tùy chỉnh:

import java.io.IOException; 
import java.nio.charset.StandardCharsets; 
import java.util.ArrayList; 
import java.util.List; 
import java.util.Map; 
import java.util.Map.Entry; 

import javax.xml.bind.DatatypeConverter; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileStatus; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.input.PortableDataStream; 
import org.apache.spark.sql.DataFrame; 
import org.apache.spark.sql.SQLContext; 
import org.apache.spark.storage.StorageLevel; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairInputDStream; 
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.receiver.Receiver; 
class DFSReceiver extends Receiver<byte[]> { 

    /** The Constant serialVersionUID. */ 
    private static final long serialVersionUID = -1051061769769056605L; 
    Long windowSize = 20000l; 

    /** Instantiates a new RMQ receiver. */ 
    DFSReceiver() { 
     super(StorageLevel.MEMORY_AND_DISK_SER_2()); 
    } 

    @Override 
    public void onStart() { 
     System.out.println("Inside onStart method"); 
     new Thread() { 
      @Override 
      public void run() { 

       try { 
        receive(); 
       } 
       } catch (Exception e) { 
        e.printStackTrace(); 
        LOGGER.error("Exception raised at DFSReceiverHelper , exception : " + e); 
       } 

      } 
     }.start(); 
    } 

    /** Receive. 
    * 
    * @throws Exception 
    *    the exception */ 
    protected void receive() throws Exception { 
     try { 
      ConnectionMetadata connectionMetadata = ConnectionMetadataFactory.getConnectionMetadataObj(ConnectionConstants.HDFS_DATA_STORE); 
      String connectionId = connectionMetadata.getConnectionId(ConnectionConstants.HDFS_DATA_STORE, connectionName); 
      ConnectionMetaDataDTO c = connectionMetadata.getConnectionMetaDataById(connectionId); 

      Map<String, Object> map = connectionMetadata.getConnectionConfigParameters(c); 
      FileSystem fs = HDFSUtils.getFileSystemInstance(map); 

      JavaPairRDD<String, PortableDataStream> rdd = sparkContext.binaryFiles(fsURI + directoryPath); 
      List<Tuple2<String, PortableDataStream>> rddList = rdd.collect(); 
      for (Tuple2<String, PortableDataStream> arg0 : rddList) { 
       byte[] imageInByte = arg0._2().toArray(); 
       String base64Encoded = DatatypeConverter.printBase64Binary(imageInByte); 
       store((arg0._1 + Constants.COMMA_DELIMITER + base64Encoded).getBytes()); 
      } 

      Long time = System.currentTimeMillis(); 
      System.out.println(); 
      Thread.currentThread().sleep(windowSize); 
      while (true) { 
       List<Path> newFiles = checkIfNewFileCreated(fs, new Path(fsURI + directoryPath), time); 
       for (Path p : newFiles) { 
        JavaPairRDD<String, PortableDataStream> rdd11 = sparkContext.binaryFiles(p.toString()); 
        Tuple2<String, PortableDataStream> arg0 = rdd11.first(); 
        byte[] imageInByte = arg0._2().toArray(); 
        String base64Encoded = DatatypeConverter.printBase64Binary(imageInByte); 
        store((arg0._1 + Constants.COMMA_DELIMITER + base64Encoded).getBytes()); 
       } 
       Thread.currentThread().sleep(windowSize); 
       time += windowSize; 
      } 
     } catch (ShutdownSignalException s) { 
      LOGGER.error("ShutdownSignalException raised in receive method of DFSReceiver", s); 
     } 
    } 

    private List<Path> checkIfNewFileCreated(FileSystem fs, Path p, Long timeStamp) throws IOException { 
     List<Path> fileList = new ArrayList<>(); 
     if (fs.isDirectory(p)) { 
      FileStatus[] fStatus = fs.listStatus(p); 
      for (FileStatus status : fStatus) { 
       if (status.isFile() && timeStamp < status.getModificationTime() && timeStamp + windowSize >= status.getModificationTime()) { 
        fileList.add(status.getPath()); 
       } 
      } 
     } 
     return fileList; 
    } 

    @Override 
    public void onStop() { 

    } 
}  

Với thu này, bạn sẽ có thể đọc các tập tin mới được tạo ra cũng mỗi 20 giây.

Các vấn đề liên quan