DorisDB 流批一体 实时架构

2022-07-27 00:00:00 数据 代码 架构 实时 离线

场景:
在做大数据分析,报表项目的时候,一般会有实时和离线分析两种场景,一般情况下,是实时一套代码,离线一套代码,代码开发量比较大,怎么做到

实时和离线公用一份代码(流批一体)
性能满足实时场景的时效性(实时)
我们这里以统计 每天每个游戏的登录用户数 为例,登录明细表和终结果表如下

CREATE TABLE `dwd_user_login` (
`login_time` datetime NULL COMMENT "登陆时间",
`game_id` bigint(20) NULL COMMENT "游戏ID",
`user_id` bigint(20) NULL COMMENT "账户ID",
`ip` varchar(39) NULL COMMENT "ip",
`updated_at` datetime NULL COMMENT "更新时间"
) ENGINE=OLAP
UNIQUE KEY(`login_time`, `game_id`, `user_id`)
COMMENT "OLAP"
PARTITION BY RANGE(`login_time`)
(
PARTITION p20211129 VALUES [('1000-11-29 00:00:00'), ('2021-11-30 00:00:00')),
PARTITION p20211130 VALUES [('2021-11-30 00:00:00'), ('2021-12-01 00:00:00')),
PARTITION p20211201 VALUES [('2021-12-01 00:00:00'), ('2021-12-02 00:00:00')),
PARTITION p20211202 VALUES [('2021-12-02 00:00:00'), ('2021-12-03 00:00:00')),
PARTITION p20211203 VALUES [('2021-12-03 00:00:00'), ('2021-12-04 00:00:00')))
DISTRIBUTED BY HASH(`user_id`) BUCKETS 3
PROPERTIES (
"replication_num" = "3",
"colocate_with" = "user_id",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-365",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.replication_num" = "3",
"dynamic_partition.buckets" = "3",
"in_memory" = "false",
"storage_format" = "V2"
);

CREATE TABLE `app_day_game_kpi` (
`login_date` date NULL COMMENT "登陆时间",
`game_id` bigint(20) NULL COMMENT "游戏ID",
`login_count` bigint(20) NULL COMMENT "登录次数",
`login_user_count` bigint(20) NULL COMMENT "登录用户数"
) ENGINE=OLAP
UNIQUE KEY(`login_date`,`game_id`)
COMMENT "OLAP"
PARTITION BY RANGE(`login_date`)
(
PARTITION p20211129 VALUES [('1000-11-29'), ('2021-11-30')),
PARTITION p20211130 VALUES [('2021-11-30'), ('2021-12-01')),
PARTITION p20211201 VALUES [('2021-12-01'), ('2021-12-02')),
PARTITION p20211202 VALUES [('2021-12-02'), ('2021-12-03')),
PARTITION p20211203 VALUES [('2021-12-03'), ('2021-12-04')))
DISTRIBUTED BY HASH(`game_id`) BUCKETS 3
PROPERTIES (
"replication_num" = "3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-365",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.replication_num" = "3",
"dynamic_partition.buckets" = "3",
"in_memory" = "false",
"storage_format" = "V2"
);



hadoop生态圈经典架构



这是我们经常用到的hadoop生态圈,经典的架构
实时分析: 数据->flume->kafka-> spark streaming/flink-> mysql

离线分析: 数据->flume->kafka->hadoop-> spark sql -> mysql

问题
实时一套代码,离线一套代码 ,代码开发量比较大
当实时出现问题的时候,需要用离线任务来修复数据,实时数据的offset 难以确定
DorisDB 常用架构



这是DorisDB 基本的架构:
实时分析: 数据->flume->kafka-> DorisDB-> 直接查明细数据

离线分析: 数据->flume->kafka-> DorisDB->DorisDB统计计算->查统计好的数据

问题:
实时:虽然 DorisDB 是实时数据仓库,性能很优越,但是所有数据都查明细数据,查询时间会很长

离线: 每天的数据离线更新,数据时效性很差

DorisDB流批一体 实时架构


优点:
流批一体,代码简洁: 先实现离线sql,再把离线sql 复用到实时代码
兼容时效和性能: 今天的数据实时查询,所以基本算实时;今天以前的数据查统计好的代码,所以性能比较优越
实现SQL:
-- 离线sql,每天跑一次
insert into app_day_game_kpi
select date(login_time) login_date,game_id,count(1) login_count, count(distinct user_id) login_user_count from dwd_user_login
where login_time < DATE_FORMAT(NOW(),'%Y-%m-%d 00:00:00')
group by date(login_time),game_id

-- 复用离线sql,把今天的统计创建成 view
CREATE view app_day_game_kpi_view as
select date(login_time) login_date,game_id,count(1) login_count, count(distinct user_id) login_user_count from dwd_user_login
where login_time>=DATE_FORMAT(NOW(),'%Y-%m-%d 00:00:00')
group by date(login_time),game_id

-- 离线和实时数据结合,流批一体
select * from
(
SELECT * from app_day_game_kpi
union
SELECT * from app_day_game_kpi_view
) t
结语
sql 很简单,只用于说明思路,思路通了,复杂的业务逻辑一样可以实现
我太聪明了,哈哈哈
————————————————
版权声明:本文为CSDN博主「kequanjade」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/keyuquan/article/details/121640591

相关文章