聊聊cortex的Ingester

2022-05-10 00:00:00 执行 代码 接口 方法 复制

本文主要研究一下cortex的Ingester

Ingester

cortex/pkg/api/api.go

// Ingester is defined as an interface to allow for alternative implementations
// of ingesters to be passed into the API.RegisterIngester() method.
type Ingester interface {
	client.IngesterServer
	FlushHandler(http.ResponseWriter, *http.Request)
	ShutdownHandler(http.ResponseWriter, *http.Request)
	Push(context.Context, *client.WriteRequest) (*client.WriteResponse, error)
}
复制代码

Ingester接口内嵌了client.IngesterServer,定义了FlushHandler、ShutdownHandler、Push方法

client.IngesterServer

cortex/pkg/ingester/client/cortex.pb.go

// IngesterServer is the server API for Ingester service.
type IngesterServer interface {
	Push(context.Context, *WriteRequest) (*WriteResponse, error)
	Query(context.Context, *QueryRequest) (*QueryResponse, error)
	QueryStream(*QueryRequest, Ingester_QueryStreamServer) error
	LabelValues(context.Context, *LabelValuesRequest) (*LabelValuesResponse, error)
	LabelNames(context.Context, *LabelNamesRequest) (*LabelNamesResponse, error)
	UserStats(context.Context, *UserStatsRequest) (*UserStatsResponse, error)
	AllUserStats(context.Context, *UserStatsRequest) (*UsersStatsResponse, error)
	MetricsForLabelMatchers(context.Context, *MetricsForLabelMatchersRequest) (*MetricsForLabelMatchersResponse, error)
	MetricsMetadata(context.Context, *MetricsMetadataRequest) (*MetricsMetadataResponse, error)
	// TransferChunks allows leaving ingester (client) to stream chunks directly to joining ingesters (server).
	TransferChunks(Ingester_TransferChunksServer) error
}
复制代码

client.IngesterServer接口定义了Push、Query、QueryStream、LabelValues、LabelNames、UserStats、AllUserStats、MetricsForLabelMatchers、MetricsMetadata、TransferChunks方法

Push

cortex/pkg/ingester/ingester.go

// Push implements client.IngesterServer
func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
	if err := i.checkRunningOrStopping(); err != nil {
		return nil, err
	}

	if i.cfg.BlocksStorageEnabled {
		return i.v2Push(ctx, req)
	}

	// NOTE: because we use `unsafe` in deserialisation, we must not
	// retain anything from `req` past the call to ReuseSlice
	defer client.ReuseSlice(req.Timeseries)

	userID, err := tenant.TenantID(ctx)
	if err != nil {
		return nil, fmt.Errorf("no user id")
	}

	// Given metadata is a best-effort approach, and we don't halt on errors
	// process it before samples. Otherwise, we risk returning an error before ingestion.
	i.pushMetadata(ctx, userID, req.GetMetadata())

	var firstPartialErr *validationError
	var record *WALRecord
	if i.cfg.WALConfig.WALEnabled {
		record = recordPool.Get().(*WALRecord)
		record.UserID = userID
		// Assuming there is not much churn in most cases, there is no use
		// keeping the record.Labels slice hanging around.
		record.Series = nil
		if cap(record.Samples) < len(req.Timeseries) {
			record.Samples = make([]tsdb_record.RefSample, 0, len(req.Timeseries))
		} else {
			record.Samples = record.Samples[:0]
		}
	}

	for _, ts := range req.Timeseries {
		seriesSamplesIngested := 0
		for _, s := range ts.Samples {
			// append() copies the memory in `ts.Labels` except on the error path
			err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source, record)
			if err == nil {
				seriesSamplesIngested++
				continue
			}

			i.metrics.ingestedSamplesFail.Inc()
			if ve, ok := err.(*validationError); ok {
				if firstPartialErr == nil {
					firstPartialErr = ve
				}
				continue
			}

			// non-validation error: abandon this request
			return nil, grpcForwardableError(userID, http.StatusInternalServerError, err)
		}

		if i.cfg.ActiveSeriesMetricsEnabled && seriesSamplesIngested > 0 {
			// updateActiveSeries will copy labels if necessary.
			i.updateActiveSeries(userID, time.Now(), ts.Labels)
		}
	}

	if record != nil {
		// Log the record only if there was no error in ingestion.
		if err := i.wal.Log(record); err != nil {
			return nil, err
		}
		recordPool.Put(record)
	}

	if firstPartialErr != nil {
		// grpcForwardableError turns the error into a string so it no longer references `req`
		return &client.WriteResponse{}, grpcForwardableError(userID, firstPartialErr.code, firstPartialErr)
	}

	return &client.WriteResponse{}, nil
}
复制代码

