Skip to content

Commit

Permalink
Proposal: Create a new "Per LabelSet" limit (#5950)
Browse files Browse the repository at this point in the history
* Creating Limits per LabelSet

Signed-off-by: alanprot <alanprot@gmail.com>

* lint

Signed-off-by: alanprot <alanprot@gmail.com>

* fix test

Signed-off-by: alanprot <alanprot@gmail.com>

* doc

Signed-off-by: alanprot <alanprot@gmail.com>

---------

Signed-off-by: alanprot <alanprot@gmail.com>
  • Loading branch information
alanprot committed May 15, 2024
1 parent 2527f9e commit 2948539
Show file tree
Hide file tree
Showing 8 changed files with 557 additions and 46 deletions.
24 changes: 15 additions & 9 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3168,6 +3168,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -ingester.max-global-series-per-metric
[max_global_series_per_metric: <int> | default = 0]

# [Experimental] The maximum number of active series per LabelSet, across the
# cluster before replication. Empty list to disable.
[max_series_per_label_set: <list of MaxSeriesPerLabelSet> | default = []]

# The maximum number of active metrics with metadata per user, per ingester. 0
# to disable.
# CLI flag: -ingester.max-metadata-per-user
Expand Down Expand Up @@ -4009,7 +4013,7 @@ The `ruler_config` configures the Cortex ruler.
[external_url: <url> | default = ]

# Labels to add to all alerts.
[external_labels: <list of Label> | default = []]
[external_labels: <map of string (labelName) to string (labelValue)> | default = []]

ruler_client:
# gRPC client max receive message size (bytes).
Expand Down Expand Up @@ -5306,6 +5310,16 @@ otel:
[tls_insecure_skip_verify: <boolean> | default = false]
```

### `MaxSeriesPerLabelSet`

```yaml
# The maximum number of active series per LabelSet before replication.
[limit: <int> | default = ]

# LabelSet which the limit should be applied.
[label_set: <map of string (labelName) to string (labelValue)> | default = []]
```

### `PriorityDef`

```yaml
Expand Down Expand Up @@ -5350,11 +5364,3 @@ time_window:
# name of the rule group
[name: <string> | default = ""]
```

### `Label`

```yaml
[name: <string> | default = ""]

[value: <string> | default = ""]
```
47 changes: 35 additions & 12 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,12 @@ func (r tsdbCloseCheckResult) shouldClose() bool {
}

type userTSDB struct {
db *tsdb.DB
userID string
activeSeries *ActiveSeries
seriesInMetric *metricCounter
limiter *Limiter
db *tsdb.DB
userID string
activeSeries *ActiveSeries
seriesInMetric *metricCounter
labelSetCounter *labelSetCounter
limiter *Limiter

instanceSeriesCount *atomic.Int64 // Shared across all userTSDB instances created by ingester.
instanceLimitsFn func() *InstanceLimits
Expand Down Expand Up @@ -399,6 +400,10 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error {
return err
}

if err := u.labelSetCounter.canAddSeriesForLabelSet(context.TODO(), u, metric); err != nil {
return err
}

return nil
}

Expand All @@ -412,6 +417,7 @@ func (u *userTSDB) PostCreation(metric labels.Labels) {
return
}
u.seriesInMetric.increaseSeriesForMetric(metricName)
u.labelSetCounter.increaseSeriesLabelSet(u, metric)
}

// PostDeletion implements SeriesLifecycleCallback interface.
Expand All @@ -425,6 +431,7 @@ func (u *userTSDB) PostDeletion(metrics map[chunks.HeadSeriesRef]labels.Labels)
continue
}
u.seriesInMetric.decreaseSeriesForMetric(metricName)
u.labelSetCounter.decreaseSeriesLabelSet(u, metric)
}
}

Expand Down Expand Up @@ -713,6 +720,15 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe
TSDBState: newTSDBState(bucketClient, registerer),
logger: logger,
}
i.limiter = NewLimiter(
limits,
i.lifecycler,
cfg.DistributorShardingStrategy,
cfg.DistributorShardByAllLabels,
cfg.LifecyclerConfig.RingConfig.ReplicationFactor,
cfg.LifecyclerConfig.RingConfig.ZoneAwarenessEnabled,
cfg.AdminLimitMessage,
)
i.metrics = newIngesterMetrics(registerer,
false,
false,
Expand Down Expand Up @@ -924,6 +940,7 @@ func (i *Ingester) updateActiveSeries() {

userDB.activeSeries.Purge(purgeTime)
i.metrics.activeSeriesPerUser.WithLabelValues(userID).Set(float64(userDB.activeSeries.Active()))
userDB.labelSetCounter.UpdateMetric(userDB, i.metrics.activeSeriesPerLabelSet)
}
}

Expand Down Expand Up @@ -1100,38 +1117,43 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
// of it, so that we can return it back to the distributor, which will return a
// 400 error to the client. The client (Prometheus) will not retry on 400, and
// we actually ingested all samples which haven't failed.
switch cause := errors.Cause(err); cause {
case storage.ErrOutOfBounds:
switch cause := errors.Cause(err); {
case errors.Is(cause, storage.ErrOutOfBounds):
sampleOutOfBoundsCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
continue

case storage.ErrOutOfOrderSample:
case errors.Is(cause, storage.ErrOutOfOrderSample):
sampleOutOfOrderCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
continue

case storage.ErrDuplicateSampleForTimestamp:
case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp):
newValueForTimestampCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
continue

case storage.ErrTooOldSample:
case errors.Is(cause, storage.ErrTooOldSample):
sampleTooOldCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
continue

case errMaxSeriesPerUserLimitExceeded:
case errors.Is(cause, errMaxSeriesPerUserLimitExceeded):
perUserSeriesLimitCount++
updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause)) })
continue

case errMaxSeriesPerMetricLimitExceeded:
case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded):
perMetricSeriesLimitCount++
updateFirstPartial(func() error {
return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
})
continue
case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}):
updateFirstPartial(func() error {
return makeMetricLimitError(perLabelsetSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
})
continue
}

// The error looks an issue on our side, so we should rollback
Expand Down Expand Up @@ -2018,6 +2040,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
userID: userID,
activeSeries: NewActiveSeries(),
seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()),
labelSetCounter: newLabelSetCounter(i.limiter),
ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod),
ingestedRuleSamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod),

Expand Down

0 comments on commit 2948539

Please sign in to comment.