Skip to content

Commit

Permalink
Fixes early notifications after config reload (#2492)
Browse files Browse the repository at this point in the history
Add an acceptance test that triggers a config reload and verifies
that no early notification is occurring.

One of the challenges is which time to use to check for a previous
notification. The nflog captures about the time all notifications
were sent. That conflicts with the ag.next timer that get's reset
before the ag is being flushed. Delays and retries can make these
two point in time be different enough for the integration tests to
fail.

I considered the following ways to fix it:

  1.) Modify the nflog.proto to capture the flush time in addition
      to the successful notification time.
  2.) In addition to hasFlushed capture the last flush time and pass
      it to the context like we do for Now.
  3.) Set hashFlushed and reset the timer after the flush has been
      done.

I started with #3 as it seemeded to have the fewest downsides with
things like drift. Based on comments this is no #1.

needsUpdate is based on:
#3074 (comment)

Signed-off-by: Holger Hans Peter Freyther <holger@freyther.de>
  • Loading branch information
zecke committed May 10, 2024
1 parent c9f73e3 commit e4c97e3
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 68 deletions.
1 change: 1 addition & 0 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ func (ag *aggrGroup) run(nf notifyFunc) {
ctx = notify.WithNow(ctx, now)

// Populate context with information needed along the pipeline.
ctx = notify.WithGroupInterval(ctx, ag.opts.GroupInterval)
ctx = notify.WithGroupKey(ctx, ag.GroupKey())
ctx = notify.WithGroupLabels(ctx, ag.labels)
ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
Expand Down
3 changes: 2 additions & 1 deletion nflog/nflog.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func stateKey(k string, r *pb.Receiver) string {
return fmt.Sprintf("%s:%s", k, receiverKey(r))
}

func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration, dispatchTime time.Time) error {
// Write all st with the same timestamp.
now := l.now()
key := stateKey(gkey, r)
Expand Down Expand Up @@ -405,6 +405,7 @@ func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []ui
Timestamp: now,
FiringAlerts: firingAlerts,
ResolvedAlerts: resolvedAlerts,
DispatchTime: dispatchTime,
},
ExpiresAt: expiresAt,
}
Expand Down
2 changes: 1 addition & 1 deletion nflog/nflog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func TestQuery(t *testing.T) {
firingAlerts := []uint64{1, 2, 3}
resolvedAlerts := []uint64{4, 5}

err = nl.Log(recv, "key", firingAlerts, resolvedAlerts, 0)
err = nl.Log(recv, "key", firingAlerts, resolvedAlerts, 0, time.Time{})
require.NoError(t, err, "logging notification failed")

entries, err := nl.Query(QGroupKey("key"), QReceiver(recv))
Expand Down
162 changes: 104 additions & 58 deletions nflog/nflogpb/nflog.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions nflog/nflogpb/nflog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ message Entry {
repeated uint64 firing_alerts = 6;
// ResolvedAlerts list of hashes of resolved alerts at the last notification time.
repeated uint64 resolved_alerts = 7;
// Timestamp of the last time the notifications started.
google.protobuf.Timestamp dispatch_time = 8 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
}

// MeshEntry is a wrapper message to communicate a notify log
Expand Down
40 changes: 34 additions & 6 deletions notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type notifyKey int
const (
keyReceiverName notifyKey = iota
keyRepeatInterval
keyGroupInterval
keyGroupLabels
keyGroupKey
keyFiringAlerts
Expand Down Expand Up @@ -156,6 +157,11 @@ func WithRepeatInterval(ctx context.Context, t time.Duration) context.Context {
return context.WithValue(ctx, keyRepeatInterval, t)
}

// WithGroupInterval populates a context with a repeat interval.
func WithGroupInterval(ctx context.Context, t time.Duration) context.Context {
return context.WithValue(ctx, keyGroupInterval, t)
}

// WithMuteTimeIntervals populates a context with a slice of mute time names.
func WithMuteTimeIntervals(ctx context.Context, mt []string) context.Context {
return context.WithValue(ctx, keyMuteTimeIntervals, mt)
Expand All @@ -172,6 +178,13 @@ func RepeatInterval(ctx context.Context) (time.Duration, bool) {
return v, ok
}

// GroupInterval extracts a group interval from the context. Iff none exists, the
// second argument is false.
func GroupInterval(ctx context.Context) (time.Duration, bool) {
v, ok := ctx.Value(keyGroupInterval).(time.Duration)
return v, ok
}

// ReceiverName extracts a receiver name from the context. Iff none exists, the
// second argument is false.
func ReceiverName(ctx context.Context) (string, bool) {
Expand Down Expand Up @@ -242,7 +255,7 @@ func (f StageFunc) Exec(ctx context.Context, l log.Logger, alerts ...*types.Aler
}

type NotificationLog interface {
Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error
Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration, dispatchTime time.Time) error
Query(params ...nflog.QueryParam) ([]*nflogpb.Entry, error)
}

Expand Down Expand Up @@ -651,14 +664,16 @@ func hashAlert(a *types.Alert) uint64 {
return hash
}

func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) bool {
func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, dispatchTime time.Time, repeat, groupInterval time.Duration) bool {
// If we haven't notified about the alert group before, notify right away
// unless we only have resolved alerts.
if entry == nil {
return len(firing) > 0
}

if !entry.IsFiringSubset(firing) {
groupIntervalMuted := len(entry.FiringAlerts) > 0 && entry.DispatchTime.After(dispatchTime.Add(-groupInterval))

if !entry.IsFiringSubset(firing) && !groupIntervalMuted {
return true
}

Expand All @@ -673,7 +688,7 @@ func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint
return len(entry.FiringAlerts) > 0
}

if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) {
if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) && !groupIntervalMuted {
return true
}

Expand All @@ -692,6 +707,14 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al
if !ok {
return ctx, nil, errors.New("repeat interval missing")
}
groupInterval, ok := GroupInterval(ctx)
if !ok {
return ctx, nil, errors.New("group interval missing")
}
now, ok := Now(ctx)
if !ok {
return ctx, nil, errors.New("dispatch time missing")
}

firingSet := map[uint64]struct{}{}
resolvedSet := map[uint64]struct{}{}
Expand Down Expand Up @@ -727,7 +750,7 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al
return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries))
}

if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {
if n.needsUpdate(entry, firingSet, resolvedSet, now, repeatInterval, groupInterval) {
return ctx, alerts, nil
}
return ctx, nil, nil
Expand Down Expand Up @@ -918,7 +941,12 @@ func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*typ
}
expiry := 2 * repeat

return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved, expiry)
now, ok := Now(ctx)
if !ok {
return ctx, nil, errors.New("now time missing")
}

return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved, expiry, now)
}

type timeStage struct {
Expand Down

0 comments on commit e4c97e3

Please sign in to comment.