Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hot Reloading: Kubernetes Component hot reloading #7260

Merged
merged 33 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
dcc4c02
Hot Reloading: SelfHosted Componenthot reloading
JoshVanL Nov 18, 2023
c807097
Adds app ready checks for subscribed input bindings and pubsub
JoshVanL Nov 26, 2023
bb6b06d
Linting
JoshVanL Nov 26, 2023
e824506
Linting
JoshVanL Nov 26, 2023
cc2243f
Updates reconciler to wait for processor queue to be empty after
JoshVanL Nov 26, 2023
c3a773c
Revert processor pending resource queue to 0 channel
JoshVanL Nov 26, 2023
b808b4d
Review comments
JoshVanL Nov 27, 2023
c8efb48
Linitng
JoshVanL Dec 4, 2023
2ce1d59
Linting
JoshVanL Dec 6, 2023
81cd03b
Adds review comments
JoshVanL Dec 6, 2023
eabeee6
Hot Reloading: SelfHosted Componenthot reloading
JoshVanL Nov 18, 2023
af2b64e
Adds app ready checks for subscribed input bindings and pubsub
JoshVanL Nov 26, 2023
29d3746
Hot Reloading: Operator Componenthot reloading
JoshVanL Nov 28, 2023
de78797
Linting
JoshVanL Nov 30, 2023
f464bcf
Adds hotreloading e2e config
JoshVanL Nov 30, 2023
be0b495
Use correct URL endpoints for hotreloading e2e save
JoshVanL Nov 30, 2023
68e449b
Revert integration test case timeout to 180 seconds
JoshVanL Nov 30, 2023
56a6bdf
Fix hotreloading_test.go
JoshVanL Dec 1, 2023
e65e961
Use correct external URL target
JoshVanL Dec 1, 2023
54a25b4
Lint code
JoshVanL Dec 1, 2023
d5d7be9
Hot Reloading: SelfHosted Componenthot reloading
JoshVanL Nov 18, 2023
d62e8d9
Adds app ready checks for subscribed input bindings and pubsub
JoshVanL Nov 26, 2023
f824261
Linitng
JoshVanL Dec 4, 2023
ddc528b
Linting
JoshVanL Dec 4, 2023
40fff23
Linting
JoshVanL Dec 6, 2023
4b2cbd8
Linting
JoshVanL Dec 6, 2023
19eb5ea
Linting
JoshVanL Dec 6, 2023
f3e5d60
Review comments
JoshVanL Dec 6, 2023
e483e9e
Merge branch 'master' into hot-reloading-components-operator-tested
JoshVanL Jan 2, 2024
83562fb
Review comments
JoshVanL Jan 2, 2024
f2d34c4
Set eventually in crypto test to 5 secs
JoshVanL Jan 2, 2024
49f7c97
Merge branch 'master' into hot-reloading-components-operator-tested
dapr-bot Jan 4, 2024
b76e6ac
Merge branch 'master' into hot-reloading-components-operator-tested
dapr-bot Jan 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/apis/components/v1alpha1/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
// SchemeGroupVersion is group version used to register these objects.
var SchemeGroupVersion = schema.GroupVersion{Group: components.GroupName, Version: "v1alpha1"}

