聊聊cortex的kv.Client
本文主要研究一下cortex的kv.Client
kv.Client
github.com/cortexproject/cortex/pkg/ring/kv/client.go
// Client is a high-level client for key-value stores (such as Etcd and
// Consul) that exposes operations such as CAS and Watch which take callbacks.
// It also deals with serialisation by using a Codec and having a instance of
// the the desired type passed in to methods ala json.Unmarshal.
type Client interface {
// List returns a list of keys under the given prefix. Returned keys will
// include the prefix.
List(ctx context.Context, prefix string) ([]string, error)
// Get a specific key. Will use a codec to deserialise key to appropriate type.
// If the key does not exist, Get will return nil and no error.
Get(ctx context.Context, key string) (interface{}, error)
// Delete a specific key. Deletions are best-effort and no error will
// be returned if the key does not exist.
Delete(ctx context.Context, key string) error
// CAS stands for Compare-And-Swap. Will call provided callback f with the
// current value of the key and allow callback to return a different value.
// Will then attempt to atomically swap the current value for the new value.
// If that doesn't succeed will try again - callback will be called again
// with new value etc. Guarantees that only a single concurrent CAS
// succeeds. Callback can return nil to indicate it is happy with existing
// value.
CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error
// WatchKey calls f whenever the value stored under key changes.
WatchKey(ctx context.Context, key string, f func(interface{}) bool)
// WatchPrefix calls f whenever any value stored under prefix changes.
WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool)
}
复制代码
kv.Client接口定义了List、Get、Delete、CAS、WatchKey、WatchPrefix方法
Client
github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go
// Client implements kv.Client interface, by using memberlist.KV
type Client struct {
kv *KV // reference to singleton memberlist-based KV
codec codec.Codec
}
// List is part of kv.Client interface.
func (c *Client) List(ctx context.Context, prefix string) ([]string, error) {
err := c.awaitKVRunningOrStopping(ctx)
if err != nil {
return nil, err
}
return c.kv.List(prefix), nil
}
// Get is part of kv.Client interface.
func (c *Client) Get(ctx context.Context, key string) (interface{}, error) {
err := c.awaitKVRunningOrStopping(ctx)
if err != nil {
return nil, err
}
return c.kv.Get(key, c.codec)
}
// Delete is part of kv.Client interface.
func (c *Client) Delete(ctx context.Context, key string) error {
return errors.New("memberlist does not support Delete")
}
// CAS is part of kv.Client interface
func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error {
err := c.awaitKVRunningOrStopping(ctx)
if err != nil {
return err
}
return c.kv.CAS(ctx, key, c.codec, f)
}
// WatchKey is part of kv.Client interface.
func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) bool) {
err := c.awaitKVRunningOrStopping(ctx)
if err != nil {
return
}
c.kv.WatchKey(ctx, key, c.codec, f)
}
// WatchPrefix calls f whenever any value stored under prefix changes.
// Part of kv.Client interface.
func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) {
err := c.awaitKVRunningOrStopping(ctx)
if err != nil {
return
}
c.kv.WatchPrefix(ctx, prefix, c.codec, f)
}
复制代码
Client实现了kv.Client接口,其List、Get、CAS、WatchKey、WatchPrefix方法均代理给kv,其Delete方法返回error
KV.List
github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go
// List returns all known keys under a given prefix.
// No communication with other nodes in the cluster is done here.
func (m *KV) List(prefix string) []string {
m.storeMu.Lock()
defer m.storeMu.Unlock()
var keys []string
for k := range m.store {
if strings.HasPrefix(k, prefix) {
keys = append(keys, k)
}
}
return keys
}
复制代码
KV.List方法遍历m.store,查找是否有指定prefix的key
KV.Get
github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go
// Get returns current value associated with given key.
// No communication with other nodes in the cluster is done here.
func (m *KV) Get(key string, codec codec.Codec) (interface{}, error) {
val, _, err := m.get(key, codec)
return val, err
}
// Returns current value with removed tombstones.
func (m *KV) get(key string, codec codec.Codec) (out interface{}, version uint, err error) {
m.storeMu.Lock()
v := m.store[key]
m.storeMu.Unlock()
out = nil
if v.value != nil {
out, err = codec.Decode(v.value)
if err != nil {
return nil, 0, err
}
if mr, ok := out.(Mergeable); ok {
// remove ALL tombstones before returning to client.
// No need for clients to see them.
mr.RemoveTombstones(time.Time{})
}
}
return out, v.version, nil
}
复制代码
KV.Get方法主要是从m.store[key]获取数据
KV.CAS
github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go
func (m *KV) CAS(ctx context.Context, key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) error {
var lastError error = nil
outer:
for retries := m.maxCasRetries; retries > 0; retries-- {
m.casAttempts.Inc()
if lastError == errNoChangeDetected {
// We only get here, if 'f' reports some change, but Merge function reports no change. This can happen
// with Ring's merge function, which depends on timestamps (and not the tokens) with 1-second resolution.
// By waiting for one second, we hope that Merge will be able to detect change from 'f' function.
select {
case <-time.After(noChangeDetectedRetrySleep):
// ok
case <-ctx.Done():
lastError = ctx.Err()
break outer
}
}
change, newver, retry, err := m.trySingleCas(key, codec, f)
if err != nil {
level.Debug(m.logger).Log("msg", "CAS attempt failed", "err", err, "retry", retry)
lastError = err
if !retry {
break
}
continue
}
if change != nil {
m.casSuccesses.Inc()
m.notifyWatchers(key)
if m.State() == services.Running {
m.broadcastNewValue(key, change, newver, codec)
} else {
level.Warn(m.logger).Log("msg", "skipped broadcasting CAS update because memberlist KV is shutting down", "key", key)
}
}
return nil
}
if lastError == errVersionMismatch {
// this is more likely error than version mismatch.
lastError = errTooManyRetries
}
m.casFailures.Inc()
return fmt.Errorf("failed to CAS-update key %s: %v", key, lastError)
}
复制代码
KV.CAS通过for循环m.maxCasRetries执行m.trySingleCas操作
KV.WatchKey
github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go
func (m *KV) WatchKey(ctx context.Context, key string, codec codec.Codec, f func(interface{}) bool) {
// keep one extra notification, to avoid missing notification if we're busy running the function
w := make(chan string, 1)
// register watcher
m.watchersMu.Lock()
m.watchers[key] = append(m.watchers[key], w)
m.watchersMu.Unlock()
defer func() {
// unregister watcher on exit
m.watchersMu.Lock()
defer m.watchersMu.Unlock()
removeWatcherChannel(key, w, m.watchers)
}()
for {
select {
case <-w:
// value changed
val, _, err := m.get(key, codec)
if err != nil {
level.Warn(m.logger).Log("msg", "failed to decode value while watching for changes", "key", key, "err", err)
continue
}
if !f(val) {
return
}
case <-m.shutdown:
// stop watching on shutdown
return
case <-ctx.Done():
return
}
}
}
复制代码
KV.WatchKey方法会往m.watchers[key]追加channel,然后for循环select等待channel的写入
KV.WatchPrefix
github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go
func (m *KV) WatchPrefix(ctx context.Context, prefix string, codec codec.Codec, f func(string, interface{}) bool) {
// we use bigger buffer here, since keys are interesting and we don't want to lose them.
w := make(chan string, 16)
// register watcher
m.watchersMu.Lock()
m.prefixWatchers[prefix] = append(m.prefixWatchers[prefix], w)
m.watchersMu.Unlock()
defer func() {
// unregister watcher on exit
m.watchersMu.Lock()
defer m.watchersMu.Unlock()
removeWatcherChannel(prefix, w, m.prefixWatchers)
}()
for {
select {
case key := <-w:
val, _, err := m.get(key, codec)
if err != nil {
level.Warn(m.logger).Log("msg", "failed to decode value while watching for changes", "key", key, "err", err)
continue
}
if !f(key, val) {
return
}
case <-m.shutdown:
// stop watching on shutdown
return
case <-ctx.Done():
return
}
}
}
复制代码
KV.WatchPrefix方法与WatchKey类型,不过它channel的长度为16,追加到的是m.prefixWatchers[prefix]
小结
cortex的kv.Client接口定义了List、Get、Delete、CAS、WatchKey、WatchPrefix方法;Client实现了kv.Client接口,其List、Get、CAS、WatchKey、WatchPrefix方法均代理给kv,其Delete方法返回error。
相关文章