Skip to content

Commit

Permalink
Hot Reloading: Kubernetes Component hot reloading (#7260)
Browse files Browse the repository at this point in the history
* Hot Reloading: SelfHosted Componenthot reloading

Part of #1172

Adds hot reloading functionality to Daprd, which is gated behind the HotReload preview feature.

If enabled, Daprd in Selfhosted mode will watch for resource updates
from watching the resource directory(s). If a resource is detected as
changed (spec is different to what is currently loaded), then Dapr will
close/create/update the resource.

Supports Component _except_ HTTP middlware which requires further
changes to the HTTP server handler machinery. A warning is thrown if hot
reloading is enabled and a HTTP middleware is updated.

The hot reloader reconciler is made generic to enable other resource
types to be added in future.

---

When running in standalone mode, the disk loader watches the directories
passed by --resources-path for updates to yaml files containing
Components.

When an event occurs, the reconciler and differ are responsible for
determining whether any resources have been created, updated, or
deleted, by comparing the local store specs with that of the remote. If
any have changed, the resource is closed (if it exists), and then
re-initialized (if it exists). A resource will only be closed if it has
been deleted, only initialized if it has been created, and closed &
initialized if it has been updated.

We consider a resource to have changed generally if anything apart from
its Kubernetes metadata or type meta object meta has changed, and
therefore needs some kind of reloading.

---

Currently, if a reloading component errors and `spec.ignoreErrors=false`
then Daprd will gracefully close, like a component loaded at startup
today. It is intended that in future the component will be re-inited on
a backoff queue in future in the case of errors, even if
`spec.ignoreErros=true`.

HTTP middleware component hot reloading is not supported as it requires
further changes to the HTTP server handler machinery, and I didn't want
to grow this PR further.

To use the HotReloading feature, which is currently only available as an
alpha feature in Selfhosted mode, users need to add the following
Configuration to the target Daprd;

```yaml
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
  name: hotreloading
spec:
  features:
  - name: HotReload
    enabled: true
```

Signed-off-by: joshvanl <me@joshvanl.dev>

* Adds app ready checks for subscribed input bindings and pubsub

Signed-off-by: joshvanl <me@joshvanl.dev>

* Linting

Signed-off-by: joshvanl <me@joshvanl.dev>

* Linting

Signed-off-by: joshvanl <me@joshvanl.dev>

* Updates reconciler to wait for processor queue to be empty after
each event

Signed-off-by: joshvanl <me@joshvanl.dev>

* Revert processor pending resource queue to 0 channel

Signed-off-by: joshvanl <me@joshvanl.dev>

* Review comments

Signed-off-by: joshvanl <me@joshvanl.dev>

* Linitng

Signed-off-by: joshvanl <me@joshvanl.dev>

* Linting

Signed-off-by: joshvanl <me@joshvanl.dev>

* Adds review comments

Signed-off-by: joshvanl <me@joshvanl.dev>

* Hot Reloading: SelfHosted Componenthot reloading

Part of #1172

Adds hot reloading functionality to Daprd, which is gated behind the HotReload preview feature.

If enabled, Daprd in Selfhosted mode will watch for resource updates
from watching the resource directory(s). If a resource is detected as
changed (spec is different to what is currently loaded), then Dapr will
close/create/update the resource.

Supports Component _except_ HTTP middlware which requires further
changes to the HTTP server handler machinery. A warning is thrown if hot
reloading is enabled and a HTTP middleware is updated.

The hot reloader reconciler is made generic to enable other resource
types to be added in future.

---

When running in standalone mode, the disk loader watches the directories
passed by --resources-path for updates to yaml files containing
Components.

When an event occurs, the reconciler and differ are responsible for
determining whether any resources have been created, updated, or
deleted, by comparing the local store specs with that of the remote. If
any have changed, the resource is closed (if it exists), and then
re-initialized (if it exists). A resource will only be closed if it has
been deleted, only initialized if it has been created, and closed &
initialized if it has been updated.

We consider a resource to have changed generally if anything apart from
its Kubernetes metadata or type meta object meta has changed, and
therefore needs some kind of reloading.

---

Currently, if a reloading component errors and `spec.ignoreErrors=false`
then Daprd will gracefully close, like a component loaded at startup
today. It is intended that in future the component will be re-inited on
a backoff queue in future in the case of errors, even if
`spec.ignoreErros=true`.

HTTP middleware component hot reloading is not supported as it requires
further changes to the HTTP server handler machinery, and I didn't want
to grow this PR further.

To use the HotReloading feature, which is currently only available as an
alpha feature in Selfhosted mode, users need to add the following
Configuration to the target Daprd;

```yaml
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
  name: hotreloading
spec:
  features:
  - name: HotReload
    enabled: true
```

Signed-off-by: joshvanl <me@joshvanl.dev>

* Adds app ready checks for subscribed input bindings and pubsub

Signed-off-by: joshvanl <me@joshvanl.dev>

* Hot Reloading: Operator Componenthot reloading

Branched from #7239

Part of #1172

Adds hot reloading functionality to Daprd, which is gated behind the HotReload preview feature.

If enabled, Daprd in Kubernetes mode will watch for Component resource
updates from the Operator. If a resource is detected as changed (spec is
different to what is currently loaded), then Dapr will
close/create/update the resource.

Supports Component _except_ HTTP middlware which requires further
changes to the HTTP server handler machinery. A warning is thrown if hot
reloading is enabled and a HTTP middleware is updated.

---

When running in Kubernetes mode, the operator loader streams updates from
the Operator API. When an event occurs, the event type will trigger the
resource to be created, updated, or deleted. Updated components will be
closed, and then re-initialized.

We consider a resource to have changed generally if anything apart from
its Kubernetes metadata or type meta object meta has changed, and
therefore needs some kind of reloading.

---

Currently, if a reloading component errors and `spec.ignoreErrors=false`
then Daprd will gracefully close, like a component loaded at startup
today. It is intended that in future the component will be re-inited on
a backoff queue in future in the case of errors, even if
`spec.ignoreErros=true`.

HTTP middleware component hot reloading is not supported as it requires
further changes to the HTTP server handler machinery, and I didn't want
to grow this PR further.

To use the HotReloading feature, which is currently only available as an
alpha feature in Selfhosted mode, users need to add the following
Configuration to the target Daprd;

```yaml
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
  name: hotreloading
spec:
  features:
  - name: HotReload
    enabled: true
```

Signed-off-by: joshvanl <me@joshvanl.dev>

* Linting

Signed-off-by: joshvanl <me@joshvanl.dev>

* Adds hotreloading e2e config

Signed-off-by: joshvanl <me@joshvanl.dev>

* Use correct URL endpoints for hotreloading e2e save

Signed-off-by: joshvanl <me@joshvanl.dev>

* Revert integration test case timeout to 180 seconds

Signed-off-by: joshvanl <me@joshvanl.dev>

* Fix hotreloading_test.go

Signed-off-by: joshvanl <me@joshvanl.dev>

* Use correct external URL target

Signed-off-by: joshvanl <me@joshvanl.dev>

* Lint code

Signed-off-by: joshvanl <me@joshvanl.dev>

* Hot Reloading: SelfHosted Componenthot reloading

Part of #1172

Adds hot reloading functionality to Daprd, which is gated behind the HotReload preview feature.

If enabled, Daprd in Selfhosted mode will watch for resource updates
from watching the resource directory(s). If a resource is detected as
changed (spec is different to what is currently loaded), then Dapr will
close/create/update the resource.

Supports Component _except_ HTTP middlware which requires further
changes to the HTTP server handler machinery. A warning is thrown if hot
reloading is enabled and a HTTP middleware is updated.

The hot reloader reconciler is made generic to enable other resource
types to be added in future.

---

When running in standalone mode, the disk loader watches the directories
passed by --resources-path for updates to yaml files containing
Components.

When an event occurs, the reconciler and differ are responsible for
determining whether any resources have been created, updated, or
deleted, by comparing the local store specs with that of the remote. If
any have changed, the resource is closed (if it exists), and then
re-initialized (if it exists). A resource will only be closed if it has
been deleted, only initialized if it has been created, and closed &
initialized if it has been updated.

We consider a resource to have changed generally if anything apart from
its Kubernetes metadata or type meta object meta has changed, and
therefore needs some kind of reloading.

---

Currently, if a reloading component errors and `spec.ignoreErrors=false`
then Daprd will gracefully close, like a component loaded at startup
today. It is intended that in future the component will be re-inited on
a backoff queue in future in the case of errors, even if
`spec.ignoreErros=true`.

HTTP middleware component hot reloading is not supported as it requires
further changes to the HTTP server handler machinery, and I didn't want
to grow this PR further.

To use the HotReloading feature, which is currently only available as an
alpha feature in Selfhosted mode, users need to add the following
Configuration to the target Daprd;

```yaml
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
  name: hotreloading
spec:
  features:
  - name: HotReload
    enabled: true
```

Signed-off-by: joshvanl <me@joshvanl.dev>

* Adds app ready checks for subscribed input bindings and pubsub

Signed-off-by: joshvanl <me@joshvanl.dev>

* Linitng

Signed-off-by: joshvanl <me@joshvanl.dev>

* Linting

Signed-off-by: joshvanl <me@joshvanl.dev>

* Linting

Signed-off-by: joshvanl <me@joshvanl.dev>

* Linting

Signed-off-by: joshvanl <me@joshvanl.dev>

* Linting

Signed-off-by: joshvanl <me@joshvanl.dev>

* Review comments

Signed-off-by: joshvanl <me@joshvanl.dev>

* Review comments

Signed-off-by: joshvanl <me@joshvanl.dev>

* Set eventually in crypto test to 5 secs

Signed-off-by: joshvanl <me@joshvanl.dev>

---------

Signed-off-by: joshvanl <me@joshvanl.dev>
Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
  • Loading branch information
JoshVanL and dapr-bot committed Jan 8, 2024
1 parent 70e3f69 commit bfcff0a
Show file tree
Hide file tree
Showing 59 changed files with 5,523 additions and 248 deletions.
4 changes: 2 additions & 2 deletions pkg/apis/components/v1alpha1/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
// SchemeGroupVersion is group version used to register these objects.
var SchemeGroupVersion = schema.GroupVersion{Group: components.GroupName, Version: "v1alpha1"}

// GroupKind takes an unqualified kind and returns back a Group qualified GroupKind.
func GroupKind(kind string) schema.GroupKind {
// GroupKindFromKind takes an unqualified kind and returns back a Group qualified GroupKind.
func GroupKindFromKind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/httpEndpoint/v1alpha1/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
// SchemeGroupVersion is group version used to register these objects.
var SchemeGroupVersion = schema.GroupVersion{Group: httpendpoint.GroupName, Version: "v1alpha1"}

// GroupKind takes an unqualified kind and returns back a Group
// GroupKindFromKind takes an unqualified kind and returns back a Group
// qualified GroupKind.
func GroupKind(kind string) schema.GroupKind {
func GroupKindFromKind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import (
"strings"
"time"

env "github.com/dapr/dapr/pkg/config/env"

grpcRetry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/spf13/cast"
yaml "gopkg.in/yaml.v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/dapr/dapr/pkg/buildinfo"
env "github.com/dapr/dapr/pkg/config/env"
operatorv1pb "github.com/dapr/dapr/pkg/proto/operator/v1"
"github.com/dapr/dapr/utils"
"github.com/dapr/kit/ptr"
Expand Down
49 changes: 38 additions & 11 deletions pkg/operator/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,13 @@ type Options struct {
type Server interface {
Run(context.Context) error
Ready(context.Context) error
OnComponentUpdated(ctx context.Context, component *componentsapi.Component)
OnHTTPEndpointUpdated(ctx context.Context, endpoint *httpendpointsapi.HTTPEndpoint)
OnComponentUpdated(context.Context, operatorv1pb.ResourceEventType, *componentsapi.Component)
OnHTTPEndpointUpdated(context.Context, *httpendpointsapi.HTTPEndpoint)
}

type ComponentUpdateEvent struct {
Component *componentsapi.Component
EventType operatorv1pb.ResourceEventType
}

type apiServer struct {
Expand All @@ -73,7 +78,7 @@ type apiServer struct {
// notify all dapr runtime
connLock sync.Mutex
endpointLock sync.Mutex
allConnUpdateChan map[string]chan *componentsapi.Component
allConnUpdateChan map[string]chan *ComponentUpdateEvent
allEndpointsUpdateChan map[string]chan *httpendpointsapi.HTTPEndpoint
readyCh chan struct{}
running atomic.Bool
Expand All @@ -85,7 +90,7 @@ func NewAPIServer(opts Options) Server {
Client: opts.Client,
sec: opts.Security,
port: strconv.Itoa(opts.Port),
allConnUpdateChan: make(map[string]chan *componentsapi.Component),
allConnUpdateChan: make(map[string]chan *ComponentUpdateEvent),
allEndpointsUpdateChan: make(map[string]chan *httpendpointsapi.HTTPEndpoint),
readyCh: make(chan struct{}),
}
Expand Down Expand Up @@ -125,6 +130,13 @@ func (a *apiServer) Run(ctx context.Context) error {
// Block until context is done
<-ctx.Done()

a.connLock.Lock()
for key, ch := range a.allConnUpdateChan {
close(ch)
delete(a.allConnUpdateChan, key)
}
a.connLock.Unlock()

s.GracefulStop()
err = <-errCh
if err != nil {
Expand All @@ -137,11 +149,23 @@ func (a *apiServer) Run(ctx context.Context) error {
return nil
}

func (a *apiServer) OnComponentUpdated(_ context.Context, component *componentsapi.Component) {
func (a *apiServer) OnComponentUpdated(ctx context.Context, eventType operatorv1pb.ResourceEventType, component *componentsapi.Component) {
a.connLock.Lock()
var wg sync.WaitGroup
wg.Add(len(a.allConnUpdateChan))
for _, connUpdateChan := range a.allConnUpdateChan {
connUpdateChan <- component
go func(connUpdateChan chan *ComponentUpdateEvent) {
defer wg.Done()
select {
case connUpdateChan <- &ComponentUpdateEvent{
Component: component,
EventType: eventType,
}:
case <-ctx.Done():
}
}(connUpdateChan)
}
wg.Wait()
a.connLock.Unlock()
}

Expand Down Expand Up @@ -169,7 +193,7 @@ func (a *apiServer) GetConfiguration(ctx context.Context, in *operatorv1pb.GetCo
key := types.NamespacedName{Namespace: in.GetNamespace(), Name: in.GetName()}
var config configurationapi.Configuration
if err := a.Client.Get(ctx, key, &config); err != nil {
return nil, fmt.Errorf("error getting configuration: %w", err)
return nil, fmt.Errorf("error getting configuration %s/%s: %w", in.GetNamespace(), in.GetName(), err)
}
b, err := json.Marshal(&config)
if err != nil {
Expand Down Expand Up @@ -402,6 +426,8 @@ func (a *apiServer) ListResiliency(ctx context.Context, in *operatorv1pb.ListRes
}

// ComponentUpdate updates Dapr sidecars whenever a component in the cluster is modified.
// TODO: @joshvanl: Authorize pod name and namespace matches the SPIFFE ID of
// the caller.
func (a *apiServer) ComponentUpdate(in *operatorv1pb.ComponentUpdateRequest, srv operatorv1pb.Operator_ComponentUpdateServer) error { //nolint:nosnakecase
log.Info("sidecar connected for component updates")
keyObj, err := uuid.NewRandom()
Expand All @@ -411,7 +437,7 @@ func (a *apiServer) ComponentUpdate(in *operatorv1pb.ComponentUpdateRequest, srv
key := keyObj.String()

a.connLock.Lock()
a.allConnUpdateChan[key] = make(chan *componentsapi.Component, 1)
a.allConnUpdateChan[key] = make(chan *ComponentUpdateEvent)
updateChan := a.allConnUpdateChan[key]
a.connLock.Unlock()

Expand All @@ -421,7 +447,7 @@ func (a *apiServer) ComponentUpdate(in *operatorv1pb.ComponentUpdateRequest, srv
delete(a.allConnUpdateChan, key)
}()

updateComponentFunc := func(ctx context.Context, c *componentsapi.Component) {
updateComponentFunc := func(ctx context.Context, t operatorv1pb.ResourceEventType, c *componentsapi.Component) {
if c.Namespace != in.GetNamespace() {
return
}
Expand All @@ -440,13 +466,14 @@ func (a *apiServer) ComponentUpdate(in *operatorv1pb.ComponentUpdateRequest, srv

err = srv.Send(&operatorv1pb.ComponentUpdateEvent{
Component: b,
Type: t,
})
if err != nil {
log.Warnf("error updating sidecar with component %s (%s) from pod %s/%s: %s", c.GetName(), c.Spec.Type, in.GetNamespace(), in.GetPodName(), err)
return
}

log.Infof("updated sidecar with component %s (%s) from pod %s/%s", c.GetName(), c.Spec.Type, in.GetNamespace(), in.GetPodName())
log.Debugf("updated sidecar with component %s %s (%s) from pod %s/%s", t.String(), c.GetName(), c.Spec.Type, in.GetNamespace(), in.GetPodName())
}

var wg sync.WaitGroup
Expand All @@ -462,7 +489,7 @@ func (a *apiServer) ComponentUpdate(in *operatorv1pb.ComponentUpdateRequest, srv
wg.Add(1)
go func() {
defer wg.Done()
updateComponentFunc(srv.Context(), c)
updateComponentFunc(srv.Context(), c.EventType, c.Component)
}()
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/operator/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ func TestComponentUpdate(t *testing.T) {
api.connLock.Lock()
defer api.connLock.Unlock()
for key := range api.allConnUpdateChan {
api.allConnUpdateChan[key] <- &c
api.allConnUpdateChan[key] <- &ComponentUpdateEvent{
Component: &c,
}
close(api.allConnUpdateChan[key])
}
}()
Expand Down Expand Up @@ -261,7 +263,7 @@ func TestComponentUpdate(t *testing.T) {
api.connLock.Lock()
defer api.connLock.Unlock()
for key := range api.allConnUpdateChan {
api.allConnUpdateChan[key] <- &c
api.allConnUpdateChan[key] <- &ComponentUpdateEvent{Component: &c}
close(api.allConnUpdateChan[key])
}
}()
Expand Down
20 changes: 14 additions & 6 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/dapr/dapr/pkg/operator/api"
operatorcache "github.com/dapr/dapr/pkg/operator/cache"
"github.com/dapr/dapr/pkg/operator/handlers"
operatorv1pb "github.com/dapr/dapr/pkg/proto/operator/v1"
"github.com/dapr/dapr/pkg/security"
"github.com/dapr/kit/concurrency"
"github.com/dapr/kit/logger"
Expand Down Expand Up @@ -190,12 +191,18 @@ func NewOperator(ctx context.Context, opts Options) (Operator, error) {
}, nil
}

func (o *operator) syncComponent(ctx context.Context) func(obj interface{}) {
func (o *operator) syncComponent(ctx context.Context, eventType operatorv1pb.ResourceEventType) func(obj interface{}) {
return func(obj interface{}) {
c, ok := obj.(*componentsapi.Component)
if ok {
var c *componentsapi.Component
switch o := obj.(type) {
case *componentsapi.Component:
c = o
case cache.DeletedFinalStateUnknown:
c = o.Obj.(*componentsapi.Component)
}
if c != nil {
log.Debugf("Observed component to be synced: %s/%s", c.Namespace, c.Name)
o.apiServer.OnComponentUpdated(ctx, c)
o.apiServer.OnComponentUpdated(ctx, eventType, c)
}
}
}
Expand Down Expand Up @@ -319,10 +326,11 @@ func (o *operator) Run(ctx context.Context) error {
return fmt.Errorf("unable to get setup components informer: %w", rErr)
}
_, rErr = componentInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: o.syncComponent(ctx),
AddFunc: o.syncComponent(ctx, operatorv1pb.ResourceEventType_CREATED),
UpdateFunc: func(_, newObj interface{}) {
o.syncComponent(ctx)(newObj)
o.syncComponent(ctx, operatorv1pb.ResourceEventType_UPDATED)(newObj)
},
DeleteFunc: o.syncComponent(ctx, operatorv1pb.ResourceEventType_DELETED),
})
if rErr != nil {
return fmt.Errorf("unable to add components informer event handler: %w", rErr)
Expand Down
4 changes: 3 additions & 1 deletion pkg/runtime/compstore/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ limitations under the License.

package compstore

import "github.com/dapr/components-contrib/crypto"
import (
"github.com/dapr/components-contrib/crypto"
)

func (c *ComponentStore) AddCryptoProvider(name string, provider crypto.SubtleCrypto) {
c.lock.Lock()
Expand Down
31 changes: 31 additions & 0 deletions pkg/runtime/hotreload/hotreload.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (

componentsapi "github.com/dapr/dapr/pkg/apis/components/v1alpha1"
"github.com/dapr/dapr/pkg/config"
operatorv1 "github.com/dapr/dapr/pkg/proto/operator/v1"
"github.com/dapr/dapr/pkg/runtime/authorizer"
"github.com/dapr/dapr/pkg/runtime/compstore"
"github.com/dapr/dapr/pkg/runtime/hotreload/loader/disk"
"github.com/dapr/dapr/pkg/runtime/hotreload/loader/operator"
"github.com/dapr/dapr/pkg/runtime/hotreload/reconciler"
"github.com/dapr/dapr/pkg/runtime/processor"
"github.com/dapr/kit/concurrency"
Expand All @@ -38,6 +40,16 @@ type OptionsReloaderDisk struct {
Processor *processor.Processor
}

type OptionsReloaderOperator struct {
PodName string
Namespace string
Client operatorv1.OperatorClient
Config *config.Configuration
ComponentStore *compstore.ComponentStore
Authorizer *authorizer.Authorizer
Processor *processor.Processor
}

type Reloader struct {
isEnabled bool
componentsReconciler *reconciler.Reconciler[componentsapi.Component]
Expand All @@ -63,6 +75,25 @@ func NewDisk(ctx context.Context, opts OptionsReloaderDisk) (*Reloader, error) {
}, nil
}

func NewOperator(opts OptionsReloaderOperator) *Reloader {
loader := operator.New(operator.Options{
PodName: opts.PodName,
Namespace: opts.Namespace,
ComponentStore: opts.ComponentStore,
OperatorClient: opts.Client,
})

return &Reloader{
isEnabled: opts.Config.IsFeatureEnabled(config.HotReload),
componentsReconciler: reconciler.NewComponent(reconciler.Options[componentsapi.Component]{
Loader: loader,
CompStore: opts.ComponentStore,
Processor: opts.Processor,
Authorizer: opts.Authorizer,
}),
}
}

func (r *Reloader) Run(ctx context.Context) error {
if !r.isEnabled {
log.Debug("Hot reloading disabled")
Expand Down
84 changes: 84 additions & 0 deletions pkg/runtime/hotreload/loader/operator/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package operator

import (
"context"
"encoding/json"
"fmt"

componentsapi "github.com/dapr/dapr/pkg/apis/components/v1alpha1"
operatorpb "github.com/dapr/dapr/pkg/proto/operator/v1"
"github.com/dapr/dapr/pkg/runtime/hotreload/loader"
)

type component struct {
operatorpb.Operator_ComponentUpdateClient
}

// The go linter does not yet understand that these functions are being used by
// the generic operator.
//
//nolint:unused
func (c *component) list(ctx context.Context, opclient operatorpb.OperatorClient, ns, podName string) ([][]byte, error) {
resp, err := opclient.ListComponents(ctx, &operatorpb.ListComponentsRequest{
Namespace: ns,
PodName: podName,
})
if err != nil {
return nil, err
}

return resp.GetComponents(), nil
}

//nolint:unused
func (c *component) close() error {
if c.Operator_ComponentUpdateClient != nil {
return c.Operator_ComponentUpdateClient.CloseSend()
}
return nil
}

//nolint:unused
func (c *component) recv() (*loader.Event[componentsapi.Component], error) {
event, err := c.Operator_ComponentUpdateClient.Recv()
if err != nil {
return nil, err
}

var component componentsapi.Component
if err := json.Unmarshal(event.GetComponent(), &component); err != nil {
return nil, fmt.Errorf("failed to deserializing component: %w", err)
}

return &loader.Event[componentsapi.Component]{
Resource: component,
Type: event.GetType(),
}, nil
}

//nolint:unused
func (c *component) establish(ctx context.Context, opclient operatorpb.OperatorClient, ns, podName string) error {
stream, err := opclient.ComponentUpdate(ctx, &operatorpb.ComponentUpdateRequest{
Namespace: ns,
PodName: podName,
})
if err != nil {
return err
}

c.Operator_ComponentUpdateClient = stream
return nil
}

0 comments on commit bfcff0a

Please sign in to comment.