聊聊cortex的ReadRing

2022-05-10 00:00:00 专区 代码 获取 方法 复制

本文主要研究一下cortex的ReadRing

ReadRing

cortex/pkg/ring/ring.go

// ReadRing represents the read interface to the ring.
type ReadRing interface {
	prometheus.Collector

	// Get returns n (or more) ingesters which form the replicas for the given key.
	// bufDescs, bufHosts and bufZones are slices to be overwritten for the return value
	// to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet().
	Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, bufZones []string) (ReplicationSet, error)

	// GetAllHealthy returns all healthy instances in the ring, for the given operation.
	// This function doesn't check if the quorum is honored, so doesn't fail if the number
	// of unhealthy ingesters is greater than the tolerated max unavailable.
	GetAllHealthy(op Operation) (ReplicationSet, error)

	// GetReplicationSetForOperation returns all instances where the input operation should be executed.
	// The resulting ReplicationSet doesn't necessarily contains all healthy instances
	// in the ring, but could contain the minimum set of instances required to execute
	// the input operation.
	GetReplicationSetForOperation(op Operation) (ReplicationSet, error)

	ReplicationFactor() int
	IngesterCount() int

	// ShuffleShard returns a subring for the provided identifier (eg. a tenant ID)
	// and size (number of instances).
	ShuffleShard(identifier string, size int) ReadRing

	// ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes
	// all instances that have been part of the identifier's shard since "now - lookbackPeriod".
	ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing

	// HasInstance returns whether the ring contains an instance matching the provided instanceID.
	HasInstance(instanceID string) bool
}
复制代码

ReadRing内嵌了prometheus.Collector,定义了Get、GetAllHealthy、GetReplicationSetForOperation、ReplicationFactor、IngesterCount、ShuffleShard、ShuffleShardWithLookback、HasInstance方法

Get

cortex/pkg/ring/ring.go

// Get returns n (or more) ingesters which form the replicas for the given key.
func (r *Ring) Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, bufZones []string) (ReplicationSet, error) {
	r.mtx.RLock()
	defer r.mtx.RUnlock()
	if r.ringDesc == nil || len(r.ringTokens) == 0 {
		return ReplicationSet{}, ErrEmptyRing
	}

	var (
		n          = r.cfg.ReplicationFactor
		ingesters  = bufDescs[:0]
		start      = searchToken(r.ringTokens, key)
		iterations = 0

		// We use a slice instead of a map because it's faster to search within a
		// slice than lookup a map for a very low number of items.
		distinctHosts = bufHosts[:0]
		distinctZones = bufZones[:0]
	)
	for i := start; len(distinctHosts) < n && iterations < len(r.ringTokens); i++ {
		iterations++
		// Wrap i around in the ring.
		i %= len(r.ringTokens)
		token := r.ringTokens[i]

		info, ok := r.ringInstanceByToken[token]
		if !ok {
			// This should never happen unless a bug in the ring code.
			return ReplicationSet{}, ErrInconsistentTokensInfo
		}

		// We want n *distinct* ingesters && distinct zones.
		if util.StringsContain(distinctHosts, info.InstanceID) {
			continue
		}

		// Ignore if the ingesters don't have a zone set.
		if r.cfg.ZoneAwarenessEnabled && info.Zone != "" {
			if util.StringsContain(distinctZones, info.Zone) {
				continue
			}
			distinctZones = append(distinctZones, info.Zone)
		}

		distinctHosts = append(distinctHosts, info.InstanceID)
		ingester := r.ringDesc.Ingesters[info.InstanceID]

		// Check whether the replica set should be extended given we're including
		// this instance.
		if op.ShouldExtendReplicaSetOnState(ingester.State) {
			n++
		}

		ingesters = append(ingesters, ingester)
	}

	liveIngesters, maxFailure, err := r.strategy.Filter(ingesters, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled)
	if err != nil {
		return ReplicationSet{}, err
	}

	return ReplicationSet{
		Ingesters: liveIngesters,
		MaxErrors: maxFailure,
	}, nil
}
复制代码

Get方法先通过r.ringInstanceByToken[token]获取info,再通过r.ringDesc.Ingesters[info.InstanceID]获取ingester,之后通过r.strategy.Filter过滤出liveIngesters

GetAllHealthy

cortex/pkg/ring/ring.go

// GetAllHealthy implements ReadRing.
func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error) {
	r.mtx.RLock()
	defer r.mtx.RUnlock()

	if r.ringDesc == nil || len(r.ringDesc.Ingesters) == 0 {
		return ReplicationSet{}, ErrEmptyRing
	}

	now := time.Now()
	ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))
	for _, ingester := range r.ringDesc.Ingesters {
		if r.IsHealthy(&ingester, op, now) {
			ingesters = append(ingesters, ingester)
		}
	}

	return ReplicationSet{
		Ingesters: ingesters,
		MaxErrors: 0,
	}, nil
}
复制代码

