聊聊cortex的Distributor

2022-05-10 00:00:00 专区 代码 获取 方法 复制

本文主要研究一下cortex的Distributor

Distributor

cortex/pkg/distributor/distributor.go

// Distributor is a storage.SampleAppender and a client.Querier which
// forwards appends and queries to individual ingesters.
type Distributor struct {
	services.Service

	cfg           Config
	ingestersRing ring.ReadRing
	ingesterPool  *ring_client.Pool
	limits        *validation.Overrides

	// The global rate limiter requires a distributors ring to count
	// the number of healthy instances
	distributorsRing *ring.Lifecycler

	// For handling HA replicas.
	HATracker *haTracker

	// Per-user rate limiter.
	ingestionRateLimiter *limiter.RateLimiter

	// Manager for subservices (HA Tracker, distributor ring and client pool)
	subservices        *services.Manager
	subservicesWatcher *services.FailureWatcher
}
复制代码

Distributor用于转发、追加、查询ingesters

Push

cortex/pkg/distributor/distributor.go

// Push implements client.IngesterServer
func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
	userID, err := tenant.TenantID(ctx)
	if err != nil {
		return nil, err
	}
	source := util.GetSourceIPsFromOutgoingCtx(ctx)

	var firstPartialErr error
	removeReplica := false

	numSamples := 0
	for _, ts := range req.Timeseries {
		numSamples += len(ts.Samples)
	}
	// Count the total samples in, prior to validation or deduplication, for comparison with other metrics.
	incomingSamples.WithLabelValues(userID).Add(float64(numSamples))
	// Count the total number of metadata in.
	incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata)))

	// A WriteRequest can only contain series or metadata but not both. This might change in the future.
	// For each timeseries or samples, we compute a hash to distribute across ingesters;
	// check each sample/metadata and discard if outside limits.
	validatedTimeseries := make([]client.PreallocTimeseries, 0, len(req.Timeseries))
	validatedMetadata := make([]*client.MetricMetadata, 0, len(req.Metadata))
	metadataKeys := make([]uint32, 0, len(req.Metadata))
	seriesKeys := make([]uint32, 0, len(req.Timeseries))
	validatedSamples := 0

	if d.limits.AcceptHASamples(userID) && len(req.Timeseries) > 0 {
		cluster, replica := findHALabels(d.limits.HAReplicaLabel(userID), d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels)
		removeReplica, err = d.checkSample(ctx, userID, cluster, replica)
		if err != nil {
			// Ensure the request slice is reused if the series get deduped.
			client.ReuseSlice(req.Timeseries)

			if errors.Is(err, replicasNotMatchError{}) {
				// These samples have been deduped.
				dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numSamples))
				return nil, httpgrpc.Errorf(http.StatusAccepted, err.Error())
			}

			if errors.Is(err, tooManyClustersError{}) {
				validation.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numSamples))
				return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
			}

			return nil, err
		}
		// If there wasn't an error but removeReplica is false that means we didn't find both HA labels.
		if !removeReplica {
			nonHASamples.WithLabelValues(userID).Add(float64(numSamples))
		}
	}

	latestSampleTimestampMs := int64(0)
	defer func() {
		// Update this metric even in case of errors.
		if latestSampleTimestampMs > 0 {
			latestSeenSampleTimestampPerUser.WithLabelValues(userID).Set(float64(latestSampleTimestampMs) / 1000)
		}
	}()

	// For each timeseries, compute a hash to distribute across ingesters;
	// check each sample and discard if outside limits.
	for _, ts := range req.Timeseries {
		// Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong.
		if len(ts.Samples) > 0 {
			latestSampleTimestampMs = util.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
		}

		if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 {
			l := relabel.Process(client.FromLabelAdaptersToLabels(ts.Labels), mrc...)
			ts.Labels = client.FromLabelsToLabelAdapters(l)
		}

		// If we found both the cluster and replica labels, we only want to include the cluster label when
		// storing series in Cortex. If we kept the replica label we would end up with another series for the same
		// series we're trying to dedupe when HA tracking moves over to a different replica.
		if removeReplica {
			removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels)
		}

		for _, labelName := range d.limits.DropLabels(userID) {
			removeLabel(labelName, &ts.Labels)
		}

		if len(ts.Labels) == 0 {
			continue
		}

		// We rely on sorted labels in different places:
		// 1) When computing token for labels, and sharding by all labels. Here different order of labels returns
		// different tokens, which is bad.
		// 2) In validation code, when checking for duplicate label names. As duplicate label names are rejected
		// later in the validation phase, we ignore them here.
		sortLabelsIfNeeded(ts.Labels)

		// Generate the sharding token based on the series labels without the HA replica
		// label and dropped labels (if any)
		key, err := d.tokenForLabels(userID, ts.Labels)
		if err != nil {
			return nil, err
		}

		validatedSeries, err := d.validateSeries(ts, userID)

		// Errors in validation are considered non-fatal, as one series in a request may contain
		// invalid data but all the remaining series could be perfectly valid.
		if err != nil && firstPartialErr == nil {
			firstPartialErr = err
		}

		// validateSeries would have returned an emptyPreallocSeries if there were no valid samples.
		if validatedSeries == emptyPreallocSeries {
			continue
		}

		seriesKeys = append(seriesKeys, key)
		validatedTimeseries = append(validatedTimeseries, validatedSeries)
		validatedSamples += len(ts.Samples)
	}

	for _, m := range req.Metadata {
		err := validation.ValidateMetadata(d.limits, userID, m)

		if err != nil {
			if firstPartialErr == nil {
				firstPartialErr = err
			}

			continue
		}

		metadataKeys = append(metadataKeys, d.tokenForMetadata(userID, m.MetricFamilyName))
		validatedMetadata = append(validatedMetadata, m)
	}

	receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples))
	receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata)))

	if len(seriesKeys) == 0 && len(metadataKeys) == 0 {
		// Ensure the request slice is reused if there's no series or metadata passing the validation.
		client.ReuseSlice(req.Timeseries)

		return &client.WriteResponse{}, firstPartialErr
	}

	now := time.Now()
	totalN := validatedSamples + len(validatedMetadata)
	if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
		// Ensure the request slice is reused if the request is rate limited.
		client.ReuseSlice(req.Timeseries)

		// Return a 4xx here to have the client discard the data and not retry. If a client
		// is sending too much data consistently we will unlikely ever catch up otherwise.
		validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples))
		validation.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
		return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))
	}

	subRing := d.ingestersRing

	// Obtain a subring if required.
	if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
		subRing = d.ingestersRing.ShuffleShard(userID, d.limits.IngestionTenantShardSize(userID))
	}

	keys := append(seriesKeys, metadataKeys...)
	initialMetadataIndex := len(seriesKeys)

	op := ring.WriteNoExtend
	if d.cfg.ExtendWrites {
		op = ring.Write
	}

	err = ring.DoBatch(ctx, op, subRing, keys, func(ingester ring.IngesterDesc, indexes []int) error {
		timeseries := make([]client.PreallocTimeseries, 0, len(indexes))
		var metadata []*client.MetricMetadata

		for _, i := range indexes {
			if i >= initialMetadataIndex {
				metadata = append(metadata, validatedMetadata[i-initialMetadataIndex])
			} else {
				timeseries = append(timeseries, validatedTimeseries[i])
			}
		}

		// Use a background context to make sure all ingesters get samples even if we return early
		localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
		defer cancel()
		localCtx = user.InjectOrgID(localCtx, userID)
		if sp := opentracing.SpanFromContext(ctx); sp != nil {
			localCtx = opentracing.ContextWithSpan(localCtx, sp)
		}

		// Get clientIP(s) from Context and add it to localCtx
		localCtx = util.AddSourceIPsToOutgoingContext(localCtx, source)

		return d.send(localCtx, ingester, timeseries, metadata, req.Source)
	}, func() { client.ReuseSlice(req.Timeseries) })
	if err != nil {
		return nil, err
	}
	return &client.WriteResponse{}, firstPartialErr
}
复制代码

