FlatMap a通量未执行
我有一个包含240个项目的列表,使用for
完全发送此列表需要1个多小时。
List<Map<String, Object>> conventions = mapConventions(objects, referentialService);
for(Map<String, Object> item : conventions) {
webClient.post()
.uri(configProperties.getUrl().getConvention() + CONVENTION)
.bodyValue(objectMapper.convertValue(item, JsonNode.class))
.retrieve()
.bodyToMono(String.class);
}
所以我按照这个article同时发送,以最小化响应时间,但flatmap
中的代码从未执行过:
Flux.fromIterable(conventions).flatMap(item -> {
System.out.print(item);
return webClient.post()
.uri(configProperties.getUrl().getConvention() + CONVENTION)
.bodyValue(objectMapper.convertValue(item, JsonNode.class))
.retrieve()
.bodyToMono(String.class);
});
解决方案
在反应式节目中,有生产者和订阅者。虽然制作人可以输出结果,但如果没有人听取这些结果--这就是订阅者发挥作用的地方--它不会有任何好处。订阅者处理生产者的输出,并对结果做一些有意义的事情。对于反应式编程来说,这是非常重要的,如果订阅者没有监听结果,则生产者将不会执行任何代码。
因此,在本例中flatmap()
是生产者。并且它不会执行任何代码,除非有订阅服务器来处理输出。
subscribe()
调用。看起来像这样。
Flux.fromIterable(conventions).flatMap(item -> {
System.out.print(item);
return webClient.post()
.uri(configProperties.getUrl().getConvention() + CONVENTION)
.bodyValue(objectMapper.convertValue(item, JsonNode.class))
.retrieve()
.bodyToMono(String.class);
}).subscribe();
已经写了很多关于这方面的教程。 例如: https://spring.io/blog/2016/06/13/notes-on-reactive-programming-part-ii-writing-some-code https://medium.com/@olehdokuka/mastering-own-reactive-streams-implementation-part-1-publisher-e8eaf928a78c https://projectreactor.io/docs/core/release/reference/
相关文章