SQLAlchemy 在一个事务中更新多行

2022-01-17 00:00:00 python sqlalchemy sql-update

问题描述

如何使用将一列的现有值映射到另一列所需的新值的字典来更新数据库中的多个现有行?

How can I update multiple, existing rows in a database, using dictionary that maps existing values for one column, to the required new values for another column?

我有一张桌子:

class MyTable(BaseModel):
    col1 = sa.Column(sa.String(256))
    col2 = sa.Column(sa.String(256))

鉴于 col1 已经有值并且 col2 为空,如果我将数据集作为字典,如何更新 col2:

Given that col1 has already values and col2 is empty, how can I update col2 if I have the set of data as a dictionary:

payload = {'x': 'y', 'a': 'b', 'c': 'd'}

因此,此有效负载将 col1 的值映射到 col2 的 new 值;更新后,您会从数据库中获得 [{'col1': 'x', 'col2': 'y'}, ...].

So this payload maps values for col1, to a new value for col2; after the update you'd get [{'col1': 'x', 'col2': 'y'}, ...] from the database.

我尝试了几种方法,它们确实有效,但我认为它们并不像以前那样最佳:

I tried a couple of ways, which actually work but I think they are not as optimal as it could be ex.:

my_table = MyTable.__table__
for key, value in payload.items():
    stm = my_table.update()
    stm = stm.where(getattr(sales_order_item.c, 'col1') == key)
    stm = stm.values({'col2': value})
    session.execute(stm)

或者像这样

for key, value in payload.items():
    query = session.query(MyTable).filter(MyTable.col1==key)
    query.update({MyTable.col2: value})

现在这两种解决方案都按预期工作,唯一困扰我的是它所花费的时间,例如,对于 100 个元素的有效负载,它最多需要 6 秒,我几乎可以肯定应该有一个更好的方法,不是吗?

Now both of these solutions work as expected the only thing that is bothering me is the time it takes, for example for a payload of 100 elements it takes up to 6 sec, and I'm almost sure that there should be a better way to do that, isn't it?

我在想是否有办法让它与 in_ 函数一起工作:

I was thinking if there is a way of making it work with the in_ function:

query(MyTable).filter(
        MyTable.col1.in_(payload.keys())
    )

但我不知道如何构建更新查询.

but I don't know how to structure the update query.


解决方案

是的,使用单个批量 UPDATE 语句更新大量行将比使用单个 UPDATE 快很多s 在每个对象上.IN 过滤器只会帮助您限制更新哪些行,但您仍然需要告诉数据库要为 col2 更新使用什么值.

Yes, updating a larger number of rows with a single bulk UPDATE statement will be a lot faster than using individual UPDATEs on each and every object. An IN filter would only help you limit what rows are updated, but you still need to tell the database what value to use for the col2 updates.

您可以为此使用 CASE ... WHEN ... THEN 构造,并带有 case() 函数:

You can use a CASE ... WHEN ... THEN construct for that, with the case() function:

from sqlalchemy.sql import case

query(MyTable).filter(
    MyTable.col1.in_(payload)
).update({
    MyTable.col2: case(
        payload,
        value=MyTable.col1,
    )
}, synchronize_session=False)

上面 a) 选择 col1 值是 payload 字典中的键的行,然后 b) 更新 col2 列值使用 CASE 语句从同一个字典中选择值,以根据与键匹配的 col1 更新该列.

The above a) selects rows where the col1 value is a key in the payload dictionary, then b) updates the col2 column value using a CASE statement that picks values from that same dictionary to update that column based on matching col1 against the keys.

payload 设置为 {'x': 'y', 'a': 'b', 'c': 'd'},上面的执行以下查询(在 IN 测试中给出或采用 WHEN 子句和值的确切顺序):

With payload set to {'x': 'y', 'a': 'b', 'c': 'd'}, the above executes the following query (give or take the exact order of WHEN clauses and values in the IN test):

UPDATE mytable
SET
    col2=CASE mytable.col1
        WHEN 'x' THEN 'y'
        WHEN 'a' THEN 'b'
        WHEN 'c' THEN 'd'
    END
WHERE
    mytable.col1 IN ('x', 'a', 'c')

我在那里将 synchronize_session 设置为 False,因为一次更新所有可能的缓存 MyTable 实例可能不是在更新大型行数.您的其他选项是 'evaluate''fetch'.

I set synchronize_session to False there, as updating all possible cached MyTable instances at once is perhaps not the best idea when updating a large number of rows. Your other options are 'evaluate' and 'fetch'.

  • 我们不能使用默认的 'evaluate'(它会在会话中找到与 where 子句匹配的现有对象,来就地更新),因为 SQLAlchemy 目前不知道如何处理 IN 过滤器(您会得到 UnevaluatableError 异常).

  • We can't use the default 'evaluate' (which would find existing objects in the session that match the where clause, to update in-place), because SQLAlchemy currently doesn't know how to process an IN filter (you get an UnevaluatableError exception).

如果您确实使用 'fetch',则缓存在受影响的会话中的所有 MyTable 实例都将使用 col2<的新值进行更新/code>(由它们的主键映射).

If you do use 'fetch' then all instances of MyTable cached in the session that were affected are updated with new values for col2 (as mapped by their primary key).

请注意,提交会使会话过期无论如何,因此如果您需要对更新的行做更多的工作,您只想使用 'fetch'在您提交当前事务之前.

Note that a commit would expire the session anyway, so you'd only want to use 'fetch' if you need to do some more work with the updated rows before you can commit the current transaction.

查看 查询.update() 文档 了解更多关于你有哪些 synchronize_session 选项的信息.

See the Query.update() documentation for more information on what synchronize_session options you have.

相关文章