Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL committed Dec 27, 2023
1 parent 19eb5ea commit f3e5d60
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 46 deletions.
6 changes: 3 additions & 3 deletions pkg/runtime/hotreload/hotreload.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type OptionsReloaderDisk struct {
Processor *processor.Processor
}

type OptionsOperator struct {
type OptionsReloaderOperator struct {
PodName string
Namespace string
Client operatorv1.OperatorClient
Expand Down Expand Up @@ -75,7 +75,7 @@ func NewDisk(ctx context.Context, opts OptionsReloaderDisk) (*Reloader, error) {
}, nil
}

func NewOperator(opts OptionsOperator) *Reloader {
func NewOperator(opts OptionsReloaderOperator) *Reloader {
loader := operator.New(operator.Options{
PodName: opts.PodName,
Namespace: opts.Namespace,
Expand All @@ -85,7 +85,7 @@ func NewOperator(opts OptionsOperator) *Reloader {

return &Reloader{
isEnabled: opts.Config.IsFeatureEnabled(config.HotReload),
components: reconciler.NewComponent(reconciler.Options[componentsapi.Component]{
componentsReconciler: reconciler.NewComponent(reconciler.Options[componentsapi.Component]{
Loader: loader,
CompStore: opts.ComponentStore,
Processor: opts.Processor,
Expand Down
4 changes: 2 additions & 2 deletions pkg/runtime/hotreload/loader/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ type Options struct {
}

type operator struct {
component *generic[componentsapi.Component]
component *resource[componentsapi.Component]
}

func New(opts Options) loader.Interface {
return &operator{
component: newGeneric[componentsapi.Component](opts, loadercompstore.NewComponent(opts.ComponentStore), new(component)),
component: newResource[componentsapi.Component](opts, loadercompstore.NewComponent(opts.ComponentStore), new(component)),

Check warning on line 42 in pkg/runtime/hotreload/loader/operator/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/loader/operator/operator.go#L41-L42

Added lines #L41 - L42 were not covered by tests
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (
"github.com/dapr/dapr/pkg/runtime/hotreload/loader/store"
)

type generic[T differ.Resource] struct {
// resource is a generic implementation of an operator resource loader.
// resource will watch and load resources from the operator service.
type resource[T differ.Resource] struct {
opClient operatorpb.OperatorClient
podName string
namespace string
Expand All @@ -42,15 +44,18 @@ type generic[T differ.Resource] struct {
closed atomic.Bool
}

// streamer is a generic interface for streaming resources from the operator.
// We need a generic interface because the gRPC methods used for streaming
// differ between resources.
type streamer[T differ.Resource] interface {
list(ctx context.Context, opclient operatorpb.OperatorClient, ns, podName string) ([][]byte, error)
close() error
recv() (*loader.Event[T], error)
establish(context.Context, operatorpb.OperatorClient, string, string) error
}

func newGeneric[T differ.Resource](opts Options, store store.Store[T], streamer streamer[T]) *generic[T] {
return &generic[T]{
func newResource[T differ.Resource](opts Options, store store.Store[T], streamer streamer[T]) *resource[T] {
return &resource[T]{
opClient: opts.OperatorClient,
podName: opts.PodName,
namespace: opts.Namespace,
Expand All @@ -60,9 +65,9 @@ func newGeneric[T differ.Resource](opts Options, store store.Store[T], streamer
}
}

func (g *generic[T]) List(ctx context.Context) (*differ.LocalRemoteResources[T], error) {
func (r *resource[T]) List(ctx context.Context) (*differ.LocalRemoteResources[T], error) {
resp, err := backoff.RetryWithData(func() ([][]byte, error) {
return g.streamer.list(ctx, g.opClient, g.namespace, g.podName)
return r.streamer.list(ctx, r.opClient, r.namespace, r.podName)
}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
if err != nil {
return nil, err

Check warning on line 73 in pkg/runtime/hotreload/loader/operator/resource.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/loader/operator/resource.go#L69-L73

Added lines #L69 - L73 were not covered by tests
Expand All @@ -78,47 +83,47 @@ func (g *generic[T]) List(ctx context.Context) (*differ.LocalRemoteResources[T],
}

return &differ.LocalRemoteResources[T]{
Local: g.store.List(),
Local: r.store.List(),
Remote: remotes,
}, nil

Check warning on line 88 in pkg/runtime/hotreload/loader/operator/resource.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/loader/operator/resource.go#L85-L88

Added lines #L85 - L88 were not covered by tests
}

func (g *generic[T]) Stream(ctx context.Context) (<-chan *loader.Event[T], error) {
if g.closed.Load() {
func (r *resource[T]) Stream(ctx context.Context) (<-chan *loader.Event[T], error) {
if r.closed.Load() {
return nil, errors.New("stream is closed")
}

if err := g.streamer.establish(ctx, g.opClient, g.namespace, g.podName); err != nil {
if err := r.streamer.establish(ctx, r.opClient, r.namespace, r.podName); err != nil {
return nil, err
}

log.Debugf("stream established with operator")

eventCh := make(chan *loader.Event[T])
ctx, cancel := context.WithCancel(ctx)
g.wg.Add(2)
r.wg.Add(2)
go func() {
defer g.wg.Done()
defer r.wg.Done()
select {
case <-g.closeCh:
case <-r.closeCh:
case <-ctx.Done():

Check warning on line 109 in pkg/runtime/hotreload/loader/operator/resource.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/hotreload/loader/operator/resource.go#L109

Added line #L109 was not covered by tests
}
cancel()
}()
go func() {
defer g.wg.Done()
g.stream(ctx, eventCh)
defer r.wg.Done()
r.stream(ctx, eventCh)
}()

return eventCh, nil
}

func (g *generic[T]) stream(ctx context.Context, eventCh chan<- *loader.Event[T]) {
func (r *resource[T]) stream(ctx context.Context, eventCh chan<- *loader.Event[T]) {
for {
for {
event, err := g.streamer.recv()
event, err := r.streamer.recv()
if err != nil {
g.streamer.close()
r.streamer.close()
// Retry on stream error.
log.Errorf("error from operator stream: %s", err)
break
Expand All @@ -139,7 +144,7 @@ func (g *generic[T]) stream(ctx context.Context, eventCh chan<- *loader.Event[T]
}

if err := backoff.Retry(func() error {
berr := g.streamer.establish(ctx, g.opClient, g.namespace, g.podName)
berr := r.streamer.establish(ctx, r.opClient, r.namespace, r.podName)
if berr != nil {
log.Errorf("Failed to establish stream: %s", berr)
}
Expand All @@ -152,11 +157,11 @@ func (g *generic[T]) stream(ctx context.Context, eventCh chan<- *loader.Event[T]
}
}

func (g *generic[T]) close() error {
defer g.wg.Wait()
if g.closed.CompareAndSwap(false, true) {
close(g.closeCh)
func (r *resource[T]) close() error {
defer r.wg.Wait()
if r.closed.CompareAndSwap(false, true) {
close(r.closeCh)
}

return g.streamer.close()
return r.streamer.close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,21 @@ import (
func Test_generic(t *testing.T) {
t.Run("Stream should return error on stream when already closed", func(t *testing.T) {
streamer := newFakeStreamer()
g := newGeneric[componentsapi.Component](
r := newResource[componentsapi.Component](
Options{},
loadercompstore.NewComponent(compstore.New()),
streamer,
)

require.NoError(t, g.close())
ch, err := g.Stream(context.Background())
require.NoError(t, r.close())
ch, err := r.Stream(context.Background())
assert.Nil(t, ch)
require.ErrorContains(t, err, "stream is closed")
})

t.Run("Stream should return error on stream and context cancelled", func(t *testing.T) {
streamer := newFakeStreamer()
g := newGeneric[componentsapi.Component](
r := newResource[componentsapi.Component](
Options{},
loadercompstore.NewComponent(compstore.New()),
streamer,
Expand All @@ -60,14 +60,14 @@ func Test_generic(t *testing.T) {
return errors.New("test error")
}

ch, err := g.Stream(ctx)
ch, err := r.Stream(ctx)
assert.Nil(t, ch)
require.ErrorContains(t, err, "test error")
})

t.Run("Should send event to Stream channel on Recv", func(t *testing.T) {
streamer := newFakeStreamer()
g := newGeneric[componentsapi.Component](
r := newResource[componentsapi.Component](
Options{},
loadercompstore.NewComponent(compstore.New()),
streamer,
Expand All @@ -78,7 +78,7 @@ func Test_generic(t *testing.T) {
return <-recCh, nil
}

ch, err := g.Stream(context.Background())
ch, err := r.Stream(context.Background())
assert.NotNil(t, ch)
require.NoError(t, err)

Expand All @@ -100,12 +100,12 @@ func Test_generic(t *testing.T) {

close(recCh)

require.NoError(t, g.close())
require.NoError(t, r.close())
})

t.Run("Should attempt to re-establish after the stream fails", func(t *testing.T) {
streamer := newFakeStreamer()
g := newGeneric[componentsapi.Component](
r := newResource[componentsapi.Component](
Options{},
loadercompstore.NewComponent(compstore.New()),
streamer,
Expand All @@ -131,7 +131,7 @@ func Test_generic(t *testing.T) {
return nil, errors.New("recv error")
}

_, err := g.Stream(context.Background())
_, err := r.Stream(context.Background())
require.NoError(t, err)

select {
Expand All @@ -140,13 +140,13 @@ func Test_generic(t *testing.T) {
t.Error("expected generic to retry establishing stream after failure")
}

require.NoError(t, g.close())
require.NoError(t, r.close())
assert.GreaterOrEqual(t, calls, 3)
})

t.Run("close waits for streamer to close", func(t *testing.T) {
streamer := newFakeStreamer()
g := newGeneric[componentsapi.Component](
r := newResource[componentsapi.Component](
Options{},
loadercompstore.NewComponent(compstore.New()),
streamer,
Expand All @@ -159,7 +159,7 @@ func Test_generic(t *testing.T) {
}

go func() {
closeCh <- g.close()
closeCh <- r.close()
}()

select {
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func newDaprRuntime(ctx context.Context,
var reloader *hotreload.Reloader
switch runtimeConfig.mode {
case modes.KubernetesMode:
reloader = hotreload.NewOperator(hotreload.OptionsOperator{
reloader = hotreload.NewOperator(hotreload.OptionsReloaderOperator{
PodName: podName,
Namespace: namespace,
Client: operatorClient,
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/suite/daprd/hotreload/operator/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (s *state) Run(t *testing.T, ctx context.Context) {
},
{
Name: "abc", Type: "state.sqlite", Version: "v1",
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "ACTOR"},
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "ACTOR"},
},
{
Name: "xyz", Type: "state.in-memory", Version: "v1",
Expand Down Expand Up @@ -270,7 +270,7 @@ func (s *state) Run(t *testing.T, ctx context.Context) {
assert.ElementsMatch(c, []*rtv1.RegisteredComponents{
{
Name: "123", Type: "state.sqlite", Version: "v1",
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "ACTOR"},
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "ACTOR"},
},
{
Name: "abc", Type: "state.in-memory", Version: "v1",
Expand Down Expand Up @@ -328,7 +328,7 @@ func (s *state) Run(t *testing.T, ctx context.Context) {
assert.ElementsMatch(c, []*rtv1.RegisteredComponents{
{
Name: "123", Type: "state.sqlite", Version: "v1",
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "ACTOR"},
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "ACTOR"},
},
{
Name: "bar", Type: "state.in-memory", Version: "v1",
Expand Down Expand Up @@ -381,7 +381,7 @@ func (s *state) Run(t *testing.T, ctx context.Context) {
{Name: "bar", Type: "secretstores.local.file", Version: "v1"},
{
Name: "123", Type: "state.sqlite", Version: "v1",
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "ACTOR"},
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "ACTOR"},
},
{
Name: "xyz", Type: "state.in-memory", Version: "v1",
Expand Down

0 comments on commit f3e5d60

Please sign in to comment.