VictoriaMetrics源码及相关API解析

2022-03-24 00:00:00 索引 查询 文件 这一 类型

flock.lock

vmStorage在启动时会根据路径以及给定的大保存时间创建Storage对象,然后还会根据cache路径(path + /cache)下是否存在/reset_cache_on_startup这一路径在选择进行cache目录下的清空(reset),然后是创建flock.lock文件,这是一个文件锁文件,调用的底层API是

if err := unix.Flock(int(flockF.Fd()), unix.LOCK_EX|unix.LOCK_NB); err != nil {
	return nil, fmt.Errorf("cannot acquire lock on file %q: %w", flockFile, err)
}

调用unix.Flock(int fd, int operation)对创建的flock.lock文件上文件锁,第二个参数的含义是指上锁的锁类型,有LOCK_SHLOCK_EXLOCK_NBLOCK_UN四种,含义如下

  • LOCK_SH表示共享锁
  • LOCK_EX表示排它锁
  • LOCK_NB让想要尝试给文件上排它锁的进程不进入阻塞状态(默认会进入阻塞状态)而直接返回
  • LOCK_UN用于解除文件的文件锁

而flock()添加的锁是建议性锁,也就是使用flock()对文件上锁后,其他进程仍然可以对该文件进行操作,而只有通过flock()调用去检测才可以知道文件是否处于被锁定的状态,也就是flock()只是起到一个通知的作用,没有强制性的阻止文件修改的动作。

atomic.Value

vmStorage在启动时会需要指定每小时以及每天的大保存进storage的数量,这个数量由一个限制器实现:

type Limiter struct {
	maxItems int
	v        atomic.Value

	wg     sync.WaitGroup
	stopCh chan struct{}
}

其中,atomic.Value类型类似于一个容器,可以将任意类型的读写操作分装成原子性操作(让中间状态对外不可见),atomic.Value类型对外暴露的方法只有两个

  • v.Store(c) - 写操作,将原始的变量c存放到一个atomic.Value类型的v里
  • c = v.Load() - 读操作,从线程安全的v中读取上一步存放的内容

Request.FormValue

Golang中net/http包下Request.FormValue方法可以获取 url 中 ? 后面的请求参数

TSID

vmStorage在启动时还需要获取到分配到的内存大小,然后根据一定的比例创建cache区域,有MetricName->TSIDMetricID->TSIDMetricID->MetricName以供后续插入数据建立索引使用,其中有关与TSID的创建是至关重要的,如下图所示,VictoriaMetrics在接收到写入请求时,会对请求中包含的时序数据做转换处理,首先根据包含metric和labels的MetricName生成一个表示TSID,然后metric + labels + TSID作为索引index, TSID + timestamp + value作为数据data,后索引index和数据data分别进行存储和检索。

TSID的结构

// storage/tsid.go
type TSID struct {
	// metricName, 指标名对应的id
	MetricGroupID uint64

	JobID              uint32
	InstanceID       uint32

        // metricNameRaw, 指标名+tags对应的id
	MetricID          uint64
}

TSID一共有四个字段,其中JobIDInstanceID是为了兼容Prometheus的协议而添加的,而且只有MetricID是必须的,其他三个字段都是可选的。TSID的初始化函数如下,这一函数会在VmStorage启动的main函数里面的go srv.RunVMInsert()go srv.RunVMSelect()中被调用到

func (db *indexDB) generateTSID(dst *TSID, metricName []byte, mn *MetricName) error {
	// Search the TSID in the external storage.
	// This is usually the db from the previous period.
	var err error
	if db.doExtDB(func(extDB *indexDB) {
		err = extDB.getTSIDByNameNoCreate(dst, metricName)
	}) {
		if err == nil {
			// The TSID has been found in the external storage.
			return nil
		}
		if err != io.EOF {
			return fmt.Errorf("external search failed: %w", err)
		}
	}

	// The TSID wasn't found in the external storage.
	// Generate it locally.
	dst.AccountID = mn.AccountID
	dst.ProjectID = mn.ProjectID
	dst.MetricGroupID = xxhash.Sum64(mn.MetricGroup)
	if len(mn.Tags) >  {
		dst.JobID = uint32(xxhash.Sum64(mn.Tags[].Value))
	}
	if len(mn.Tags) > 1 {
		dst.InstanceID = uint32(xxhash.Sum64(mn.Tags[1].Value))
	}
	dst.MetricID = generateUniqueMetricID()
	return nil
}

