Skip to content

Commit

Permalink
Merge pull request #59828 from krousey/shared_informer_race
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Fix a race condition in SharedInformer

**What this PR does / why we need it**:

This fixes a race condition that can occur in the `sharedIndexInformer`

**Which issue(s) this PR fixes**:
Fixes #59822

**Release note**:

```release-note
Fixed a race condition in k8s.io/client-go/tools/cache.SharedInformer that could violate the sequential delivery guarantee and cause panics on shutdown.
```
  • Loading branch information
Kubernetes Submit Queue committed Feb 14, 2018
2 parents 603ebe4 + 3c36d9e commit 6590ea6
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
17 changes: 7 additions & 10 deletions staging/src/k8s.io/client-go/tools/cache/shared_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()

s.processor.addAndStartListener(listener)
s.processor.addListener(listener)
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
}
Expand Down Expand Up @@ -373,27 +373,23 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
}

type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
listeners []*processorListener
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}

func (p *sharedProcessor) addAndStartListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()

p.addListenerLocked(listener)
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}

func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()

p.addListenerLocked(listener)
if p.listenersStarted {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}

func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
Expand Down Expand Up @@ -424,6 +420,7 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
Expand Down
12 changes: 12 additions & 0 deletions staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,15 @@ func TestResyncCheckPeriod(t *testing.T) {
t.Errorf("expected %d, got %d", e, a)
}
}

// verify that https://github.com/kubernetes/kubernetes/issues/59822 is fixed
func TestSharedInformerInitializationRace(t *testing.T) {
source := fcache.NewFakeControllerSource()
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
listener := newTestListener("raceListener", 0)

stop := make(chan struct{})
go informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
go informer.Run(stop)
close(stop)
}

0 comments on commit 6590ea6

Please sign in to comment.