Skip to content

Commit

Permalink
Make Workflow dapr Component internal (#7427)
Browse files Browse the repository at this point in the history
* Make Workflow `dapr` Component internal

The internal workflow engine `dapr` component is currently being
registered in the processor component store, making it visible to the
user via the metadata registered component list and preventing the user
creating a Component of the same name.

PR updates the workflow engine machinery to `Init` the component
manually, bypassing the processor and preventing it being committed to
the component store.

Adds integration test to ensure a `dapr` named component can be created
by the user.

Updates tests that previously expected the workflow engine component to
be registered in the component store.

Signed-off-by: joshvanl <me@joshvanl.dev>

* Adds int test to ensure workflow.dapr Component cannot be loaded

Signed-off-by: joshvanl <me@joshvanl.dev>

---------

Signed-off-by: joshvanl <me@joshvanl.dev>
Co-authored-by: Artur Souza <asouza.pro@gmail.com>
  • Loading branch information
JoshVanL and artursouza committed Jan 25, 2024
1 parent cc9d51e commit 007ccf5
Show file tree
Hide file tree
Showing 33 changed files with 305 additions and 206 deletions.
6 changes: 4 additions & 2 deletions pkg/runtime/hotreload/differ/differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/dapr/dapr/pkg/runtime/wfengine"
)

var wfengineComp = wfengine.ComponentDefinition()