Push方法在d.cfg.ShardingStrategy为util.ShardingStrategyShuffle时,会通过d.ingestersRing.ShuffleShard确定subRing;之后通过ring.DoBatch提交keys,其callback函数执行d.send(localCtx, ingester, timeseries, metadata, req.Source)

DoBatch

cortex/pkg/ring/batch.go

// DoBatch request against a set of keys in the ring, handling replication and
// failures. For example if we want to write N items where they may all
// hit different ingesters, and we want them all replicated R ways with
// quorum writes, we track the relationship between batch RPCs and the items
// within them.
//
// Callback is passed the ingester to target, and the indexes of the keys
// to send to that ingester.
//
// Not implemented as a method on Ring so we can test separately.
func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callback func(IngesterDesc, []int) error, cleanup func()) error {
	if r.IngesterCount() <= 0 {
		return fmt.Errorf("DoBatch: IngesterCount <= 0")
	}
	expectedTrackers := len(keys) * (r.ReplicationFactor() + 1) / r.IngesterCount()
	itemTrackers := make([]itemTracker, len(keys))
	ingesters := make(map[string]ingester, r.IngesterCount())

	var (
		bufDescs [GetBufferSize]IngesterDesc
		bufHosts [GetBufferSize]string
		bufZones [GetBufferSize]string
	)
	for i, key := range keys {
		replicationSet, err := r.Get(key, op, bufDescs[:0], bufHosts[:0], bufZones[:0])
		if err != nil {
			return err
		}
		itemTrackers[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors
		itemTrackers[i].maxFailures = replicationSet.MaxErrors

		for _, desc := range replicationSet.Ingesters {
			curr, found := ingesters[desc.Addr]
			if !found {
				curr.itemTrackers = make([]*itemTracker, 0, expectedTrackers)
				curr.indexes = make([]int, 0, expectedTrackers)
			}
			ingesters[desc.Addr] = ingester{
				desc:         desc,
				itemTrackers: append(curr.itemTrackers, &itemTrackers[i]),
				indexes:      append(curr.indexes, i),
			}
		}
	}

	tracker := batchTracker{
		done: make(chan struct{}, 1),
		err:  make(chan error, 1),
	}
	tracker.rpcsPending.Store(int32(len(itemTrackers)))

	var wg sync.WaitGroup

	wg.Add(len(ingesters))
	for _, i := range ingesters {
		go func(i ingester) {
			err := callback(i.desc, i.indexes)
			tracker.record(i.itemTrackers, err)
			wg.Done()
		}(i)
	}

	// Perform cleanup at the end.
	go func() {
		wg.Wait()

		cleanup()
	}()

	select {
	case err := <-tracker.err:
		return err
	case <-tracker.done:
		return nil
	case <-ctx.Done():
		return ctx.Err()
	}
}
复制代码

DoBatch方法提供了callback函数用于处理ingester及indexes

Query

cortex/pkg/distributor/query.go

// Query multiple ingesters and returns a Matrix of samples.
func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {
	var matrix model.Matrix
	err := instrument.CollectedRequest(ctx, "Distributor.Query", queryDuration, instrument.ErrorCode, func(ctx context.Context) error {
		req, err := ingester_client.ToQueryRequest(from, to, matchers)
		if err != nil {
			return err
		}

		replicationSet, err := d.GetIngestersForQuery(ctx, matchers...)
		if err != nil {
			return err
		}

		matrix, err = d.queryIngesters(ctx, replicationSet, req)
		if err != nil {
			return err
		}

		if s := opentracing.SpanFromContext(ctx); s != nil {
			s.LogKV("series", len(matrix))
		}
		return nil
	})
	return matrix, err
}
复制代码

Query方法通过d.GetIngestersForQuery获取replicationSet,再通过d.queryIngesters获取matrix

小结

cortex的Distributor提供了Push、Query方法;Push方法会通过d.ingestersRing.ShuffleShard确定subRing;之后通过ring.DoBatch提交keys;Query方法通过d.GetIngestersForQuery获取replicationSet,再通过d.queryIngesters获取matrix。

相关文章