Skip to content

Commit

Permalink
notify: Use the DispatchTime and now as base for the repeat
Browse files Browse the repository at this point in the history
Use the dispatch time instead of the last successful notification
for the repeat interval. This avoids a drift and also keeps the
same time during notifications.
  • Loading branch information
zecke committed May 18, 2024
1 parent 146f09e commit 91dfd29
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 25 deletions.
8 changes: 1 addition & 7 deletions notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,6 @@ type DedupStage struct {
nflog NotificationLog
recv *nflogpb.Receiver

now func() time.Time
hash func(*types.Alert) uint64
}

Expand All @@ -672,15 +671,10 @@ func NewDedupStage(rs ResolvedSender, l NotificationLog, recv *nflogpb.Receiver)
rs: rs,
nflog: l,
recv: recv,
now: utcNow,
hash: hashAlert,
}
}

func utcNow() time.Time {
return time.Now().UTC()
}

// Wrap a slice in a struct so we can store a pointer in sync.Pool.
type hashBuffer struct {
buf []byte
Expand Down Expand Up @@ -745,7 +739,7 @@ func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint
}

// Nothing changed, only notify if the repeat interval has passed.
return entry.Timestamp.Before(n.now().Add(-repeat))
return entry.DispatchTime.Before(dispatchTime.Add(-repeat))
}

// NextExecTime implements the Stage interface.
Expand Down
33 changes: 15 additions & 18 deletions notify/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestDedupNextExecTime(t *testing.T) {
}

func TestDedupStageNeedsUpdate(t *testing.T) {
now := utcNow()
now := time.Now().UTC()

cases := []struct {
entry *nflogpb.Entry
Expand Down Expand Up @@ -168,15 +168,15 @@ func TestDedupStageNeedsUpdate(t *testing.T) {
// Zero timestamp in the nflog entry should always update.
entry: &nflogpb.Entry{
FiringAlerts: []uint64{1, 2, 3},
Timestamp: time.Time{},
DispatchTime: time.Time{},
},
firingAlerts: alertHashSet(1, 2, 3),
res: true,
}, {
// Identical sets of alerts shouldn't update before repeat_interval.
entry: &nflogpb.Entry{
FiringAlerts: []uint64{1, 2, 3},
Timestamp: now.Add(-9 * time.Minute),
DispatchTime: now.Add(-9 * time.Minute),
},
repeat: 10 * time.Minute,
firingAlerts: alertHashSet(1, 2, 3),
Expand All @@ -185,7 +185,7 @@ func TestDedupStageNeedsUpdate(t *testing.T) {
// Identical sets of alerts should update after repeat_interval.
entry: &nflogpb.Entry{
FiringAlerts: []uint64{1, 2, 3},
Timestamp: now.Add(-11 * time.Minute),
DispatchTime: now.Add(-11 * time.Minute),
},
repeat: 10 * time.Minute,
firingAlerts: alertHashSet(1, 2, 3),
Expand All @@ -194,7 +194,7 @@ func TestDedupStageNeedsUpdate(t *testing.T) {
// Different sets of resolved alerts without firing alerts shouldn't update after repeat_interval.
entry: &nflogpb.Entry{
ResolvedAlerts: []uint64{1, 2, 3},
Timestamp: now.Add(-11 * time.Minute),
DispatchTime: now.Add(-11 * time.Minute),
},
repeat: 10 * time.Minute,
resolvedAlerts: alertHashSet(3, 4, 5),
Expand All @@ -205,7 +205,7 @@ func TestDedupStageNeedsUpdate(t *testing.T) {
entry: &nflogpb.Entry{
FiringAlerts: []uint64{1, 2},
ResolvedAlerts: []uint64{3},
Timestamp: now.Add(-9 * time.Minute),
DispatchTime: now.Add(-9 * time.Minute),
},
repeat: 10 * time.Minute,
firingAlerts: alertHashSet(1),
Expand All @@ -217,7 +217,7 @@ func TestDedupStageNeedsUpdate(t *testing.T) {
entry: &nflogpb.Entry{
FiringAlerts: []uint64{1, 2},
ResolvedAlerts: []uint64{3},
Timestamp: now.Add(-9 * time.Minute),
DispatchTime: now.Add(-9 * time.Minute),
},
repeat: 10 * time.Minute,
firingAlerts: alertHashSet(1),
Expand All @@ -229,7 +229,7 @@ func TestDedupStageNeedsUpdate(t *testing.T) {
entry: &nflogpb.Entry{
FiringAlerts: []uint64{1, 2},
ResolvedAlerts: []uint64{3},
Timestamp: now.Add(-9 * time.Minute),
DispatchTime: now.Add(-9 * time.Minute),
},
repeat: 10 * time.Minute,
firingAlerts: alertHashSet(),
Expand All @@ -241,7 +241,7 @@ func TestDedupStageNeedsUpdate(t *testing.T) {
entry: &nflogpb.Entry{
FiringAlerts: []uint64{1, 2},
ResolvedAlerts: []uint64{3},
Timestamp: now.Add(-9 * time.Minute),
DispatchTime: now.Add(-9 * time.Minute),
},
repeat: 10 * time.Minute,
firingAlerts: alertHashSet(),
Expand All @@ -254,8 +254,7 @@ func TestDedupStageNeedsUpdate(t *testing.T) {
t.Log("case", i)

s := &DedupStage{
now: func() time.Time { return now },
rs: sendResolved(c.resolve),
rs: sendResolved(c.resolve),
}
res := s.needsUpdate(c.entry, c.firingAlerts, c.resolvedAlerts, now, c.repeat, time.Second*0)
require.Equal(t, c.res, res)
Expand All @@ -264,16 +263,14 @@ func TestDedupStageNeedsUpdate(t *testing.T) {

func TestDedupStage(t *testing.T) {
i := 0
now := utcNow()
now := time.Now().UTC()

s := &DedupStage{
hash: func(a *types.Alert) uint64 {
res := uint64(i)
i++
return res
},
now: func() time.Time {
return now
},
rs: sendResolved(false),
}

Expand Down Expand Up @@ -333,7 +330,7 @@ func TestDedupStage(t *testing.T) {
qres: []*nflogpb.Entry{
{
FiringAlerts: []uint64{0, 1, 2},
Timestamp: now,
DispatchTime: now,
},
},
}
Expand All @@ -348,7 +345,7 @@ func TestDedupStage(t *testing.T) {
qres: []*nflogpb.Entry{
{
FiringAlerts: []uint64{1, 2, 3, 4},
Timestamp: now,
DispatchTime: now,
},
},
}
Expand Down Expand Up @@ -777,7 +774,7 @@ func TestMuteStageWithSilences(t *testing.T) {
t.Fatal(err)
}
silID, err := silences.Set(&silencepb.Silence{
EndsAt: utcNow().Add(time.Hour),
EndsAt: time.Now().UTC().Add(time.Hour),
Matchers: []*silencepb.Matcher{{Name: "mute", Pattern: "me"}},
})
if err != nil {
Expand Down

0 comments on commit 91dfd29

Please sign in to comment.