Skip to content

Commit

Permalink
Add application/json support to outbox (#7230)
Browse files Browse the repository at this point in the history
* add application/json support to outbox

Signed-off-by: yaron2 <schneider.yaron@live.com>

* linter

Signed-off-by: yaron2 <schneider.yaron@live.com>

---------

Signed-off-by: yaron2 <schneider.yaron@live.com>
  • Loading branch information
yaron2 committed Nov 22, 2023
1 parent bef0ca2 commit 886d7f7
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 3 deletions.
8 changes: 8 additions & 0 deletions pkg/runtime/pubsub/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -151,6 +152,13 @@ func (o *outboxImpl) PublishInternal(ctx context.Context, stateStore string, ope
bt, ok := sr.Value.([]byte)
if ok {
ceData = bt
} else if sr.ContentType != nil && strings.EqualFold(*sr.ContentType, "application/json") {
b, sErr := json.Marshal(sr.Value)
if sErr != nil {
return nil, sErr
}

ceData = b
} else {
ceData = []byte(fmt.Sprintf("%v", sr.Value))
}
Expand Down
77 changes: 74 additions & 3 deletions pkg/runtime/pubsub/outbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func TestPublishInternal(t *testing.T) {
assert.NoError(t, err)
})

t.Run("valid operation, custom datacontenttype", func(t *testing.T) {
t.Run("valid operation, no datacontenttype", func(t *testing.T) {
o := newTestOutbox().(*outboxImpl)
o.publishFn = func(ctx context.Context, pr *contribPubsub.PublishRequest) error {
var cloudEvent map[string]interface{}
Expand All @@ -270,7 +270,7 @@ func TestPublishInternal(t *testing.T) {
assert.Equal(t, "a", pr.PubsubName)
assert.Equal(t, "testapp1outbox", pr.Topic)
assert.Equal(t, "testapp", cloudEvent["source"])
assert.Equal(t, "application/json", cloudEvent["datacontenttype"])
assert.Equal(t, "text/plain", cloudEvent["datacontenttype"])
assert.Equal(t, "a", cloudEvent["pubsubname"])

return nil
Expand Down Expand Up @@ -302,7 +302,7 @@ func TestPublishInternal(t *testing.T) {
},
})

contentType := "application/json"
contentType := ""
_, err := o.PublishInternal(context.TODO(), "test", []state.TransactionalStateOperation{
state.SetRequest{
Key: "key",
Expand All @@ -314,6 +314,77 @@ func TestPublishInternal(t *testing.T) {
assert.NoError(t, err)
})

type customData struct {
Name string `json:"name"`
}

t.Run("valid operation, application/json datacontenttype", func(t *testing.T) {
o := newTestOutbox().(*outboxImpl)
o.publishFn = func(ctx context.Context, pr *contribPubsub.PublishRequest) error {
var cloudEvent map[string]interface{}
err := json.Unmarshal(pr.Data, &cloudEvent)
assert.NoError(t, err)

data := cloudEvent["data"]
j := customData{}

err = json.Unmarshal([]byte(data.(string)), &j)
assert.NoError(t, err)

assert.Equal(t, "test", j.Name)
assert.Equal(t, "a", pr.PubsubName)
assert.Equal(t, "testapp1outbox", pr.Topic)
assert.Equal(t, "testapp", cloudEvent["source"])
assert.Equal(t, "application/json", cloudEvent["datacontenttype"])
assert.Equal(t, "a", cloudEvent["pubsubname"])

return nil
}

o.AddOrUpdateOutbox(v1alpha1.Component{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: v1alpha1.ComponentSpec{
Metadata: []common.NameValuePair{
{
Name: outboxPublishPubsubKey,
Value: common.DynamicValue{
JSON: v1.JSON{
Raw: []byte("a"),
},
},
},
{
Name: outboxPublishTopicKey,
Value: common.DynamicValue{
JSON: v1.JSON{
Raw: []byte("1"),
},
},
},
},
},
})

j := customData{
Name: "test",
}
b, err := json.Marshal(&j)
require.NoError(t, err)

contentType := "application/json"
_, err = o.PublishInternal(context.TODO(), "test", []state.TransactionalStateOperation{
state.SetRequest{
Key: "key",
Value: string(b),
ContentType: &contentType,
},
}, "testapp", "", "")

assert.NoError(t, err)
})

t.Run("missing state store", func(t *testing.T) {
o := newTestOutbox().(*outboxImpl)

Expand Down

0 comments on commit 886d7f7

Please sign in to comment.