Skip to content

Commit

Permalink
Ingester: Metadata APIs should honour QueryIngestersWithin when Query…
Browse files Browse the repository at this point in the history
…StoreForLabels is enabled (cortexproject#5027)

* Ingester Metadata APIs should honour QueryIngestersWithin

Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>

* Deprecate query-store-for-labels-enabled flag

Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>

Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>
Signed-off-by: Alex Le <leqiyue@amazon.com>
  • Loading branch information
harry671003 authored and alexqyle committed May 2, 2023
1 parent 7bf7669 commit e2dc0bf
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* [ENHANCEMENT] Query-tee: Add `/api/v1/query_exemplars` API endpoint support. #5010
* [ENHANCEMENT] Query Frontend/Query Scheduler: Increase upper bound to 60s for queue duration histogram metric. #5029
* [ENHANCEMENT] Query Frontend: Log Vertical sharding information when `query_stats_enabled` is enabled. #5037
* [ENHANCEMENT] Ingester: The metadata APIs should honour `querier.query-ingesters-within` when `querier.query-store-for-labels-enabled` is true. #5027
* [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978
* [FEATURE] Ingester: Add active series to all_user_stats page. #4972
* [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000
Expand Down
5 changes: 3 additions & 2 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,9 @@ querier:
# CLI flag: -querier.query-ingesters-within
[query_ingesters_within: <duration> | default = 0s]

# Query long-term store for series, label values and label names APIs. Works
# only with blocks engine.
# Deprecated (Querying long-term store for labels will be always enabled in
# the future.): Query long-term store for series, label values and label names
# APIs.
# CLI flag: -querier.query-store-for-labels-enabled
[query_store_for_labels_enabled: <boolean> | default = false]

Expand Down
4 changes: 2 additions & 2 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -810,8 +810,8 @@ The `querier_config` configures the Cortex querier.
# CLI flag: -querier.query-ingesters-within
[query_ingesters_within: <duration> | default = 0s]

# Query long-term store for series, label values and label names APIs. Works
# only with blocks engine.
# Deprecated (Querying long-term store for labels will be always enabled in the
# future.): Query long-term store for series, label values and label names APIs.
# CLI flag: -querier.query-store-for-labels-enabled
[query_store_for_labels_enabled: <boolean> | default = false]

Expand Down
2 changes: 2 additions & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,8 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) {
t.Cfg.Ingester.DistributorShardingStrategy = t.Cfg.Distributor.ShardingStrategy
t.Cfg.Ingester.DistributorShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels
t.Cfg.Ingester.InstanceLimitsFn = ingesterInstanceLimits(t.RuntimeConfig)
t.Cfg.Ingester.QueryStoreForLabels = t.Cfg.Querier.QueryStoreForLabels
t.Cfg.Ingester.QueryIngestersWithin = t.Cfg.Querier.QueryIngestersWithin
t.tsdbIngesterConfig()

t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Overrides, prometheus.DefaultRegisterer, util_log.Logger)
Expand Down
18 changes: 14 additions & 4 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ type Config struct {
DistributorShardingStrategy string `yaml:"-"`
DistributorShardByAllLabels bool `yaml:"-"`

// Injected at runtime and read from querier config.
QueryStoreForLabels bool `yaml:"-"`
QueryIngestersWithin time.Duration `yaml:"-"`

DefaultLimits InstanceLimits `yaml:"instance_limits"`
InstanceLimitsFn func() *InstanceLimits `yaml:"-"`

Expand Down Expand Up @@ -1303,7 +1307,7 @@ func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesReque
return &client.LabelValuesResponse{}, nil
}

mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db)
mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.cfg.QueryStoreForLabels, i.cfg.QueryIngestersWithin)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1364,7 +1368,7 @@ func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest
return &client.LabelNamesResponse{}, nil
}

mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db)
mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db, i.cfg.QueryStoreForLabels, i.cfg.QueryIngestersWithin)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1432,7 +1436,7 @@ func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.Metr
return nil, err
}

mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db)
mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db, i.cfg.QueryStoreForLabels, i.cfg.QueryIngestersWithin)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2564,7 +2568,13 @@ func (i *Ingester) flushHandler(w http.ResponseWriter, r *http.Request) {
}

