Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Daprd option --dapr-block-shutdown-duration #7268

Merged
merged 8 commits into from
Dec 6, 2023
2 changes: 2 additions & 0 deletions cmd/daprd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func main() {
UnixDomainSocket: opts.UnixDomainSocket,
DaprHTTPReadBufferSize: opts.DaprHTTPReadBufferSize,
DaprGracefulShutdownSeconds: opts.DaprGracefulShutdownSeconds,
DaprBlockShutdownSeconds: opts.DaprBlockShutdownSeconds,
DisableBuiltinK8sSecretStore: opts.DisableBuiltinK8sSecretStore,
EnableAppHealthCheck: opts.EnableAppHealthCheck,
AppHealthCheckPath: opts.AppHealthCheckPath,
Expand All @@ -175,4 +176,5 @@ func main() {
if err != nil {
log.Fatalf("Fatal error from runtime: %s", err)
}
log.Info("Daprd shutdown gracefully")
}
8 changes: 8 additions & 0 deletions cmd/daprd/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
DaprPublicPort string
AppPort string
DaprGracefulShutdownSeconds int
DaprBlockShutdownSeconds *int
PlacementServiceHostAddr string
DaprAPIListenAddresses string
AppHealthProbeInterval int
Expand All @@ -79,6 +80,8 @@
EnableAPILogging: new(bool),
}

var blockShutdownSeconds int

flag.StringVar(&opts.Mode, "mode", string(modes.StandaloneMode), "Runtime mode for Dapr")
flag.StringVar(&opts.DaprHTTPPort, "dapr-http-port", strconv.Itoa(runtime.DefaultDaprHTTPPort), "HTTP port for Dapr API to listen on")
flag.StringVar(&opts.DaprAPIListenAddresses, "dapr-listen-addresses", runtime.DefaultAPIListenAddress, "One or more addresses for the Dapr API to listen on, CSV limited")
Expand Down Expand Up @@ -109,6 +112,7 @@
flag.StringVar(&opts.UnixDomainSocket, "unix-domain-socket", "", "Path to a unix domain socket dir mount. If specified, Dapr API servers will use Unix Domain Sockets")
flag.IntVar(&opts.DaprHTTPReadBufferSize, "dapr-http-read-buffer-size", runtime.DefaultReadBufferSize, "Increasing max size of read buffer in KB to handle sending multi-KB headers")
flag.IntVar(&opts.DaprGracefulShutdownSeconds, "dapr-graceful-shutdown-seconds", int(runtime.DefaultGracefulShutdownDuration/time.Second), "Graceful shutdown time in seconds")
flag.IntVar(&blockShutdownSeconds, "dapr-block-shutdown-seconds", 0, "If enabled, will block graceful shutdown after terminate signal is received until either the number of seconds has been reached or the app reports unhealthy. Disabled by default")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't use "seconds" or other things with complex units. We should standardize on Go durations which allow more flexibility (includng fractions of seconds), at least for new flags. See #7028

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, it might be more familiar for K8s users with terminationGracePeriodSeconds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to use a Go duration instead for the reasons in the issue. Should we move forward with that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've update to use a Go duration- easy enough to revert the commit if we want to in this PR.

flag.BoolVar(opts.EnableAPILogging, "enable-api-logging", false, "Enable API logging for API calls")
flag.BoolVar(&opts.DisableBuiltinK8sSecretStore, "disable-builtin-k8s-secret-store", false, "Disable the built-in Kubernetes Secret Store")
flag.BoolVar(&opts.EnableAppHealthCheck, "enable-app-health-check", false, "Enable health checks for the application using the protocol defined with app-protocol")
Expand Down Expand Up @@ -150,6 +154,10 @@
}
}

if isFlagPassed("dapr-block-shutdown-seconds") {
opts.DaprBlockShutdownSeconds = &blockShutdownSeconds

Check warning on line 158 in cmd/daprd/options/options.go

View check run for this annotation

Codecov / codecov/patch

cmd/daprd/options/options.go#L158

Added line #L158 was not covered by tests
}

return &opts
}

