Slick 中如何使用反应流来插入数据
在 Slick 的文档中,提供了使用反应流的示例仅用于读取数据作为 DatabasePublisher 的一种方式.但是,当您想根据插入率将数据库用作 Sink 和 backpreasure 时会发生什么?
In Slick's documentation examples for using Reactive Streams are presented just for reading data as a means of a DatabasePublisher. But what happens when you want to use your database as a Sink and backpreasure based on your insertion rate?
我已经寻找了等效的DatabaseSubscriber,但它不存在.所以问题是,如果我有来源,请说:
I've looked for equivalent DatabaseSubscriber but it doesn't exist. So the question is, if I have a Source, say:
<代码>val 源 = 源(0 到 100)
我怎样才能用 Slick 构造一个 Sink 来将这些值写入一个带有模式的表中:
how can I crete a Sink with Slick that writes those values into a table with schema:
<代码>创建表 NumberTable(值 INT)
推荐答案
Serial Inserts
最简单的方法是在一个接收器.foreach
.
The easiest way would be to do inserts within a Sink.foreach
.
假设您已经使用了模式代码生成和进一步假设您的表名为NumberTable"
Assuming you've used the schema code generation and further assuming your table is named "NumberTable"
//Tables file was auto-generated by the schema code generation
import Tables.{Numbertable, NumbertableRow}
val numberTableDB = Database forConfig "NumberTableConfig"
我们可以写一个函数来进行插入
We can write a function that does the insertion
def insertIntoDb(num : Int) =
numberTableDB run (Numbertable += NumbertableRow(num))
那个函数可以放在Sink中
And that function can be placed in the Sink
val insertSink = Sink[Int] foreach insertIntoDb
Source(0 to 100) runWith insertSink
批量插入
您可以通过一次批处理 N 个插入来进一步扩展 Sink 方法:
You could further extend the Sink methodology by batching N inserts at a time:
def batchInsertIntoDb(nums : Seq[Int]) =
numberTableDB run (Numbertable ++= nums.map(NumbertableRow.apply))
val batchInsertSink = Sink[Seq[Int]] foreach batchInsertIntoDb
这个批处理 Sink 可以由一个 Flow
提供,它进行批量分组:
This batched Sink can be fed by a Flow
which does the batch grouping:
val batchSize = 10
Source(0 to 100).via(Flow[Int].grouped(batchSize))
.runWith(batchInsertSink)
相关文章