// metadataQueryRange returns the best range to query for metadata queries based on the timerange in the ingester.
func metadataQueryRange(queryStart, queryEnd int64, db *userTSDB) (mint, maxt int64, err error) {
func metadataQueryRange(queryStart, queryEnd int64, db *userTSDB, queryStoreForLabels bool, queryIngestersWithin time.Duration) (mint, maxt int64, err error) {
if queryIngestersWithin > 0 && queryStoreForLabels {
// If the feature for querying metadata from store-gateway is enabled,
// then we don't want to manipulate the mint and maxt.
return
}

// Ingesters are run with limited retention and we don't support querying the store-gateway for labels yet.
// This means if someone loads a dashboard that is outside the range of the ingester, and we only return the
// data for the timerange requested (which will be empty), the dashboards will break. To fix this we should
Expand Down
25 changes: 20 additions & 5 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1733,10 +1733,12 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) {
}

tests := map[string]struct {
from int64
to int64
matchers []*client.LabelMatchers
expected []*cortexpb.Metric
from int64
to int64
matchers []*client.LabelMatchers
expected []*cortexpb.Metric
queryStoreForLabels bool
queryIngestersWithin time.Duration
}{
"should return an empty response if no metric match": {
from: math.MinInt64,
Expand Down Expand Up @@ -1794,6 +1796,18 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) {
{Labels: cortexpb.FromLabelsToLabelAdapters(fixtures[1].lbls)},
},
},
"should filter metrics by time range if queryStoreForLabels and queryIngestersWithin is enabled": {
from: 100,
to: 1000,
matchers: []*client.LabelMatchers{{
Matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "test_1"},
},
}},
expected: []*cortexpb.Metric{},
queryStoreForLabels: true,
queryIngestersWithin: time.Hour,
},
"should not return duplicated metrics on overlapping matchers": {
from: math.MinInt64,
to: math.MaxInt64,
Expand Down Expand Up @@ -1860,7 +1874,8 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) {
EndTimestampMs: testData.to,
MatchersSet: testData.matchers,
}

i.cfg.QueryStoreForLabels = testData.queryStoreForLabels
i.cfg.QueryIngestersWithin = testData.queryIngestersWithin
res, err := i.MetricsForLabelMatchers(ctx, req)
require.NoError(t, err)
assert.ElementsMatch(t, testData.expected, res.Metric)
Expand Down
53 changes: 30 additions & 23 deletions pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ type Distributor interface {
MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error)
}

