聊聊cortex的kv.Client

2022-05-10 00:00:00 专区 代码 接口 方法 复制

本文主要研究一下cortex的kv.Client

kv.Client

github.com/cortexproject/cortex/pkg/ring/kv/client.go

// Client is a high-level client for key-value stores (such as Etcd and
// Consul) that exposes operations such as CAS and Watch which take callbacks.
// It also deals with serialisation by using a Codec and having a instance of
// the the desired type passed in to methods ala json.Unmarshal.
type Client interface {
	// List returns a list of keys under the given prefix. Returned keys will
	// include the prefix.
	List(ctx context.Context, prefix string) ([]string, error)

	// Get a specific key.  Will use a codec to deserialise key to appropriate type.
	// If the key does not exist, Get will return nil and no error.
	Get(ctx context.Context, key string) (interface{}, error)

	// Delete a specific key. Deletions are best-effort and no error will
	// be returned if the key does not exist.
	Delete(ctx context.Context, key string) error

	// CAS stands for Compare-And-Swap.  Will call provided callback f with the
	// current value of the key and allow callback to return a different value.
	// Will then attempt to atomically swap the current value for the new value.
	// If that doesn't succeed will try again - callback will be called again
	// with new value etc.  Guarantees that only a single concurrent CAS
	// succeeds.  Callback can return nil to indicate it is happy with existing
	// value.
	CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error

	// WatchKey calls f whenever the value stored under key changes.
	WatchKey(ctx context.Context, key string, f func(interface{}) bool)

	// WatchPrefix calls f whenever any value stored under prefix changes.
	WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool)
}
复制代码

kv.Client接口定义了List、Get、Delete、CAS、WatchKey、WatchPrefix方法

Client

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

// Client implements kv.Client interface, by using memberlist.KV
type Client struct {
	kv    *KV // reference to singleton memberlist-based KV
	codec codec.Codec
}

// List is part of kv.Client interface.
func (c *Client) List(ctx context.Context, prefix string) ([]string, error) {
	err := c.awaitKVRunningOrStopping(ctx)
	if err != nil {
		return nil, err
	}

	return c.kv.List(prefix), nil
}

// Get is part of kv.Client interface.
func (c *Client) Get(ctx context.Context, key string) (interface{}, error) {
	err := c.awaitKVRunningOrStopping(ctx)
	if err != nil {
		return nil, err
	}

	return c.kv.Get(key, c.codec)
}

// Delete is part of kv.Client interface.
func (c *Client) Delete(ctx context.Context, key string) error {
	return errors.New("memberlist does not support Delete")
}

// CAS is part of kv.Client interface
func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error {
	err := c.awaitKVRunningOrStopping(ctx)
	if err != nil {
		return err
	}

	return c.kv.CAS(ctx, key, c.codec, f)
}

// WatchKey is part of kv.Client interface.
func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) bool) {
	err := c.awaitKVRunningOrStopping(ctx)
	if err != nil {
		return
	}

	c.kv.WatchKey(ctx, key, c.codec, f)
}

// WatchPrefix calls f whenever any value stored under prefix changes.
// Part of kv.Client interface.
func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) {
	err := c.awaitKVRunningOrStopping(ctx)
	if err != nil {
		return
	}

	c.kv.WatchPrefix(ctx, prefix, c.codec, f)
}
复制代码

Client实现了kv.Client接口,其List、Get、CAS、WatchKey、WatchPrefix方法均代理给kv,其Delete方法返回error

KV.List

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

// List returns all known keys under a given prefix.
// No communication with other nodes in the cluster is done here.
func (m *KV) List(prefix string) []string {
	m.storeMu.Lock()
	defer m.storeMu.Unlock()

	var keys []string
	for k := range m.store {
		if strings.HasPrefix(k, prefix) {
			keys = append(keys, k)
		}
	}
	return keys
}
复制代码

KV.List方法遍历m.store,查找是否有指定prefix的key

KV.Get

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

// Get returns current value associated with given key.
// No communication with other nodes in the cluster is done here.
func (m *KV) Get(key string, codec codec.Codec) (interface{}, error) {
	val, _, err := m.get(key, codec)
	return val, err
}

// Returns current value with removed tombstones.
func (m *KV) get(key string, codec codec.Codec) (out interface{}, version uint, err error) {
	m.storeMu.Lock()
	v := m.store[key]
	m.storeMu.Unlock()

	out = nil
	if v.value != nil {
		out, err = codec.Decode(v.value)
		if err != nil {
			return nil, 0, err
		}

		if mr, ok := out.(Mergeable); ok {
			// remove ALL tombstones before returning to client.
			// No need for clients to see them.
			mr.RemoveTombstones(time.Time{})
		}
	}

	return out, v.version, nil
}
复制代码

