mysql存储graph数据_hugegraph 存取数据解析

2022-04-25 00:00:00 数据 都是 读取 后端 序列化

hugegraph 是百度开源的图数据库,支持hbase,mysql,rocksdb等作为存储后端。本文以EDGE 存储,hbase为存储后端,来探索hugegraph是如何存取数据的。

存数据

序列化
  


首先需要序列化,hbase 使用BinarySerializer:

keyWithIdPrefix 和indexWithIdPrefix都是false

这个后面会用到。

public class HbaseSerializer extends BinarySerializer {undefined

public HbaseSerializer() {undefined

super(false, true);

}

}

要存到db,首先需要序列化为BackendEntry,BackendEntry 是图数据库和后端存储的传输对象,Hbase对应的是BinaryBackendEntry:

public class BinaryBackendEntry implements BackendEntry {undefined

private static final byte[] EMPTY_BYTES = new byte[]{};

private final HugeType type;

private final BinaryId id;

private Id subId;

private final List columns;

private long ttl;

public BinaryBackendEntry(HugeType type, byte[] bytes) {undefined

this(type, BytesBuffer.wrap(bytes).parseId(type));

}

public BinaryBackendEntry(HugeType type, BinaryId id) {undefined

this.type = type;

this.id = id;

this.subId = null;

this.columns = new ArrayList<>();

this.ttl = 0L;

}

我们来看序列化,序列化,其实就是要将数据放到entry的column列里。

hbase 的keyWithIdPrefix是false,因此name不包含ownerVertexId(参考下面的EdgeId,去掉ownerVertexId)

public BackendEntry writeEdge(HugeEdge edge) {undefined

BinaryBackendEntry entry = newBackendEntry(edge);

byte[] name = this.keyWithIdPrefix ?

this.formatEdgeName(edge) : EMPTY_BYTES;

byte[] value = this.formatEdgeValue(edge);

entry.column(name, value);

if (edge.hasTtl()) {undefined

entry.ttl(edge.ttl());

}

return entry;

}

EdgeId:

private final Id ownerVertexId;

private final Directions direction;

private final Id edgeLabelId;

private final String sortValues;

private final Id otherVertexId;

private final boolean directed;

private String cache;

backend 存储

生成BackendEntry后,通过store机制,交给后端的backend存储。

EDGE的保存,对应HbaseTables.Edge:

public static class Edge extends HbaseTable {undefined

@Override

public void insert(Session session, BackendEntry entry) {undefined

long ttl = entry.ttl();

if (ttl == 0L) {undefined

session.put(this.table(), CF, entry.id().asBytes(),

entry.columns());

} else {undefined

session.put(this.table(), CF, entry.id().asBytes(),

entry.columns(), ttl);

}

}

}

CF 是固定的f:

protected static final byte[] CF = "f".getBytes();

session.put 对应:

@Override

public void put(String table, byte[] family, byte[] rowkey,

Collection columns) {undefined

Put put = new Put(rowkey);

for (BackendColumn column : columns) {undefined

put.addColumn(family, column.name, column.value);

}

this.batch(table, put);

}

可以看出,存储时,edgeid作为rowkey,然后把去除ownerVertexId后的edgeid作为column.name

EDGE 读取

从backend读取BackendEntry

读取就是从hbase读取result,转换为BinaryBackendEntry,再转成Edge。

读取,是scan的过程:

/**

* Inner scan: send scan request to HBase and get iterator

*/

@Override

public RowIterator scan(String table, Scan scan) {undefined

assert !this.hasChanges();

try (Table htable = table(table)) {undefined

return new RowIterator(htable.getScanner(scan));

} catch (IOException e) {undefined

throw new BackendException(e);

}

}

scan后,返回BackendEntryIterator

protected BackendEntryIterator newEntryIterator(Query query,

RowIterator rows) {undefined

return new BinaryEntryIterator<>(rows, query, (entry, row) -> {undefined

E.checkState(!row.isEmpty(), "Can't parse empty HBase result");

byte[] id = row.getRow();

if (entry == null || !Bytes.prefixWith(id, entry.id().asBytes())) {undefined

HugeType type = query.resultType();

// NOTE: only support BinaryBackendEntry currently

entry = new BinaryBackendEntry(type, id);

}

try {undefined

this.parseRowColumns(row, entry, query);

} catch (IOException e) {undefined

throw new BackendException("Failed to read HBase columns", e);

}

return entry;

});

}

注意,new BinaryBackendEntry(type, id) 时,BinaryBackendEntry的id并不是rowkey,而是对rowkey做了处理:

public BinaryId parseId(HugeType type) {undefined

if (type.isIndex()) {undefined

return this.readIndexId(type);

}

// Parse id from bytes

int start = this.buffer.position();

/*

* Since edge id in edges table doesn't prefix with leading 0x7e,

* so readId() will return the source vertex id instead of edge id,

* can't call: type.isEdge() ? this.readEdgeId() : this.readId();

*/

Id id = this.readId();

int end = this.buffer.position();

int len = end - start;

byte[] bytes = new byte[len];

System.arraycopy(this.array(), start, bytes, 0, len);

return new BinaryId(bytes, id);

}

这里是先读取ownervertexId作为Id部分, 然后将剩余的直接放入bytes,组合成BinaryId,和序列化的时候有差别,为什么这么设计呢?原来不管是vertex还是edge,都是当成Vertex来读取的。

protected final BinaryBackendEntry newBackendEntry(HugeEdge edge) {undefined

BinaryId id = new BinaryId(formatEdgeName(edge),

edge.idWithDirection());

return newBackendEntry(edge.type(), id);

}

public EdgeId directed(boolean directed) {undefined

return new EdgeId(this.ownerVertexId, this.direction, this.edgeLabelId,

this.sortValues, this.otherVertexId, directed);

}

序列化的时候是EdgeId。

BackendEntryIterator迭代器支持对结果进行merge, 上面代码里的!Bytes.prefixWith(id, entry.id().asBytes())) 就是对比是否是同一个ownervertex,如果是同一个,则放到同一个BackendEntry的Columns里。

