Hazelcast Jet Pipeline详解

2022-05-23 00:00:00 支持 配置 事件 数据源 序列化

前言
pipeline只有两种stage:stream和batch,主要看它的数据源是哪种,如果是StreamSource那就用StreamStage,如果是BatchSource那就用BatchStage。也可以通过 addTimestamps来把batch模拟成无边界流。

1. 避免事件乱序
为了避免乱序可以如下配置:

Pipeline p = Pipeline.create();
p.setPreserveOrder(true);
1
2
这样配置后,Jet会给这些消息打上同样的partitioning key。会影响性能。如果你无意中改了partitioning key就会影响事件的顺序。比如你从带有分区的kafka消费,然后用了groupingKey(这个key不是kafka的分区key),那么你就潜在的打乱了事件的顺序。

2.多数据源
如下:

Pipeline p = Pipeline.create();

BatchSource<String> leftSource = TestSources.items("the", "quick", "brown", "fox");
BatchSource<String> rightSource = TestSources.items("jumps", "over", "the", "lazy", "dog");

BatchStage<String> left = p.readFrom(leftSource);
BatchStage<String> right = p.readFrom(rightSource);

left.merge(right)
.writeTo(Sinks.logger());

3. 转换Transforms
Jet在数据源和目标数据中间有一层转换层,分为两类:
无状态转换:map,filter,flatMap,hashJoin
有状态转换:aggregate,rollingAggregate,distinct,window

4. 分布式内存数据结构
IMap: Hazelcast内置的,支持索引、查询、持久化,可以用在Jet的跑批和流式。
可以通过配置event journal来支持流式处理(支持Exactly-Once):

hazelcast:
map:
name_of_map:
event-journal:
enabled: true
capacity: 100000
time-to-live-seconds: 10
1
然后代码如下:

IMap<String, User> userCache = jet.getMap("usersCache")
Pipeline p = Pipeline.create();
p.readFrom(Sources.mapJournal(userCache, START_FROM_OLDEST))
.withIngestionTimestamps()
.writeTo(Sinks.logger()));

默认数据源只会输出ADDED和UPDATED事件。
详细的例子: IMap Change Stream
注意,IMap是一个分布式对象,下面的写法是有潜在bug的:

IMap<String, User> userCache = jet.getMap("users");
User user = userCache.get("user-id");
user.setAccessCount(user.getAccessCount() + 1);
userCache.put("user-id", user);

为了保证一致性,下面的写法才对:

static class IncrementEntryProcessor implements EntryProcessor<String, User, User> {
@Override
public User process(Entry<String, User> entry) {
User value = entry.getValue();
value.setAccessCount(value.getAccessCount() + 1);
return entry.setValue(value);
}
}

IMap<String, User> userCache = jet.getMap("users");
userCache.executeOnEntry("user-id", new IncrementEntryProcessor());

5. CDC
如下配置监听Mysql的binlog:

Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
MySqlCdcSources.mysql("customers")
.setDatabaseAddress("127.0.0.1")
.setDatabasePort(3306)
.setDatabaseUser("debezium")
.setDatabasePassword("dbz")
.setClusterName("dbserver1")
.setDatabaseWhitelist("inventory")
.setTableWhitelist("inventory.customers")
.build())
.withNativeTimestamps(0)
.writeTo(Sinks.logger());

CDC只能支持At-Least-Once。数据源部分间歇性保存保存WAL offset,当发生故障或者重启的时候,offset之后的所有事件都会被重放。所以不是Exactly-Once。
因为数据库配置binlog的大小的原因,如果想回放很久以前的CDC可能发生数据丢失。所以配置合理的话,还是可以做到At-Least-Once的。

6. Pipeline的序列化
标准的Pipeline写法是用lambda,因为Pipeline是要序列化传到计算集群的,所以这些表达式都要支持序列化,又因为java.util.function是没有继承Serializable接口,因此Hazelcast的工程师实现了等同的ExXXX系列的接口,比如java.util.function.Function等同于com.hazelcast.function.FunctionEx。
除了表达式要支持Serializable,引用到的参数类也要支持Serializable。或者在buildPipeline方法内部声明内部变量。
还有一种情况,像DateTimeFormatter这种不可序列化的对象也是不能用的,但是我们可以用JDK预置的DateTimeFormatter.ISO_LOCAL_TIME在集群的目标机器上直接生成jvm对象。这个原理是,Jet集群支持java的static final,所以本质上你也可以自己创建static final变量。
后一种情况是mapUsingService():

Pipeline p = Pipeline.create();
BatchStage<Long> src = p.readFrom(Sources.list("input"));
ServiceFactory<?, DateTimeFormatter> serviceFactory = nonSharedService(
pctx -> DateTimeFormatter.ofPattern("HH:mm:ss.SSS")
.withZone(ZoneId.systemDefault()));
// 这里
src.mapUsingService(serviceFactory,
(formatter, tstamp) -> formatter.format(Instant.ofEpochMilli(tstamp)));

但是默认的Java序列化Serializable性能并不高,看下面的benchmark:

Strategy Number of Bytes Overhead %
java.io.Serializable 162 523
java.io.Externalizable 87 234
com.hazelcast.nio.serialization.Portable 104 300
com.hazelcast.nio.serialization.StreamSerializer 26 0

7. spring集成
Hazelcast增加了一个新的注解@SpringAware,它的作用是:
可以操作bean属性
可以监听callback,例如ApplicationContextAware, BeanNameAware
可以操作bean的post-processing注解,例如InitializingBean, @PostConstruct
例如:

@SpringAware
public class SourceContext {

@Resource(name = "my-source-map")
IMap<String, String> sourceMap;
}

@SpringAware
public class SinkContext {

@Resource(name = "my-sink-map")
IMap<String, String> sinkMap;
}

还有一种方式是用JetSpringServiceFactories,但是需要配置xml:

<hz:spring-aware/>
1
代码如下:

Pipeline pipeline = Pipeline.create();
pipeline.readFrom(Sources.list("list"))
.mapUsingService(JetSpringServiceFactories.bean("my-bean"), (myBean, item) -> myBean.enrich(item))
.writeTo(Sinks.logger());
1
2
3
4
8. springboot集成
Hazelcast提供的springboot starter 暂时找不到了。
————————————————
版权声明:本文为CSDN博主「ouyangshixiong」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/ouyangshixiong/article/details/122251278

相关文章