mnesia存储过程(dump)

2022-04-11 00:00:00 函数 执行 文件 写入 转存

1.表类型为disc_copies和disc_only_copies的操作会记录到latest.log中
disc_copies类型的表在事务提交后,数据并不会直接落地,而是先写入latest.log中
disc_only_copies则直接写进dets落地,同时也会写入latest.log

2.dump操作会将latest.log的数据转存并落地
disc_copies类型会转存到.DCD和.DCL文件(通过disk_log.erl实现)
ram_copies和disc_only_copies类型不在dump中处理
3.dump的触发时机
1. 手动执行mnesia:dump_log/0
2. 通过mnesia_controller定时触发dump
mnesia_controller.erl

init([Parent]) ->
process_flag(trap_exit, true),
...
%% 发起定时器
Ref = next_async_dump_log(),
...
Empty = gb_trees:empty(),
{ok, #state{supervisor = Parent, dump_log_timer_ref = Ref,
loader_queue = Empty,
late_loader_queue = Empty}}.

next_async_dump_log() ->
%% 时间间隔为dump_log_time_threshold, 默认为3分钟
Interval = mnesia_monitor:get_env(dump_log_time_threshold),
Msg = {next_async_dump_log, time_threshold},
%% 向自身发消息
Ref = erlang:send_after(Interval, self(), Msg),
Ref.

%% 默认为3分钟
default_env(dump_log_time_threshold) ->
timer:minutes(3);

%% 收到dump消息的处理
handle_info({next_async_dump_log, InitBy}, State) ->
async_dump_log(InitBy),
Ref = next_async_dump_log(),
noreply(State#state{dump_log_timer_ref=Ref});

3. 执行mnesia_log:log/1时,累计执行次数等于dump_log_write_threshold时,触发dump
mnesia_dumper.erl

incr_log_writes() ->
Left = mnesia_lib:incr_counter(trans_log_writes_left, -1),
if
%% 剩余次数为0
Left =:= 0 ->
adjust_log_writes(true);
true ->
ignore
end.

adjust_log_writes(DoCast) ->
Token = {mnesia_adjust_log_writes, self()},
case global:set_lock(Token, [node()], 1) of
false ->
ignore; %% Somebody else is sending a dump request
true ->
case DoCast of
false ->
ignore;
true ->
%% 通知mnesia_controller执行dump
mnesia_controller:async_dump_log(write_threshold)
end,
%% dump_log_write_threshold默认为1000
Max = mnesia_monitor:get_env(dump_log_write_threshold),
Left = mnesia_lib:read_counter(trans_log_writes_left),
%% 重新设置次数
mnesia_lib:set_counter(trans_log_writes_left, Max),
Diff = Max - Left,
_ = mnesia_lib:incr_counter(trans_log_writes, Diff),
global:del_lock(Token, [node()])
end.

4.dump的流程
dump通过调用函数mnesia_controller:async_dump_log/1执行

async_dump_log(InitBy) ->
%% 向自己发消息
?SERVER_NAME ! {async_dump_log, InitBy},
ok.

handle_info({async_dump_log, InitBy}, State) ->
Worker = #dump_log{initiated_by = InitBy},
State2 = add_worker(Worker, State),
noreply(State2);

add_worker(Worker = #dump_log{}, State) ->
InitBy = Worker#dump_log.initiated_by,
Queue = State#state.dumper_queue,
...
Queue2 = Queue ++ [Worker],
State2 = State#state{dumper_queue = Queue2},
opt_start_worker(State2);

opt_start_worker(State) ->
case State#state.dumper_queue of
[Worker | _Rest] when State#state.dumper_pid == undefined ->
if
is_record(Worker, dump_log) ->
Pid = spawn_link(?MODULE, dump_and_reply, [self()]),
%% dump_and_reply函数会继续跳转到mnesia_dumper:opt_dump_log/1
%% 然后再进一步跳转到mnesia_dumper:perform_dump/2
...
ok.

dump_and_reply函数会继续跳转到mnesia_dumper:opt_dump_log/1,然后再进一步跳转到mnesia_dumper:perform_dump/2

mnesia_dumper.erl

perform_dump(InitBy, Regulator) ->
?eval_debug_fun({?MODULE, perform_dump}, [InitBy]),
%% 这个函数将latest.log文件改名为previous.log
LogState = mnesia_log:prepare_log_dump(InitBy),
adjust_log_writes(false),
case LogState of
{needs_dump, Diff} ->
U = mnesia_monitor:get_env(dump_log_update_in_place),
%% 加载previous.log
Cont = mnesia_log:init_log_dump(),
mnesia_recover:sync(),
try do_perform_dump(Cont, U, InitBy, Regulator, undefined).
%% 之后会进一步调用insert_op -> insert_rec ->
%% do_insert_rec -> insert_ops -> open_files
%% open_file通过open_disc_copies打开日志,再调用disc_insert写入

open_disc_copies/2
此函数通过mnesia_log打开DCD或DCL文件,供之后的disc_insert写入。

注意,DCD或DCL文件通过disk_log.erl实现,并不是dets!

open_disc_copies(Tab, InitBy) ->
%% 判断存储.DCL还是.DCD
DumpEts = needs_dump_ets(Tab),
if
DumpEts == false; InitBy == startup ->
DclF = mnesia_lib:tab2dcl(Tab),
mnesia_log:open_log({?MODULE,Tab},
mnesia_log:dcl_log_header(),
DclF,
mnesia_lib:exists(DclF),
mnesia_monitor:get_env(auto_repair),
read_write),
put({?MODULE, Tab}, {opened_dumper, dcl}),
true;
true ->
mnesia_log:ets2dcd(Tab),
put({?MODULE, Tab}, already_dumped),
false
end.

%% 判断存储.DCL还是.DCD
needs_dump_ets(Tab) ->
DclF = mnesia_lib:tab2dcl(Tab),
case file:read_file_info(DclF) of
{error, enoent} ->
false;
{ok, DclInfo} ->
DcdF = mnesia_lib:tab2dcd(Tab),
case file:read_file_info(DcdF) of
{error, Reason} ->
mnesia_lib:dbg_out("File ~tp info_error ~tp ~n",
[DcdF, Reason]),
true;
{ok, DcdInfo} ->
Mul = case ?catch_val(dc_dump_limit) of
{'EXIT', _} -> ?DumpToEtsMultiplier;
Val -> Val
end,
%% 判断公式
DcdInfo#file_info.size =< (DclInfo#file_info.size * Mul)
end
end.

disc_insert/8
disc_insert(_Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy) ->
case open_files(Tab, Storage, InPlace, InitBy) of
true ->
case Storage of
disc_copies when Tab /= schema ->
%% 写入上一步打开的DCL或DCD文件
mnesia_log:append({?MODULE,Tab}, {{Tab, Key}, Val, Op}),
ok;
_ ->
dets_insert(Op,Tab,Key,Val)
end;
false ->
ignore
end.

5.结语
disc_copies类型会转存到.DCD和.DCL文件(通过disk_log.erl实现),并不是dets。但disk_log并不是实时写磁盘,且terminate函数中并没有做保存处理,所以个人认为有一定丢数据的风险。
ram_copies在事务提交时通过ets存储
disc_only_copies在事务提交时直接通过dets存储,个人也没搞懂为何也要写latest.log

相关文章