2012-06-08 16 views
6

Mọi người đều biết rằng Pig đã ủng hộ DBStorage, nhưng họ chỉ được hỗ trợ kết quả tải từ lợn với mysql như thếMột cách để đọc dữ liệu bảng từ Mysql để Pig

STORE data INTO DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'INSERT ...'); 

Nhưng Xin chỉ cho tôi cách để đọc bảng từ mysql như thế

data = LOAD 'my_table' AS DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'SELECT * FROM my_table'); 

đây là mã của tôi

public class DBLoader extends LoadFunc { 
    private final Log log = LogFactory.getLog(getClass()); 
    private ArrayList mProtoTuple = null; 
    private Connection con; 
    private String jdbcURL; 
    private String user; 
    private String pass; 
    private int batchSize; 
    private int count = 0; 
    private String query; 
    ResultSet result; 
    protected TupleFactory mTupleFactory = TupleFactory.getInstance(); 

    public DBLoader() { 
    } 

    public DBLoader(String driver, String jdbcURL, String user, String pass, 
      String query) { 

     try { 
      Class.forName(driver); 
     } catch (ClassNotFoundException e) { 
      log.error("can't load DB driver:" + driver, e); 
      throw new RuntimeException("Can't load DB Driver", e); 
     } 
     this.jdbcURL = jdbcURL; 
     this.user = user; 
     this.pass = pass; 
     this.query = query; 

    } 

    @Override 
    public InputFormat getInputFormat() throws IOException { 
     // TODO Auto-generated method stub 
     return new TextInputFormat(); 
    } 

    @Override 
    public Tuple getNext() throws IOException { 
     // TODO Auto-generated method stub 
     boolean next = false; 

     try { 
      next = result.next(); 
     } catch (SQLException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

     if (!next) 
      return null; 
     int numColumns = 0; 
     // Get result set meta data 
     ResultSetMetaData rsmd; 
     try { 
      rsmd = result.getMetaData(); 
      numColumns = rsmd.getColumnCount(); 
     } catch (SQLException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

     for (int i = 0; i < numColumns; i++) { 

      try { 
       Object field = result.getObject(i); 

       switch (DataType.findType(field)) { 
       case DataType.NULL: 

        mProtoTuple.add(null); 

        break; 

       case DataType.BOOLEAN: 
        mProtoTuple.add((Boolean) field); 

        break; 

       case DataType.INTEGER: 
        mProtoTuple.add((Integer) field); 

        break; 

       case DataType.LONG: 
        mProtoTuple.add((Long) field); 

        break; 

       case DataType.FLOAT: 
        mProtoTuple.add((Float) field); 

        break; 

       case DataType.DOUBLE: 
        mProtoTuple.add((Double) field); 

        break; 

       case DataType.BYTEARRAY: 
        byte[] b = ((DataByteArray) field).get(); 
        mProtoTuple.add(b); 

        break; 
       case DataType.CHARARRAY: 
        mProtoTuple.add((String) field); 

        break; 
       case DataType.BYTE: 
        mProtoTuple.add((Byte) field); 

        break; 

       case DataType.MAP: 
       case DataType.TUPLE: 
       case DataType.BAG: 
        throw new RuntimeException("Cannot store a non-flat tuple " 
          + "using DbStorage"); 

       default: 
        throw new RuntimeException("Unknown datatype " 
          + DataType.findType(field)); 

       } 

      } catch (Exception ee) { 
       throw new RuntimeException(ee); 
      } 
     } 

     Tuple t = mTupleFactory.newTuple(mProtoTuple); 
     mProtoTuple.clear(); 
     return t; 

    } 

    @Override 
    public void prepareToRead(RecordReader arg0, PigSplit arg1) 
      throws IOException { 

     con = null; 
     if (query == null) { 
      throw new IOException("SQL Insert command not specified"); 
     } 
     try { 
      if (user == null || pass == null) { 
       con = DriverManager.getConnection(jdbcURL); 
      } else { 
       con = DriverManager.getConnection(jdbcURL, user, pass); 
      } 
      con.setAutoCommit(false); 
      result = con.createStatement().executeQuery(query); 
     } catch (SQLException e) { 
      log.error("Unable to connect to JDBC @" + jdbcURL); 
      throw new IOException("JDBC Error", e); 
     } 
     count = 0; 
    } 

    @Override 
    public void setLocation(String location, Job job) throws IOException { 
     // TODO Auto-generated method stub 

     //TextInputFormat.setInputPaths(job, location); 

    } 

    class MyDBInputFormat extends InputFormat<NullWritable, NullWritable>{ 

     @Override 
     public RecordReader<NullWritable, NullWritable> createRecordReader(
       InputSplit arg0, TaskAttemptContext arg1) throws IOException, 
       InterruptedException { 
      // TODO Auto-generated method stub 
      return null; 
     } 

     @Override 
     public List<InputSplit> getSplits(JobContext arg0) throws IOException, 
       InterruptedException { 
      // TODO Auto-generated method stub 
      return null; 
     } 

    } 

} 

Tôi cố gắng nhiều lần để viết UDF nhưng không thành công .....

Trả lời

2

Như bạn nói, DBStorage chỉ hỗ trợ lưu kết quả vào cơ sở dữ liệu.

Để tải dữ liệu từ MySQL, bạn có thể xem dự án có tên sqoop (sao chép dữ liệu từ cơ sở dữ liệu sang HDFS) hoặc bạn có thể thực hiện kết xuất mysql rồi sao chép tệp vào HDFS. Cả hai cách đều yêu cầu một số tương tác và không thể được sử dụng trực tiếp từ bên trong Pig.

Tùy chọn thứ ba là xem xét viết một tệp LoadFunc Pig (bạn nói bạn đã cố gắng viết UDF). Nó không quá khó, bạn sẽ cần phải vượt qua nhiều tùy chọn giống như DBStorage (trình điều khiển, thông tin kết nối và truy vấn SQL để thực hiện), và bạn có thể sử dụng một số kiểm tra siêu dữ liệu của bộ kết quả để tự động tạo một lược đồ.

+0

Xin chào, cảm ơn bạn. Nhưng như tôi đã đề cập trước đây, tôi chỉ muốn tải trực tiếp được sử dụng từ bên trong Pig. Tôi đăng mã của tôi mở rộng từ LoadFunc như bạn nói. Khi tôi chạy mã của tôi từ lớn, nó luôn luôn ném ngoại lệ. – phuongdo

+0

Sau đó, hãy thêm ngoại lệ bạn đang thấy –

+0

@phuongdo Bạn có thành công trong việc viết Pig LoadFunc để tải dữ liệu từ mysql không? – Shri

Các vấn đề liên quan