FlatMap a通量未执行

2022-07-12 00:00:00 java spring-webflux project-reactor

我有一个包含240个项目的列表,使用for完全发送此列表需要1个多小时。

List<Map<String, Object>> conventions = mapConventions(objects, referentialService);

for(Map<String, Object> item : conventions) {
        webClient.post()
        .uri(configProperties.getUrl().getConvention() + CONVENTION)
        .bodyValue(objectMapper.convertValue(item, JsonNode.class))
        .retrieve()
        .bodyToMono(String.class);
}

所以我按照这个article同时发送,以最小化响应时间,但flatmap中的代码从未执行过:

Flux.fromIterable(conventions).flatMap(item ->  {
    System.out.print(item);
    return webClient.post()
            .uri(configProperties.getUrl().getConvention() + CONVENTION)
            .bodyValue(objectMapper.convertValue(item, JsonNode.class))
            .retrieve()
            .bodyToMono(String.class);
});

解决方案

在反应式节目中,有生产者和订阅者。虽然制作人可以输出结果,但如果没有人听取这些结果--这就是订阅者发挥作用的地方--它不会有任何好处。订阅者处理生产者的输出,并对结果做一些有意义的事情。对于反应式编程来说,这是非常重要的,如果订阅者没有监听结果,则生产者将不会执行任何代码。

因此,在本例中flatmap()是生产者。并且它不会执行任何代码,除非有订阅服务器来处理输出。

简短的答案是在平面图的末尾添加一个subscribe()调用。看起来像这样。

Flux.fromIterable(conventions).flatMap(item ->  {
    System.out.print(item);
    return webClient.post()
            .uri(configProperties.getUrl().getConvention() + CONVENTION)
            .bodyValue(objectMapper.convertValue(item, JsonNode.class))
            .retrieve()
            .bodyToMono(String.class);
}).subscribe();

已经写了很多关于这方面的教程。 例如: https://spring.io/blog/2016/06/13/notes-on-reactive-programming-part-ii-writing-some-code https://medium.com/@olehdokuka/mastering-own-reactive-streams-implementation-part-1-publisher-e8eaf928a78c https://projectreactor.io/docs/core/release/reference/

相关文章