janusgraph 数据写入流程
janusgraph写数据的大概流程。
一步步来看
Graph 实现类 StandardJanusGraph 继承于 JanusGraphBlueprintsGraph 所有的顶点和边的操作都在 父类中实现,并且委托给当前的Transaction处理, transaction 类JanusGraphBlueprintsTransaction 。
@Override
public JanusGraphVertex addVertex(Object... keyValues) {
return getAutoStartTx().addVertex(keyValues);
}
@Override
public Iterator<Vertex> vertices(Object... vertexIds) {
return getAutoStartTx().vertices(vertexIds);
}
@Override
public Iterator<Edge> edges(Object... edgeIds) {
return getAutoStartTx().edges(edgeIds);
}
JanusGraphBlueprintsTransaction 的 addVertex 方法
。。。。。
// 上面是获取ID和LEBEL。
// 下面调用StandardJanusGraphTx的方法创建顶点。
final JanusGraphVertex vertex = addVertex(id, label);
org.janusgraph.graphdb.util.ElementHelper.attachProperties(vertex, keyValues);
StandardJanusGraphTx.addVertex()方法。创建一个 StandardVertex ,设置id,设置系统属性VertexExist=True,设置VertexLabel,然后把vertex加入vertexCache缓存中。
@Override
public JanusGraphVertex addVertex(Long vertexId, VertexLabel label) {
.... // 检查
StandardVertex vertex = new StandardVertex(this, IDManager.getTemporaryVertexID(IDManager.VertexIDType.NormalVertex, temporaryIds.nextID()), ElementLifeCycle.New);
if (vertexId != null) {
vertex.setId(vertexId);
} else if (config.hasAssignIDsImmediately() || label.isPartitioned()) {
graph.assignID(vertex,label);
}
addProperty(vertex, BaseKey.VertexExists, Boolean.TRUE);
if (label!=BaseVertexLabel.DEFAULT_VERTEXLABEL) { //Add label
Preconditions.checkArgument(label instanceof VertexLabelVertex);
addEdge(vertex, label, BaseLabel.VertexLabelEdge);
}
vertexCache.add(vertex, vertex.longId());
return vertex;
janusgraph把VertexLabel作为边存储的;这是因为janusgraph中属性和边都是关系,addProperty和addEdge后都会调用connectRelation方法, 把新增关系放在addedRelations缓存中 .
一点儿总结
所有的图操作后都会委托给StandardJanusGraphTx StandardJanusGraphTx的addVertex接口,其实就是把创建一个StandardVertex,将其放在vertexCache中 属性和边都是关系,addProperty和addEdge接口其实就是把property和edge放入addedRelations缓存中 。
添加的顶点和边都在缓存里,什么写到存储里?答案在 StandardJanusGraphTx commit()方法中
StandardJanusGraphTx.commit()
-> StandardJanusGraph.commit()
StandardJanusGraph.commit 主要流程:
1. Finalize transaction :确认Transaction,生成Transaction ID2.Assign JanusGraphVertex IDs :分配顶点ID3.Commit 3.1 Log transaction (write-ahead log) if enabled 3.2 Commit schema elements and their associated relations in a separate transaction if backend does not support transactional isolation .
如果后端不支持事务隔离,则在单独的事务中提交schema元素及其关联关系.
3.3 prepareCommit 3.4 Commit storage() 3.5 Commit indexes
StandardJanusGraph.prepareCommit() 方法 。主要作用是收集索引变更,然后 把本次事务中变更的顶点、边、索引序列化到BackendTransaction 的缓存中 。
public ModificationSummary prepareCommit(final Collection<InternalRelation> addedRelations,
final Collection<InternalRelation> deletedRelations,
final Predicate<InternalRelation> filter,
final BackendTransaction mutator, final StandardJanusGraphTx tx,
final boolean acquireLocks) throws BackendException {
ListMultimap<Long, InternalRelation> mutations = ArrayListMultimap.create();
ListMultimap<InternalVertex, InternalRelation> mutatedProperties = ArrayListMultimap.create();
List<IndexSerializer.IndexUpdate> indexUpdates = Lists.newArrayList();
/1) Collect deleted edges and their index updates and acquire edge locks
for (InternalRelation del : Iterables.filter(deletedRelations,filter)) {
Preconditions.checkArgument(del.isRemoved());
for (int pos = ; pos < del.getLen(); pos++) {
InternalVertex vertex = del.getVertex(pos);
if (pos == || !del.isLoop()) {
if (del.isProperty()) mutatedProperties.put(vertex,del);
mutations.put(vertex.longId(), del);
}
if (acquireLock(del,pos,acquireLocks)) {
Entry entry = edgeSerializer.writeRelation(del, pos, tx);
mutator.acquireEdgeLock(idManager.getKey(vertex.longId()), entry);
}
}
indexUpdates.addAll(indexSerializer.getIndexUpdates(del));
}
//2) Collect added edges and their index updates and acquire edge locks
for (InternalRelation add : Iterables.filter(addedRelations,filter)) {
Preconditions.checkArgument(add.isNew());
for (int pos = ; pos < add.getLen(); pos++) {
InternalVertex vertex = add.getVertex(pos);
if (pos == || !add.isLoop()) {
if (add.isProperty()) mutatedProperties.put(vertex,add);
mutations.put(vertex.longId(), add);
}
if (!vertex.isNew() && acquireLock(add,pos,acquireLocks)) {
Entry entry = edgeSerializer.writeRelation(add, pos, tx);
mutator.acquireEdgeLock(idManager.getKey(vertex.longId()), entry.getColumn());
}
}
indexUpdates.addAll(indexSerializer.getIndexUpdates(add));
}
//3) Collect all index update for vertices
for (InternalVertex v : mutatedProperties.keySet()) {
indexUpdates.addAll(indexSerializer.getIndexUpdates(v,mutatedProperties.get(v)));
}
//4) Acquire index locks (deletions first)
for (IndexSerializer.IndexUpdate update : indexUpdates) {
if (!update.isCompositeIndex() || !update.isDeletion()) continue;
CompositeIndexType iIndex = (CompositeIndexType) update.getIndex();
if (acquireLock(iIndex,acquireLocks)) {
mutator.acquireIndexLock((StaticBuffer)update.getKey(), (Entry)update.getEntry());
}
}
for (IndexSerializer.IndexUpdate update : indexUpdates) {
if (!update.isCompositeIndex() || !update.isAddition()) continue;
CompositeIndexType iIndex = (CompositeIndexType) update.getIndex();
if (acquireLock(iIndex,acquireLocks)) {
mutator.acquireIndexLock((StaticBuffer)update.getKey(), ((Entry)update.getEntry()).getColumn());
}
}
//5) Add relation mutations
for (Long vertexId : mutations.keySet()) {
Preconditions.checkArgument(vertexId > , "Vertex has no id: %s", vertexId);
final List<InternalRelation> edges = mutations.get(vertexId);
final List<Entry> additions = new ArrayList<>(edges.size());
final List<Entry> deletions = new ArrayList<>(Math.max(10, edges.size() / 10));
for (final InternalRelation edge : edges) {
final InternalRelationType baseType = (InternalRelationType) edge.getType();
assert baseType.getBaseType()==null;
for (InternalRelationType type : baseType.getRelationIndexes()) {
if (type.getStatus()== SchemaStatus.DISABLED) continue;
for (int pos = ; pos < edge.getArity(); pos++) {
if (!type.isUnidirected(Direction.BOTH) && !type.isUnidirected(EdgeDirection.fromPosition(pos)))
continue; //Directionality is not covered
if (edge.getVertex(pos).longId()==vertexId) {
StaticArrayEntry entry = edgeSerializer.writeRelation(edge, type, pos, tx);
if (edge.isRemoved()) {
deletions.add(entry);
} else {
Preconditions.checkArgument(edge.isNew());
int ttl = getTTL(edge);
if (ttl > 0) {
entry.setMetaData(EntryMetaData.TTL, ttl);
}
additions.add(entry);
}
}
}
}
}
StaticBuffer vertexKey = idManager.getKey(vertexId);
mutator.mutateEdges(vertexKey, additions, deletions);
}
//6) Add index updates
boolean has2iMods = false;
for (IndexSerializer.IndexUpdate indexUpdate : indexUpdates) {
assert indexUpdate.isAddition() || indexUpdate.isDeletion();
if (indexUpdate.isCompositeIndex()) {
final IndexSerializer.IndexUpdate<StaticBuffer,Entry> update = indexUpdate;
if (update.isAddition())
mutator.mutateIndex(update.getKey(), Lists.newArrayList(update.getEntry()), KCVSCache.NO_DELETIONS);
else
mutator.mutateIndex(update.getKey(), KeyColumnValueStore.NO_ADDITIONS, Lists.newArrayList(update.getEntry()));
} else {
final IndexSerializer.IndexUpdate<String,IndexEntry> update = indexUpdate;
has2iMods = true;
IndexTransaction itx = mutator.getIndexTransaction(update.getIndex().getBackingIndexName());
String indexStore = ((MixedIndexType)update.getIndex()).getStoreName();
if (update.isAddition())
itx.add(indexStore, update.getKey(), update.getEntry(), update.getElement().isNew());
else
itx.delete(indexStore,update.getKey(),update.getEntry().field,update.getEntry().value,update.getElement().isRemoved());
}
}
return new ModificationSummary(!mutations.isEmpty(),has2iMods);
}
ListMultimap<Long, InternalRelation> mutations: 每个顶点id对应的变更的 属性(StandardVertexProperty)/边(StandardEdge) 列表 ListMultimap<InternalVertex, InternalRelation> mutatedProperties: 每个vertex对应的变更属性(StandardVertexProperty)列表 List indexUpdates: 检查 顶点属性(StandardVertexProperty)/边(StandardEdge)的每个properties是否有索引变更。)
prepareCommit方法还会调用BackendTransaction的mutateEdges方法应用修改,而后者会调用edgeStore的mutateEntries方法,而edgeStore则后委托给StoreTransaction(CacheTransaction)的mutate方法,这个方法会将修改缓存起来。同样修改的索引,prepareCommit方法也会调用BackendTransaction的mutateIndex方法,后缓存在StoreTransaction(CacheTransaction)的mutations变量中。
commitStorage commit方法然后调用BackendTransaction的commitStorage()方法,提交数据存储。这个方法会调用storeTx.commit() .
@Override
public void commit() throws BackendException {
flushInternal(); // 调用 persist 方法
tx.commit();
}
private int persist(final Map<String, Map<StaticBuffer, KCVMutation>> subMutations) {
BackendOperation.execute(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
manager.mutateMany(subMutations, tx);
return true;
}
@Override
public String toString() {
return "CacheMutation";
}
}, maxWriteTime);
subMutations.clear();
return 0;
}
这里看到persist方法其实就是调用manager的mutateMany接口,这个就是存储实现的接口。
tx.commit(); 也会调用到StoreTransaction的commit方法,如CassandraTransaction,这就会调用具体的后端存储的实现了。
总结
•janusgraph 提供了事务的抽象流程:生成事务 -> 记录操作 -> 提交前写Log -> prepareCommit -> commit•janusgraph 为了底层存储插件化,抽象了存储管理和事务管理,通过实现这些管理接口,可以自定义数据存储和索引存储
相关文章