public BinaryEntryIterator(BackendIterator results, Query query,

BiFunction m)

@Override

protected final boolean fetch() {undefined

assert this.current == null;

if (this.next != null) {undefined

this.current = this.next;

this.next = null;

}

while (this.results.hasNext()) {undefined

Elem elem = this.results.next();

BackendEntry merged = this.merger.apply(this.current, elem);

E.checkState(merged != null, "Error when merging entry");

if (this.current == null) {undefined

// The first time to read

this.current = merged;

} else if (merged == this.current) {undefined

// The next entry belongs to the current entry

assert this.current != null;

if (this.sizeOf(this.current) >= INLINE_BATCH_SIZE) {undefined

break;

}

} else {undefined

// New entry

assert this.next == null;

this.next = merged;

break;

}

// When limit exceed, stop fetching

if (this.reachLimit(this.fetched() - 1)) {undefined

// Need remove last one because fetched limit + 1 records

this.removeLastRecord();

this.results.close();

break;

}

}

return this.current != null;

}

从BackendEntry转换为edge

然后再来看读取数据readVertex,前面说了,就算是edge,其实也是当vertex来读取的:

@Override

public HugeVertex readVertex(HugeGraph graph, BackendEntry bytesEntry) {undefined

if (bytesEntry == null) {undefined

return null;

}

BinaryBackendEntry entry = this.convertEntry(bytesEntry);

// Parse id

Id id = entry.id().origin();

Id vid = id.edge() ? ((EdgeId) id).ownerVertexId() : id;

HugeVertex vertex = new HugeVertex(graph, vid, VertexLabel.NONE);

// Parse all properties and edges of a Vertex

for (BackendColumn col : entry.columns()) {undefined

if (entry.type().isEdge()) {undefined

// NOTE: the entry id type is vertex even if entry type is edge

// Parse vertex edges

this.parseColumn(col, vertex);

} else {undefined

assert entry.type().isVertex();

// Parse vertex properties

assert entry.columnsSize() == 1 : entry.columnsSize();

this.parseVertex(col.value, vertex);

}

}

return vertex;

}

逻辑:

先读取ownervertexid,生成HugeVertex,这个时候只知道id,不知道vertexlabel,所以设置为VertexLabel.NONE

然后,读取BackendColumn,一个edge,一个Column(name是edgeid去除ownervertexid后的部分,value是边数据)

读取是在parseColumn:

protected void parseColumn(BackendColumn col, HugeVertex vertex) {undefined

BytesBuffer buffer = BytesBuffer.wrap(col.name);

Id id = this.keyWithIdPrefix ? buffer.readId() : vertex.id();

E.checkState(buffer.remaining() > 0, "Missing column type");

byte type = buffer.read();

// Parse property

if (type == HugeType.PROPERTY.code()) {undefined

Id pkeyId = buffer.readId();

this.parseProperty(pkeyId, BytesBuffer.wrap(col.value), vertex);

}

// Parse edge

else if (type == HugeType.EDGE_IN.code() ||

type == HugeType.EDGE_OUT.code()) {undefined

this.parseEdge(col, vertex, vertex.graph());

}

// Parse system property

else if (type == HugeType.SYS_PROPERTY.code()) {undefined

// pass

}

// Invalid entry

else {undefined

E.checkState(false, "Invalid entry(%s) with unknown type(%s): 0x%s",

id, type & 0xff, Bytes.toHex(col.name));

}

}

从``col.name`读取type,如果是edge,则parseEdge:

protected void parseEdge(BackendColumn col, HugeVertex vertex,

HugeGraph graph) {undefined

// owner-vertex + dir + edge-label + sort-values + other-vertex

BytesBuffer buffer = BytesBuffer.wrap(col.name);

if (this.keyWithIdPrefix) {undefined

// Consume owner-vertex id

buffer.readId();

}

byte type = buffer.read();

Id labelId = buffer.readId();

String sortValues = buffer.readStringWithEnding();

Id otherVertexId = buffer.readId();

boolean direction = EdgeId.isOutDirectionFromCode(type);

EdgeLabel edgeLabel = graph.edgeLabelOrNone(labelId);

// Construct edge

HugeEdge edge = HugeEdge.constructEdge(vertex, direction, edgeLabel,

sortValues, otherVertexId);

// Parse edge-id + edge-properties

buffer = BytesBuffer.wrap(col.value);

//Id id = buffer.readId();

// Parse edge properties

this.parseProperties(buffer, edge);

// Parse edge expired time if needed

if (edge.hasTtl()) {undefined

this.parseExpiredTime(buffer, edge);

}

}

从col.name依次读取出type,labelId,sortValues和otherVertexId:

byte type = buffer.read();

Id labelId = buffer.readId();

String sortValues = buffer.readStringWithEnding();

Id otherVertexId = buffer.readId();

然后根据labelid找到 EdgeLabel edgeLabel = graph.edgeLabelOrNone(labelId);

创建edge, 解析边属性parseProperties

后读取Ttl, 处理结果的时候,会过滤过期数据。
————————————————
版权声明:本文为CSDN博主「cestZOE」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_42144199/article/details/113401894

相关文章