Expand Down
1 change: 1 addition & 0 deletions pkg/injector/annotations/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const (
KeyHTTPMaxRequestSize = "dapr.io/http-max-request-size"
KeyHTTPReadBufferSize = "dapr.io/http-read-buffer-size"
KeyGracefulShutdownSeconds = "dapr.io/graceful-shutdown-seconds"
KeyBlockShutdownSeconds = "dapr.io/block-shutdown-seconds"
KeyEnableAPILogging = "dapr.io/enable-api-logging"
KeyUnixDomainSocketPath = "dapr.io/unix-domain-socket-path"
KeyVolumeMountsReadOnly = "dapr.io/volume-mounts"
Expand Down
1 change: 1 addition & 0 deletions pkg/injector/patcher/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type SidecarConfig struct {
HTTPMaxRequestSize *int `annotation:"dapr.io/http-max-request-size"`
HTTPReadBufferSize *int `annotation:"dapr.io/http-read-buffer-size"`
GracefulShutdownSeconds int `annotation:"dapr.io/graceful-shutdown-seconds" default:"-1"`
BlockShutdownSeconds *int `annotation:"dapr.io/block-shutdown-seconds"`
EnableAPILogging *bool `annotation:"dapr.io/enable-api-logging"`
UnixDomainSocketPath string `annotation:"dapr.io/unix-domain-socket-path"`
VolumeMounts string `annotation:"dapr.io/volume-mounts"`
Expand Down
4 changes: 4 additions & 0 deletions pkg/injector/patcher/sidecar_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ func (c *SidecarConfig) getSidecarContainer(opts getSidecarContainerOpts) (*core
args = append(args, "--unix-domain-socket", injectorConsts.UnixDomainSocketDaprdPath)
}

if c.BlockShutdownSeconds != nil {
args = append(args, "--dapr-block-shutdown-seconds", strconv.Itoa(*c.BlockShutdownSeconds))
}

// When debugging is enabled, we need to override the command and the flags
if c.EnableDebug {
ports = append(ports, corev1.ContainerPort{
Expand Down
31 changes: 31 additions & 0 deletions pkg/injector/patcher/sidecar_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,37 @@ func TestGetSidecarContainer(t *testing.T) {
},
}))

t.Run("block shutdown seconds", testSuiteGenerator([]testCase{
{
name: "default to empty",
annotations: map[string]string{},
assertFn: func(t *testing.T, container *corev1.Container) {
args := strings.Join(container.Args, " ")
assert.NotContains(t, args, "--dapr-block-shutdown-seconds")
},
},
{
name: "add a block shutdown seconds",
annotations: map[string]string{
annotations.KeyBlockShutdownSeconds: "3",
},
assertFn: func(t *testing.T, container *corev1.Container) {
args := strings.Join(container.Args, " ")
assert.Contains(t, args, "--dapr-block-shutdown-seconds 3")
},
},
{
name: "empty when value is invalid",
annotations: map[string]string{
annotations.KeyBlockShutdownSeconds: "invalid",
},
assertFn: func(t *testing.T, container *corev1.Container) {
args := strings.Join(container.Args, " ")
assert.NotContains(t, args, "--dapr-block-shutdown-seconds")
},
},
}))

t.Run("sidecar image", testSuiteGenerator([]testCase{
{
name: "no annotation",
Expand Down
9 changes: 9 additions & 0 deletions pkg/runtime/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
DaprPublicPort string
ApplicationPort string
DaprGracefulShutdownSeconds int
DaprBlockShutdownSeconds *int
PlacementServiceHostAddr string
DaprAPIListenAddresses string
AppHealthProbeInterval int
Expand Down Expand Up @@ -129,6 +130,7 @@
unixDomainSocket string
readBufferSize int
gracefulShutdownDuration time.Duration
blockShutdownDuration *time.Duration
enableAPILogging *bool
disableBuiltinK8sSecretStore bool
config []string
Expand Down Expand Up @@ -362,6 +364,13 @@
intc.readBufferSize = DefaultReadBufferSize
}

if c.DaprBlockShutdownSeconds != nil {
if *c.DaprBlockShutdownSeconds < 0 {
return nil, fmt.Errorf("the '-dapr-block-shutdown-seconds' argument value %d must be more then 0", *c.DaprBlockShutdownSeconds)

Check warning on line 369 in pkg/runtime/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/config.go#L368-L369

Added lines #L368 - L369 were not covered by tests
}
intc.blockShutdownDuration = ptr.Of(time.Duration(*c.DaprBlockShutdownSeconds) * time.Second)

Check warning on line 371 in pkg/runtime/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/config.go#L371

Added line #L371 was not covered by tests
}

if c.DaprGracefulShutdownSeconds < 0 {
intc.gracefulShutdownDuration = DefaultGracefulShutdownDuration
} else {
Expand Down
3 changes: 3 additions & 0 deletions pkg/runtime/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func TestParsePlacementAddr(t *testing.T) {
func Test_toInternal(t *testing.T) {
cfg := defaultTestConfig()

var nilDuration *time.Duration

intc, err := cfg.toInternal()
require.NoError(t, err)

Expand All @@ -81,6 +83,7 @@ func Test_toInternal(t *testing.T) {
assert.Equal(t, "", intc.unixDomainSocket)
assert.Equal(t, 4, intc.readBufferSize)
assert.Equal(t, time.Second, intc.gracefulShutdownDuration)
assert.Equal(t, nilDuration, intc.blockShutdownDuration)
assert.Equal(t, ptr.Of(true), intc.enableAPILogging)
assert.True(t, intc.disableBuiltinK8sSecretStore)
assert.Equal(t, "1.1.1.1", intc.appConnectionConfig.ChannelAddress)
Expand Down
46 changes: 44 additions & 2 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/clock"

nr "github.com/dapr/components-contrib/nameresolution"
"github.com/dapr/components-contrib/state"
Expand Down Expand Up @@ -98,6 +99,7 @@
daprHTTPAPI http.API
daprGRPCAPI grpc.API
operatorClient operatorv1pb.OperatorClient
isAppHealthy chan struct{}
appHealth *apphealth.AppHealth
appHealthReady func(context.Context) error // Invoked the first time the app health becomes ready
appHealthLock sync.Mutex
Expand All @@ -107,6 +109,7 @@
authz *authorizer.Authorizer
sec security.Handler
runnerCloser *concurrency.RunnerCloserManager
clock clock.Clock
// Used for testing.
initComplete chan struct{}

Expand Down Expand Up @@ -206,7 +209,10 @@
namespace: namespace,
podName: podName,
initComplete: make(chan struct{}),
isAppHealthy: make(chan struct{}),
clock: new(clock.RealClock),
}
close(rt.isAppHealthy)

var gracePeriod *time.Duration
if duration := runtimeConfig.gracefulShutdownDuration; duration > 0 {
Expand Down Expand Up @@ -274,7 +280,31 @@
}

// Run performs initialization of the runtime with the runtime and global configurations.
func (a *DaprRuntime) Run(ctx context.Context) error {
func (a *DaprRuntime) Run(parentCtx context.Context) error {
ctx := parentCtx
if a.runtimeConfig.blockShutdownDuration != nil {
// Override context with Background. Runner context will be cancelled when
// blocking graceful shutdown returns.
ctx = context.Background()
a.runnerCloser.Add(func(ctx context.Context) error {
select {
case <-parentCtx.Done():
case <-ctx.Done():

Check warning on line 292 in pkg/runtime/runtime.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/runtime.go#L292

Added line #L292 was not covered by tests
// Return nil as another routine has returned, not due to an interrupt.
return nil

Check warning on line 294 in pkg/runtime/runtime.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/runtime.go#L294

Added line #L294 was not covered by tests
}

log.Infof("Blocking graceful shutdown for %s or until app reports unhealthy...", *a.runtimeConfig.blockShutdownDuration)
select {
case <-a.clock.After(*a.runtimeConfig.blockShutdownDuration):
log.Info("Block shutdown period expired, entering shutdown...")
case <-a.isAppHealthy:
log.Info("App reported unhealthy, entering shutdown...")
}
return nil
})
}

return a.runnerCloser.Run(ctx)
}

Expand Down Expand Up @@ -586,6 +616,12 @@

switch status {
case apphealth.AppStatusHealthy:
select {
case <-a.isAppHealthy:
a.isAppHealthy = make(chan struct{})
default:

Check warning on line 622 in pkg/runtime/runtime.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/runtime.go#L622

Added line #L622 was not covered by tests
}

// First time the app becomes healthy, complete the init process
if a.appHealthReady != nil {
if err := a.appHealthReady(ctx); err != nil {
Expand All @@ -608,6 +644,12 @@
log.Warnf("failed to subscribe to outbox topics: %s", err)
}
case apphealth.AppStatusUnhealthy:
select {
case <-a.isAppHealthy:

Check warning on line 648 in pkg/runtime/runtime.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/runtime.go#L648

Added line #L648 was not covered by tests
default:
close(a.isAppHealthy)
}

// Stop topic subscriptions and input bindings
a.processor.PubSub().StopSubscriptions()
a.processor.Binding().StopReadingFromBindings()
Expand Down Expand Up @@ -1025,7 +1067,7 @@
case <-ctx.Done():
return ctx.Err()
// prevents overwhelming the OS with open connections
case <-time.After(time.Millisecond * 100):
case <-a.clock.After(time.Millisecond * 100):
}
}

Expand Down
121 changes: 121 additions & 0 deletions pkg/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"google.golang.org/grpc/metadata"
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clocktesting "k8s.io/utils/clock/testing"

"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/lock"
Expand Down Expand Up @@ -1855,6 +1856,126 @@ func TestGracefulShutdownBindings(t *testing.T) {
}
}

func TestBlockShutdownBindings(t *testing.T) {
t.Run("block timeout", func(t *testing.T) {
rt, err := NewTestDaprRuntime(t, modes.StandaloneMode)
require.NoError(t, err)

fakeClock := clocktesting.NewFakeClock(time.Now())
rt.clock = fakeClock
rt.appHealthChanged(context.Background(), apphealth.AppStatusHealthy)

rt.runtimeConfig.blockShutdownDuration = ptr.Of(time.Millisecond * 100)
rt.runtimeConfig.gracefulShutdownDuration = 3 * time.Second
rt.runtimeConfig.registry.Bindings().RegisterInputBinding(
func(_ logger.Logger) bindings.InputBinding {
return &daprt.MockBinding{}
},
"testInputBinding",
)

ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error)
go func() {
errCh <- rt.Run(ctx)
}()

cin := componentsV1alpha1.Component{}
cin.ObjectMeta.Name = "testInputBinding"
cin.Spec.Type = "bindings.testInputBinding"

rt.runtimeConfig.registry.Bindings().RegisterOutputBinding(
func(_ logger.Logger) bindings.OutputBinding {
return &daprt.MockBinding{}
},
"testOutputBinding",
)
cout := componentsV1alpha1.Component{}
cout.ObjectMeta.Name = "testOutputBinding"
cout.Spec.Type = "bindings.testOutputBinding"
require.NoError(t, rt.processor.Init(context.Background(), cin))
require.NoError(t, rt.processor.Init(context.Background(), cout))
assert.Len(t, rt.compStore.ListInputBindings(), 1)
assert.Len(t, rt.compStore.ListOutputBindings(), 1)

cancel()

select {
case <-time.After(time.Second):
case <-errCh:
assert.Fail(t, "expected not to return until block timeout is reached")
}

fakeClock.Step(time.Millisecond * 200)

select {
case <-time.After(rt.runtimeConfig.gracefulShutdownDuration + 2*time.Second):
assert.Fail(t, "input bindings shutdown timed out")
case err := <-errCh:
require.NoError(t, err)
}
})

t.Run("block app unhealthy", func(t *testing.T) {
rt, err := NewTestDaprRuntime(t, modes.StandaloneMode)
require.NoError(t, err)

fakeClock := clocktesting.NewFakeClock(time.Now())
rt.clock = fakeClock
rt.appHealthChanged(context.Background(), apphealth.AppStatusHealthy)

rt.runtimeConfig.blockShutdownDuration = ptr.Of(time.Millisecond * 100)
rt.runtimeConfig.gracefulShutdownDuration = 3 * time.Second
rt.runtimeConfig.registry.Bindings().RegisterInputBinding(
func(_ logger.Logger) bindings.InputBinding {
return &daprt.MockBinding{}
},
"testInputBinding",
)

ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error)
go func() {
errCh <- rt.Run(ctx)
}()

cin := componentsV1alpha1.Component{}
cin.ObjectMeta.Name = "testInputBinding"
cin.Spec.Type = "bindings.testInputBinding"

rt.runtimeConfig.registry.Bindings().RegisterOutputBinding(
func(_ logger.Logger) bindings.OutputBinding {
return &daprt.MockBinding{}
},
"testOutputBinding",
)
cout := componentsV1alpha1.Component{}
cout.ObjectMeta.Name = "testOutputBinding"
cout.Spec.Type = "bindings.testOutputBinding"
require.NoError(t, rt.processor.Init(context.Background(), cin))
require.NoError(t, rt.processor.Init(context.Background(), cout))
assert.Len(t, rt.compStore.ListInputBindings(), 1)
assert.Len(t, rt.compStore.ListOutputBindings(), 1)

cancel()

select {
case <-time.After(time.Second):
case <-errCh:
assert.Fail(t, "expected not to return until block timeout is reached")
}

rt.appHealthChanged(context.Background(), apphealth.AppStatusUnhealthy)

select {
case <-time.After(rt.runtimeConfig.gracefulShutdownDuration + 2*time.Second):
assert.Fail(t, "input bindings shutdown timed out")
case err := <-errCh:
require.NoError(t, err)
}
})
}

func TestGracefulShutdownPubSub(t *testing.T) {
rt, err := NewTestDaprRuntime(t, modes.StandaloneMode)
require.NoError(t, err)
Expand Down