EMQ持久化数据库Oracle,EMQ源码分析(五):Mnesia数据库
前言
基于EMQ2.3.11。Mnesia是分布式电信数据库管理系统,有以下特性:DBMS查询语言
数据持久性:表是持久化到存储的
集群复制:表可以在多个节点上复制
原子事务:支持事务
透明:对编程来说是透明的
实时数据搜索:查询速度很快
一、Mnesia表的创建参数
1、type 表类型
【取值】set:键值,1:1,一个键一条记录
ordered_set:键值,1:1,带排序
bag:1:n,一个键可以映射很多条记录
2、record_name 记录名称
【取值】记录名
所有表中的记录都必须是同一个记录的实例
3、ram_copies 内存复制
【取值】节点列表
可以指定要备份到哪些节点的内存中去,不保证事务更新。可以定时刷盘。
4、disc_copies 磁盘复制
【取值】节点列表
指定要备份到哪些节点的内存和磁盘中去,内存操作表格,磁盘追加操作日志。
5、disc_only_copies 强制磁盘复制
【取值】节点列表
指定备份到哪些节点的磁盘中去,操作都是基于磁盘操作,速度慢。
6、index 索引
【取值】属性名或整数列表
指定额外维护的索引表的元组位置
7、local_content 本地内容
【取值】true/false,默认false
表名对其他节点是已知的,但数据只在自己节点上。
8、majority
【取值】true/false,默认falsetrue:大多数表的副本都必须同步成功更新(保证数据一致性)
false:不需要立刻同步,副本数据可能某段时间不一致
9、snmp
【取值】SNMP键类型
将基于集合的表自动变为简单网络管理协议(SNMP)有序表。
10、attributes
【取值】Record的属性列表
指定要插入哪些属性到表中。
EMQ的表都是内存复制,而且是用的自行开发的ekka_mnesia库。
EMQ路由表是bag类型,因为一个Topic可能有多个Node都会订阅,是一个类似Map的表:-record(mqtt_route,
{ topic :: binary(),
node :: node()
}).
ok = ekka_mnesia:create_table(mqtt_route, [
{type, bag},
{ram_copies, [node()]},
{record_name, mqtt_route},
{attributes, record_info(fields, mqtt_route)}]);
EMQ的session是set类型,因为必须要保证clientId性:-record(mqtt_session,
{ client_id :: binary(),
sess_pid :: pid(),
clean_sess :: boolean()
}).
%% Global Session Table
ok = ekka_mnesia:create_table(mqtt_session, [
{type, set},
{ram_copies, [node()]},
{record_name, mqtt_session},
{attributes, record_info(fields, mqtt_session)}]);
二、Mnesia表操作
这里只简单的记录EMQ用到过的表格操作是什么含义。
1、select 按条件读取表格
【原型】
select(Tab, MatchSpec [, Lock]) -> transaction abort | [Object]Tab:表名
MatchSpec:匹配参数
Lock:是否加锁读取
【示例】mnesia:select(mqtt_session, [{#mqtt_session{client_id = '$1', sess_pid = '$2', _ = '_'},[{'==', {node, '$2'}, Node}], ['$1']}])
2、write 写记录
【原型】write(Record) -> transaction abort | ok
write(Tab, Record, LockKind) -> transaction abort | okTab:表名
Record:记录值
LockKind:write写入锁,sticky_write粘滞写入锁
默认的write/1是本表+记录值+写入锁。sticky_write粘滞写入锁是一种用于优化锁获取的机制。如果您使用复制表的目的主要是为了容错(而不是为了快速读取访问),则粘滞锁可能是佳选择。当获取粘滞写入锁时,将通知所有节点哪个节点被锁定。然后,来自同一节点的粘滞锁定请求将作为本地操作执行,而不与其他节点进行任何通信。即使在事务结束后,粘滞锁仍会留在节点上。
【示例】mnesia:write(Route).
3、delete 删除记录
【原型】delete({Tab, Key}) -> transaction abort | ok
delete(Tab, Key, LockKind) -> transaction abort | ok
和前面一致,默认用write写入锁,删除一行数据。
【示例】mnesia:delete({mqtt_trie_node, Topic})
4、delete_object 删除对象
【原型】delete_object(Record) -> transaction abort | ok
delete_object(Tab, Record, LockKind) -> transaction abort | ok
如果表是bag类型,一对多,可以用这个来删除所有数据,默认写入锁。
【示例】mnesia:delete_object(mqtt_route, R, write)
5、wread
【原型】wread({Tab, Key}) -> transaction abort | RecordList
read(Tab, Key, LockKind) -> transaction abort | RecordList
wread调用的read,使用写入锁(不可写)。
read的锁可以是:read、write、sticky_write。
【示例】mnesia:wread({mqtt_route, Topic})
6、ets
【原型】ets(Fun, [, Args]) -> ResultOfFun | exit(Reason)
调用一个非事务的函数,会去内存取本地ets表,速度很快,相当于取缓存。
【示例】EMQ只有一处用到了,在订阅树匹配的时候,因为只是读操作,但是操作次数太多,可以容忍脏读,所以用这个直接读内存:Matched = mnesia:ets(fun emqttd_trie:match/1, [Topic])
7、transaction 事务
【原型】transaction(Fun [[, Args], Retries]) -> {aborted, Reason} | {atomic, ResultOfFun}
以事务的方式执行Fun函数。
【示例】mnesia:transaction(Clean)
8、abort 中止事务
【原型】abort(Reason) -> transaction abort
主动表明事务执行失败,数据库恢复到事务之前的状态。
【示例】在进行delete_path的时候,递归到后发现找不到指定节点,需要主动表明自己执行失败(因为是业务逻辑错误),所以使用了abort:mnesia:abort({node_not_found, NodeId})
9、dirty_read 脏读
【原型】dirty_read(Tab,Key)->ValueList | exit({aborted,Reason})
读取当前的数据,不需要事务。
【示例】mnesia:dirty_read(mqtt_session, ClientId)
10、dirty_delete_object 脏删
【原型】dirty_delete_object(Tab, Record)
删除当前数据,不需要事务。
【示例】remove_session(Session) ->
mnesia:dirty_delete_object(Session).
11、async_dirty 异步执行
【原型】async_dirty(Fun, [, Args]) -> ResultOfFun | exit(Reason)
不要事务地执行Fun函数。
【示例】只在路由表相关操作中使用到,因为增加路由表无需加锁:add_direct_route(Route) ->
mnesia:async_dirty(fun mnesia:write/1, [Route]).
12、dirty_all_keys
【原型】dirty_all_keys(Tab) -> KeyList | exit({aborted, Reason})
注意这里的dirty不是动词,而是只all_keys的脏操作版本,获取表的所有键值。
【示例】只在一处用到,路由表:topics() ->
mnesia:dirty_all_keys(mqtt_route).
参考资料
2、《腾讯云开发者文档:mnesia》(相当于手册的中文翻译,基本是机器翻译,很多错误)
相关文章