Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bulksubscribe http #478

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open

Conversation

sadath-12
Copy link
Contributor

Fixes #423 for http bulksubscribe

Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
@sadath-12 sadath-12 requested a review from a team as a code owner December 2, 2023 07:42
Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
service/http/topic.go Outdated Show resolved Hide resolved
Copy link

codecov bot commented Dec 4, 2023

Codecov Report

Attention: Patch coverage is 75.36232% with 34 lines in your changes are missing coverage. Please review.

Project coverage is 58.63%. Comparing base (27248ba) to head (300e5ac).
Report is 11 commits behind head on main.

Files Patch % Lines
service/http/topic.go 66.66% 21 Missing and 7 partials ⚠️
service/internal/topicregistrar.go 86.66% 2 Missing and 2 partials ⚠️
service/internal/topicsubscription.go 75.00% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #478      +/-   ##
==========================================
+ Coverage   58.04%   58.63%   +0.58%     
==========================================
  Files          55       55              
  Lines        3568     3701     +133     
==========================================
+ Hits         2071     2170      +99     
- Misses       1375     1399      +24     
- Partials      122      132      +10     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
@daixiang0
Copy link
Member

Codes look good, please add unit tests.

Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
@sadath-12
Copy link
Contributor Author

Added it 🙂

Copy link
Member

@mikeee mikeee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although the limited tests are passing and likewise with the validator, I'd like to see a validated bulk subscription.

examples/pubsub/sub/sub.go Outdated Show resolved Hide resolved
examples/pubsub/sub/sub.go Show resolved Hide resolved
examples/pubsub/sub/sub.go Outdated Show resolved Hide resolved
service/grpc/topic.go Show resolved Hide resolved
service/internal/topicsubscription.go Show resolved Hide resolved
service/http/topic_test.go Outdated Show resolved Hide resolved
service/http/topic_test.go Outdated Show resolved Hide resolved
service/http/topic_test.go Outdated Show resolved Hide resolved
service/http/topic_test.go Outdated Show resolved Hide resolved
service/internal/topicsubscription.go Outdated Show resolved Hide resolved
Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
@sadath-12
Copy link
Contributor Author

Thank you for the review @mikeee . Added them

examples/pubsub/sub/sub.go Outdated Show resolved Hide resolved
Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
@daixiang0
Copy link
Member

Please fix conflicts.

Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
@sadath-12
Copy link
Contributor Author

Done @daixiang0

Copy link
Member

@mikeee mikeee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are regressions with the tests moving back to assert rather than require when evaluating errors

Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
Copy link
Member

@mikeee mikeee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go-sdk/examples/pubsub/README.md

Needs to be updated inline with your modifications to the pubsub example

Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
@daixiang0
Copy link
Member

Please make CI happy.

Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
@sadath-12
Copy link
Contributor Author

Yup @daixiang0 it seems to be chilling now . 😅 Thanks

examples/pubsub/pub/pub.go Outdated Show resolved Hide resolved
Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
Copy link
Member

@mikeee mikeee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few quick-win tests can be implemented on the code to improve coverage on top of the identified which I've marked

examples/pubsub/README.md Show resolved Hide resolved
service/grpc/topic.go Show resolved Hide resolved
service/grpc/topic_test.go Outdated Show resolved Hide resolved
Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
@daixiang0 daixiang0 requested a review from mikeee January 5, 2024 06:16
Copy link
Member

@mikeee mikeee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've missed this over the past few reviews but Redis is not suitable for testing bulk pub/sub. Would it be wise to migrate to another broker (for example kafka or I believe ASB) so that we can validate this example?

I'd like to see a validated run that doesn't fallback to singular pub/sub before I review again?

@@ -37,16 +37,25 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE
return s.topicRegistrar.AddSubscription(sub, fn)
}

func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler, maxMessagesCount, maxAwaitDurationMs int32) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As my previous review - the validation of arguments passed to these parameters should be implemented as per the implementation spec. I do think that this is something we need to address both sdk-side and in the runtime explicitly as part of best practice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what default values you suggest if nil values are given?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error-out if either are X <= 0 this would prevent a negative value, a nil value is not possible for the int type.

