Mọi người đều biết rằng Pig đã ủng hộ DBStorage, nhưng họ chỉ được hỗ trợ kết quả tải từ lợn với mysql như thếMột cách để đọc dữ liệu bảng từ Mysql để Pig
STORE data INTO DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'INSERT ...');
Nhưng Xin chỉ cho tôi cách để đọc bảng từ mysql như thế
data = LOAD 'my_table' AS DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'SELECT * FROM my_table');
đây là mã của tôi
public class DBLoader extends LoadFunc {
private final Log log = LogFactory.getLog(getClass());
private ArrayList mProtoTuple = null;
private Connection con;
private String jdbcURL;
private String user;
private String pass;
private int batchSize;
private int count = 0;
private String query;
ResultSet result;
protected TupleFactory mTupleFactory = TupleFactory.getInstance();
public DBLoader() {
}
public DBLoader(String driver, String jdbcURL, String user, String pass,
String query) {
try {
Class.forName(driver);
} catch (ClassNotFoundException e) {
log.error("can't load DB driver:" + driver, e);
throw new RuntimeException("Can't load DB Driver", e);
}
this.jdbcURL = jdbcURL;
this.user = user;
this.pass = pass;
this.query = query;
}
@Override
public InputFormat getInputFormat() throws IOException {
// TODO Auto-generated method stub
return new TextInputFormat();
}
@Override
public Tuple getNext() throws IOException {
// TODO Auto-generated method stub
boolean next = false;
try {
next = result.next();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (!next)
return null;
int numColumns = 0;
// Get result set meta data
ResultSetMetaData rsmd;
try {
rsmd = result.getMetaData();
numColumns = rsmd.getColumnCount();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
for (int i = 0; i < numColumns; i++) {
try {
Object field = result.getObject(i);
switch (DataType.findType(field)) {
case DataType.NULL:
mProtoTuple.add(null);
break;
case DataType.BOOLEAN:
mProtoTuple.add((Boolean) field);
break;
case DataType.INTEGER:
mProtoTuple.add((Integer) field);
break;
case DataType.LONG:
mProtoTuple.add((Long) field);
break;
case DataType.FLOAT:
mProtoTuple.add((Float) field);
break;
case DataType.DOUBLE:
mProtoTuple.add((Double) field);
break;
case DataType.BYTEARRAY:
byte[] b = ((DataByteArray) field).get();
mProtoTuple.add(b);
break;
case DataType.CHARARRAY:
mProtoTuple.add((String) field);
break;
case DataType.BYTE:
mProtoTuple.add((Byte) field);
break;
case DataType.MAP:
case DataType.TUPLE:
case DataType.BAG:
throw new RuntimeException("Cannot store a non-flat tuple "
+ "using DbStorage");
default:
throw new RuntimeException("Unknown datatype "
+ DataType.findType(field));
}
} catch (Exception ee) {
throw new RuntimeException(ee);
}
}
Tuple t = mTupleFactory.newTuple(mProtoTuple);
mProtoTuple.clear();
return t;
}
@Override
public void prepareToRead(RecordReader arg0, PigSplit arg1)
throws IOException {
con = null;
if (query == null) {
throw new IOException("SQL Insert command not specified");
}
try {
if (user == null || pass == null) {
con = DriverManager.getConnection(jdbcURL);
} else {
con = DriverManager.getConnection(jdbcURL, user, pass);
}
con.setAutoCommit(false);
result = con.createStatement().executeQuery(query);
} catch (SQLException e) {
log.error("Unable to connect to JDBC @" + jdbcURL);
throw new IOException("JDBC Error", e);
}
count = 0;
}
@Override
public void setLocation(String location, Job job) throws IOException {
// TODO Auto-generated method stub
//TextInputFormat.setInputPaths(job, location);
}
class MyDBInputFormat extends InputFormat<NullWritable, NullWritable>{
@Override
public RecordReader<NullWritable, NullWritable> createRecordReader(
InputSplit arg0, TaskAttemptContext arg1) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return null;
}
@Override
public List<InputSplit> getSplits(JobContext arg0) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return null;
}
}
}
Tôi cố gắng nhiều lần để viết UDF nhưng không thành công .....
Xin chào, cảm ơn bạn. Nhưng như tôi đã đề cập trước đây, tôi chỉ muốn tải trực tiếp được sử dụng từ bên trong Pig. Tôi đăng mã của tôi mở rộng từ LoadFunc như bạn nói. Khi tôi chạy mã của tôi từ lớn, nó luôn luôn ném ngoại lệ. – phuongdo
Sau đó, hãy thêm ngoại lệ bạn đang thấy –
@phuongdo Bạn có thành công trong việc viết Pig LoadFunc để tải dữ liệu từ mysql không? – Shri