聊聊cortex的ReadRing
本文主要研究一下cortex的ReadRing
ReadRing
cortex/pkg/ring/ring.go
// ReadRing represents the read interface to the ring.
type ReadRing interface {
prometheus.Collector
// Get returns n (or more) ingesters which form the replicas for the given key.
// bufDescs, bufHosts and bufZones are slices to be overwritten for the return value
// to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet().
Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, bufZones []string) (ReplicationSet, error)
// GetAllHealthy returns all healthy instances in the ring, for the given operation.
// This function doesn't check if the quorum is honored, so doesn't fail if the number
// of unhealthy ingesters is greater than the tolerated max unavailable.
GetAllHealthy(op Operation) (ReplicationSet, error)
// GetReplicationSetForOperation returns all instances where the input operation should be executed.
// The resulting ReplicationSet doesn't necessarily contains all healthy instances
// in the ring, but could contain the minimum set of instances required to execute
// the input operation.
GetReplicationSetForOperation(op Operation) (ReplicationSet, error)
ReplicationFactor() int
IngesterCount() int
// ShuffleShard returns a subring for the provided identifier (eg. a tenant ID)
// and size (number of instances).
ShuffleShard(identifier string, size int) ReadRing
// ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes
// all instances that have been part of the identifier's shard since "now - lookbackPeriod".
ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing
// HasInstance returns whether the ring contains an instance matching the provided instanceID.
HasInstance(instanceID string) bool
}
复制代码
ReadRing内嵌了prometheus.Collector,定义了Get、GetAllHealthy、GetReplicationSetForOperation、ReplicationFactor、IngesterCount、ShuffleShard、ShuffleShardWithLookback、HasInstance方法
Get
cortex/pkg/ring/ring.go
// Get returns n (or more) ingesters which form the replicas for the given key.
func (r *Ring) Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, bufZones []string) (ReplicationSet, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.ringDesc == nil || len(r.ringTokens) == 0 {
return ReplicationSet{}, ErrEmptyRing
}
var (
n = r.cfg.ReplicationFactor
ingesters = bufDescs[:0]
start = searchToken(r.ringTokens, key)
iterations = 0
// We use a slice instead of a map because it's faster to search within a
// slice than lookup a map for a very low number of items.
distinctHosts = bufHosts[:0]
distinctZones = bufZones[:0]
)
for i := start; len(distinctHosts) < n && iterations < len(r.ringTokens); i++ {
iterations++
// Wrap i around in the ring.
i %= len(r.ringTokens)
token := r.ringTokens[i]
info, ok := r.ringInstanceByToken[token]
if !ok {
// This should never happen unless a bug in the ring code.
return ReplicationSet{}, ErrInconsistentTokensInfo
}
// We want n *distinct* ingesters && distinct zones.
if util.StringsContain(distinctHosts, info.InstanceID) {
continue
}
// Ignore if the ingesters don't have a zone set.
if r.cfg.ZoneAwarenessEnabled && info.Zone != "" {
if util.StringsContain(distinctZones, info.Zone) {
continue
}
distinctZones = append(distinctZones, info.Zone)
}
distinctHosts = append(distinctHosts, info.InstanceID)
ingester := r.ringDesc.Ingesters[info.InstanceID]
// Check whether the replica set should be extended given we're including
// this instance.
if op.ShouldExtendReplicaSetOnState(ingester.State) {
n++
}
ingesters = append(ingesters, ingester)
}
liveIngesters, maxFailure, err := r.strategy.Filter(ingesters, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled)
if err != nil {
return ReplicationSet{}, err
}
return ReplicationSet{
Ingesters: liveIngesters,
MaxErrors: maxFailure,
}, nil
}
复制代码
Get方法先通过r.ringInstanceByToken[token]获取info,再通过r.ringDesc.Ingesters[info.InstanceID]获取ingester,之后通过r.strategy.Filter过滤出liveIngesters
GetAllHealthy
cortex/pkg/ring/ring.go
// GetAllHealthy implements ReadRing.
func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.ringDesc == nil || len(r.ringDesc.Ingesters) == 0 {
return ReplicationSet{}, ErrEmptyRing
}
now := time.Now()
ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))
for _, ingester := range r.ringDesc.Ingesters {
if r.IsHealthy(&ingester, op, now) {
ingesters = append(ingesters, ingester)
}
}
return ReplicationSet{
Ingesters: ingesters,
MaxErrors: 0,
}, nil
}
复制代码
GetAllHealthy方法遍历r.ringDesc.Ingesters,然后通过r.IsHealthy(&ingester, op, now)提取healthy的ingester
GetReplicationSetForOperation
cortex/pkg/ring/ring.go
// GetReplicationSetForOperation implements ReadRing.
func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.ringDesc == nil || len(r.ringTokens) == 0 {
return ReplicationSet{}, ErrEmptyRing
}
// Build the initial replication set, excluding unhealthy instances.
healthyInstances := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))
zoneFailures := make(map[string]struct{})
now := time.Now()
for _, ingester := range r.ringDesc.Ingesters {
if r.IsHealthy(&ingester, op, now) {
healthyInstances = append(healthyInstances, ingester)
} else {
zoneFailures[ingester.Zone] = struct{}{}
}
}
// Max errors and max unavailable zones are mutually exclusive. We initialise both
// to 0 and then we update them whether zone-awareness is enabled or not.
maxErrors := 0
maxUnavailableZones := 0
if r.cfg.ZoneAwarenessEnabled {
// Given data is replicated to RF different zones, we can tolerate a number of
// RF/2 failing zones. However, we need to protect from the case the ring currently
// contains instances in a number of zones < RF.
numReplicatedZones := util.Min(len(r.ringZones), r.cfg.ReplicationFactor)
minSuccessZones := (numReplicatedZones / 2) + 1
maxUnavailableZones = minSuccessZones - 1
if len(zoneFailures) > maxUnavailableZones {
return ReplicationSet{}, ErrTooManyFailedIngesters
}
if len(zoneFailures) > 0 {
// We remove all instances (even healthy ones) from zones with at least
// 1 failing ingester. Due to how replication works when zone-awareness is
// enabled (data is replicated to RF different zones), there's no benefit in
// querying healthy instances from "failing zones". A zone is considered
// failed if there is single error.
filteredInstances := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))
for _, ingester := range healthyInstances {
if _, ok := zoneFailures[ingester.Zone]; !ok {
filteredInstances = append(filteredInstances, ingester)
}
}
healthyInstances = filteredInstances
}
// Since we removed all instances from zones containing at least 1 failing
// instance, we have to decrease the max unavailable zones accordingly.
maxUnavailableZones -= len(zoneFailures)
} else {
// Calculate the number of required ingesters;
// ensure we always require at least RF-1 when RF=3.
numRequired := len(r.ringDesc.Ingesters)
if numRequired < r.cfg.ReplicationFactor {
numRequired = r.cfg.ReplicationFactor
}
// We can tolerate this many failures
numRequired -= r.cfg.ReplicationFactor / 2
if len(healthyInstances) < numRequired {
return ReplicationSet{}, ErrTooManyFailedIngesters
}
maxErrors = len(healthyInstances) - numRequired
}
return ReplicationSet{
Ingesters: healthyInstances,
MaxErrors: maxErrors,
MaxUnavailableZones: maxUnavailableZones,
}, nil
}
复制代码
GetReplicationSetForOperation先提取healthyInstances,然后再根据r.cfg.ZoneAwarenessEnabled进行进一步过滤
ShuffleShard
cortex/pkg/ring/ring.go
func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
// Nothing to do if the shard size is not smaller then the actual ring.
if size <= 0 || r.IngesterCount() <= size {
return r
}
if cached := r.getCachedShuffledSubring(identifier, size); cached != nil {
return cached
}
result := r.shuffleShard(identifier, size, 0, time.Now())
r.setCachedShuffledSubring(identifier, size, result)
return result
}
复制代码
ShuffleShard方法先从r.getCachedShuffledSubring获取,如果为nil则执行r.shuffleShard,再执行r.setCachedShuffledSubring
HasInstance
cortex/pkg/ring/ring.go
// HasInstance returns whether the ring contains an instance matching the provided instanceID.
func (r *Ring) HasInstance(instanceID string) bool {
r.mtx.RLock()
defer r.mtx.RUnlock()
instances := r.ringDesc.GetIngesters()
_, ok := instances[instanceID]
return ok
}
复制代码
HasInstance通过r.ringDesc.GetIngesters()获取instances,在根据instanceID判断是否存在
小结
cortex的ReadRing内嵌了prometheus.Collector,定义了Get、GetAllHealthy、GetReplicationSetForOperation、ReplicationFactor、IngesterCount、ShuffleShard、ShuffleShardWithLookback、HasInstance方法。
相关文章