From 04bd87475a866c2e8aeea58f8ed9a3647a85e9a5 Mon Sep 17 00:00:00 2001 From: Andy Williams Date: Sun, 21 Feb 2021 17:29:50 +0000 Subject: [PATCH 1/2] move to an infinite queue for binding callbacks Fixes #1838 --- data/binding/queue.go | 42 ++++++++++++++++++++++++++++++++++---- data/binding/queue_test.go | 23 +++++++++++++++++++++ 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/data/binding/queue.go b/data/binding/queue.go index 7587a88906..0e353ec3a9 100644 --- a/data/binding/queue.go +++ b/data/binding/queue.go @@ -1,6 +1,6 @@ package binding -var itemQueue = make(chan itemData, 1024) +var itemQueueIn, itemQueueOut = makeInfiniteQueue() type itemData struct { fn func() @@ -8,16 +8,50 @@ type itemData struct { } func queueItem(f func()) { - itemQueue <- itemData{fn: f} + itemQueueIn <- &itemData{fn: f} } func init() { go processItems() } +func makeInfiniteQueue() (chan<- *itemData, <-chan *itemData) { + in := make(chan *itemData) + out := make(chan *itemData) + go func() { + var queued []*itemData + pending := func() chan *itemData { + if len(queued) == 0 { + return nil + } + return out + } + next := func() *itemData { + if len(queued) == 0 { + return nil + } + return queued[0] + } + for len(queued) > 0 || in != nil { + select { + case val, ok := <-in: + if !ok { + in = nil + } else { + queued = append(queued, val) + } + case pending() <- next(): + queued = queued[1:] + } + } + close(out) + }() + return in, out +} + func processItems() { for { - i := <-itemQueue + i := <-itemQueueOut if i.fn != nil { i.fn() } @@ -29,6 +63,6 @@ func processItems() { func waitForItems() { done := make(chan interface{}) - itemQueue <- itemData{done: done} + itemQueueIn <- &itemData{done: done} <-done } diff --git a/data/binding/queue_test.go b/data/binding/queue_test.go index 41e213cce3..71e71dc38e 100644 --- a/data/binding/queue_test.go +++ b/data/binding/queue_test.go @@ -1,6 +1,7 @@ package binding import ( + "sync" "testing" "github.com/stretchr/testify/assert" @@ -18,3 +19,25 @@ func TestQueueItem(t *testing.T) { waitForItems() assert.Equal(t, 2, called) } + +func TestMakeInfiniteQueue(t *testing.T) { + var wg sync.WaitGroup + in, out := makeInfiniteQueue() + + wg.Add(1) + c := 0 + go func() { + for range out { + c++ + } + wg.Done() + }() + + for i := 0; i < 2048; i++ { + in <- &itemData{} + } + close(in) + + wg.Wait() + assert.Equal(t, 2048, c) +} From 69431e1eab14a2ee9bf54f45a1f8ddbf8e32d637 Mon Sep 17 00:00:00 2001 From: Andy Williams Date: Fri, 26 Feb 2021 18:08:10 +0000 Subject: [PATCH 2/2] Initialise queue to a certain capacity This should make small apps just as performant as before the change --- data/binding/queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data/binding/queue.go b/data/binding/queue.go index 0e353ec3a9..cafaed58d2 100644 --- a/data/binding/queue.go +++ b/data/binding/queue.go @@ -19,7 +19,7 @@ func makeInfiniteQueue() (chan<- *itemData, <-chan *itemData) { in := make(chan *itemData) out := make(chan *itemData) go func() { - var queued []*itemData + queued := make([]*itemData, 0, 1024) pending := func() chan *itemData { if len(queued) == 0 { return nil