Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce Component names to be unique #7195

Merged
merged 7 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 6 additions & 4 deletions pkg/grpc/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4091,7 +4091,7 @@ func TestUnlock(t *testing.T) {

func TestMetadata(t *testing.T) {
compStore := compstore.New()
compStore.AddComponent(componentsV1alpha1.Component{
require.NoError(t, compStore.AddPendingComponentForCommit(componentsV1alpha1.Component{
ObjectMeta: metaV1.ObjectMeta{
Name: "MockComponent1Name",
},
Expand All @@ -4107,8 +4107,9 @@ func TestMetadata(t *testing.T) {
},
},
},
})
compStore.AddComponent(componentsV1alpha1.Component{
}))
require.NoError(t, compStore.CommitPendingComponent())
require.NoError(t, compStore.AddPendingComponentForCommit(componentsV1alpha1.Component{
ObjectMeta: metaV1.ObjectMeta{
Name: "MockComponent2Name",
},
Expand All @@ -4124,7 +4125,8 @@ func TestMetadata(t *testing.T) {
},
},
},
})
}))
require.NoError(t, compStore.CommitPendingComponent())
compStore.SetSubscriptions([]runtimePubsub.Subscription{
{
PubsubName: "test",
Expand Down
3 changes: 2 additions & 1 deletion pkg/grpc/universalapi/api_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func TestGetMetadata(t *testing.T) {
})

compStore := compstore.New()
compStore.AddComponent(fakeComponent)
require.NoError(t, compStore.AddPendingComponentForCommit(fakeComponent))
require.NoError(t, compStore.CommitPendingComponent())
compStore.SetSubscriptions([]runtimePubsub.Subscription{
{
PubsubName: "test",
Expand Down
10 changes: 6 additions & 4 deletions pkg/http/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1739,7 +1739,7 @@ func TestV1MetadataEndpoint(t *testing.T) {
fakeServer := newFakeHTTPServer()

compStore := compstore.New()
compStore.AddComponent(componentsV1alpha1.Component{
require.NoError(t, compStore.AddPendingComponentForCommit(componentsV1alpha1.Component{
ObjectMeta: metaV1.ObjectMeta{
Name: "MockComponent1Name",
},
Expand All @@ -1755,8 +1755,9 @@ func TestV1MetadataEndpoint(t *testing.T) {
},
},
},
})
compStore.AddComponent(componentsV1alpha1.Component{
}))
require.NoError(t, compStore.CommitPendingComponent())
require.NoError(t, compStore.AddPendingComponentForCommit(componentsV1alpha1.Component{
ObjectMeta: metaV1.ObjectMeta{
Name: "MockComponent2Name",
},
Expand All @@ -1772,7 +1773,8 @@ func TestV1MetadataEndpoint(t *testing.T) {
},
},
},
})
}))
require.NoError(t, compStore.CommitPendingComponent())
compStore.SetSubscriptions([]runtimePubsub.Subscription{
{
PubsubName: "test",
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (c *Channels) buildHTTPPipelineForSpec(spec *config.PipelineSpec, targetPip
}

for _, handlerSpec := range spec.Handlers {
comp, exists := c.compStore.GetComponent(handlerSpec.Type, handlerSpec.Name)
comp, exists := c.compStore.GetComponent(handlerSpec.Name)
if !exists {
// Log the error but continue with initializing the pipeline
log.Errorf("couldn't find middleware component defined in configuration with name %s and type %s",
Expand Down
20 changes: 12 additions & 8 deletions pkg/runtime/channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,24 +74,26 @@ func TestMiddlewareBuildPipeline(t *testing.T) {
})

compStore := compstore.New()
compStore.AddComponent(componentsapi.Component{
require.NoError(t, compStore.AddPendingComponentForCommit(componentsapi.Component{
ObjectMeta: metav1.ObjectMeta{
Name: "mymw1",
},
Spec: componentsapi.ComponentSpec{
Type: "middleware.http.fakemw",
Version: "v1",
},
})
compStore.AddComponent(componentsapi.Component{
}))
require.NoError(t, compStore.CommitPendingComponent())
require.NoError(t, compStore.AddPendingComponentForCommit(componentsapi.Component{
ObjectMeta: metav1.ObjectMeta{
Name: "mymw2",
},
Spec: componentsapi.ComponentSpec{
Type: "middleware.http.fakemw",
Version: "v1",
},
})
}))
require.NoError(t, compStore.CommitPendingComponent())

t.Run("all components exists", func(t *testing.T) {
ch := &Channels{
Expand Down Expand Up @@ -135,17 +137,18 @@ func TestMiddlewareBuildPipeline(t *testing.T) {

testInitFail := func(ignoreErrors bool) func(t *testing.T) {
compStore := compstore.New()
compStore.AddComponent(componentsapi.Component{
require.NoError(t, compStore.AddPendingComponentForCommit(componentsapi.Component{
ObjectMeta: metav1.ObjectMeta{
Name: "mymw",
},
Spec: componentsapi.ComponentSpec{
Type: "middleware.http.fakemw",
Version: "v1",
},
})
}))
require.NoError(t, compStore.CommitPendingComponent())

compStore.AddComponent(componentsapi.Component{
require.NoError(t, compStore.AddPendingComponentForCommit(componentsapi.Component{
ObjectMeta: metav1.ObjectMeta{
Name: "failmw",
},
Expand All @@ -157,7 +160,8 @@ func TestMiddlewareBuildPipeline(t *testing.T) {
{Name: "fail", Value: commonapi.DynamicValue{JSON: v1.JSON{Raw: []byte("true")}}},
},
},
})
}))
require.NoError(t, compStore.CommitPendingComponent())
return func(t *testing.T) {
ch := &Channels{
compStore: compStore,
Expand Down
64 changes: 53 additions & 11 deletions pkg/runtime/compstore/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,73 @@ limitations under the License.

package compstore

import compsv1alpha1 "github.com/dapr/dapr/pkg/apis/components/v1alpha1"
import (
"errors"
"fmt"

func (c *ComponentStore) GetComponent(componentType, name string) (compsv1alpha1.Component, bool) {
compsv1alpha1 "github.com/dapr/dapr/pkg/apis/components/v1alpha1"
)

func (c *ComponentStore) GetComponent(name string) (compsv1alpha1.Component, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
for i, comp := range c.components {
if comp.Spec.Type == componentType && comp.ObjectMeta.Name == name {
if comp.ObjectMeta.Name == name {
return c.components[i], true
}
}
return compsv1alpha1.Component{}, false
}

func (c *ComponentStore) AddComponent(component compsv1alpha1.Component) {
func (c *ComponentStore) AddPendingComponentForCommit(component compsv1alpha1.Component) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need a pending queue as we add component one by one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is necessarily true when we introduce hot reloading. Adding the name as pending means we can reserve name before committing when Init is successful.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With hot reloading components can be updated in the sidecar in parallel, as opposed to being initialized sequentially when loaded at startup.

c.compPendingLock.Lock()
c.lock.Lock()
defer c.lock.Unlock()

for i, comp := range c.components {
if comp.Spec.Type == component.Spec.Type && comp.ObjectMeta.Name == component.Name {
c.components[i] = component
return
if c.compPending != nil {
c.compPendingLock.Unlock()
return errors.New("pending component not yet committed")
}

for _, existing := range c.components {
if existing.Name == component.Name {
c.compPendingLock.Unlock()
return fmt.Errorf("component %s already exists", existing.Name)
}
}

c.components = append(c.components, component)
c.compPending = &component

return nil
}

func (c *ComponentStore) DropPendingComponent() error {
c.lock.Lock()
defer c.lock.Unlock()

if c.compPending == nil {
return errors.New("no pending component to drop")
}

c.compPending = nil
c.compPendingLock.Unlock()

return nil
}

func (c *ComponentStore) CommitPendingComponent() error {
c.lock.Lock()
defer c.lock.Unlock()

if c.compPending == nil {
return errors.New("no pending component to commit")
}

c.components = append(c.components, *c.compPending)
c.compPending = nil
c.compPendingLock.Unlock()

return nil
}

func (c *ComponentStore) ListComponents() []compsv1alpha1.Component {
Expand All @@ -48,12 +90,12 @@ func (c *ComponentStore) ListComponents() []compsv1alpha1.Component {
return comps
}

func (c *ComponentStore) DeleteComponent(compType, name string) {
func (c *ComponentStore) DeleteComponent(name string) {
c.lock.Lock()
defer c.lock.Unlock()

for i, comp := range c.components {
if comp.Spec.Type == compType && comp.ObjectMeta.Name == name {
if comp.ObjectMeta.Name == name {
c.components = append(c.components[:i], c.components[i+1:]...)
return
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/runtime/compstore/compstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type ComponentStore struct {
components []compsv1alpha1.Component
subscriptions []rtpubsub.Subscription
httpEndpoints []httpEndpointV1alpha1.HTTPEndpoint

compPendingLock sync.Mutex
compPending *compsv1alpha1.Component
}

func New() *ComponentStore {
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/processor/binding/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestInitBindings(t *testing.T) {
assert.NoError(t, err)

output := compapi.Component{}
output.ObjectMeta.Name = "testinput"
output.ObjectMeta.Name = "testoutput"
output.Spec.Type = "bindings.testoutput"
err = proc.Init(context.TODO(), output)
assert.NoError(t, err)
Expand Down
5 changes: 3 additions & 2 deletions pkg/runtime/processor/binding/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestStartReadingFromBindings(t *testing.T) {
}

b.compStore.AddInputBinding("test", m)
b.compStore.AddComponent(componentsV1alpha1.Component{
require.NoError(t, b.compStore.AddPendingComponentForCommit(componentsV1alpha1.Component{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Expand All @@ -168,7 +168,8 @@ func TestStartReadingFromBindings(t *testing.T) {
},
},
},
})
}))
require.NoError(t, b.compStore.CommitPendingComponent())
err := b.StartReadingFromBindings(context.Background())

assert.NoError(t, err)
Expand Down
11 changes: 7 additions & 4 deletions pkg/runtime/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import (
"context"
"errors"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -216,13 +217,15 @@
return err
}

if err := m.Init(ctx, comp); err != nil {
if err := p.compStore.AddPendingComponentForCommit(comp); err != nil {

Check warning on line 220 in pkg/runtime/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/processor/processor.go#L220

Added line #L220 was not covered by tests
return err
}

p.compStore.AddComponent(comp)
if err := m.Init(ctx, comp); err != nil {
return errors.Join(err, p.compStore.DropPendingComponent())

Check warning on line 225 in pkg/runtime/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/processor/processor.go#L224-L225

Added lines #L224 - L225 were not covered by tests
}

return nil
return p.compStore.CommitPendingComponent()

Check warning on line 228 in pkg/runtime/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/processor/processor.go#L228

Added line #L228 was not covered by tests
}

// Close closes the component.
Expand All @@ -239,7 +242,7 @@
return err
}

p.compStore.DeleteComponent(comp.Spec.Type, comp.Name)
p.compStore.DeleteComponent(comp.Name)

Check warning on line 245 in pkg/runtime/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/runtime/processor/processor.go#L245

Added line #L245 was not covered by tests

return nil
}
Expand Down