Skip to content

Commit

Permalink
Add client side event spam filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
derekwaynecarr committed Jun 12, 2017
1 parent 695d438 commit de86cfc
Showing 1 changed file with 113 additions and 4 deletions.
117 changes: 113 additions & 4 deletions staging/src/k8s.io/client-go/tools/record/events_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package record
import (
"encoding/json"
"fmt"
"math"
"strings"
"sync"
"time"
Expand All @@ -39,6 +40,13 @@ const (
// more than 10 times in a 10 minute period, aggregate the event
defaultAggregateMaxEvents = 10
defaultAggregateIntervalInSeconds = 600

// if we see the same event (source, object, uid, type, reason, message)
// more than 10 times in a 120 second window, only sink that event
// to the server every 60s.
defaultSpamMaxEvents = 10
defaultSpamIntervalInSeconds = 120
defaultSpamSyncIntervalSeconds = 60
)

// getEventKey builds unique event key based on source, involvedObject, reason, message
Expand Down Expand Up @@ -66,6 +74,105 @@ func DefaultEventFilterFunc(event *v1.Event) bool {
return false
}

// EventSpamFilter is responsible for filtering spam events.
type EventSpamFilter struct {
sync.RWMutex

// the cache that manages last synced state
cache *lru.Cache

// the maximum number of same events that must occur before we begin filtering.
maxEvents int

// the amount of time in seconds that must transpire between messages to not be considered potential spam
maxIntervalInSeconds int

// the amount of time in seconds that transpire for a spam event to be allowed to sync (i.e. to bump the count)
syncIntervalInSeconds int

// clock is used to allow for testing over a time interval
clock clock.Clock
}

// NewEventSpamFilter knows how to perform spam filtering of matching events.
func NewEventSpamFilter(lruCacheSize, maxEvents, maxIntervalInSeconds, syncIntervalSeconds int, clock clock.Clock) *EventSpamFilter {
return &EventSpamFilter{
cache: lru.New(lruCacheSize),
maxEvents: maxEvents,
maxIntervalInSeconds: maxIntervalInSeconds,
syncIntervalInSeconds: syncIntervalSeconds,
clock: clock,
}
}

// spamRecord holds data used to perform spam filtering decisions.
type spamRecord struct {
// the amount of times this event appeared
numEvents int
// the last time at which the event was recorded.
lastTimestamp metav1.Time
// the last time at which the event was synced (to bump the count)
lastSynced metav1.Time
}

// Filter checks if the same event has been seen multiple times
// in a sample window. If it was seen more than the allowed number of
// times in the rolling window, it will only let an event through
// every sinkInterval.
func (f *EventSpamFilter) Filter(event *v1.Event) bool {
var record spamRecord

now := metav1.NewTime(f.clock.Now())

// event key is the full cache key for this event
eventKey := getEventKey(event)

// do we have a record of similar events in our cache?
f.Lock()
defer f.Unlock()
value, found := f.cache.Get(eventKey)
if found {
record = value.(spamRecord)
}

// Is the previous record too old? If so, make a fresh one. Note: if we didn't
// find a similar record, its lastTimestamp will be the zero value, so we
// create a new one in that case.
maxInterval := time.Duration(f.maxIntervalInSeconds) * time.Second
interval := now.Time.Sub(record.lastTimestamp.Time)
if interval > maxInterval {
// enough time has transpired, we create a new record
record = spamRecord{}
}

// Write the new event into the spam record and put it on the cache
// protect against overflow
if record.numEvents < math.MaxInt32 {
record.numEvents = record.numEvents + 1
}
record.lastTimestamp = now

// filter the event if it occurs too frequently
filter := false
if !record.lastSynced.Time.IsZero() && record.numEvents > f.maxEvents {
filter = true

maxSyncInterval := time.Duration(f.syncIntervalInSeconds) * time.Second
syncInterval := now.Time.Sub(record.lastSynced.Time)
if syncInterval > maxSyncInterval {
filter = false
}
}

if !filter {
record.lastSynced = now
}

f.cache.Add(eventKey, record)

return filter
}

// EventAggregatorKeyFunc is responsible for grouping events for aggregation
// It returns a tuple of the following:
// aggregateKey - key the identifies the aggregate group to bucket this event
Expand Down Expand Up @@ -336,18 +443,20 @@ type EventCorrelateResult struct {
// prior to interacting with the API server to record the event.
//
// The default behavior is as follows:
// * No events are filtered from being recorded
// * Aggregation is performed if a similar event is recorded 10 times in a
// in a 10 minute rolling interval. A similar event is an event that varies only by
// the Event.Message field. Rather than recording the precise event, aggregation
// will create a new event whose message reports that it has combined events with
// the same reason.
// * Events are incrementally counted if the exact same event is encountered multiple
// times.
// * If the same event occurs more than 10 times in a 10 minute rolling interval,
// we only record the event every 5 minutes with the latest observed count.
func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
cacheSize := maxLruCacheEntries
spamFilter := NewEventSpamFilter(cacheSize, defaultSpamMaxEvents, defaultSpamIntervalInSeconds, defaultSpamSyncIntervalSeconds, clock)
return &EventCorrelator{
filterFunc: DefaultEventFilterFunc,
filterFunc: spamFilter.Filter,
aggregator: NewEventAggregator(
cacheSize,
EventAggregatorByReasonFunc,
Expand All @@ -362,11 +471,11 @@ func NewEventCorrelator(clock clock.Clock) *EventCorrelator {

// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
if c.filterFunc(newEvent) {
return &EventCorrelateResult{Skip: true}, nil
}
aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}

Expand Down

0 comments on commit de86cfc

Please sign in to comment.