diff --git a/data/binding/queue.go b/data/binding/queue.go index 7587a88906..cafaed58d2 100644 --- a/data/binding/queue.go +++ b/data/binding/queue.go @@ -1,6 +1,6 @@ package binding -var itemQueue = make(chan itemData, 1024) +var itemQueueIn, itemQueueOut = makeInfiniteQueue() type itemData struct { fn func() @@ -8,16 +8,50 @@ type itemData struct { } func queueItem(f func()) { - itemQueue <- itemData{fn: f} + itemQueueIn <- &itemData{fn: f} } func init() { go processItems() } +func makeInfiniteQueue() (chan<- *itemData, <-chan *itemData) { + in := make(chan *itemData) + out := make(chan *itemData) + go func() { + queued := make([]*itemData, 0, 1024) + pending := func() chan *itemData { + if len(queued) == 0 { + return nil + } + return out + } + next := func() *itemData { + if len(queued) == 0 { + return nil + } + return queued[0] + } + for len(queued) > 0 || in != nil { + select { + case val, ok := <-in: + if !ok { + in = nil + } else { + queued = append(queued, val) + } + case pending() <- next(): + queued = queued[1:] + } + } + close(out) + }() + return in, out +} + func processItems() { for { - i := <-itemQueue + i := <-itemQueueOut if i.fn != nil { i.fn() } @@ -29,6 +63,6 @@ func processItems() { func waitForItems() { done := make(chan interface{}) - itemQueue <- itemData{done: done} + itemQueueIn <- &itemData{done: done} <-done } diff --git a/data/binding/queue_test.go b/data/binding/queue_test.go index 41e213cce3..71e71dc38e 100644 --- a/data/binding/queue_test.go +++ b/data/binding/queue_test.go @@ -1,6 +1,7 @@ package binding import ( + "sync" "testing" "github.com/stretchr/testify/assert" @@ -18,3 +19,25 @@ func TestQueueItem(t *testing.T) { waitForItems() assert.Equal(t, 2, called) } + +func TestMakeInfiniteQueue(t *testing.T) { + var wg sync.WaitGroup + in, out := makeInfiniteQueue() + + wg.Add(1) + c := 0 + go func() { + for range out { + c++ + } + wg.Done() + }() + + for i := 0; i < 2048; i++ { + in <- &itemData{} + } + close(in) + + wg.Wait() + assert.Equal(t, 2048, c) +}