LevelDB原理解析:数据的读写与合并是怎样发生的?
一、LevelDB总体架构
Memtable:Memtable可读可写,内部由SkipList实现,用于在内存中缓存写操作。
Immutable Memtable:内部同样由SkipList实现,但是只读,当Memtable的大小达到设定的阈值时,会变成 Immutable Memtable,后续由后台线程通过compaction操作将数据顺序落盘,变成sstable文件。
sstable:sstable是磁盘上的存储文件,它将key有序存放,level0层的sstable由内存中的Immutable Memtable直接持久化生成,因为没有和当前层的其他文件合并过,因此level0层的sstable里的key会发生重叠,其余层的sstable文件均由当前层和前一层的sstable文件归并而来。
Manifest:Manifest文件是sstable的索引信息,用来记录每个sstable对应的key range、文件size等信息。
Log:Log文件主要是用于机器重启而不丢失数据,当向LevelDB写入一条数据时,它首先会向Log文件顺序写入一条操作日志,然后再向内存Memtable写入数据,这样即便机器掉电,也不会出现数据丢失的情况。
Current文件:当机器重启时,LevelDB会重新生成新的Manifest文件,所以Manifest文件可能存在多个,这里会使用Current文件记录当前使用的Manifest文件。
二、写入流程
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
//调用DBImpl::Write方法
return Write(opt, &batch);
}
struct DBImpl::Writer {
Status status;
WriteBatch* batch;
bool sync;
bool done;
port::CondVar cv;
explicit Writer(port::Mutex* mu) : cv(mu) { }
};
其他线程已经帮忙把Writer写入;
抢到锁并且是写队列的首节点。
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;
MutexLock l(&mutex_);
writers_.push_back(&w);
//任务放到queue中,如果当前不是queue的头部则等待
//当某个线程将queue中自己对应的Writer写入磁盘时,可能也会将其他线程对应的Writer写入磁盘
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {
return w.status;
}
Status DBImpl::MakeRoomForWrite(bool force) {
//通过改变指针指向,将Memtable转换成Immutable
imm_ = mem_;
has_imm_.store(true, std::memory_order_release);
//生成新的Memtable
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
//触发compaction
MaybeScheduleCompaction();
}
//从队列中批量取任务
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
//将任务写入Log文件
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
//将任务写入Memtable
status = WriteBatchInternal::InsertInto(write_batch, mem_);
//last_writer指向writers_里合并的后一个Writer
//逐个遍历弹出writers_里的元素,并唤醒对应的正在等待的写线程,直到遇到last_writer
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// 唤醒队列未写入的个Writer
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
三、读取流程
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
//先查找memtable
if (mem->Get(lkey, value, &s)) {
//再查找immutable memtable
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
} else {
//查找sstable
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}
四、Compaction流程
将内存中的数据持久化到磁盘;
清理冗余数据,因为LevelDB的更新和删除操作具有延后性,两种操作实际上都会向LevelDB写入一条新记录,所以通过重新compaction整理数据,可以清理冗余数据,节省磁盘空间;
通过compaction使level 0以下的文件层中的数据保持有序,这样便可以通过二分进行数据查找,同时也可以减少待查找的文件数量,提升读效率。
1. minor compaction
将Immutable memtable落盘成SSTable文件
Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
Version* base) {
//生成sstable编号,用于构建文件名
FileMetaData meta;
meta.number = versions_->NewFileNumber();
Status s;
{
mutex_.Unlock();
//更新memtable中的全部数据到xxx.ldb文件
//meta记录key range, file_size等sst信息
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
mutex_.Lock();
}
int level = ;
if (s.ok() && meta.file_size > ) {
const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key();
if (base != nullptr) {
//为新生成的sstable选择合适的level(不一定总是0)
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
}
//level及file meta记录到edit
edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
meta.largest);
}
}
//记录edit信息
versions_->LogAndApply(&edit, &mutex_);
//释放imm_空间
imm_->Unref();
imm_ = nullptr;
has_imm_.Release_Store(nullptr);
//清理无用文件
DeleteObsoleteFiles();
2. major compation
level 0层:sstable文件个数超过指定个数。因为level0是从Immutable直接转储而来,所以用个数限制而不是文件大小。
level i层:第i层的sstable size总大小超过(10^i) MB。level越大,说明数据越冷,读取的几率越小,因此对于level更大的层,给定的size阈值更大,从而减少comaction次数。
对于sstable文件还有seek次数限制,如果文件多次seek但是一直没有查找到数据,那么就应该被合并了,否则会浪费更多的seek。
选择合适的level及sstable文件用于合并
level 0层的score计算规则为:文件数 / 4;
level i层的计算规则为:整个level所有的file size总和/(10^i)。
void VersionSet::Finalize(Version* v) {
// Precomputed best level for next compaction
int best_level = -1;
double best_score = -1
for (int level = ; level < config::kNumLevels - 1; level++) {
double score;
//对于level 0使用文件数/4计算score
if (level == ) {
score = v->files_[level].size() /
static_cast<double>(config::kL0_CompactionTrigger);
} else {
//对于非0层,使用该层文件的总大小
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score =
static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level);
}
if (score > best_score) {
best_level = level;
best_score = score;
}
}
//使用compaction_level_记录需要合并的层,使用compaction_score_记录合并分数
v->compaction_level_ = best_level;
v->compaction_score_ = best_score;
}
bool Version::UpdateStats(const GetStats& stats) {
FileMetaData* f = stats.seek_file;
if (f != nullptr) {
f->allowed_seeks--;
if (f->allowed_seeks <= && file_to_compact_ == nullptr) {
file_to_compact_ = f;
file_to_compact_level_ = stats.seek_file_level;
return true;
}
}
return false;
}
根据key重叠情况扩大输入文件集合
多路合并
Iterator* VersionSet::MakeInputIterator(Compaction* c) {
const int space = (c->level() == ? c->inputs_[].size() + 1 : 2);
// list存储所有Iterator
Iterator** list = new Iterator*[space];
int num = ;
for (int which = ; which < 2; which++) {
if (!c->inputs_[which].empty()) {
//第0层
if (c->level() + which == ) {
const std::vector<FileMetaData*>& files = c->inputs_[which];
// Iterator* Table::NewIterator
for (size_t i = ; i < files.size(); i++) {
list[num++] = table_cache_->NewIterator(
options, files[i]->number, files[i]->file_size);
}
} else {
// Create concatenating iterator for the files from this level
list[num++] = NewTwoLevelIterator(
// 遍历文件列表的iterator
new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]),
&GetFileIterator, table_cache_, options);
}
}
}
参考链接:
相关文章