Push方法首先执行checkRunningOrStopping,若i.cfg.BlocksStorageEnabled则执行i.v2Push(ctx, req);否则遍历req.Timeseries执行i.append

FlushHandler

cortex/pkg/ingester/flush.go

// FlushHandler triggers a flush of all in memory chunks.  Mainly used for
// local testing.
func (i *Ingester) FlushHandler(w http.ResponseWriter, r *http.Request) {
	if i.cfg.BlocksStorageEnabled {
		i.v2FlushHandler(w, r)
		return
	}

	level.Info(util.Logger).Log("msg", "starting to flush all the chunks")
	i.sweepUsers(true)
	level.Info(util.Logger).Log("msg", "chunks queued for flushing")
	w.WriteHeader(http.StatusNoContent)
}
复制代码

FlushHandler方法在i.cfg.BlocksStorageEnabled为true时执行i.v2FlushHandler(w, r)

ShutdownHandler

cortex/pkg/ingester/ingester.go

// ShutdownHandler triggers the following set of operations in order:
//     * Change the state of ring to stop accepting writes.
//     * Flush all the chunks.
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
	originalFlush := i.lifecycler.FlushOnShutdown()
	// We want to flush the chunks if transfer fails irrespective of original flag.
	i.lifecycler.SetFlushOnShutdown(true)

	// In the case of an HTTP shutdown, we want to unregister no matter what.
	originalUnregister := i.lifecycler.ShouldUnregisterOnShutdown()
	i.lifecycler.SetUnregisterOnShutdown(true)

	_ = services.StopAndAwaitTerminated(context.Background(), i)
	// Set state back to original.
	i.lifecycler.SetFlushOnShutdown(originalFlush)
	i.lifecycler.SetUnregisterOnShutdown(originalUnregister)

	w.WriteHeader(http.StatusNoContent)
}
复制代码

ShutdownHandler方法执行i.lifecycler.FlushOnShutdown()、i.lifecycler.ShouldUnregisterOnShutdown()以及services.StopAndAwaitTerminated

Query

cortex/pkg/ingester/ingester.go

// Query implements service.IngesterServer
func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error) {
	if err := i.checkRunningOrStopping(); err != nil {
		return nil, err
	}

	if i.cfg.BlocksStorageEnabled {
		return i.v2Query(ctx, req)
	}

	userID, err := tenant.TenantID(ctx)
	if err != nil {
		return nil, err
	}

	from, through, matchers, err := client.FromQueryRequest(req)
	if err != nil {
		return nil, err
	}

	i.metrics.queries.Inc()

	i.userStatesMtx.RLock()
	state, ok, err := i.userStates.getViaContext(ctx)
	i.userStatesMtx.RUnlock()
	if err != nil {
		return nil, err
	} else if !ok {
		return &client.QueryResponse{}, nil
	}

	result := &client.QueryResponse{}
	numSeries, numSamples := 0, 0
	maxSamplesPerQuery := i.limits.MaxSamplesPerQuery(userID)
	err = state.forSeriesMatching(ctx, matchers, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error {
		values, err := series.samplesForRange(from, through)
		if err != nil {
			return err
		}
		if len(values) == 0 {
			return nil
		}
		numSeries++

		numSamples += len(values)
		if numSamples > maxSamplesPerQuery {
			return httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "exceeded maximum number of samples in a query (%d)", maxSamplesPerQuery)
		}

		ts := client.TimeSeries{
			Labels:  client.FromLabelsToLabelAdapters(series.metric),
			Samples: make([]client.Sample, 0, len(values)),
		}
		for _, s := range values {
			ts.Samples = append(ts.Samples, client.Sample{
				Value:       float64(s.Value),
				TimestampMs: int64(s.Timestamp),
			})
		}
		result.Timeseries = append(result.Timeseries, ts)
		return nil
	}, nil, 0)
	i.metrics.queriedSeries.Observe(float64(numSeries))
	i.metrics.queriedSamples.Observe(float64(numSamples))
	return result, err
}
复制代码

Query方法先判断i.checkRunningOrStopping();若i.cfg.BlocksStorageEnabled则执行i.v2Query(ctx, req);否则通过series.samplesForRange(from, through)获取数据

小结

cortex的Ingester接口内嵌了client.IngesterServer,定义了FlushHandler、ShutdownHandler、Push方法;Ingester实现了Ingester接口。

相关文章