其中,JobID和InstanceID是为了兼容Prometheus协议而添加的,并且是根据tag字段的个跟第二个进行生成,因为VictoriaMetrics会将接收到的这两个ID放在前两位。MetricGroupID是对指标名称做hash算法;而MetricID是进程当前启动的纳秒时间戳,并且在随后的每次新的TSID的生成都会在此基础上自增1

倒排索引

VictoriaMetrics在创建完TSID后就会建立一系列的索引以供在查找时使用,并且由于TSID中除了MetricID外,其他字段都是可供选择的,所以可供使用的有效信息只有MetricID,因为VictoriaMetrics在构建tags到TSID的字典的过程中,就是在构建tag->MetricID的字典。
对于接收到的一个时序指标,VictoriaMetrics会生成以下几种索引,分别是

  • MetricName -> TSID
  • MetricID -> MetricName
  • MetricID -> TSID
  • Tags -> MetricID

例如以http_requests_total{status="200", method="GET"}为例,则MetricName为http_requests_total{status="200", method="GET"}, 假设生成的TSID为{metricGroupID=0, jobID=0, instanceID=0, metricID=51106185174286},那么生成的索引如下:

  • metricName -> TSID, 即http_requests_total{status="200", method="GET"} -> {metricGroupID=0, jobID=0, instanceID=0, metricID=51106185174286}
  • metricID -> metricName,即51106185174286 -> http_requests_total{status="200", method="GET"}
  • metricID -> TSID,即51106185174286 -> {metricGroupID=0, jobID=0, instanceID=0, metricID=51106185174286}
  • tag -> metricID,即 status="200" -> 51106185174286, method="GET" -> 51106185174286, "" = http_requests_total -> 51106185174286

有了上面这些索引的item之后,就可以进行基于tag的多维查询,VictoriaMetrics会首先根据tag寻找到所对应的MetricID列表,然后求出所有tag的MetricID列表的交集,然后再根据这一MetricID检索出TSID去到数据文件查询数据以及根据MetricID到索引文件检索原始的MetricName

VmSelect

VmSelect在接受到查询请求后,会将查询任务拆分成多个任务分发给VMStorage,再将返回的结果聚合返回,接受的url格式为/{prefix}/{authToken}/{suddix},含义分别如下:

  • prefix:操作类型,VmSelect支持的是select和delete
  • authToken:账号类型,格式为accountID[:projectID],用于租户隔离,projectID可选
  • suffix:prometheus的查询api

从VMSelect的api接收到查询的query到查询VmStorage,中间主要是一些解析查询语句的过程,然后会通过下面这一函数对所有的VmStorage进行遍历查询(无差别对待)

func startStorageNodesRequest(denyPartialResponse bool, f func(idx int, sn *storageNode) interface{}) *storageNodesRequest {
	resultsCh := make(chan interface{}, len(storageNodes))
	for idx, sn := range storageNodes {
		go func(idx int, sn *storageNode) {
			result := f(idx, sn)
			resultsCh <- result
		}(idx, sn)
	}
	return &storageNodesRequest{
		denyPartialResponse: denyPartialResponse,
		resultsCh:           resultsCh,
	}
}

其中,storageNodes是在VmSelect初始化的时候通过func InitStorageNodes(addrs []string)便初始化好的一个节点,这就意味着当我们使用增加storageNodes的方式扩容VictoriaMetrics的时候,就需要修改VmSelect和VmInsert的参数并重新启动,并且如果有一个VmStorage宕机的时候,查询出来的数据也是不完整的。

来源 https://www.cnblogs.com/keke-coding/p/15387056.html

相关文章