Skip to content

Commit

Permalink
Fixes early notifications after config reload (#2492)
Browse files Browse the repository at this point in the history
Add an acceptance test that triggers a config reload and verifies
that no early notification is occurring.

One of the challenges is which time to use to check for a previous
notification. The nflog captures about the time all notifications
were sent. That conflicts with the ag.next timer that get's reset
before the ag is being flushed. Delays and retries can make these
two point in time be different enough for the integration tests to
fail.

I considered the following ways to fix it:

  1.) Modify the nflog.proto to capture the flush time in addition
      to the successful notification time.
  2.) In addition to hasFlushed capture the last flush time and pass
      it to the context like we do for Now.
  3.) Set hashFlushed and reset the timer after the flush has been
      done.

I started with #3 as it seemeded to have the fewest downsides with
things like drift.

needsUpdate is based on:
#3074 (comment)

Signed-off-by: Holger Hans Peter Freyther <holger@freyther.de>
  • Loading branch information
zecke committed May 5, 2024
1 parent d7ad5e1 commit e1464ba
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 9 deletions.
9 changes: 5 additions & 4 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,23 +441,24 @@ func (ag *aggrGroup) run(nf notifyFunc) {
ctx = notify.WithNow(ctx, now)

// Populate context with information needed along the pipeline.
ctx = notify.WithGroupInterval(ctx, ag.opts.GroupInterval)
ctx = notify.WithGroupKey(ctx, ag.GroupKey())
ctx = notify.WithGroupLabels(ctx, ag.labels)
ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals)
ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals)

ag.flush(func(alerts ...*types.Alert) bool {
return nf(ctx, alerts...)
})

// Wait the configured interval before calling flush again.
ag.mtx.Lock()
ag.next.Reset(ag.opts.GroupInterval)
ag.hasFlushed = true
ag.mtx.Unlock()

ag.flush(func(alerts ...*types.Alert) bool {
return nf(ctx, alerts...)
})

cancel()

case <-ag.ctx.Done():
Expand Down
31 changes: 27 additions & 4 deletions notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type notifyKey int
const (
keyReceiverName notifyKey = iota
keyRepeatInterval
keyGroupInterval
keyGroupLabels
keyGroupKey
keyFiringAlerts
Expand Down Expand Up @@ -156,6 +157,11 @@ func WithRepeatInterval(ctx context.Context, t time.Duration) context.Context {
return context.WithValue(ctx, keyRepeatInterval, t)
}

// WithGroupInterval populates a context with a repeat interval.
func WithGroupInterval(ctx context.Context, t time.Duration) context.Context {
return context.WithValue(ctx, keyGroupInterval, t)
}

// WithMuteTimeIntervals populates a context with a slice of mute time names.
func WithMuteTimeIntervals(ctx context.Context, mt []string) context.Context {
return context.WithValue(ctx, keyMuteTimeIntervals, mt)
Expand All @@ -172,6 +178,13 @@ func RepeatInterval(ctx context.Context) (time.Duration, bool) {
return v, ok
}

// GroupInterval extracts a group interval from the context. Iff none exists, the
// second argument is false.
func GroupInterval(ctx context.Context) (time.Duration, bool) {
v, ok := ctx.Value(keyGroupInterval).(time.Duration)
return v, ok
}

// ReceiverName extracts a receiver name from the context. Iff none exists, the
// second argument is false.
func ReceiverName(ctx context.Context) (string, bool) {
Expand Down Expand Up @@ -651,14 +664,16 @@ func hashAlert(a *types.Alert) uint64 {
return hash
}

func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) bool {
func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, dispatchTime time.Time, repeat, groupInterval time.Duration) bool {
// If we haven't notified about the alert group before, notify right away
// unless we only have resolved alerts.
if entry == nil {
return len(firing) > 0
}

if !entry.IsFiringSubset(firing) {
groupIntervalMuted := len(entry.FiringAlerts) > 0 && entry.Timestamp.After(dispatchTime.Add(-groupInterval))

if !entry.IsFiringSubset(firing) && !groupIntervalMuted {
return true
}

Expand All @@ -673,7 +688,7 @@ func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint
return len(entry.FiringAlerts) > 0
}

if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) {
if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) && !groupIntervalMuted {
return true
}

Expand All @@ -692,6 +707,14 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al
if !ok {
return ctx, nil, errors.New("repeat interval missing")
}
groupInterval, ok := GroupInterval(ctx)
if !ok {
return ctx, nil, errors.New("group interval missing")
}
now, ok := Now(ctx)
if !ok {
return ctx, nil, errors.New("dispatch time missing")
}

firingSet := map[uint64]struct{}{}
resolvedSet := map[uint64]struct{}{}
Expand Down Expand Up @@ -727,7 +750,7 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al
return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries))
}

if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {
if n.needsUpdate(entry, firingSet, resolvedSet, now, repeatInterval, groupInterval) {
return ctx, alerts, nil
}
return ctx, nil, nil
Expand Down
12 changes: 11 additions & 1 deletion notify/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func TestDedupStageNeedsUpdate(t *testing.T) {
now: func() time.Time { return now },
rs: sendResolved(c.resolve),
}
res := s.needsUpdate(c.entry, c.firingAlerts, c.resolvedAlerts, c.repeat)
res := s.needsUpdate(c.entry, c.firingAlerts, c.resolvedAlerts, now, c.repeat, time.Second*0)
require.Equal(t, c.res, res)
}
}
Expand Down Expand Up @@ -241,6 +241,16 @@ func TestDedupStage(t *testing.T) {

ctx = WithRepeatInterval(ctx, time.Hour)

_, _, err = s.Exec(ctx, log.NewNopLogger())
require.EqualError(t, err, "group interval missing")

ctx = WithGroupInterval(ctx, time.Second*0)

_, _, err = s.Exec(ctx, log.NewNopLogger())
require.EqualError(t, err, "dispatch time missing")

ctx = WithNow(ctx, now)

alerts := []*types.Alert{{}, {}, {}}

// Must catch notification log query errors.
Expand Down
43 changes: 43 additions & 0 deletions test/cli/acceptance/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,46 @@ receivers:
require.EqualError(t, err, "exit status 1")
require.Equal(t, "amtool: error: Failed to parse labels: unexpected open or close brace: {foo=bar}\n\n", string(out))
}

func TestGroupingOnConfigReload(t *testing.T) {
t.Parallel()

conf := `
route:
receiver: "default"
group_by: [alertname]
group_wait: 2s
group_interval: 1h
repeat_interval: 4h
receivers:
- name: "default"
webhook_configs:
- url: 'http://%s'
send_resolved: true
`

at := NewAcceptanceTest(t, &AcceptanceOpts{
Tolerance: 150 * time.Millisecond,
})
co := at.Collector("webhook")
wh := NewWebhook(co)

amc := at.AlertmanagerCluster(fmt.Sprintf(conf, wh.Address()), 1)
am := amc.Members()[0]

alert1 := Alert("alertname", "test1", "tag", "one").Active(1, 400)
am.AddAlertsAt(false, 0, alert1)
co.Want(Between(1, 3), Alert("alertname", "test1", "tag", "one").Active(1))

alert2 := Alert("alertname", "test1", "tag", "two").Active(4, 402)
am.AddAlertsAt(false, 4, alert2)
co.Want(Between(4, 8))

// Force a config re-load
at.Do(5, func() { amc.Reload() })

at.Run()

t.Log(co.Check())
}

0 comments on commit e1464ba

Please sign in to comment.