GetAllHealthy方法遍历r.ringDesc.Ingesters,然后通过r.IsHealthy(&ingester, op, now)提取healthy的ingester

GetReplicationSetForOperation

cortex/pkg/ring/ring.go

// GetReplicationSetForOperation implements ReadRing.
func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) {
	r.mtx.RLock()
	defer r.mtx.RUnlock()

	if r.ringDesc == nil || len(r.ringTokens) == 0 {
		return ReplicationSet{}, ErrEmptyRing
	}

	// Build the initial replication set, excluding unhealthy instances.
	healthyInstances := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))
	zoneFailures := make(map[string]struct{})
	now := time.Now()

	for _, ingester := range r.ringDesc.Ingesters {
		if r.IsHealthy(&ingester, op, now) {
			healthyInstances = append(healthyInstances, ingester)
		} else {
			zoneFailures[ingester.Zone] = struct{}{}
		}
	}

	// Max errors and max unavailable zones are mutually exclusive. We initialise both
	// to 0 and then we update them whether zone-awareness is enabled or not.
	maxErrors := 0
	maxUnavailableZones := 0

	if r.cfg.ZoneAwarenessEnabled {
		// Given data is replicated to RF different zones, we can tolerate a number of
		// RF/2 failing zones. However, we need to protect from the case the ring currently
		// contains instances in a number of zones < RF.
		numReplicatedZones := util.Min(len(r.ringZones), r.cfg.ReplicationFactor)
		minSuccessZones := (numReplicatedZones / 2) + 1
		maxUnavailableZones = minSuccessZones - 1

		if len(zoneFailures) > maxUnavailableZones {
			return ReplicationSet{}, ErrTooManyFailedIngesters
		}

		if len(zoneFailures) > 0 {
			// We remove all instances (even healthy ones) from zones with at least
			// 1 failing ingester. Due to how replication works when zone-awareness is
			// enabled (data is replicated to RF different zones), there's no benefit in
			// querying healthy instances from "failing zones". A zone is considered
			// failed if there is single error.
			filteredInstances := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))
			for _, ingester := range healthyInstances {
				if _, ok := zoneFailures[ingester.Zone]; !ok {
					filteredInstances = append(filteredInstances, ingester)
				}
			}

			healthyInstances = filteredInstances
		}

		// Since we removed all instances from zones containing at least 1 failing
		// instance, we have to decrease the max unavailable zones accordingly.
		maxUnavailableZones -= len(zoneFailures)
	} else {
		// Calculate the number of required ingesters;
		// ensure we always require at least RF-1 when RF=3.
		numRequired := len(r.ringDesc.Ingesters)
		if numRequired < r.cfg.ReplicationFactor {
			numRequired = r.cfg.ReplicationFactor
		}
		// We can tolerate this many failures
		numRequired -= r.cfg.ReplicationFactor / 2

		if len(healthyInstances) < numRequired {
			return ReplicationSet{}, ErrTooManyFailedIngesters
		}

		maxErrors = len(healthyInstances) - numRequired
	}

	return ReplicationSet{
		Ingesters:           healthyInstances,
		MaxErrors:           maxErrors,
		MaxUnavailableZones: maxUnavailableZones,
	}, nil
}
复制代码

GetReplicationSetForOperation先提取healthyInstances,然后再根据r.cfg.ZoneAwarenessEnabled进行进一步过滤

ShuffleShard

cortex/pkg/ring/ring.go

func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
	// Nothing to do if the shard size is not smaller then the actual ring.
	if size <= 0 || r.IngesterCount() <= size {
		return r
	}

	if cached := r.getCachedShuffledSubring(identifier, size); cached != nil {
		return cached
	}

	result := r.shuffleShard(identifier, size, 0, time.Now())

	r.setCachedShuffledSubring(identifier, size, result)
	return result
}
复制代码

ShuffleShard方法先从r.getCachedShuffledSubring获取,如果为nil则执行r.shuffleShard,再执行r.setCachedShuffledSubring

HasInstance

cortex/pkg/ring/ring.go

// HasInstance returns whether the ring contains an instance matching the provided instanceID.
func (r *Ring) HasInstance(instanceID string) bool {
	r.mtx.RLock()
	defer r.mtx.RUnlock()

	instances := r.ringDesc.GetIngesters()
	_, ok := instances[instanceID]
	return ok
}
复制代码

HasInstance通过r.ringDesc.GetIngesters()获取instances,在根据instanceID判断是否存在

小结

cortex的ReadRing内嵌了prometheus.Collector,定义了Get、GetAllHealthy、GetReplicationSetForOperation、ReplicationFactor、IngesterCount、ShuffleShard、ShuffleShardWithLookback、HasInstance方法。

相关文章