Skip to content

Commit

Permalink
Shutdowner: Support calling from fx.Invoke (#1076)
Browse files Browse the repository at this point in the history
Stacked on top of:

- #1081
- #1082

However, since I can't push branches directly to this repository,
this PR shows commits from all PRs.

---

App.Start nils out the "last" signal recorded by signalReceivers,
which it otherwise broadcasts to waiters if it was already received.
This is unnecessary especially because there's a discrepancy in behavior
of using App.Start vs App.Run when shutting down from fx.Invoke.

Given a program that calls Shutdown from fx.Invoke,
when we do:

    app := fx.New(...)

The shutdowner has already sent the signal, and signalReceivers has
already recorded it.
At that point, whether we call App.Start or App.Run changes behavior:

- If we call App.Run, that calls App.Done (or App.Wait after #1075),
  which gives it back a channel that already has the signal filled in.
  It then calls App.Start, waits on the channel--which returns
  immediately--and then calls App.Stop.
- If we call App.Start and App.Wait, on the other hand,
  Start will clear the signal recorded in signalReceivers,
  and then App.Wait will build a channel that will block indefinitely
  because Shutdowner.Shutdown will not be called again.

So even though App.Run() and App.Start()+App.Wait() are meant to be
equivalent, this causes a serious discrepancy in behavior.
It makes sense to resolve this by supporting Shutdown from Invoke.

Refs #1074

---------

Co-authored-by: Sung Yoon Whang <sungyoon@uber.com>
  • Loading branch information
abhinav and sywhang committed May 8, 2023
1 parent 57e421a commit 7174fde
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 7 deletions.
6 changes: 3 additions & 3 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,20 +574,20 @@ func (app *App) Run() {
// Historically, we do not os.Exit(0) even though most applications
// cede control to Fx with they call app.Run. To avoid a breaking
// change, never os.Exit for success.
if code := app.run(app.Wait()); code != 0 {
if code := app.run(app.Wait); code != 0 {
app.exit(code)
}
}

func (app *App) run(done <-chan ShutdownSignal) (exitCode int) {
func (app *App) run(done func() <-chan ShutdownSignal) (exitCode int) {
startCtx, cancel := app.clock.WithTimeout(context.Background(), app.StartTimeout())
defer cancel()

if err := app.Start(startCtx); err != nil {
return 1
}

sig := <-done
sig := <-done()
app.log().LogEvent(&fxevent.Stopping{Signal: sig.Signal})
exitCode = sig.ExitCode

Expand Down
2 changes: 1 addition & 1 deletion app_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestAppRun(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
app.run(done)
app.run(func() <-chan ShutdownSignal { return done })
}()

done <- ShutdownSignal{Signal: _sigINT}
Expand Down
54 changes: 54 additions & 0 deletions internal/e2e/shutdowner_wait_exitcode/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) 2023 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package main

import (
"context"
"log"
"os"
"time"

"go.uber.org/fx"
)

func main() {
app := fx.New(
fx.Invoke(func(shutdowner fx.Shutdowner) error {
shutdowner.Shutdown(fx.ExitCode(20))
return nil
}),
)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

if err := app.Start(ctx); err != nil {
log.Fatal(err)
}

sig := <-app.Wait()

if err := app.Stop(ctx); err != nil {
log.Fatal(err)
}

os.Exit(sig.ExitCode)
}
72 changes: 72 additions & 0 deletions internal/e2e/shutdowner_wait_exitcode/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) 2023 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package main

import (
"os"
"os/exec"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/fx/internal/testutil"
)

// Hijacks the test binary so that the test can run main() as a subprocess
// instead of trying to compile the program and run it directly.
func TestMain(m *testing.M) {
// If the test binary is named "app", then we're running as a subprocess.
// Otherwise, run the tests.
switch filepath.Base(os.Args[0]) {
case "app":
main()
os.Exit(0)
default:
os.Exit(m.Run())
}
}

// Verifies that an Fx program running with Run
// exits with the exit code passed to Shutdowner.
//
// Regression test for https://github.com/uber-go/fx/issues/1074.
func TestShutdownExitCode(t *testing.T) {
exe, err := os.Executable()
require.NoError(t, err)

out := testutil.WriteSyncer{T: t}

// Run the test binary with the name 'app' so that it runs main().
cmd := exec.Command(exe)
cmd.Args[0] = "app"
cmd.Stdout = &out
cmd.Stderr = &out

// The program should exit with code 20.
err = cmd.Run()
require.Error(t, err)

var exitErr *exec.ExitError
require.ErrorAs(t, err, &exitErr)

assert.Equal(t, 20, exitErr.ExitCode())
}
2 changes: 0 additions & 2 deletions shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ type shutdowner struct {
// Shutdown broadcasts a signal to all of the application's Done channels
// and begins the Stop process. Applications can be shut down only after they
// have finished starting up.
// In practice this means Shutdowner.Shutdown should not be called from an
// fx.Invoke, but from a fx.Lifecycle.OnStart hook.
func (s *shutdowner) Shutdown(opts ...ShutdownOption) error {
for _, opt := range opts {
opt.apply(s)
Expand Down
40 changes: 40 additions & 0 deletions shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,46 @@ func TestShutdown(t *testing.T) {

assert.NoError(t, s.Shutdown(fx.ExitCode(2), fx.ShutdownTimeout(time.Second)))
})

t.Run("from invoke", func(t *testing.T) {
t.Parallel()

app := fxtest.New(
t,
fx.Invoke(func(s fx.Shutdowner) {
s.Shutdown()
}),
)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

require.NoError(t, app.Start(ctx), "error starting app")
defer app.Stop(ctx)
select {
case <-ctx.Done():
assert.Fail(t, "app did not shutdown in time")
case <-app.Wait():
// success
}
})

t.Run("many times", func(t *testing.T) {
t.Parallel()

var shutdowner fx.Shutdowner
app := fxtest.New(
t,
fx.Populate(&shutdowner),
)

for i := 0; i < 10; i++ {
app.RequireStart()
shutdowner.Shutdown(fx.ExitCode(i))
assert.Equal(t, i, (<-app.Wait()).ExitCode, "run %d", i)
app.RequireStop()
}
})
}

func TestDataRace(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ func (recv *signalReceivers) Start(ctx context.Context) {
return
}

recv.last = nil
recv.finished = make(chan struct{}, 1)
recv.shutdown = make(chan struct{}, 1)
recv.notify(recv.signals, os.Interrupt, _sigINT, _sigTERM)
Expand All @@ -135,6 +134,7 @@ func (recv *signalReceivers) Stop(ctx context.Context) error {
close(recv.finished)
recv.shutdown = nil
recv.finished = nil
recv.last = nil
return nil
}
}
Expand Down

0 comments on commit 7174fde

Please sign in to comment.