Comment on lines 29 to +54
func TestTopicErrors(t *testing.T) {
server := getTestServer()
err := server.AddTopicEventHandler(nil, nil)
require.Errorf(t, err, "expected error on nil sub")
require.Error(t, err, "expected error on nil sub with AddTopicEventHandler")

err = server.AddBulkTopicEventHandler(nil, nil, 0, 0)
require.Error(t, err, "expected error on nil sub with AddBulkTopicEventHandler")

sub := &common.Subscription{}
err = server.AddTopicEventHandler(sub, nil)
require.Errorf(t, err, "expected error on invalid sub")
require.Error(t, err, "expected error on invalid sub with AddTopicEventHandler")
err = server.AddBulkTopicEventHandler(sub, nil, 0, 0)
require.Error(t, err, "expected error on invalid sub with AddBulkTopicEventHandler")

sub.PubsubName = "messages"
err = server.AddTopicEventHandler(sub, nil)
require.Errorf(t, err, "expected error on sub without topic")
require.Error(t, err, "expected error on sub without topic with AddTopicEventHandler")
sub.PubsubName = "messages"
err = server.AddBulkTopicEventHandler(sub, nil, 0, 0)
require.Error(t, err, "expected error on sub without topic with AddBulkTopicEventHandler")

sub.Topic = "test"
err = server.AddTopicEventHandler(sub, nil)
require.Errorf(t, err, "expected error on sub without handler")
require.Error(t, err, "expected error on sub without handler")
err = server.AddBulkTopicEventHandler(sub, nil, 0, 0)
require.Error(t, err, "expected error on sub without handler")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Break this down into idiomatic sub-tests, you can share the setup steps here but it doesn't read well when running tests.

@@ -44,10 +48,9 @@ expected_stdout_lines:
background: true
sleep: 15
-->
#### Note: pub/pub.go contains both PublishEvents (used for publish of messages) and PublishEvent (used for bulkPublish of messages)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above

Comment on lines +28 to +29
#### Note: sub/sub.go contains both AddTopicEventHandler (used for subscribe of messages) and AddBulkTopicEventHandler (used for bulksubscribe of messages)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This folder contains two Go files that use the Go SDK to invoke the Dapr Pub/Sub API.

Remove this and modify the above to include something along the lines of:

This folder contains a publisher and subscriber that demonstrates both standard pubsub and bulk-pubsub

@@ -35,12 +35,10 @@ var defaultSubscription = &common.Subscription{
Route: "/orders",
}

var importantSubscription = &common.Subscription{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see the importantSubscription re-implemented as a way to validate the route being correct for a message on the same topic alongside match/priority

Comment on lines +427 to +430
if err != nil {
http.Error(w, err.Error(), PubSubHandlerDropStatusCode)
return
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no unhandled error at this point, could you clarify that if a single event is dropped it will be replayed/retried at a later date?

Comment on lines +447 to +451
func writeBulkStatus(w http.ResponseWriter, s BulkSubscribeResponse) {
if err := json.NewEncoder(w).Encode(s); err != nil {
http.Error(w, err.Error(), PubSubHandlerRetryStatusCode)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine to create a new function as it is manifestly different or is idiomatic, do you think this is better or would it be better to pass the slice to your function as an argument and wrap it within the function?

Comment on lines +142 to +154
if assert.Len(t, subs, 2, "unexpected subscription count") {
assert.Equal(t, "messages", subs[0].PubsubName)
assert.Equal(t, "errors", subs[0].Topic)

assert.Equal(t, "messages", subs[1].PubsubName)
assert.Equal(t, "test", subs[1].Topic)
assert.Equal(t, "", subs[1].Route)
assert.Equal(t, "/", subs[1].Routes.Default)
if assert.Len(t, subs[1].Routes.Rules, 1, "unexpected rules count") {
assert.Equal(t, `event.type == "other"`, subs[1].Routes.Rules[0].Match)
assert.Equal(t, "/other", subs[1].Routes.Rules[0].Path)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite confusing to read - would it be more idiomatic to create a known map and compare this to the subscriptions?

Comment on lines +647 to 663
func TestAddingInvalidBulkEventHandlers(t *testing.T) {
s := newServer("", nil)
err := s.AddBulkTopicEventHandler(nil, testTopicFunc, 10, 1000)
require.Error(t, err, "expected error adding no sub event handler")

sub := &common.Subscription{Metadata: map[string]string{}}
err = s.AddBulkTopicEventHandler(sub, testTopicFunc, 10, 1000)
require.Error(t, err, "expected error adding empty sub event handler")

sub.Topic = "test"
err = s.AddBulkTopicEventHandler(sub, testTopicFunc, 10, 1000)
require.Error(t, err, "expected error adding sub without component event handler")

sub.PubsubName = "messages"
err = s.AddBulkTopicEventHandler(sub, testTopicFunc, 10, 1000)
require.Error(t, err, "expected error adding sub without route event handler")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please split this into subtests

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provide negative tests

@sadath-12
Copy link
Contributor Author

I've missed this over the past few reviews but Redis is not suitable for testing bulk pub/sub. Would it be wise to migrate to another broker (for example kafka or I believe ASB) so that we can validate this example?

I'd like to see a validated run that doesn't fallback to singular pub/sub before I review again?

can we keep the migration in separate pr ?

@mikeee
Copy link
Member

mikeee commented Jan 6, 2024

I've missed this over the past few reviews but Redis is not suitable for testing bulk pub/sub. Would it be wise to migrate to another broker (for example kafka or I believe ASB) so that we can validate this example?
I'd like to see a validated run that doesn't fallback to singular pub/sub before I review again?

can we keep the migration in separate pr ?

I have to answer that with another question - has the bulk pub/sub been validated elsewhere? All I'm seeing is fallbacks.

@sadath-12
Copy link
Contributor Author

sadath-12 commented Jan 6, 2024

I've missed this over the past few reviews but Redis is not suitable for testing bulk pub/sub. Would it be wise to migrate to another broker (for example kafka or I believe ASB) so that we can validate this example?
I'd like to see a validated run that doesn't fallback to singular pub/sub before I review again?

can we keep the migration in separate pr ?

I have to answer that with another question - has the bulk pub/sub been validated elsewhere? All I'm seeing is fallbacks.

sure no issues lets test it for others as well . Before I jump wanted to confirm what you mean by fallback here ? because when I see the demo of bulksubscribe in the dapr docs and how js-sdk has implemented bulksubscribe . I have made sure similar behaviour is achieved here . would you like to explain what do you expect clearly ? Happy to implement whatever works best for the project 😊 If possible maybe we could drive the talk on the discord as well since some discussion is required on this

@mikeee
Copy link
Member

mikeee commented Jan 6, 2024

I've missed this over the past few reviews but Redis is not suitable for testing bulk pub/sub. Would it be wise to migrate to another broker (for example kafka or I believe ASB) so that we can validate this example?
I'd like to see a validated run that doesn't fallback to singular pub/sub before I review again?

can we keep the migration in separate pr ?

I have to answer that with another question - has the bulk pub/sub been validated elsewhere? All I'm seeing is fallbacks.

sure no issues lets test it for others as well . Before I jump wanted to confirm what you mean by fallback here ? because when I see the demo of bulksubscribe in the dapr docs and how js-sdk has implemented bulksubscribe . I have made sure similar behaviour is achieved here . would you like to explain what do you expect clearly ? Happy to implement whatever works best for the project 😊 If possible maybe we could drive the talk on the discord as well since some discussion is required on this

The issue is that since the broker used does not implement bulk pub/sub methods, we are effectively dropping down to single pub/sub. Looking at the validation run it is highlighted in the logs, likewise in the js-sdk I note this is an issue too as it uses rabbitmq but this is not validated so you'd run into the same result if run in debug mode.

@sadath-12
Copy link
Contributor Author

sadath-12 commented Jan 6, 2024

I've missed this over the past few reviews but Redis is not suitable for testing bulk pub/sub. Would it be wise to migrate to another broker (for example kafka or I believe ASB) so that we can validate this example?
I'd like to see a validated run that doesn't fallback to singular pub/sub before I review again?

can we keep the migration in separate pr ?

I have to answer that with another question - has the bulk pub/sub been validated elsewhere? All I'm seeing is fallbacks.

sure no issues lets test it for others as well . Before I jump wanted to confirm what you mean by fallback here ? because when I see the demo of bulksubscribe in the dapr docs and how js-sdk has implemented bulksubscribe . I have made sure similar behaviour is achieved here . would you like to explain what do you expect clearly ? Happy to implement whatever works best for the project 😊 If possible maybe we could drive the talk on the discord as well since some discussion is required on this

The issue is that since the broker used does not implement bulk pub/sub methods, we are effectively dropping down to single pub/sub. Looking at the validation run it is highlighted in the logs, likewise in the js-sdk I note this is an issue too as it uses rabbitmq but this is not validated so you'd run into the same result if run in debug mode.

Ya the dapr itself sends back the response one by one . so with all those brokers we would get the same result right ? we run the callback for each entry and send response to dapr

@@ -44,7 +44,7 @@ func main() {
}

// Publish multiple events
if res := client.PublishEvents(ctx, pubsubName, topicName, publishEventsData); res.Error != nil {
if res := client.PublishEvents(ctx, pubsubName, bulkTopicName, publishEventsData); res.Error != nil {
panic(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem entirely right, you're panicking here with an unrelated (potentially) nil error

@sadath-12
Copy link
Contributor Author

@mikeee what you want to say about this approach are we going up with this pr after those things resolved ?

@mikeee
Copy link
Member

mikeee commented Feb 21, 2024

@mikeee what you want to say about this approach are we going up with this pr after those things resolved ?

I think we should definitely try to get this merged even as a fallback validation, as long as there is validation locally just to make sure it's working then that should be fine 👍

@Eileen-Yu
Copy link

Eileen-Yu commented Mar 22, 2024

wondering if there's any progress on this PR?

I'm happy to help finish this one. Would you please resolve those conflicts if you'd like me to keep your commits? @sadath-12

Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
@sadath-12 sadath-12 requested a review from a team as a code owner May 14, 2024 13:26
Signed-off-by: sadath-12 <sadathsadu2002@gmail.com>
@sadath-12
Copy link
Contributor Author

@Eileen-Yu

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Programmatic PubSub Bulk Subscribe Support
4 participants