一种从Mysql读取表数据到Pig的方法
大家都知道Pig已经支持DBStorage,但它们只支持从 Pig 到 mysql 这样的加载结果
Everyone know that Pig have supported DBStorage, but they are only supported load results from Pig to mysql like that
STORE data INTO DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'INSERT ...');
但是请告诉我像那样从 mysql 读取表的方法
But Please show me the way to read table from mysql like that
data = LOAD 'my_table' AS DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'SELECT * FROM my_table');
这是我的代码
public class DBLoader extends LoadFunc {
private final Log log = LogFactory.getLog(getClass());
private ArrayList mProtoTuple = null;
private Connection con;
private String jdbcURL;
private String user;
private String pass;
private int batchSize;
private int count = 0;
private String query;
ResultSet result;
protected TupleFactory mTupleFactory = TupleFactory.getInstance();
public DBLoader() {
}
public DBLoader(String driver, String jdbcURL, String user, String pass,
String query) {
try {
Class.forName(driver);
} catch (ClassNotFoundException e) {
log.error("can't load DB driver:" + driver, e);
throw new RuntimeException("Can't load DB Driver", e);
}
this.jdbcURL = jdbcURL;
this.user = user;
this.pass = pass;
this.query = query;
}
@Override
public InputFormat getInputFormat() throws IOException {
// TODO Auto-generated method stub
return new TextInputFormat();
}
@Override
public Tuple getNext() throws IOException {
// TODO Auto-generated method stub
boolean next = false;
try {
next = result.next();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (!next)
return null;
int numColumns = 0;
// Get result set meta data
ResultSetMetaData rsmd;
try {
rsmd = result.getMetaData();
numColumns = rsmd.getColumnCount();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
for (int i = 0; i < numColumns; i++) {
try {
Object field = result.getObject(i);
switch (DataType.findType(field)) {
case DataType.NULL:
mProtoTuple.add(null);
break;
case DataType.BOOLEAN:
mProtoTuple.add((Boolean) field);
break;
case DataType.INTEGER:
mProtoTuple.add((Integer) field);
break;
case DataType.LONG:
mProtoTuple.add((Long) field);
break;
case DataType.FLOAT:
mProtoTuple.add((Float) field);
break;
case DataType.DOUBLE:
mProtoTuple.add((Double) field);
break;
case DataType.BYTEARRAY:
byte[] b = ((DataByteArray) field).get();
mProtoTuple.add(b);
break;
case DataType.CHARARRAY:
mProtoTuple.add((String) field);
break;
case DataType.BYTE:
mProtoTuple.add((Byte) field);
break;
case DataType.MAP:
case DataType.TUPLE:
case DataType.BAG:
throw new RuntimeException("Cannot store a non-flat tuple "
+ "using DbStorage");
default:
throw new RuntimeException("Unknown datatype "
+ DataType.findType(field));
}
} catch (Exception ee) {
throw new RuntimeException(ee);
}
}
Tuple t = mTupleFactory.newTuple(mProtoTuple);
mProtoTuple.clear();
return t;
}
@Override
public void prepareToRead(RecordReader arg0, PigSplit arg1)
throws IOException {
con = null;
if (query == null) {
throw new IOException("SQL Insert command not specified");
}
try {
if (user == null || pass == null) {
con = DriverManager.getConnection(jdbcURL);
} else {
con = DriverManager.getConnection(jdbcURL, user, pass);
}
con.setAutoCommit(false);
result = con.createStatement().executeQuery(query);
} catch (SQLException e) {
log.error("Unable to connect to JDBC @" + jdbcURL);
throw new IOException("JDBC Error", e);
}
count = 0;
}
@Override
public void setLocation(String location, Job job) throws IOException {
// TODO Auto-generated method stub
//TextInputFormat.setInputPaths(job, location);
}
class MyDBInputFormat extends InputFormat<NullWritable, NullWritable>{
@Override
public RecordReader<NullWritable, NullWritable> createRecordReader(
InputSplit arg0, TaskAttemptContext arg1) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return null;
}
@Override
public List<InputSplit> getSplits(JobContext arg0) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return null;
}
}
}
我尝试了很多次写 UDF 但都没有成功.....
I try many times to write UDF but not success.....
推荐答案
就像你说的,DBStorage
只支持将结果保存到数据库中.
Like you say, DBStorage
only supports saving results to a database.
要从 MySQL 加载数据,您可以查看名为 sqoop(将数据从数据库复制到HDFS),或者您可以执行 mysql 转储,然后将文件复制到 HDFS.两种方式都需要一些交互,不能直接从 Pig 内部使用.
To load data from MySQL you could look into a project called sqoop (that copies data from a database to HDFS), or you could perform a mysql dump and then copy the file into HDFS. Both ways required some interaction and cannot be directly used from inside Pig.
第三种选择是考虑编写 Pig LoadFunc(您说您尝试编写 UDF).这应该不会太难,您需要传递与 DBStorage 相同的选项(驱动程序、连接凭据和要执行的 SQL 查询),并且您可能也可以使用一些结果集元数据检查来自动生成模式.
A third option would be to look into writing a Pig LoadFunc (you say your tried to write a UDF). It shouldn't be too difficult, you'll need to pass much the same options as DBStorage (driver, connection credentials and a SQL query to execute), and you can probably use some result set metadata inspection to auto generate a schema too.
相关文章