Skip to content

Commit

Permalink
Adds Daprd option --block-shutdown-seconds
Browse files Browse the repository at this point in the history
Closes dapr#4313
Docs: dapr/docs#3893

PR adds the `--block-shutdown-seconds` CLI flag and corresponding
`dapr.io/block-shutdown-seconds` Kubernetes annotation which configures
Daprd to block the graceful shutdown procedure until _either_, the
block shutdown seconds has elapsed _or_ the application has become
unhealthy, as according to the normal app health status.

By default, this option is unset, and therefore there is no effect to
the current behaviour of graceful shutdown. When set, Daprd will
block the interrupt signal cascading into runtime until the above
requirements have been met.

The framework process `Cleanup` order has been reversed to mimic
`t.Cleanup` and allow the `logline` process to function correctly.

Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL committed Dec 4, 2023
1 parent c11465f commit 4396bf2
Show file tree
Hide file tree
Showing 27 changed files with 974 additions and 33 deletions.
2 changes: 2 additions & 0 deletions cmd/daprd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func main() {
UnixDomainSocket: opts.UnixDomainSocket,
DaprHTTPReadBufferSize: opts.DaprHTTPReadBufferSize,
DaprGracefulShutdownSeconds: opts.DaprGracefulShutdownSeconds,
DaprBlockShutdownSeconds: opts.DaprBlockShutdownSeconds,
DisableBuiltinK8sSecretStore: opts.DisableBuiltinK8sSecretStore,
EnableAppHealthCheck: opts.EnableAppHealthCheck,
AppHealthCheckPath: opts.AppHealthCheckPath,
Expand All @@ -175,4 +176,5 @@ func main() {
if err != nil {
log.Fatalf("Fatal error from runtime: %s", err)
}
log.Info("Daprd shutdown gracefully")
}
8 changes: 8 additions & 0 deletions cmd/daprd/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Options struct {
DaprPublicPort string
AppPort string
DaprGracefulShutdownSeconds int
DaprBlockShutdownSeconds *int
PlacementServiceHostAddr string
DaprAPIListenAddresses string
AppHealthProbeInterval int
Expand All @@ -79,6 +80,8 @@ func New(args []string) *Options {
EnableAPILogging: new(bool),
}

var blockShutdownSeconds int

flag.StringVar(&opts.Mode, "mode", string(modes.StandaloneMode), "Runtime mode for Dapr")
flag.StringVar(&opts.DaprHTTPPort, "dapr-http-port", strconv.Itoa(runtime.DefaultDaprHTTPPort), "HTTP port for Dapr API to listen on")
flag.StringVar(&opts.DaprAPIListenAddresses, "dapr-listen-addresses", runtime.DefaultAPIListenAddress, "One or more addresses for the Dapr API to listen on, CSV limited")
Expand Down Expand Up @@ -109,6 +112,7 @@ func New(args []string) *Options {
flag.StringVar(&opts.UnixDomainSocket, "unix-domain-socket", "", "Path to a unix domain socket dir mount. If specified, Dapr API servers will use Unix Domain Sockets")
flag.IntVar(&opts.DaprHTTPReadBufferSize, "dapr-http-read-buffer-size", runtime.DefaultReadBufferSize, "Increasing max size of read buffer in KB to handle sending multi-KB headers")
flag.IntVar(&opts.DaprGracefulShutdownSeconds, "dapr-graceful-shutdown-seconds", int(runtime.DefaultGracefulShutdownDuration/time.Second), "Graceful shutdown time in seconds")
flag.IntVar(&blockShutdownSeconds, "dapr-block-shutdown-seconds", 0, "If enabled, will block graceful shutdown after terminate signal is received until either the number of seconds has been reached or the app reports unhealthy. Disabled by default")
flag.BoolVar(opts.EnableAPILogging, "enable-api-logging", false, "Enable API logging for API calls")
flag.BoolVar(&opts.DisableBuiltinK8sSecretStore, "disable-builtin-k8s-secret-store", false, "Disable the built-in Kubernetes Secret Store")
flag.BoolVar(&opts.EnableAppHealthCheck, "enable-app-health-check", false, "Enable health checks for the application using the protocol defined with app-protocol")
Expand Down Expand Up @@ -150,6 +154,10 @@ func New(args []string) *Options {
}
}

if isFlagPassed("dapr-block-shutdown-seconds") {
opts.DaprBlockShutdownSeconds = &blockShutdownSeconds
}

return &opts
}

Expand Down
1 change: 1 addition & 0 deletions pkg/injector/annotations/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const (
KeyHTTPMaxRequestSize = "dapr.io/http-max-request-size"
KeyHTTPReadBufferSize = "dapr.io/http-read-buffer-size"
KeyGracefulShutdownSeconds = "dapr.io/graceful-shutdown-seconds"
KeyBlockShutdownSeconds = "dapr.io/block-shutdown-seconds"
KeyEnableAPILogging = "dapr.io/enable-api-logging"
KeyUnixDomainSocketPath = "dapr.io/unix-domain-socket-path"
KeyVolumeMountsReadOnly = "dapr.io/volume-mounts"
Expand Down
1 change: 1 addition & 0 deletions pkg/injector/patcher/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type SidecarConfig struct {
HTTPMaxRequestSize *int `annotation:"dapr.io/http-max-request-size"`
HTTPReadBufferSize *int `annotation:"dapr.io/http-read-buffer-size"`
GracefulShutdownSeconds int `annotation:"dapr.io/graceful-shutdown-seconds" default:"-1"`
BlockShutdownSeconds *int `annotation:"dapr.io/block-shutdown-seconds"`
EnableAPILogging *bool `annotation:"dapr.io/enable-api-logging"`
UnixDomainSocketPath string `annotation:"dapr.io/unix-domain-socket-path"`
VolumeMounts string `annotation:"dapr.io/volume-mounts"`
Expand Down
4 changes: 4 additions & 0 deletions pkg/injector/patcher/sidecar_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ func (c *SidecarConfig) getSidecarContainer(opts getSidecarContainerOpts) (*core
args = append(args, "--unix-domain-socket", injectorConsts.UnixDomainSocketDaprdPath)
}

if c.BlockShutdownSeconds != nil {
args = append(args, "--dapr-block-shutdown-seconds", strconv.Itoa(*c.BlockShutdownSeconds))
}

// When debugging is enabled, we need to override the command and the flags
if c.EnableDebug {
ports = append(ports, corev1.ContainerPort{
Expand Down
31 changes: 31 additions & 0 deletions pkg/injector/patcher/sidecar_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,37 @@ func TestGetSidecarContainer(t *testing.T) {
},
}))

t.Run("block shutdown seconds", testSuiteGenerator([]testCase{
{
name: "default to empty",
annotations: map[string]string{},
assertFn: func(t *testing.T, container *corev1.Container) {
args := strings.Join(container.Args, " ")
assert.NotContains(t, args, "--dapr-block-shutdown-seconds")
},
},
{
name: "add a block shutdown seconds",
annotations: map[string]string{
annotations.KeyBlockShutdownSeconds: "3",
},
assertFn: func(t *testing.T, container *corev1.Container) {
args := strings.Join(container.Args, " ")
assert.Contains(t, args, "--dapr-block-shutdown-seconds 3")
},
},
{
name: "empty when value is invalid",
annotations: map[string]string{
annotations.KeyBlockShutdownSeconds: "invalid",
},
assertFn: func(t *testing.T, container *corev1.Container) {
args := strings.Join(container.Args, " ")
assert.NotContains(t, args, "--dapr-block-shutdown-seconds")
},
},
}))

t.Run("sidecar image", testSuiteGenerator([]testCase{
{
name: "no annotation",
Expand Down
9 changes: 9 additions & 0 deletions pkg/runtime/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type Config struct {
DaprPublicPort string
ApplicationPort string
DaprGracefulShutdownSeconds int
DaprBlockShutdownSeconds *int
PlacementServiceHostAddr string
DaprAPIListenAddresses string
AppHealthProbeInterval int
Expand Down Expand Up @@ -129,6 +130,7 @@ type internalConfig struct {
unixDomainSocket string
readBufferSize int
gracefulShutdownDuration time.Duration
blockShutdownDuration *time.Duration
enableAPILogging *bool
disableBuiltinK8sSecretStore bool
config []string
Expand Down Expand Up @@ -362,6 +364,13 @@ func (c *Config) toInternal() (*internalConfig, error) {
intc.readBufferSize = DefaultReadBufferSize
}

if c.DaprBlockShutdownSeconds != nil {
if *c.DaprBlockShutdownSeconds < 0 {
return nil, fmt.Errorf("the '-dapr-block-shutdown-seconds' argument value %d must be more then 0", *c.DaprBlockShutdownSeconds)
}
intc.blockShutdownDuration = ptr.Of(time.Duration(*c.DaprBlockShutdownSeconds) * time.Second)
}

if c.DaprGracefulShutdownSeconds < 0 {
intc.gracefulShutdownDuration = DefaultGracefulShutdownDuration
} else {
Expand Down
3 changes: 3 additions & 0 deletions pkg/runtime/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func TestParsePlacementAddr(t *testing.T) {
func Test_toInternal(t *testing.T) {
cfg := defaultTestConfig()

var nilDuration *time.Duration

intc, err := cfg.toInternal()
require.NoError(t, err)

Expand All @@ -81,6 +83,7 @@ func Test_toInternal(t *testing.T) {
assert.Equal(t, "", intc.unixDomainSocket)
assert.Equal(t, 4, intc.readBufferSize)
assert.Equal(t, time.Second, intc.gracefulShutdownDuration)
assert.Equal(t, nilDuration, intc.blockShutdownDuration)
assert.Equal(t, ptr.Of(true), intc.enableAPILogging)
assert.True(t, intc.disableBuiltinK8sSecretStore)
assert.Equal(t, "1.1.1.1", intc.appConnectionConfig.ChannelAddress)
Expand Down
46 changes: 44 additions & 2 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/clock"

nr "github.com/dapr/components-contrib/nameresolution"
"github.com/dapr/components-contrib/state"
Expand Down Expand Up @@ -98,6 +99,7 @@ type DaprRuntime struct {
daprHTTPAPI http.API
daprGRPCAPI grpc.API
operatorClient operatorv1pb.OperatorClient
isAppHealthy chan struct{}
appHealth *apphealth.AppHealth
appHealthReady func(context.Context) error // Invoked the first time the app health becomes ready
appHealthLock sync.Mutex
Expand All @@ -107,6 +109,7 @@ type DaprRuntime struct {
authz *authorizer.Authorizer
sec security.Handler
runnerCloser *concurrency.RunnerCloserManager
clock clock.Clock
// Used for testing.
initComplete chan struct{}

Expand Down Expand Up @@ -206,7 +209,10 @@ func newDaprRuntime(ctx context.Context,
namespace: namespace,
podName: podName,
initComplete: make(chan struct{}),
isAppHealthy: make(chan struct{}),
clock: new(clock.RealClock),
}
close(rt.isAppHealthy)

var gracePeriod *time.Duration
if duration := runtimeConfig.gracefulShutdownDuration; duration > 0 {
Expand Down Expand Up @@ -274,7 +280,31 @@ func newDaprRuntime(ctx context.Context,
}

// Run performs initialization of the runtime with the runtime and global configurations.
func (a *DaprRuntime) Run(ctx context.Context) error {
func (a *DaprRuntime) Run(parentCtx context.Context) error {
ctx := parentCtx
if a.runtimeConfig.blockShutdownDuration != nil {
// Override context with Background. Runner context will be cancelled when
// blocking graceful shutdown returns.
ctx = context.Background()
a.runnerCloser.Add(func(ctx context.Context) error {
select {
case <-parentCtx.Done():
case <-ctx.Done():
// Return nil as another routine has returned, not due to an interrupt.
return nil
}

log.Infof("Blocking graceful shutdown for %s or until app reports unhealthy...", *a.runtimeConfig.blockShutdownDuration)
select {
case <-a.clock.After(*a.runtimeConfig.blockShutdownDuration):
log.Info("Block shutdown period expired, entering shutdown...")
case <-a.isAppHealthy:
log.Info("App reported unhealthy, entering shutdown...")
}
return nil
})
}

return a.runnerCloser.Run(ctx)
}

Expand Down Expand Up @@ -586,6 +616,12 @@ func (a *DaprRuntime) appHealthChanged(ctx context.Context, status uint8) {

switch status {
case apphealth.AppStatusHealthy:
select {
case <-a.isAppHealthy:
a.isAppHealthy = make(chan struct{})
default:
}

// First time the app becomes healthy, complete the init process
if a.appHealthReady != nil {
if err := a.appHealthReady(ctx); err != nil {
Expand All @@ -608,6 +644,12 @@ func (a *DaprRuntime) appHealthChanged(ctx context.Context, status uint8) {
log.Warnf("failed to subscribe to outbox topics: %s", err)
}
case apphealth.AppStatusUnhealthy:
select {
case <-a.isAppHealthy:
default:
close(a.isAppHealthy)
}

// Stop topic subscriptions and input bindings
a.processor.PubSub().StopSubscriptions()
a.processor.Binding().StopReadingFromBindings()
Expand Down Expand Up @@ -1025,7 +1067,7 @@ func (a *DaprRuntime) blockUntilAppIsReady(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
// prevents overwhelming the OS with open connections
case <-time.After(time.Millisecond * 100):
case <-a.clock.After(time.Millisecond * 100):
}
}

Expand Down
121 changes: 121 additions & 0 deletions pkg/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"google.golang.org/grpc/metadata"
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clocktesting "k8s.io/utils/clock/testing"

"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/lock"
Expand Down Expand Up @@ -1855,6 +1856,126 @@ func TestGracefulShutdownBindings(t *testing.T) {
}
}

func TestBlockShutdownBindings(t *testing.T) {
t.Run("block timeout", func(t *testing.T) {
rt, err := NewTestDaprRuntime(t, modes.StandaloneMode)
require.NoError(t, err)

fakeClock := clocktesting.NewFakeClock(time.Now())
rt.clock = fakeClock
rt.appHealthChanged(context.Background(), apphealth.AppStatusHealthy)

rt.runtimeConfig.blockShutdownDuration = ptr.Of(time.Millisecond * 100)
rt.runtimeConfig.gracefulShutdownDuration = 3 * time.Second
rt.runtimeConfig.registry.Bindings().RegisterInputBinding(
func(_ logger.Logger) bindings.InputBinding {
return &daprt.MockBinding{}
},
"testInputBinding",
)

ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error)
go func() {
errCh <- rt.Run(ctx)
}()

cin := componentsV1alpha1.Component{}
cin.ObjectMeta.Name = "testInputBinding"
cin.Spec.Type = "bindings.testInputBinding"

rt.runtimeConfig.registry.Bindings().RegisterOutputBinding(
func(_ logger.Logger) bindings.OutputBinding {
return &daprt.MockBinding{}
},
"testOutputBinding",
)
cout := componentsV1alpha1.Component{}
cout.ObjectMeta.Name = "testOutputBinding"
cout.Spec.Type = "bindings.testOutputBinding"
require.NoError(t, rt.processor.Init(context.Background(), cin))
require.NoError(t, rt.processor.Init(context.Background(), cout))
assert.Len(t, rt.compStore.ListInputBindings(), 1)
assert.Len(t, rt.compStore.ListOutputBindings(), 1)

cancel()

select {
case <-time.After(time.Second):
case <-errCh:
assert.Fail(t, "expected not to return until block timeout is reached")
}

fakeClock.Step(time.Millisecond * 200)

select {
case <-time.After(rt.runtimeConfig.gracefulShutdownDuration + 2*time.Second):
assert.Fail(t, "input bindings shutdown timed out")
case err := <-errCh:
require.NoError(t, err)
}
})

t.Run("block app unhealthy", func(t *testing.T) {
rt, err := NewTestDaprRuntime(t, modes.StandaloneMode)
require.NoError(t, err)

fakeClock := clocktesting.NewFakeClock(time.Now())
rt.clock = fakeClock
rt.appHealthChanged(context.Background(), apphealth.AppStatusHealthy)

rt.runtimeConfig.blockShutdownDuration = ptr.Of(time.Millisecond * 100)
rt.runtimeConfig.gracefulShutdownDuration = 3 * time.Second
rt.runtimeConfig.registry.Bindings().RegisterInputBinding(
func(_ logger.Logger) bindings.InputBinding {
return &daprt.MockBinding{}
},
"testInputBinding",
)

ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error)
go func() {
errCh <- rt.Run(ctx)
}()

cin := componentsV1alpha1.Component{}
cin.ObjectMeta.Name = "testInputBinding"
cin.Spec.Type = "bindings.testInputBinding"

rt.runtimeConfig.registry.Bindings().RegisterOutputBinding(
func(_ logger.Logger) bindings.OutputBinding {
return &daprt.MockBinding{}
},
"testOutputBinding",
)
cout := componentsV1alpha1.Component{}
cout.ObjectMeta.Name = "testOutputBinding"
cout.Spec.Type = "bindings.testOutputBinding"
require.NoError(t, rt.processor.Init(context.Background(), cin))
require.NoError(t, rt.processor.Init(context.Background(), cout))
assert.Len(t, rt.compStore.ListInputBindings(), 1)
assert.Len(t, rt.compStore.ListOutputBindings(), 1)

cancel()

select {
case <-time.After(time.Second):
case <-errCh:
assert.Fail(t, "expected not to return until block timeout is reached")
}

rt.appHealthChanged(context.Background(), apphealth.AppStatusUnhealthy)

select {
case <-time.After(rt.runtimeConfig.gracefulShutdownDuration + 2*time.Second):
assert.Fail(t, "input bindings shutdown timed out")
case err := <-errCh:
require.NoError(t, err)
}
})
}

func TestGracefulShutdownPubSub(t *testing.T) {
rt, err := NewTestDaprRuntime(t, modes.StandaloneMode)
require.NoError(t, err)
Expand Down

0 comments on commit 4396bf2

Please sign in to comment.