// Resource is a generic type constraint.
type Resource interface {
componentsapi.Component
Expand Down Expand Up @@ -63,8 +65,8 @@ func Diff[T Resource](resources *LocalRemoteResources[T]) *Result[T] {
return true
}

if comp.Name == wfengine.ComponentDefinition.Name &&
comp.Spec.Type == wfengine.ComponentDefinition.Spec.Type {
if comp.Name == wfengineComp.Name &&
comp.Spec.Type == wfengineComp.Spec.Type {
return true
}
}
Expand Down
12 changes: 3 additions & 9 deletions pkg/runtime/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ import (
"github.com/dapr/dapr/pkg/runtime/processor/pubsub"
"github.com/dapr/dapr/pkg/runtime/processor/secret"
"github.com/dapr/dapr/pkg/runtime/processor/state"
wfbeProcessor "github.com/dapr/dapr/pkg/runtime/processor/wfbackend"
"github.com/dapr/dapr/pkg/runtime/processor/workflow"
"github.com/dapr/dapr/pkg/runtime/processor/wfbackend"
"github.com/dapr/dapr/pkg/runtime/registry"
"github.com/dapr/kit/concurrency"
"github.com/dapr/kit/logger"
Expand Down Expand Up @@ -170,7 +169,7 @@ func New(opts Options) *Processor {
Channels: opts.Channels,
})

wfbe := wfbeProcessor.New(wfbeProcessor.Options{
wfbe := wfbackend.New(wfbackend.Options{
AppID: opts.ID,
Registry: opts.Registry.WorkflowBackends(),
ComponentStore: opts.ComponentStore,
Expand Down Expand Up @@ -209,12 +208,7 @@ func New(opts Options) *Processor {
components.CategorySecretStore: secret,
components.CategoryStateStore: state,
components.CategoryWorkflowBackend: wfbe,
components.CategoryWorkflow: workflow.New(workflow.Options{
Registry: opts.Registry.Workflows(),
ComponentStore: opts.ComponentStore,
Meta: opts.Meta,
}),
components.CategoryMiddleware: middleware.New(),
components.CategoryMiddleware: middleware.New(),
},
}
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/runtime/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/dapr/components-contrib/secretstores"
commonapi "github.com/dapr/dapr/pkg/apis/common"
componentsapi "github.com/dapr/dapr/pkg/apis/components/v1alpha1"
"github.com/dapr/dapr/pkg/components"
"github.com/dapr/dapr/pkg/config"
"github.com/dapr/dapr/pkg/modes"
"github.com/dapr/dapr/pkg/resiliency"
Expand Down Expand Up @@ -453,3 +454,9 @@ func TestMetadataClientID(t *testing.T) {
}
})
}

func TestProcessNoWorkflow(t *testing.T) {
proc, _ := newTestProc()
_, ok := proc.managers[components.CategoryWorkflow]
require.False(t, ok, "workflow cannot be registered as user facing component")
}
19 changes: 15 additions & 4 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import (
"github.com/dapr/dapr/pkg/runtime/hotreload"
"github.com/dapr/dapr/pkg/runtime/meta"
"github.com/dapr/dapr/pkg/runtime/processor"
"github.com/dapr/dapr/pkg/runtime/processor/workflow"
"github.com/dapr/dapr/pkg/runtime/registry"
"github.com/dapr/dapr/pkg/runtime/wfengine"
"github.com/dapr/dapr/pkg/security"
Expand Down Expand Up @@ -644,12 +645,22 @@ func (a *DaprRuntime) initWorkflowEngine(ctx context.Context) error {
return nil
}

log.Infof("Registering component for dapr workflow engine...")
log.Info("Registering component for dapr workflow engine...")
reg.RegisterComponent(wfComponentFactory, "dapr")
if !a.processor.AddPendingComponent(ctx, wfengine.ComponentDefinition) {
log.Warn("failed to initialize Dapr workflow component")
wfe := workflow.New(workflow.Options{
Registry: a.runtimeConfig.registry.Workflows(),
ComponentStore: a.compStore,
Meta: a.meta,
})
if err := wfe.Init(ctx, wfengine.ComponentDefinition()); err != nil {
return fmt.Errorf("failed to initialize Dapr workflow component: %w", err)
}
return nil

log.Info("Workflow engine initialized.")

return a.runnerCloser.AddCloser(func() error {
return wfe.Close(wfengine.ComponentDefinition())
})
}

// initPluggableComponents discover pluggable components and initialize with their respective registries.
Expand Down
23 changes: 8 additions & 15 deletions pkg/runtime/wfengine/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/dapr/components-contrib/workflows"
commonapi "github.com/dapr/dapr/pkg/apis/common"
componentsV1alpha1 "github.com/dapr/dapr/pkg/apis/components/v1alpha1" // This will be removed
"github.com/dapr/kit/logger"
)

var ComponentDefinition = componentsV1alpha1.Component{
TypeMeta: metav1.TypeMeta{
Kind: "Component",
},
ObjectMeta: metav1.ObjectMeta{
Name: "dapr",
},
Spec: componentsV1alpha1.ComponentSpec{
Type: "workflow.dapr",
Version: "v1",
Metadata: []commonapi.NameValuePair{},
},
}

// Status values are defined at: https://github.com/microsoft/durabletask-go/blob/119b361079c45e368f83b223888d56a436ac59b9/internal/protos/orchestrator_service.pb.go#L42-L64
var statusMap = map[int32]string{
0: "RUNNING",
Expand Down Expand Up @@ -261,3 +246,11 @@ func getStatusString(status int32) string {
}
return "UNKNOWN"
}

func ComponentDefinition() componentsV1alpha1.Component {
return componentsV1alpha1.Component{
TypeMeta: metav1.TypeMeta{Kind: "Component"},
ObjectMeta: metav1.ObjectMeta{Name: "dapr"},
Spec: componentsV1alpha1.ComponentSpec{Type: "workflow.dapr", Version: "v1"},
}
}
17 changes: 14 additions & 3 deletions tests/integration/framework/process/logline/logline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"errors"
"io"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -83,12 +85,10 @@ func New(t *testing.T, fopts ...Option) *LogLine {
func (l *LogLine) Run(t *testing.T, ctx context.Context) {
go func() {
res := l.checkOut(t, ctx, l.stdoutLineContains, l.stdoutExp, l.stdout)
l.done.Add(1)
l.outCheck <- res
}()
go func() {
res := l.checkOut(t, ctx, l.stderrLinContains, l.stderrExp, l.stderr)
l.done.Add(1)
l.outCheck <- res
}()
}
Expand All @@ -109,6 +109,11 @@ func (l *LogLine) Cleanup(t *testing.T) {
func (l *LogLine) checkOut(t *testing.T, ctx context.Context, expLines map[string]bool, closer io.WriteCloser, reader io.Reader) map[string]bool {
t.Helper()

if len(expLines) == 0 {
l.done.Add(1)
return expLines
}

go func() {
select {
case <-ctx.Done():
Expand All @@ -117,10 +122,12 @@ func (l *LogLine) checkOut(t *testing.T, ctx context.Context, expLines map[strin
}
}()

var once sync.Once

breader := bufio.NewReader(reader)
for {
if len(expLines) == 0 {
break
once.Do(func() { l.done.Add(1) })
}

line, _, err := breader.ReadLine()
Expand All @@ -146,3 +153,7 @@ func (l *LogLine) Stdout() io.WriteCloser {
func (l *LogLine) Stderr() io.WriteCloser {
return l.stderrExp
}

func (l *LogLine) EventuallyFoundAll(t *testing.T) {
assert.Eventually(t, l.FoundAll, time.Second*5, time.Millisecond*100)
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (g *grpc) Run(t *testing.T, ctx context.Context) {
resp, err := client.GetMetadata(ctx, new(rtv1.GetMetadataRequest))
//nolint:testifylint
assert.NoError(c, err)
assert.Len(c, resp.GetRegisteredComponents(), 2)
assert.Len(c, resp.GetRegisteredComponents(), 1)
}, time.Second*20, time.Millisecond*100)
g.expectBinding(t, 0, "binding1")
})
Expand Down Expand Up @@ -183,7 +183,7 @@ func (g *grpc) Run(t *testing.T, ctx context.Context) {
require.EventuallyWithT(t, func(c *assert.CollectT) {
resp, err := client.GetMetadata(ctx, new(rtv1.GetMetadataRequest))
require.NoError(t, err)
assert.Len(c, resp.GetRegisteredComponents(), 3)
assert.Len(c, resp.GetRegisteredComponents(), 2)
}, time.Second*5, time.Millisecond*100)
g.expectBindings(t, []bindingPair{
{0, "binding1"},
Expand Down Expand Up @@ -217,7 +217,7 @@ func (g *grpc) Run(t *testing.T, ctx context.Context) {
require.EventuallyWithT(t, func(c *assert.CollectT) {
resp, err := client.GetMetadata(ctx, new(rtv1.GetMetadataRequest))
require.NoError(t, err)
assert.Len(c, resp.GetRegisteredComponents(), 4)
assert.Len(c, resp.GetRegisteredComponents(), 3)
}, time.Second*5, time.Millisecond*100)
g.expectBindings(t, []bindingPair{
{0, "binding1"},
Expand All @@ -233,7 +233,7 @@ func (g *grpc) Run(t *testing.T, ctx context.Context) {
require.EventuallyWithT(t, func(c *assert.CollectT) {
resp, err := client.GetMetadata(ctx, new(rtv1.GetMetadataRequest))
require.NoError(t, err)
assert.Len(c, resp.GetRegisteredComponents(), 3)
assert.Len(c, resp.GetRegisteredComponents(), 2)
}, time.Second*5, time.Millisecond*100)
g.registered[0].Store(false)
g.expectBindings(t, []bindingPair{
Expand All @@ -251,7 +251,7 @@ func (g *grpc) Run(t *testing.T, ctx context.Context) {
require.EventuallyWithT(t, func(c *assert.CollectT) {
resp, err := client.GetMetadata(ctx, new(rtv1.GetMetadataRequest))
require.NoError(c, err)
assert.Len(c, resp.GetRegisteredComponents(), 1)
assert.Empty(c, resp.GetRegisteredComponents())
}, time.Second*5, time.Millisecond*100)
g.registered[1].Store(false)
g.registered[2].Store(false)
Expand Down Expand Up @@ -285,7 +285,7 @@ func (g *grpc) Run(t *testing.T, ctx context.Context) {
require.EventuallyWithT(t, func(c *assert.CollectT) {
resp, err := client.GetMetadata(ctx, new(rtv1.GetMetadataRequest))
require.NoError(t, err)
assert.Len(c, resp.GetRegisteredComponents(), 2)
assert.Len(c, resp.GetRegisteredComponents(), 1)
}, time.Second*5, time.Millisecond*100)
g.expectBinding(t, 0, "binding1")
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (h *http) Run(t *testing.T, ctx context.Context) {
client := util.HTTPClient(t)

t.Run("expect 1 component to be loaded", func(t *testing.T) {
assert.Len(t, util.GetMetaComponents(t, ctx, client, h.daprd.HTTPPort()), 2)
assert.Len(t, util.GetMetaComponents(t, ctx, client, h.daprd.HTTPPort()), 1)
h.expectBinding(t, 0, "binding1")
})

Expand Down Expand Up @@ -173,7 +173,7 @@ func (h *http) Run(t *testing.T, ctx context.Context) {
h.operator.ComponentUpdateEvent(t, ctx, &api.ComponentUpdateEvent{Component: &comp, EventType: operatorv1.ResourceEventType_CREATED})

require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Len(c, util.GetMetaComponents(c, ctx, client, h.daprd.HTTPPort()), 3)
assert.Len(c, util.GetMetaComponents(c, ctx, client, h.daprd.HTTPPort()), 2)
}, time.Second*5, time.Millisecond*100)
h.expectBindings(t, []bindingPair{
{0, "binding1"},
Expand Down Expand Up @@ -205,7 +205,7 @@ func (h *http) Run(t *testing.T, ctx context.Context) {
h.operator.ComponentUpdateEvent(t, ctx, &api.ComponentUpdateEvent{Component: &comp, EventType: operatorv1.ResourceEventType_CREATED})

require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Len(c, util.GetMetaComponents(c, ctx, client, h.daprd.HTTPPort()), 4)
assert.Len(c, util.GetMetaComponents(c, ctx, client, h.daprd.HTTPPort()), 3)
}, time.Second*5, time.Millisecond*100)
h.expectBindings(t, []bindingPair{
{0, "binding1"},
Expand All @@ -220,7 +220,7 @@ func (h *http) Run(t *testing.T, ctx context.Context) {
h.operator.ComponentUpdateEvent(t, ctx, &api.ComponentUpdateEvent{Component: &comp, EventType: operatorv1.ResourceEventType_DELETED})

require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Len(c, util.GetMetaComponents(c, ctx, client, h.daprd.HTTPPort()), 3)
assert.Len(c, util.GetMetaComponents(c, ctx, client, h.daprd.HTTPPort()), 2)
}, time.Second*5, time.Millisecond*100)
h.registered[0].Store(false)
h.expectBindings(t, []bindingPair{
Expand All @@ -236,7 +236,7 @@ func (h *http) Run(t *testing.T, ctx context.Context) {
h.operator.ComponentUpdateEvent(t, ctx, &api.ComponentUpdateEvent{Component: &comp1, EventType: operatorv1.ResourceEventType_DELETED})
h.operator.ComponentUpdateEvent(t, ctx, &api.ComponentUpdateEvent{Component: &comp2, EventType: operatorv1.ResourceEventType_DELETED})
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Len(c, util.GetMetaComponents(c, ctx, client, h.daprd.HTTPPort()), 1)
assert.Empty(c, util.GetMetaComponents(c, ctx, client, h.daprd.HTTPPort()))
}, time.Second*5, time.Millisecond*100)
h.registered[1].Store(false)
h.registered[2].Store(false)
Expand Down Expand Up @@ -268,7 +268,7 @@ func (h *http) Run(t *testing.T, ctx context.Context) {
h.operator.ComponentUpdateEvent(t, ctx, &api.ComponentUpdateEvent{Component: &comp, EventType: operatorv1.ResourceEventType_CREATED})

require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Len(c, util.GetMetaComponents(c, ctx, client, h.daprd.HTTPPort()), 2)
assert.Len(c, util.GetMetaComponents(c, ctx, client, h.daprd.HTTPPort()), 1)
}, time.Second*5, time.Millisecond*100)
h.expectBinding(t, 0, "binding1")
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (o *output) Run(t *testing.T, ctx context.Context) {

client := util.HTTPClient(t)
t.Run("expect no components to be loaded yet", func(t *testing.T) {
assert.Len(t, util.GetMetaComponents(t, ctx, client, o.daprd.HTTPPort()), 1)
assert.Empty(t, util.GetMetaComponents(t, ctx, client, o.daprd.HTTPPort()))
})

t.Run("adding a component should become available", func(t *testing.T) {
Expand All @@ -126,7 +126,7 @@ func (o *output) Run(t *testing.T, ctx context.Context) {
o.operator.SetComponents(comp)
o.operator.ComponentUpdateEvent(t, ctx, &api.ComponentUpdateEvent{Component: &comp, EventType: operatorv1.ResourceEventType_CREATED})
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Len(c, util.GetMetaComponents(c, ctx, client, o.daprd.HTTPPort()), 2)
assert.Len(c, util.GetMetaComponents(c, ctx, client, o.daprd.HTTPPort()), 1)
}, time.Second*10, time.Millisecond*100)
o.postBinding(t, ctx, client, "binding1", "file1", "data1")
o.postBindingFail(t, ctx, client, "binding2")
Expand All @@ -153,7 +153,7 @@ func (o *output) Run(t *testing.T, ctx context.Context) {
o.operator.ComponentUpdateEvent(t, ctx, &api.ComponentUpdateEvent{Component: &comp, EventType: operatorv1.ResourceEventType_CREATED})

require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Len(c, util.GetMetaComponents(c, ctx, client, o.daprd.HTTPPort()), 3)
assert.Len(c, util.GetMetaComponents(c, ctx, client, o.daprd.HTTPPort()), 2)
}, time.Second*10, time.Millisecond*100)
o.postBinding(t, ctx, client, "binding1", "file2", "data2")
o.postBinding(t, ctx, client, "binding2", "file1", "data1")
Expand All @@ -180,7 +180,7 @@ func (o *output) Run(t *testing.T, ctx context.Context) {
o.operator.ComponentUpdateEvent(t, ctx, &api.ComponentUpdateEvent{Component: &comp, EventType: operatorv1.ResourceEventType_CREATED})

require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Len(c, util.GetMetaComponents(c, ctx, client, o.daprd.HTTPPort()), 4)
assert.Len(c, util.GetMetaComponents(c, ctx, client, o.daprd.HTTPPort()), 3)
}, time.Second*10, time.Millisecond*100)
o.postBinding(t, ctx, client, "binding1", "file3", "data3")
o.postBinding(t, ctx, client, "binding2", "file2", "data2")
Expand All @@ -196,7 +196,7 @@ func (o *output) Run(t *testing.T, ctx context.Context) {
o.operator.ComponentUpdateEvent(t, ctx, &api.ComponentUpdateEvent{Component: &comp, EventType: operatorv1.ResourceEventType_DELETED})

require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Len(c, util.GetMetaComponents(c, ctx, client, o.daprd.HTTPPort()), 3)
assert.Len(c, util.GetMetaComponents(c, ctx, client, o.daprd.HTTPPort()), 2)
}, time.Second*10, time.Millisecond*100)

o.postBindingFail(t, ctx, client, "binding1")
Expand All @@ -215,7 +215,7 @@ func (o *output) Run(t *testing.T, ctx context.Context) {
o.operator.ComponentUpdateEvent(t, ctx, &api.ComponentUpdateEvent{Component: &comp2, EventType: operatorv1.ResourceEventType_DELETED})

require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Len(c, util.GetMetaComponents(c, ctx, client, o.daprd.HTTPPort()), 1)
assert.Empty(c, util.GetMetaComponents(c, ctx, client, o.daprd.HTTPPort()))
}, time.Second*10, time.Millisecond*100)
o.postBindingFail(t, ctx, client, "binding1")
o.postBindingFail(t, ctx, client, "binding2")
Expand All @@ -239,7 +239,7 @@ func (o *output) Run(t *testing.T, ctx context.Context) {
o.operator.AddComponents(comp)
o.operator.ComponentUpdateEvent(t, ctx, &api.ComponentUpdateEvent{Component: &comp, EventType: operatorv1.ResourceEventType_CREATED})
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Len(c, util.GetMetaComponents(c, ctx, client, o.daprd.HTTPPort()), 2)
assert.Len(c, util.GetMetaComponents(c, ctx, client, o.daprd.HTTPPort()), 1)
}, time.Second*10, time.Millisecond*100)
o.postBinding(t, ctx, client, "binding2", "file5", "data5")
o.postBindingFail(t, ctx, client, "binding1")
Expand Down

0 comments on commit 007ccf5

Please sign in to comment.