DBMS级别的管道和过滤器:拆分合并输出流
方案
我们有一个非常标准的数据导入过程,在该过程中我们加载staging
表,然后MERGE
将其放入target
表。
新需求(绿色)涉及捕获导入数据的子集
放到单独的queue
表中以进行完全无关的处理。
"挑战"
(1)子集由以下记录的选择组成: 仅新插入target
表。
(2)子集是一些插入列的投影,但也
至少有一列仅存在于源中(staging
表)。
(3)MERGE
语句已使用OUTPUT..INTO
子句
严格记录MERGE
拍摄的$action
,以便
PIVOT
结果和COUNT
插入、更新和
出于统计目的的删除。我们真的不喜欢缓冲
对整个数据集执行类似的操作,并且更倾向于聚合
飞翔上的金额。不用说,我们不想将更多数据添加到
此OUTPUT
表。
(4)我们不想做MERGE
无论出于何种原因,即使是部分原因,都会执行第二次。这个
target
表非常大,我们不能索引所有内容,并且
操作通常相当昂贵(分钟,而不是秒)。
(5)我们不考虑将MERGE
的任何输出往返到
客户端只是为了使客户端可以通过以下方式将其路由到queue
马上把它送回去。数据必须保留在服务器上。
(6)我们希望避免在临时存储中缓冲整个数据集
在staging
和queue
之间。
做这件事的最佳方式是什么?
故障
(A)仅将插入的记录入队的要求阻止我们 的OUTPUT..INTO
子句中直接以queue
表为目标
MERGE
,因为它不允许任何WHERE
子句。我们可以用一些
CASE
标记不需要的记录以供后续删除的诡计
来自queue
而不进行处理,但这似乎很疯狂。
(B)因为queue
的某些列没有出现在
target
表中,我们不能简单地在目标上添加插入触发器
表来加载queue
。"数据流拆分"必须更早发生。
(C)由于我们已经在MERGE
中使用了OUTPUT..INTO
子句,因此我们
无法添加第二个OUTPUT
子句和将MERGE
嵌套到
INSERT..SELECT
加载队列。这太可惜了,因为它
感觉对一些可行的东西来说,这是一个完全武断的限制
否则很好;SELECT
只过滤具有
$action
我们希望(INSERT
)和INSERT
在queue
中
声明。因此,从理论上讲,DBMS可以避免缓冲整个
数据集,只需将其流式传输到queue
。(注:我们没有追查
而且很可能它实际上并没有以这种方式优化计划。)
情况
我们觉得我们已经用尽了我们的选择,但决定求助于Hivemind 可以肯定的是。我们能想到的只有:
(S1)创建target
表的VIEW
,该表也包含可为空的
仅用于queue
的数据的列,并具有
SELECT
语句将它们定义为NULL
。然后,设置INSTEAD OF
同时填充target
表和queue
的触发器
恰如其分。最后,将MERGE
连接到目标视图。这
是可行的,但是我们并不喜欢这个结构--它绝对是
看起来棘手。
(S2)放弃,使用以下命令在临时表中缓冲整个数据集
另一个MERGE..OUTPUT
。在MERGE
之后,立即复制数据
(再说一遍!)从临时表导入queue
。解决方案
我的理解是,主要障碍是SQL Server中OUTPUT
子句的限制。它允许一个OUTPUT INTO table
和/或一个OUTPUT
将结果集返回给调用方。
您希望以两种不同的方式保存MERGE
语句的结果:
- 受
MERGE
影响的所有行,用于收集统计信息 - 仅为
queue
插入行
简单变量
我会使用您的S2解决方案。至少一开始是这样。它很容易理解和维护,而且应该非常高效,因为最耗费资源的操作(MERGE
intoTarget
本身只执行一次)。下面还有第二个变体,比较它们在真实数据上的性能会很有趣。
所以:
- 在
MERGE
中使用 - 插入前
INSERT
从@TempTable
到Stats
的所有行或聚合。如果您只需要聚合统计数据,则聚合此批次的结果并将其合并到最终Stats
而不是复制所有行是有意义的。 INSERT
到Queue
中仅"插入"了@TempTable
中的行。
OUTPUT INTO @TempTable
我将从@i-one的答案中提取样本数据。
架构
-- I'll return to commented lines later
CREATE TABLE [dbo].[TestTarget](
-- [ID] [int] IDENTITY(1,1) NOT NULL,
[foo] [varchar](10) NULL,
[bar] [varchar](10) NULL
);
CREATE TABLE [dbo].[TestStaging](
[foo] [varchar](10) NULL,
[bar] [varchar](10) NULL,
[baz] [varchar](10) NULL
);
CREATE TABLE [dbo].[TestStats](
[MergeAction] [nvarchar](10) NOT NULL
);
CREATE TABLE [dbo].[TestQueue](
-- [TargetID] [int] NOT NULL,
[foo] [varchar](10) NULL,
[baz] [varchar](10) NULL
);
样本数据
TRUNCATE TABLE [dbo].[TestTarget];
TRUNCATE TABLE [dbo].[TestStaging];
TRUNCATE TABLE [dbo].[TestStats];
TRUNCATE TABLE [dbo].[TestQueue];
INSERT INTO [dbo].[TestStaging]
([foo]
,[bar]
,[baz])
VALUES
('A', 'AA', 'AAA'),
('B', 'BB', 'BBB'),
('C', 'CC', 'CCC');
INSERT INTO [dbo].[TestTarget]
([foo]
,[bar])
VALUES
('A', 'A_'),
('B', 'B?');
合并
DECLARE @TempTable TABLE (
MergeAction nvarchar(10) NOT NULL,
foo varchar(10) NULL,
baz varchar(10) NULL);
MERGE INTO TestTarget AS Dst
USING TestStaging AS Src
ON Dst.foo = Src.foo
WHEN MATCHED THEN
UPDATE SET
Dst.bar = Src.bar
WHEN NOT MATCHED BY TARGET THEN
INSERT (foo, bar)
VALUES (Src.foo, Src.bar)
OUTPUT $action AS MergeAction, inserted.foo, Src.baz
INTO @TempTable(MergeAction, foo, baz)
;
INSERT INTO [dbo].[TestStats] (MergeAction)
SELECT T.MergeAction
FROM @TempTable AS T;
INSERT INTO [dbo].[TestQueue]
([foo]
,[baz])
SELECT
T.foo
,T.baz
FROM @TempTable AS T
WHERE T.MergeAction = 'INSERT'
;
SELECT * FROM [dbo].[TestTarget];
SELECT * FROM [dbo].[TestStats];
SELECT * FROM [dbo].[TestQueue];
结果
TestTarget
+-----+-----+
| foo | bar |
+-----+-----+
| A | AA |
| B | BB |
| C | CC |
+-----+-----+
TestStats
+-------------+
| MergeAction |
+-------------+
| INSERT |
| UPDATE |
| UPDATE |
+-------------+
TestQueue
+-----+-----+
| foo | baz |
+-----+-----+
| C | CCC |
+-----+-----+
第二个变量
在SQL Server 2014 Express上测试。
OUTPUT
子句可以将其结果集发送到表和调用方。因此,OUTPUT INTO
可以直接进入Stats
,如果我们将MERGE
语句包装到存储过程中,则可以使用INSERT ... EXEC
进入Queue
。
INSERT ... EXEC
无论如何都会在后台创建一个临时表(另请参阅The Hidden Costs of INSERT EXECby
Adam Machanic),所以我预计当您显式创建临时表时,总体性能将类似于第一个变体。
还有一个问题需要解决:Queue
表应该只有"插入"的行,而不是所有受影响的行。要实现这一点,您可以在Queue
表上使用触发器来丢弃"INSERT"之外的行。另一种可能性是使用IGNORE_DUP_KEY = ON
定义唯一索引,并以这样的方式准备数据:"未插入"的行将违反唯一索引,并且不会插入到表中。
因此,我将向Target
表中添加一个ID IDENTITY
列,并向Queue
表中添加一个TargetID
列。(在上面的脚本中取消对它们的注释)。
另外,我将向Queue
表添加索引:
CREATE UNIQUE NONCLUSTERED INDEX [IX_TargetID] ON [dbo].[TestQueue]
(
[TargetID] ASC
) WITH (
PAD_INDEX = OFF,
STATISTICS_NORECOMPUTE = OFF,
SORT_IN_TEMPDB = OFF,
IGNORE_DUP_KEY = ON,
DROP_EXISTING = OFF,
ONLINE = OFF,
ALLOW_ROW_LOCKS = ON,
ALLOW_PAGE_LOCKS = ON)
重要部分是UNIQUE
和IGNORE_DUP_KEY = ON
。
以下是MERGE
的存储过程:
CREATE PROCEDURE [dbo].[TestMerge]
AS
BEGIN
SET NOCOUNT ON;
SET XACT_ABORT ON;
MERGE INTO dbo.TestTarget AS Dst
USING dbo.TestStaging AS Src
ON Dst.foo = Src.foo
WHEN MATCHED THEN
UPDATE SET
Dst.bar = Src.bar
WHEN NOT MATCHED BY TARGET THEN
INSERT (foo, bar)
VALUES (Src.foo, Src.bar)
OUTPUT $action INTO dbo.TestStats(MergeAction)
OUTPUT CASE WHEN $action = 'INSERT' THEN inserted.ID ELSE 0 END AS TargetID,
inserted.foo,
Src.baz
;
END
用法
TRUNCATE TABLE [dbo].[TestTarget];
TRUNCATE TABLE [dbo].[TestStaging];
TRUNCATE TABLE [dbo].[TestStats];
TRUNCATE TABLE [dbo].[TestQueue];
-- Make sure that `Queue` has one special row with TargetID=0 in advance.
INSERT INTO [dbo].[TestQueue]
([TargetID]
,[foo]
,[baz])
VALUES
(0
,NULL
,NULL);
INSERT INTO [dbo].[TestStaging]
([foo]
,[bar]
,[baz])
VALUES
('A', 'AA', 'AAA'),
('B', 'BB', 'BBB'),
('C', 'CC', 'CCC');
INSERT INTO [dbo].[TestTarget]
([foo]
,[bar])
VALUES
('A', 'A_'),
('B', 'B?');
INSERT INTO [dbo].[TestQueue]
EXEC [dbo].[TestMerge];
SELECT * FROM [dbo].[TestTarget];
SELECT * FROM [dbo].[TestStats];
SELECT * FROM [dbo].[TestQueue];
结果
TestTarget
+----+-----+-----+
| ID | foo | bar |
+----+-----+-----+
| 1 | A | AA |
| 2 | B | BB |
| 3 | C | CC |
+----+-----+-----+
TestStats
+-------------+
| MergeAction |
+-------------+
| INSERT |
| UPDATE |
| UPDATE |
+-------------+
TestQueue
+----------+------+------+
| TargetID | foo | baz |
+----------+------+------+
| 0 | NULL | NULL |
| 3 | C | CCC |
+----------+------+------+
在INSERT ... EXEC
期间将有额外的消息:
Duplicate key was ignored.
IFMERGE
更新了一些行。当唯一索引由于IGNORE_DUP_KEY = ON
而在INSERT
期间丢弃某些行时发送此警告消息。
插入重复键值时将出现警告消息 转换为唯一索引。只有违反唯一性约束的行 将失败。
相关文章