// GroupKind takes an unqualified kind and returns back a Group qualified GroupKind.
func GroupKind(kind string) schema.GroupKind {
// GroupKindFromKind takes an unqualified kind and returns back a Group qualified GroupKind.
func GroupKindFromKind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/httpEndpoint/v1alpha1/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
// SchemeGroupVersion is group version used to register these objects.
var SchemeGroupVersion = schema.GroupVersion{Group: httpendpoint.GroupName, Version: "v1alpha1"}

// GroupKind takes an unqualified kind and returns back a Group
// GroupKindFromKind takes an unqualified kind and returns back a Group
// qualified GroupKind.
func GroupKind(kind string) schema.GroupKind {
func GroupKindFromKind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import (
"strings"
"time"

env "github.com/dapr/dapr/pkg/config/env"

grpcRetry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/spf13/cast"
yaml "gopkg.in/yaml.v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/dapr/dapr/pkg/buildinfo"
env "github.com/dapr/dapr/pkg/config/env"
operatorv1pb "github.com/dapr/dapr/pkg/proto/operator/v1"
"github.com/dapr/dapr/utils"
"github.com/dapr/kit/ptr"
Expand Down
49 changes: 38 additions & 11 deletions pkg/operator/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,13 @@
type Server interface {
Run(context.Context) error
Ready(context.Context) error
OnComponentUpdated(ctx context.Context, component *componentsapi.Component)
OnHTTPEndpointUpdated(ctx context.Context, endpoint *httpendpointsapi.HTTPEndpoint)
OnComponentUpdated(context.Context, operatorv1pb.ResourceEventType, *componentsapi.Component)
OnHTTPEndpointUpdated(context.Context, *httpendpointsapi.HTTPEndpoint)
}

type ComponentUpdateEvent struct {
Component *componentsapi.Component
EventType operatorv1pb.ResourceEventType
}

type apiServer struct {
Expand All @@ -73,7 +78,7 @@
// notify all dapr runtime
connLock sync.Mutex
endpointLock sync.Mutex
allConnUpdateChan map[string]chan *componentsapi.Component
allConnUpdateChan map[string]chan *ComponentUpdateEvent
allEndpointsUpdateChan map[string]chan *httpendpointsapi.HTTPEndpoint
readyCh chan struct{}
running atomic.Bool
Expand All @@ -85,7 +90,7 @@
Client: opts.Client,
sec: opts.Security,
port: strconv.Itoa(opts.Port),
allConnUpdateChan: make(map[string]chan *componentsapi.Component),
allConnUpdateChan: make(map[string]chan *ComponentUpdateEvent),
allEndpointsUpdateChan: make(map[string]chan *httpendpointsapi.HTTPEndpoint),
readyCh: make(chan struct{}),
}
Expand Down Expand Up @@ -125,6 +130,13 @@
// Block until context is done
<-ctx.Done()

a.connLock.Lock()

Check warning on line 133 in pkg/operator/api/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/operator/api/api.go#L133

Added line #L133 was not covered by tests
for key, ch := range a.allConnUpdateChan {
close(ch)
delete(a.allConnUpdateChan, key)

Check warning on line 136 in pkg/operator/api/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/operator/api/api.go#L135-L136

Added lines #L135 - L136 were not covered by tests
}
a.connLock.Unlock()

Check warning on line 138 in pkg/operator/api/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/operator/api/api.go#L138

Added line #L138 was not covered by tests

s.GracefulStop()
err = <-errCh
if err != nil {
Expand All @@ -137,11 +149,23 @@
return nil
}

func (a *apiServer) OnComponentUpdated(_ context.Context, component *componentsapi.Component) {
func (a *apiServer) OnComponentUpdated(ctx context.Context, eventType operatorv1pb.ResourceEventType, component *componentsapi.Component) {
a.connLock.Lock()
var wg sync.WaitGroup
wg.Add(len(a.allConnUpdateChan))

Check warning on line 155 in pkg/operator/api/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/operator/api/api.go#L154-L155

Added lines #L154 - L155 were not covered by tests
for _, connUpdateChan := range a.allConnUpdateChan {
connUpdateChan <- component
go func(connUpdateChan chan *ComponentUpdateEvent) {
defer wg.Done()
select {

Check warning on line 159 in pkg/operator/api/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/operator/api/api.go#L157-L159

Added lines #L157 - L159 were not covered by tests
case connUpdateChan <- &ComponentUpdateEvent{
Component: component,
EventType: eventType,
}:
case <-ctx.Done():

Check warning on line 164 in pkg/operator/api/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/operator/api/api.go#L163-L164

Added lines #L163 - L164 were not covered by tests
}
}(connUpdateChan)
}
wg.Wait()

Check warning on line 168 in pkg/operator/api/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/operator/api/api.go#L168

Added line #L168 was not covered by tests
a.connLock.Unlock()
}

Expand Down Expand Up @@ -169,7 +193,7 @@
key := types.NamespacedName{Namespace: in.GetNamespace(), Name: in.GetName()}
var config configurationapi.Configuration
if err := a.Client.Get(ctx, key, &config); err != nil {
return nil, fmt.Errorf("error getting configuration: %w", err)
return nil, fmt.Errorf("error getting configuration %s/%s: %w", in.GetNamespace(), in.GetName(), err)

Check warning on line 196 in pkg/operator/api/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/operator/api/api.go#L196

Added line #L196 was not covered by tests
}
b, err := json.Marshal(&config)
if err != nil {
Expand Down Expand Up @@ -402,6 +426,8 @@
}

// ComponentUpdate updates Dapr sidecars whenever a component in the cluster is modified.
// TODO: @joshvanl: Authorize pod name and namespace matches the SPIFFE ID of
// the caller.
func (a *apiServer) ComponentUpdate(in *operatorv1pb.ComponentUpdateRequest, srv operatorv1pb.Operator_ComponentUpdateServer) error { //nolint:nosnakecase
log.Info("sidecar connected for component updates")
keyObj, err := uuid.NewRandom()
Expand All @@ -411,7 +437,7 @@
key := keyObj.String()

a.connLock.Lock()
a.allConnUpdateChan[key] = make(chan *componentsapi.Component, 1)
a.allConnUpdateChan[key] = make(chan *ComponentUpdateEvent)
updateChan := a.allConnUpdateChan[key]
a.connLock.Unlock()

Expand All @@ -421,7 +447,7 @@
delete(a.allConnUpdateChan, key)
}()

updateComponentFunc := func(ctx context.Context, c *componentsapi.Component) {
updateComponentFunc := func(ctx context.Context, t operatorv1pb.ResourceEventType, c *componentsapi.Component) {
if c.Namespace != in.GetNamespace() {
return
}
Expand All @@ -440,13 +466,14 @@

err = srv.Send(&operatorv1pb.ComponentUpdateEvent{
Component: b,
Type: t,
})
if err != nil {
log.Warnf("error updating sidecar with component %s (%s) from pod %s/%s: %s", c.GetName(), c.Spec.Type, in.GetNamespace(), in.GetPodName(), err)
return
}

log.Infof("updated sidecar with component %s (%s) from pod %s/%s", c.GetName(), c.Spec.Type, in.GetNamespace(), in.GetPodName())
log.Debugf("updated sidecar with component %s %s (%s) from pod %s/%s", t.String(), c.GetName(), c.Spec.Type, in.GetNamespace(), in.GetPodName())
}

var wg sync.WaitGroup
Expand All @@ -462,7 +489,7 @@
wg.Add(1)
go func() {
defer wg.Done()
updateComponentFunc(srv.Context(), c)
updateComponentFunc(srv.Context(), c.EventType, c.Component)
}()
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/operator/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ func TestComponentUpdate(t *testing.T) {
api.connLock.Lock()
defer api.connLock.Unlock()
for key := range api.allConnUpdateChan {
api.allConnUpdateChan[key] <- &c
api.allConnUpdateChan[key] <- &ComponentUpdateEvent{
Component: &c,
}
close(api.allConnUpdateChan[key])
}
}()
Expand Down Expand Up @@ -261,7 +263,7 @@ func TestComponentUpdate(t *testing.T) {
api.connLock.Lock()
defer api.connLock.Unlock()
for key := range api.allConnUpdateChan {
api.allConnUpdateChan[key] <- &c
api.allConnUpdateChan[key] <- &ComponentUpdateEvent{Component: &c}
close(api.allConnUpdateChan[key])
}
}()
Expand Down
20 changes: 14 additions & 6 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"github.com/dapr/dapr/pkg/operator/api"
operatorcache "github.com/dapr/dapr/pkg/operator/cache"
"github.com/dapr/dapr/pkg/operator/handlers"
operatorv1pb "github.com/dapr/dapr/pkg/proto/operator/v1"
"github.com/dapr/dapr/pkg/security"
"github.com/dapr/kit/concurrency"
"github.com/dapr/kit/logger"
Expand Down Expand Up @@ -190,12 +191,18 @@
}, nil
}

func (o *operator) syncComponent(ctx context.Context) func(obj interface{}) {
func (o *operator) syncComponent(ctx context.Context, eventType operatorv1pb.ResourceEventType) func(obj interface{}) {
return func(obj interface{}) {
c, ok := obj.(*componentsapi.Component)
if ok {
var c *componentsapi.Component
switch o := obj.(type) {
case *componentsapi.Component:
c = o
case cache.DeletedFinalStateUnknown:
c = o.Obj.(*componentsapi.Component)

Check warning on line 201 in pkg/operator/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/operator/operator.go#L196-L201

Added lines #L196 - L201 were not covered by tests
}
if c != nil {

Check warning on line 203 in pkg/operator/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/operator/operator.go#L203

Added line #L203 was not covered by tests
log.Debugf("Observed component to be synced: %s/%s", c.Namespace, c.Name)
o.apiServer.OnComponentUpdated(ctx, c)
o.apiServer.OnComponentUpdated(ctx, eventType, c)

Check warning on line 205 in pkg/operator/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/operator/operator.go#L205

Added line #L205 was not covered by tests
}
}
}
Expand Down Expand Up @@ -319,10 +326,11 @@
return fmt.Errorf("unable to get setup components informer: %w", rErr)
}
_, rErr = componentInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: o.syncComponent(ctx),
AddFunc: o.syncComponent(ctx, operatorv1pb.ResourceEventType_CREATED),

Check warning on line 329 in pkg/operator/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/operator/operator.go#L329

Added line #L329 was not covered by tests
UpdateFunc: func(_, newObj interface{}) {
o.syncComponent(ctx)(newObj)
o.syncComponent(ctx, operatorv1pb.ResourceEventType_UPDATED)(newObj)

Check warning on line 331 in pkg/operator/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/operator/operator.go#L331

Added line #L331 was not covered by tests
},
DeleteFunc: o.syncComponent(ctx, operatorv1pb.ResourceEventType_DELETED),
})
if rErr != nil {
return fmt.Errorf("unable to add components informer event handler: %w", rErr)
Expand Down
4 changes: 3 additions & 1 deletion pkg/runtime/compstore/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ limitations under the License.

package compstore

import "github.com/dapr/components-contrib/crypto"
import (
"github.com/dapr/components-contrib/crypto"
)

func (c *ComponentStore) AddCryptoProvider(name string, provider crypto.SubtleCrypto) {
c.lock.Lock()
Expand Down
31 changes: 31 additions & 0 deletions pkg/runtime/hotreload/hotreload.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (

componentsapi "github.com/dapr/dapr/pkg/apis/components/v1alpha1"
"github.com/dapr/dapr/pkg/config"
operatorv1 "github.com/dapr/dapr/pkg/proto/operator/v1"
"github.com/dapr/dapr/pkg/runtime/authorizer"
"github.com/dapr/dapr/pkg/runtime/compstore"
"github.com/dapr/dapr/pkg/runtime/hotreload/loader/disk"
"github.com/dapr/dapr/pkg/runtime/hotreload/loader/operator"
"github.com/dapr/dapr/pkg/runtime/hotreload/reconciler"
"github.com/dapr/dapr/pkg/runtime/processor"
"github.com/dapr/kit/concurrency"
Expand All @@ -38,6 +40,16 @@ type OptionsReloaderDisk struct {
Processor *processor.Processor
}

type OptionsReloaderOperator struct {
PodName string
Namespace string
Client operatorv1.OperatorClient
Config *config.Configuration
ComponentStore *compstore.ComponentStore
Authorizer *authorizer.Authorizer
Processor *processor.Processor
}

type Reloader struct {
isEnabled bool
componentsReconciler *reconciler.Reconciler[componentsapi.Component]
Expand All @@ -63,6 +75,25 @@ func NewDisk(ctx context.Context, opts OptionsReloaderDisk) (*Reloader, error) {
}, nil
}

func NewOperator(opts OptionsReloaderOperator) *Reloader {
loader := operator.New(operator.Options{
PodName: opts.PodName,
Namespace: opts.Namespace,
ComponentStore: opts.ComponentStore,
OperatorClient: opts.Client,
})

return &Reloader{
isEnabled: opts.Config.IsFeatureEnabled(config.HotReload),
componentsReconciler: reconciler.NewComponent(reconciler.Options[componentsapi.Component]{
Loader: loader,
CompStore: opts.ComponentStore,
Processor: opts.Processor,
Authorizer: opts.Authorizer,
}),
}
}

func (r *Reloader) Run(ctx context.Context) error {
if !r.isEnabled {
log.Debug("Hot reloading disabled")
Expand Down
84 changes: 84 additions & 0 deletions pkg/runtime/hotreload/loader/operator/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package operator

import (
"context"
"encoding/json"
"fmt"

componentsapi "github.com/dapr/dapr/pkg/apis/components/v1alpha1"
operatorpb "github.com/dapr/dapr/pkg/proto/operator/v1"
"github.com/dapr/dapr/pkg/runtime/hotreload/loader"
)

type component struct {
operatorpb.Operator_ComponentUpdateClient
}

// The go linter does not yet understand that these functions are being used by
// the generic operator.
//
//nolint:unused
func (c *component) list(ctx context.Context, opclient operatorpb.OperatorClient, ns, podName string) ([][]byte, error) {
resp, err := opclient.ListComponents(ctx, &operatorpb.ListComponentsRequest{
Namespace: ns,
PodName: podName,
})
if err != nil {
return nil, err

Check warning on line 40 in pkg/runtime/hotreload/loader/operator/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/loader/operator/component.go#L35-L40

Added lines #L35 - L40 were not covered by tests
}

return resp.GetComponents(), nil

Check warning on line 43 in pkg/runtime/hotreload/loader/operator/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/loader/operator/component.go#L43

Added line #L43 was not covered by tests
}

//nolint:unused
func (c *component) close() error {
if c.Operator_ComponentUpdateClient != nil {
return c.Operator_ComponentUpdateClient.CloseSend()

Check warning on line 49 in pkg/runtime/hotreload/loader/operator/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/loader/operator/component.go#L48-L49

Added lines #L48 - L49 were not covered by tests
}
return nil

Check warning on line 51 in pkg/runtime/hotreload/loader/operator/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/loader/operator/component.go#L51

Added line #L51 was not covered by tests
}

//nolint:unused
func (c *component) recv() (*loader.Event[componentsapi.Component], error) {
event, err := c.Operator_ComponentUpdateClient.Recv()
if err != nil {
return nil, err

Check warning on line 58 in pkg/runtime/hotreload/loader/operator/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/loader/operator/component.go#L56-L58

Added lines #L56 - L58 were not covered by tests
}

var component componentsapi.Component
if err := json.Unmarshal(event.GetComponent(), &component); err != nil {
return nil, fmt.Errorf("failed to deserializing component: %w", err)

Check warning on line 63 in pkg/runtime/hotreload/loader/operator/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/loader/operator/component.go#L61-L63

Added lines #L61 - L63 were not covered by tests
}

return &loader.Event[componentsapi.Component]{
Resource: component,
Type: event.GetType(),
}, nil

Check warning on line 69 in pkg/runtime/hotreload/loader/operator/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/loader/operator/component.go#L66-L69

Added lines #L66 - L69 were not covered by tests
}

//nolint:unused
func (c *component) establish(ctx context.Context, opclient operatorpb.OperatorClient, ns, podName string) error {
stream, err := opclient.ComponentUpdate(ctx, &operatorpb.ComponentUpdateRequest{
Namespace: ns,
PodName: podName,
})
if err != nil {
return err

Check warning on line 79 in pkg/runtime/hotreload/loader/operator/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/loader/operator/component.go#L74-L79

Added lines #L74 - L79 were not covered by tests
}

c.Operator_ComponentUpdateClient = stream
return nil

Check warning on line 83 in pkg/runtime/hotreload/loader/operator/component.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/loader/operator/component.go#L82-L83

Added lines #L82 - L83 were not covered by tests
}