Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes early notifications after config reload (#2492) #3835

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
61 changes: 52 additions & 9 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
d.aggrGroupsNum++
d.metrics.aggrGroups.Inc()

ag.syncFlushInterval(func(ctx context.Context) (time.Time, error) {
return d.stage.LastExecTime(ctx, d.logger)
})

// Insert the 1st alert in the group before starting the group's run()
// function, to make sure that when the run() will be executed the 1st
// alert is already there.
Expand Down Expand Up @@ -429,6 +433,53 @@ func (ag *aggrGroup) String() string {
return ag.GroupKey()
}

func (ag *aggrGroup) syncFlushInterval(n func(ctx context.Context) (time.Time, error)) {
ag.mtx.Lock()
defer ag.mtx.Unlock()

ctx, cancel := context.WithCancel(ag.ctx)
defer cancel()
ctx = ag.populateContext(ctx)

lastTime, err := n(ctx)
if err != nil {
level.Error(ag.logger).Log("msg", "error on determining last exec time", "err", err)
return
}

if lastTime.IsZero() {
return
}

ag.hasFlushed = true
sleepDur := calcDuration(time.Now(), lastTime, ag.opts.GroupInterval)
ag.next.Reset(sleepDur)
}

func calcDuration(now, lastTime time.Time, groupInterval time.Duration) time.Duration {
durationPassed := now.Sub(lastTime)

rest := int64(durationPassed) % int64(groupInterval)
if time.Duration(rest).Milliseconds() <= 1 {
return time.Duration(rest)
}

return time.Duration(groupInterval.Nanoseconds() - rest)
}

func (ag *aggrGroup) populateContext(ctx context.Context) context.Context {
// Populate context with information needed along the pipeline.
ctx = notify.WithGroupInterval(ctx, ag.opts.GroupInterval)
ctx = notify.WithGroupKey(ctx, ag.GroupKey())
ctx = notify.WithGroupLabels(ctx, ag.labels)
ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals)
ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals)
ctx = notify.WithRouteID(ctx, ag.routeID)
return ctx
}

func (ag *aggrGroup) run(nf notifyFunc) {
defer close(ag.done)
defer ag.next.Stop()
Expand All @@ -445,15 +496,7 @@ func (ag *aggrGroup) run(nf notifyFunc) {
// Calculating the current time directly is prone to flaky behavior,
// which usually only becomes apparent in tests.
ctx = notify.WithNow(ctx, now)

// Populate context with information needed along the pipeline.
ctx = notify.WithGroupKey(ctx, ag.GroupKey())
ctx = notify.WithGroupLabels(ctx, ag.labels)
ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals)
ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals)
ctx = notify.WithRouteID(ctx, ag.routeID)
ctx = ag.populateContext(ctx)

// Wait the configured interval before calling flush again.
ag.mtx.Lock()
Expand Down
49 changes: 49 additions & 0 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,10 @@ func (r *recordStage) Alerts() []*types.Alert {
return alerts
}

func (r *recordStage) LastExecTime(ctx context.Context, l log.Logger) (time.Time, error) {
return time.Time{}, nil
}

func (r *recordStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
r.mtx.Lock()
defer r.mtx.Unlock()
Expand Down Expand Up @@ -760,3 +764,48 @@ func TestDispatcher_DoMaintenance(t *testing.T) {
require.False(t, isMuted)
require.Empty(t, mutedBy)
}

func TestCalcDuration(t *testing.T) {
baseTime := time.Date(2024, 5, 26, 14, 33, 12, 100, time.UTC)

tests := map[string]struct {
lastTime time.Time
now time.Time
groupInterval time.Duration

expected time.Duration
}{
"next_tick": {
lastTime: baseTime,
now: baseTime.Add(time.Second * 10),
groupInterval: time.Second * 27,
expected: time.Second * 17,
},
"exact_tick": {
lastTime: baseTime,
now: baseTime.Add(time.Second * 27),
groupInterval: time.Second * 27,
expected: time.Second * 0,
},
"missed_one_exact_tick": {
lastTime: baseTime,
now: baseTime.Add(time.Second * 27 * 2),
groupInterval: time.Second * 27,
expected: time.Second * 0,
},
"missed_few_ticks": {
lastTime: baseTime,
now: baseTime.Add(time.Second * 27 * 3).Add(time.Second * 10),
groupInterval: time.Second * 27,
expected: time.Second * 17,
},
}

for name, tc := range tests {
tc := tc
t.Run(name, func(t *testing.T) {
d := calcDuration(tc.now, tc.lastTime, tc.groupInterval)
require.Equal(t, tc.expected, d)
})
}
}
3 changes: 2 additions & 1 deletion nflog/nflog.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func stateKey(k string, r *pb.Receiver) string {
return fmt.Sprintf("%s:%s", k, receiverKey(r))
}

func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error {
func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration, dispatchTime time.Time) error {
// Write all st with the same timestamp.
now := l.now()
key := stateKey(gkey, r)
Expand Down Expand Up @@ -405,6 +405,7 @@ func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []ui
Timestamp: now,
FiringAlerts: firingAlerts,
ResolvedAlerts: resolvedAlerts,
DispatchTime: dispatchTime,
},
ExpiresAt: expiresAt,
}
Expand Down
2 changes: 1 addition & 1 deletion nflog/nflog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func TestQuery(t *testing.T) {
firingAlerts := []uint64{1, 2, 3}
resolvedAlerts := []uint64{4, 5}

err = nl.Log(recv, "key", firingAlerts, resolvedAlerts, 0)
err = nl.Log(recv, "key", firingAlerts, resolvedAlerts, 0, time.Time{})
require.NoError(t, err, "logging notification failed")

entries, err := nl.Query(QGroupKey("key"), QReceiver(recv))
Expand Down
162 changes: 104 additions & 58 deletions nflog/nflogpb/nflog.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.