如何包装WebFlux应用程序的阻塞IO操作

我有一个Spring Boot/Webflow应用程序。我需要具有以下功能的异步REST终结点:

  1. 生成随机作业ID。
  2. 通过HTTP调用一个服务。
  3. 通过HTTP调用其他服务。
  4. 组合来自服务的响应并将结果写入文件。
  5. 将作业ID返回给客户端。

终结点应为异步。这就是为什么客户端不应该等待步骤2、3、4的结果。客户端应该立即收到作业ID。

目前我有以下实现:

@Override
  public Mono<String> saveData() {
    String jobId = UUID.randomUUID().toString();
    Mono<ResponseFromService1> response1 = service1.getData();
    Mono<ResponseFromService2> response2 = service2.getData();
    return fileService.saveData(response1, response2)
        .map(filePath -> log.info("File has been stored at {}", filePath))
        .map(jobId);

Service1和Service2是使用Reactive WebClient实现的。 FileService.saveData的实现如下:

  public Mono<Path> saveDataInFile(Mono<ResponseFromService1> response1,Mono<ResponseFromService2> response2) {
return Mono.fromCallable(() ->
    Mono.zip(response1, response2)
        .map(tuple -> blockingIOsaveMethod(tuple.getT1(), tuple.getT2()))
).publishOn(Schedulers.elastic())
    .flatMap(mono -> mono);
  }

问题是此终结点不是异步的。端点的客户端在保存包含数据的文件后获取作业ID。 我应该如何更新saveDataInFile和saveData以立即返回作业ID?


解决方案

客户端应立即收到作业ID。

这似乎表明Mono<String>不是saveData()的正确返回类型,因为您显然不想等待任何异步操作完成:

@Override
public String saveData() {
  String jobId = UUID.randomUUID().toString();
  // ...
  return jobId;
}

...看起来像是一个<2-2]操作,您或许可以手动订阅Mono

@Override
public String saveData() {
  String jobId = UUID.randomUUID().toString();
  fileService.saveData(...).subscribe(...); // look at the different overloads of #subscribe(...)
  return jobId;
}

确保所有内容都被记录在案,这样您就不会忘记返回HTTP响应后发生的事情。

相关文章