func newDistributorQueryable(distributor Distributor, streaming bool, streamingMetdata bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration) QueryableWithFilter {
func newDistributorQueryable(distributor Distributor, streaming bool, streamingMetdata bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration, queryStoreForLabels bool) QueryableWithFilter {
return distributorQueryable{
distributor: distributor,
streaming: streaming,
streamingMetdata: streamingMetdata,
iteratorFn: iteratorFn,
queryIngestersWithin: queryIngestersWithin,
queryStoreForLabels: queryStoreForLabels,
}
}

Expand All @@ -53,6 +54,7 @@ type distributorQueryable struct {
streamingMetdata bool
iteratorFn chunkIteratorFunc
queryIngestersWithin time.Duration
queryStoreForLabels bool
}

func (d distributorQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
Expand All @@ -65,6 +67,7 @@ func (d distributorQueryable) Querier(ctx context.Context, mint, maxt int64) (st
streamingMetadata: d.streamingMetdata,
chunkIterFn: d.iteratorFn,
queryIngestersWithin: d.queryIngestersWithin,
queryStoreForLabels: d.queryStoreForLabels,
}, nil
}

Expand All @@ -81,6 +84,7 @@ type distributorQuerier struct {
streamingMetadata bool
chunkIterFn chunkIteratorFunc
queryIngestersWithin time.Duration
queryStoreForLabels bool
}

// Select implements storage.Querier interface.
Expand All @@ -95,35 +99,18 @@ func (q *distributorQuerier) Select(sortSeries bool, sp *storage.SelectHints, ma
}

// If the querier receives a 'series' query, it means only metadata is needed.
// For this specific case we shouldn't apply the queryIngestersWithin
// time range manipulation, otherwise we'll end up returning no series at all for
// For the specific case where queryStoreForLabels is disabled
// we shouldn't apply the queryIngestersWithin time range manipulation.
// Otherwise we'll end up returning no series at all for
// older time ranges (while in Cortex we do ignore the start/end and always return
// series in ingesters).
// Also, in the recent versions of Prometheus, we pass in the hint but with Func set to "series".
// See: https://github.com/prometheus/prometheus/pull/8050
if sp != nil && sp.Func == "series" {
var (
ms []metric.Metric
err error
)

if q.streamingMetadata {
ms, err = q.distributor.MetricsForLabelMatchersStream(ctx, model.Time(minT), model.Time(maxT), matchers...)
} else {
ms, err = q.distributor.MetricsForLabelMatchers(ctx, model.Time(minT), model.Time(maxT), matchers...)
}

if err != nil {
return storage.ErrSeriesSet(err)
}
return series.MetricsToSeriesSet(sortSeries, ms)
}
shouldNotQueryStoreForMetadata := (sp != nil && sp.Func == "series" && !q.queryStoreForLabels)

// If queryIngestersWithin is enabled, we do manipulate the query mint to query samples up until
// now - queryIngestersWithin, because older time ranges are covered by the storage. This
// optimization is particularly important for the blocks storage where the blocks retention in the
// ingesters could be way higher than queryIngestersWithin.
if q.queryIngestersWithin > 0 {
if q.queryIngestersWithin > 0 && !shouldNotQueryStoreForMetadata {
now := time.Now()
origMinT := minT
minT = math.Max64(minT, util.TimeToMillis(now.Add(-q.queryIngestersWithin)))
Expand All @@ -138,6 +125,26 @@ func (q *distributorQuerier) Select(sortSeries bool, sp *storage.SelectHints, ma
}
}

// In the recent versions of Prometheus, we pass in the hint but with Func set to "series".
// See: https://github.com/prometheus/prometheus/pull/8050
if sp != nil && sp.Func == "series" {
var (
ms []metric.Metric
err error
)

if q.streamingMetadata {
ms, err = q.distributor.MetricsForLabelMatchersStream(ctx, model.Time(minT), model.Time(maxT), matchers...)
} else {
ms, err = q.distributor.MetricsForLabelMatchers(ctx, model.Time(minT), model.Time(maxT), matchers...)
}

if err != nil {
return storage.ErrSeriesSet(err)
}
return series.MetricsToSeriesSet(sortSeries, ms)
}

if q.streaming {
return q.streamingSelect(ctx, sortSeries, minT, maxT, matchers)
}
Expand Down
22 changes: 16 additions & 6 deletions pkg/querier/distributor_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestDistributorQuerier(t *testing.T) {
},
nil)

queryable := newDistributorQueryable(d, false, false, nil, 0)
queryable := newDistributorQueryable(d, false, false, nil, 0, false)
querier, err := queryable.Querier(context.Background(), mint, maxt)
require.NoError(t, err)

Expand All @@ -70,6 +70,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)

tests := map[string]struct {
querySeries bool
queryStoreForLabels bool
queryIngestersWithin time.Duration
queryMinT int64
queryMaxT int64
Expand Down Expand Up @@ -112,6 +113,15 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
expectedMinT: util.TimeToMillis(now.Add(-100 * time.Minute)),
expectedMaxT: util.TimeToMillis(now.Add(-90 * time.Minute)),
},
"should manipulate query time range if queryIngestersWithin is enabled and queryStoreForLabels is enabled": {
querySeries: true,
queryStoreForLabels: true,
queryIngestersWithin: time.Hour,
queryMinT: util.TimeToMillis(now.Add(-100 * time.Minute)),
queryMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
expectedMinT: util.TimeToMillis(now.Add(-60 * time.Minute)),
expectedMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
},
}

