如何使用最新的Java SDK 3.1.2在Couchbase中执行批量插入
我能够使用2.9.7等较旧版本的Java SDK执行批插入,代码如下。
公共作废插入全部(收款单){
Observable.from(documents).flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call(final JsonDocument docToInsert) {
return couchbaseConfig.catalogBucket().async().insert(docToInsert)
.doOnError((Throwable throwable) -> log.error(
"Exception {} occured while inerting document {} to cb", throwable.getMessage(),
docToInsert));
}
}).last().toBlocking().single();
}
我们要求升级到最新版本的Java SDK 3.1.2。在Couchbase文档方面,我无法获得太多帮助。非常感谢下面的任何链接
解决方案
Batching上的Couchbase Java SDK 3.x文档假定您熟悉反应式编程,尤其是Project Reactor。
要更新SDK 3的代码,您需要:
- 提供您自己的
JsonDocument
类。 - 写入
Collection
而不是Bucket
。 - 从RxJava迁移到反应器(或使用适配器库)。
提供您自己的JsonDocument
类
SDK 3不再需要或提供像SDK 2的JsonDocument
这样的文档类。您可以随心所欲地为文档ID和内容建模。下面是一个可以帮助您过渡到SDK 3的类:
public class JsonDocument {
private final String id;
private final JsonObject content;
public JsonDocument(String id, JsonDocument content) {
this.id = id;
this.content = content;
}
public String getId() { return id;}
public JsonObject getContent() { return content;}
@Override
public String toString() {
return "JsonDocument{id='" + id + "', content=" + content + "}";
}
}
写入Collection
而不是Bucket
在SDK 3中,每个存储桶都有一个或多个作用域。每个作用域包含一个或多个集合。作用域有助于多租户;可以为每个客户或部署分配其自己的作用域。集合可帮助您组织文档;例如,您可以将widgets
和invoices
放在单独的集合中。
每个存储桶都有一个默认作用域,并且该作用域有一个默认集合。
Couchbase Server7将支持作用域和集合,但SDK 3要求您现在就考虑它们。以前属于Bucket
类的所有Get/Insert/Remove/等方法都已移至Collection
类。
幸运的是,有一种方便的方法可以访问存储桶的默认集合。
这对您的代码意味着什么?在SDK 2中,您有:
AsyncBucket catalog = couchbaseConfig.catalogBucket().async();
在SDK 3中,您可以编写:
ReactiveCollection catalog = couchbaseConfig.catalogBucket()
.defaultCollection()
.reactive();
从RxJava迁移到反应器
您可能已经知道,Couchbase Java SDK 2.x使用RxJava作为其反应性模型。SDK 3改用了反应器。概念基本相同,但反应原语的名称不同:Obseravble<T>
-&>Flux<T>
Single<T>
-&>Mono<T>
Completable
-&>Mono<Void>
Reactor reference documentation是您的朋友。
将所有这些放在一起
假设documents
是上面提到的JsonDocument
类的列表,则SDK 3中的代码可能如下所示:
ReactiveCollection destCollection = couchbaseConfig.catalogBucket()
.defaultCollection()
.reactive();
Flux.fromIterable(documents)
.flatMap(docToInsert ->
destCollection.insert(docToInsert.getId(), docToInsert.getContent())
.doOnError(throwable -> log.error(
"Exception {} occurred while inserting document {} to cb",
throwable.getMessage(), docToInsert)))
.blockLast();
此代码(以及SDK 2版本)的一个问题是,如果任何插入失败,则不会插入剩余的文档。如果要继续插入其他文档,可以使用onErrorResume()
而不是doOnError()
:
Flux.fromIterable(documents)
.flatMap(docToInsert ->
destCollection.insert(docToInsert.getId(), docToInsert.getContent())
.onErrorResume(throwable -> {
log.error("Exception {} occurred while inserting document {} to cb",
throwable.getMessage(), docToInsert);
return Mono.empty();
}))
.blockLast();
相关文章