在 Apache Spark 中连接到 SQLite
我想对 SQLite 数据库中的所有表运行自定义函数.该功能或多或少相同,但取决于单个表的架构.此外,表及其模式仅在运行时才知道(调用程序时使用指定数据库路径的参数).
I want to run a custom function on all tables in a SQLite database. The function is more or less the same, but depends on the schema of the individual table. Also, the tables and their schemata are only known at runtime (the program is called with an argument that specifies the path of the database).
这是我目前所拥有的:
val conf = new SparkConf().setAppName("MyApp")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// somehow bind sqlContext to DB
val allTables = sqlContext.tableNames
for( t <- allTables) {
val df = sqlContext.table(t)
val schema = df.columns
sqlContext.sql("SELECT * FROM " + t + "...").map(x => myFunc(x,schema))
}
我目前发现的唯一提示需要提前知道表格,在我的场景中不是这样的:
The only hint I found so far needs to know the table in advance, which is not the case in my scenario:
val tableData =
sqlContext.read.format("jdbc")
.options(Map("url" -> "jdbc:sqlite:/path/to/file.db", "dbtable" -> t))
.load()
我使用的是 xerial sqlite jdbc 驱动程序.那么我怎样才能只连接到一个数据库,而不是一个表呢?
I am using the xerial sqlite jdbc driver. So how can I conntect solely to a database, not to a table?
使用 Beryllium 的答案作为开始,我将代码更新为:
Using Beryllium's answer as a start I updated my code to this:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val metaData = sqlContext.read.format("jdbc")
.options(Map("url" -> "jdbc:sqlite:/path/to/file.db",
"dbtable" -> "(SELECT * FROM sqlite_master) AS t")).load()
val myTableNames = metaData.select("tbl_name").distinct()
for (t <- myTableNames) {
println(t.toString)
val tableData = sqlContext.table(t.toString)
for (record <- tableData.select("*")) {
println(record)
}
}
至少我可以在运行时读取表名,这对我来说是一个巨大的进步.但是我看不懂表格.我两个都试了
At least I can read the table names at runtime which is a huge step forward for me. But I can't read the tables. I tried both
val tableData = sqlContext.table(t.toString)
和
val tableData = sqlContext.read.format("jdbc")
.options(Map("url" -> "jdbc:sqlite:/path/to/file.db",
"dbtable" -> t.toString)).load()
在循环中,但在这两种情况下,我都会收到 NullPointerException.虽然我可以打印表名,但似乎我无法连接到它们.
in the loop, but in both cases I get a NullPointerException. Although I can print the table names it seems I cannot connect to them.
最后但并非最不重要的一点是,我总是收到 SQLITE_ERROR: Connection is closed
错误.它看起来与此问题中描述的问题相同:SQLITE_ERROR: 当通过 JDBC 从 Spark 连接到 SQLite 数据库时,连接被关闭
Last but not least I always get an SQLITE_ERROR: Connection is closed
error. It looks to be the same issue described in this question: SQLITE_ERROR: Connection is closed when connecting from Spark via JDBC to SQLite database
推荐答案
您可以尝试两种选择
- 在您的 Spark 作业中打开一个单独的普通 JDBC 连接
- 从 JDBC 元数据中获取表名
- 将这些融入您的
for
理解
您可以将查询指定为 dbtable
参数的值.在语法上,这个查询必须看起来"像一个表,所以它必须包含在一个子查询中.
You can specify a query as the value for the dbtable
argument. Syntactically this query must "look" like a table, so it must be wrapped in a sub query.
在该查询中,从数据库中获取元数据:
In that query, get the meta data from the database:
val df = sqlContext.read.format("jdbc").options(
Map(
"url" -> "jdbc:postgresql:xxx",
"user" -> "x",
"password" -> "x",
"dbtable" -> "(select * from pg_tables) as t")).load()
此示例适用于 PostgreSQL,您必须将其调整为适用于 SQLite.
This example works with PostgreSQL, you have to adapt it for SQLite.
更新
似乎JDBC驱动程序只支持迭代一个结果集.无论如何,当您使用 collect()
来具体化表名列表时,以下代码段应该可以工作:
It seems that the JDBC driver only supports to iterate over one result set.
Anyway, when you materialize the list of table names using collect()
, then the following snippet should work:
val myTableNames = metaData.select("tbl_name").map(_.getString(0)).collect()
for (t <- myTableNames) {
println(t.toString)
val tableData = sqlContext.read.format("jdbc")
.options(
Map(
"url" -> "jdbc:sqlite:/x.db",
"dbtable" -> t)).load()
tableData.show()
}
相关文章