如何使用最新的Java SDK 3.1.2在Couchbase中执行批量插入

2022-07-16 00:00:00 couchbase java

我能够使用2.9.7等较旧版本的Java SDK执行批插入,代码如下。

公共作废插入全部(收款单){

Observable.from(documents).flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
    @Override
    public Observable<JsonDocument> call(final JsonDocument docToInsert) {
        return couchbaseConfig.catalogBucket().async().insert(docToInsert)
                .doOnError((Throwable throwable) -> log.error(
                        "Exception {} occured while inerting document {} to cb", throwable.getMessage(),
                        docToInsert));
    }
}).last().toBlocking().single();

}

我们要求升级到最新版本的Java SDK 3.1.2。在Couchbase文档方面,我无法获得太多帮助。非常感谢下面的任何链接


解决方案

Batching上的Couchbase Java SDK 3.x文档假定您熟悉反应式编程,尤其是Project Reactor。

要更新SDK 3的代码,您需要:

  1. 提供您自己的JsonDocument类。
  2. 写入Collection而不是Bucket
  3. 从RxJava迁移到反应器(或使用适配器库)。

提供您自己的JsonDocument

SDK 3不再需要或提供像SDK 2的JsonDocument这样的文档类。您可以随心所欲地为文档ID和内容建模。下面是一个可以帮助您过渡到SDK 3的类:

public class JsonDocument {
  private final String id;
  private final JsonObject content;

  public JsonDocument(String id, JsonDocument content) {
    this.id = id;
    this.content = content;
  }

  public String getId() { return id;}

  public JsonObject getContent() { return content;}

  @Override
  public String toString() {
    return "JsonDocument{id='" + id + "', content=" + content + "}";
  }
} 

写入Collection而不是Bucket

在SDK 3中,每个存储桶都有一个或多个作用域。每个作用域包含一个或多个集合。作用域有助于多租户;可以为每个客户或部署分配其自己的作用域。集合可帮助您组织文档;例如,您可以将widgetsinvoices放在单独的集合中。

每个存储桶都有一个默认作用域,并且该作用域有一个默认集合。

Couchbase Server7将支持作用域和集合,但SDK 3要求您现在就考虑它们。以前属于Bucket类的所有Get/Insert/Remove/等方法都已移至Collection类。

幸运的是,有一种方便的方法可以访问存储桶的默认集合。

这对您的代码意味着什么?在SDK 2中,您有:

AsyncBucket catalog = couchbaseConfig.catalogBucket().async();

在SDK 3中,您可以编写:

ReactiveCollection catalog = couchbaseConfig.catalogBucket()
    .defaultCollection()
    .reactive();

从RxJava迁移到反应器

您可能已经知道,Couchbase Java SDK 2.x使用RxJava作为其反应性模型。SDK 3改用了反应器。概念基本相同,但反应原语的名称不同:

Obseravble<T>-&>Flux<T>

Single<T>-&>Mono<T>

Completable-&>Mono<Void>

Reactor reference documentation是您的朋友。

将所有这些放在一起

假设documents是上面提到的JsonDocument类的列表,则SDK 3中的代码可能如下所示:

ReactiveCollection destCollection = couchbaseConfig.catalogBucket()
    .defaultCollection()
    .reactive();

Flux.fromIterable(documents)
    .flatMap(docToInsert ->
        destCollection.insert(docToInsert.getId(), docToInsert.getContent())
            .doOnError(throwable -> log.error(
                "Exception {} occurred while inserting document {} to cb",
                throwable.getMessage(), docToInsert)))
    .blockLast();

此代码(以及SDK 2版本)的一个问题是,如果任何插入失败,则不会插入剩余的文档。如果要继续插入其他文档,可以使用onErrorResume()而不是doOnError()

Flux.fromIterable(documents)
    .flatMap(docToInsert ->
        destCollection.insert(docToInsert.getId(), docToInsert.getContent())
            .onErrorResume(throwable -> {
              log.error("Exception {} occurred while inserting document {} to cb",
                  throwable.getMessage(), docToInsert);
              return Mono.empty();
            }))
    .blockLast();

相关文章