for _, streamingEnabled := range []bool{false, true} {
Expand All @@ -124,7 +134,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
distributor.On("MetricsForLabelMatchersStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]metric.Metric{}, nil)

ctx := user.InjectOrgID(context.Background(), "test")
queryable := newDistributorQueryable(distributor, streamingEnabled, streamingEnabled, nil, testData.queryIngestersWithin)
queryable := newDistributorQueryable(distributor, streamingEnabled, streamingEnabled, nil, testData.queryIngestersWithin, testData.queryStoreForLabels)
querier, err := queryable.Querier(ctx, testData.queryMinT, testData.queryMaxT)
require.NoError(t, err)

Expand Down Expand Up @@ -161,7 +171,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)

func TestDistributorQueryableFilter(t *testing.T) {
d := &MockDistributor{}
dq := newDistributorQueryable(d, false, false, nil, 1*time.Hour)
dq := newDistributorQueryable(d, false, false, nil, 1*time.Hour, true)

now := time.Now()

Expand Down Expand Up @@ -209,7 +219,7 @@ func TestIngesterStreaming(t *testing.T) {
nil)

ctx := user.InjectOrgID(context.Background(), "0")
queryable := newDistributorQueryable(d, true, true, mergeChunks, 0)
queryable := newDistributorQueryable(d, true, true, mergeChunks, 0, true)
querier, err := queryable.Querier(ctx, mint, maxt)
require.NoError(t, err)

Expand Down Expand Up @@ -285,7 +295,7 @@ func TestIngesterStreamingMixedResults(t *testing.T) {
nil)

ctx := user.InjectOrgID(context.Background(), "0")
queryable := newDistributorQueryable(d, true, true, mergeChunks, 0)
queryable := newDistributorQueryable(d, true, true, mergeChunks, 0, true)
querier, err := queryable.Querier(ctx, mint, maxt)
require.NoError(t, err)

Expand Down Expand Up @@ -336,7 +346,7 @@ func TestDistributorQuerier_LabelNames(t *testing.T) {
d.On("MetricsForLabelMatchersStream", mock.Anything, model.Time(mint), model.Time(maxt), someMatchers).
Return(metrics, nil)

queryable := newDistributorQueryable(d, false, streamingEnabled, nil, 0)
queryable := newDistributorQueryable(d, false, streamingEnabled, nil, 0, true)
querier, err := queryable.Querier(context.Background(), mint, maxt)
require.NoError(t, err)

Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.IngesterMetadataStreaming, "querier.ingester-metadata-streaming", false, "Use streaming RPCs for metadata APIs from ingester.")
f.IntVar(&cfg.MaxSamples, "querier.max-samples", 50e6, "Maximum number of samples a single query can load into memory.")
f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
f.BoolVar(&cfg.QueryStoreForLabels, "querier.query-store-for-labels-enabled", false, "Query long-term store for series, label values and label names APIs. Works only with blocks engine.")
f.BoolVar(&cfg.QueryStoreForLabels, "querier.query-store-for-labels-enabled", false, "Deprecated (Querying long-term store for labels will be always enabled in the future.): Query long-term store for series, label values and label names APIs.")
f.BoolVar(&cfg.EnablePerStepStats, "querier.per-step-stats-enabled", false, "Enable returning samples stats per steps in query response.")
f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.")
f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.")
Expand Down Expand Up @@ -146,7 +146,7 @@ func getChunksIteratorFunction(cfg Config) chunkIteratorFunc {
func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, tombstonesLoader purger.TombstonesLoader, reg prometheus.Registerer, logger log.Logger) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, v1.QueryEngine) {
iteratorFunc := getChunksIteratorFunction(cfg)

distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterStreaming, cfg.IngesterMetadataStreaming, iteratorFunc, cfg.QueryIngestersWithin)
distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterStreaming, cfg.IngesterMetadataStreaming, iteratorFunc, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels)

ns := make([]QueryableWithFilter, len(stores))
for ix, s := range stores {
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) {

distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&unorderedResponse, nil)
distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(unorderedResponseMatrix, nil)
distributorQueryableStreaming := newDistributorQueryable(distributor, true, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin)
distributorQueryable := newDistributorQueryable(distributor, false, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin)
distributorQueryableStreaming := newDistributorQueryable(distributor, true, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels)
distributorQueryable := newDistributorQueryable(distributor, false, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels)

tCases := []struct {
name string
Expand Down

0 comments on commit e2dc0bf

Please sign in to comment.