Skip to content

Commit

Permalink
Merge pull request #311 from cschleiden/fix-coroutine-deadlock
Browse files Browse the repository at this point in the history
Ensure coroutines exit cleanly
  • Loading branch information
cschleiden committed Jan 23, 2024
2 parents 418ac25 + eb19a62 commit 1375fc2
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 5 deletions.
11 changes: 8 additions & 3 deletions internal/sync/coroutine.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,16 @@ func (s *coState) yield(markBlocking bool) {

s.logger.Println("yielded")

// Wait for the next Execute() call
<-s.unblock

// Once we're here, another Execute() call has been made. s.blocking is empty

if s.shouldExit.Load() != nil {
s.logger.Println("shouldExit")
s.blocking <- true
s.logger.Println("goexit")
s.logger.Println("exiting")

// Goexit runs all deferred functions, which includes calling finish() in the main
// execution function. That marks the coroutine as finished and blocking.
runtime.Goexit()
}

Expand Down
6 changes: 4 additions & 2 deletions internal/sync/coroutine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sync

import (
"errors"
"runtime"
"testing"
"time"

Expand Down Expand Up @@ -119,15 +120,16 @@ func Test_Coroutine_Exit(t *testing.T) {
return nil
})

r := runtime.NumGoroutine()
c.Exit()

require.True(t, c.Finished())
require.Equal(t, r-1, runtime.NumGoroutine())
}

func Test_Coroutine_ExitIfAlreadyFinished(t *testing.T) {
c := NewCoroutine(Background(), func(ctx Context) error {
// Complete immedeiately

// Complete immediately
return nil
})

Expand Down
63 changes: 63 additions & 0 deletions internal/workflow/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"log"
"log/slog"
"runtime"
"testing"
"time"

Expand Down Expand Up @@ -609,6 +610,68 @@ func Test_Executor(t *testing.T) {
})
},
},
{
name: "Close_removes_any_goroutines",
f: func(t *testing.T, r *registry.Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
wf := func(ctx sync.Context) error {
c := wf.NewSignalChannel[int](ctx, "signal")

// Block workflow
c.Receive(ctx)

return nil
}

r.RegisterWorkflow(wf)

task := startWorkflowTask(i.InstanceID, wf)

goRoutines := runtime.NumGoroutine()

_, err := e.ExecuteTask(context.Background(), task)
require.NoError(t, err)

require.Equal(t, goRoutines+1, runtime.NumGoroutine())

e.Close()

require.Equal(t, goRoutines, runtime.NumGoroutine())
},
},
{
name: "Close_removes_any_goroutines_nested",
f: func(t *testing.T, r *registry.Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
wf := func(ctx sync.Context) error {
c := wf.NewSignalChannel[int](ctx, "signal")

wf.Go(ctx, func(ctx wf.Context) {
c.Receive(ctx)
})

// Block workflow
c.Receive(ctx)

return nil
}

r.RegisterWorkflow(wf)

task := startWorkflowTask(i.InstanceID, wf)

goRoutines := runtime.NumGoroutine()

_, err := e.ExecuteTask(context.Background(), task)
require.NoError(t, err)

// Expect two pending goroutines
require.Equal(t, goRoutines+2, runtime.NumGoroutine())

e.Close()

// Expect them to be removed
require.Equal(t, goRoutines, runtime.NumGoroutine())
},
},
}

for _, tt := range tests {
Expand Down

0 comments on commit 1375fc2

Please sign in to comment.