KV.Get方法主要是从m.store[key]获取数据

KV.CAS

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

func (m *KV) CAS(ctx context.Context, key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) error {
	var lastError error = nil

outer:
	for retries := m.maxCasRetries; retries > 0; retries-- {
		m.casAttempts.Inc()

		if lastError == errNoChangeDetected {
			// We only get here, if 'f' reports some change, but Merge function reports no change. This can happen
			// with Ring's merge function, which depends on timestamps (and not the tokens) with 1-second resolution.
			// By waiting for one second, we hope that Merge will be able to detect change from 'f' function.

			select {
			case <-time.After(noChangeDetectedRetrySleep):
				// ok
			case <-ctx.Done():
				lastError = ctx.Err()
				break outer
			}
		}

		change, newver, retry, err := m.trySingleCas(key, codec, f)
		if err != nil {
			level.Debug(m.logger).Log("msg", "CAS attempt failed", "err", err, "retry", retry)

			lastError = err
			if !retry {
				break
			}
			continue
		}

		if change != nil {
			m.casSuccesses.Inc()
			m.notifyWatchers(key)

			if m.State() == services.Running {
				m.broadcastNewValue(key, change, newver, codec)
			} else {
				level.Warn(m.logger).Log("msg", "skipped broadcasting CAS update because memberlist KV is shutting down", "key", key)
			}
		}

		return nil
	}

	if lastError == errVersionMismatch {
		// this is more likely error than version mismatch.
		lastError = errTooManyRetries
	}

	m.casFailures.Inc()
	return fmt.Errorf("failed to CAS-update key %s: %v", key, lastError)
}
复制代码

KV.CAS通过for循环m.maxCasRetries执行m.trySingleCas操作

KV.WatchKey

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

func (m *KV) WatchKey(ctx context.Context, key string, codec codec.Codec, f func(interface{}) bool) {
	// keep one extra notification, to avoid missing notification if we're busy running the function
	w := make(chan string, 1)

	// register watcher
	m.watchersMu.Lock()
	m.watchers[key] = append(m.watchers[key], w)
	m.watchersMu.Unlock()

	defer func() {
		// unregister watcher on exit
		m.watchersMu.Lock()
		defer m.watchersMu.Unlock()

		removeWatcherChannel(key, w, m.watchers)
	}()

	for {
		select {
		case <-w:
			// value changed
			val, _, err := m.get(key, codec)
			if err != nil {
				level.Warn(m.logger).Log("msg", "failed to decode value while watching for changes", "key", key, "err", err)
				continue
			}

			if !f(val) {
				return
			}

		case <-m.shutdown:
			// stop watching on shutdown
			return

		case <-ctx.Done():
			return
		}
	}
}
复制代码

KV.WatchKey方法会往m.watchers[key]追加channel,然后for循环select等待channel的写入

KV.WatchPrefix

github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go

func (m *KV) WatchPrefix(ctx context.Context, prefix string, codec codec.Codec, f func(string, interface{}) bool) {
	// we use bigger buffer here, since keys are interesting and we don't want to lose them.
	w := make(chan string, 16)

	// register watcher
	m.watchersMu.Lock()
	m.prefixWatchers[prefix] = append(m.prefixWatchers[prefix], w)
	m.watchersMu.Unlock()

	defer func() {
		// unregister watcher on exit
		m.watchersMu.Lock()
		defer m.watchersMu.Unlock()

		removeWatcherChannel(prefix, w, m.prefixWatchers)
	}()

	for {
		select {
		case key := <-w:
			val, _, err := m.get(key, codec)
			if err != nil {
				level.Warn(m.logger).Log("msg", "failed to decode value while watching for changes", "key", key, "err", err)
				continue
			}

			if !f(key, val) {
				return
			}

		case <-m.shutdown:
			// stop watching on shutdown
			return

		case <-ctx.Done():
			return
		}
	}
}
复制代码

KV.WatchPrefix方法与WatchKey类型,不过它channel的长度为16,追加到的是m.prefixWatchers[prefix]

小结

cortex的kv.Client接口定义了List、Get、Delete、CAS、WatchKey、WatchPrefix方法;Client实现了kv.Client接口,其List、Get、CAS、WatchKey、WatchPrefix方法均代理给kv,其Delete方法返回error。

相关文章