Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSK support as an event to trigger Lambdas #8117

Closed
safv12 opened this issue Aug 21, 2020 · 28 comments · Fixed by #8164
Closed

MSK support as an event to trigger Lambdas #8117

safv12 opened this issue Aug 21, 2020 · 28 comments · Fixed by #8164

Comments

@safv12
Copy link

safv12 commented Aug 21, 2020

Lambda now supports Amazon MSK as an event source, so it can consume messages and integrate with downstream serverless workflows. It will be great to have a Kafka event to trigger the functions from serverless.

https://aws.amazon.com/es/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/

Proposed solution

(Updated on the go, to reflect final agreement):

Event name msk, with support for following properties:

  • batchSize: optional, maps to BatchSize
  • arn: required, maps to EventSourceArn
  • enabled: optional, maps to Enabled
  • startingPosition: optional (but required in AWS), maps to StartingPosition with default set to TRIM_HORIZON
  • topic: required (technically optional in AWS, but it's due to implied support for other stream sources), maps to Topics[0]
@medikoo
Copy link
Contributor

medikoo commented Aug 24, 2020

@safv12 thanks for proposal. We're definitely open for that.

Can you outline how kafka event properties will translate to CloudFormation template? Are those listed properties the only properties we intend to support?

@pgrzesik
Copy link
Contributor

pgrzesik commented Aug 24, 2020

Hello team 👋 I was recently looking at how it's supported at CloudFormation level and how well that could fit into what is already supported by Serverless Framework. Underneath, it's using the same EventSourceMapping resource that is already in use for sqs and stream events (reference: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html). Additional available property for MSK is Topics used to specify the Kafka topic(s). Given that, we could follow a convention like this:

functions:
  compute:
    handler: handler.compute
    events:
      - msk:
          arn: arn:aws:kafka:region:XXXXXX:Cluster/xxx
          topic: kafka-topic
          startingPosition: LATEST
          batchSize: 1000

msk can be replaced by kafka in my opinion as well. In addition, if possible, it could also support shorter definition e.g. msk: arn:aws:kafka:region:XXXXXX:Cluster/xxx with sensible defaults for all other values or support Fn::ImportValue and similar.

@medikoo
Copy link
Contributor

medikoo commented Aug 25, 2020

@pgrzesik great thanks for sharing that.

msk can be replaced by kafka in my opinion as well.

I fully agree, seems as right to me

Given that, we could follow a convention like this

Which properties do you think should be mandatory for msk event type and which optional ?

@safv12
Copy link
Author

safv12 commented Aug 25, 2020

From my point of view, ARN and Topic should be mandatory. We could define some default values for the other two properties. What do you think @pgrzesik, @medikoo?

@pgrzesik
Copy link
Contributor

Hey 👋

@safv12 I agree here, arn of Kafka Cluster as well as topic should be mandatory. Optional parameters should be batchSize (defaults to 100 for MSK in CloudFormation), startingPosition and enabled. AFAIK, there are no other configuration parameters available at the moment for that event source.

One note on the topic parameter. In CloudFormation it's implemented as Topics list with a maximum number of 1 topic. It's also impossible to set it to more than one topic via AWS Console. So the question here is, do we want to be "future-proof" and support it as topics or start with topic and potentially support topics in the future when it becomes supported by CloudFormation directly?

@safv12
Copy link
Author

safv12 commented Aug 25, 2020

@pgrzesik should we define the ConsumerGroupId in the trigger definition or it should be optional?

@medikoo
Copy link
Contributor

medikoo commented Aug 26, 2020

Great thanks @safv12 and @pgrzesik for valuable insight!

One note on the topic parameter. In CloudFormation it's implemented as Topics list with a maximum number of 1 topic

I'd say let's follow with what we did in EFS, so stick to singular notation (unless we have a clear hint from AWS that it's likely to change soon).

Summarizing above (and after going through AWS docs) I think in context of MSK we may have following options.
(Please point if you feel I got something wrong, or something can be improved):

  • batchSize: optional, maps to BatchSize
  • bisectBatchOnFunctionError: optional, maps to BisectBatchOnFunctionError
  • onFailureDestination: optional, maps to DestinationConfig.OnFailure
  • arn: required, maps to EventSourceArn
  • maximumBatchingWindow: optional, maps to MaximumBatchingWindowInSeconds
  • maximumRecordAge: optional, maps to MaximumRecordAgeInSeconds
  • maximumRetryAttempts: optional, maps to MaximumRetryAttempts
  • parallelizationFactor: optional, maps to ParallelizationFactor
  • startingPosition: optional (but required in AWS), maps to StartingPosition and we should map to LATEST as default
  • topic: required (technically optional in AWS, but it's due to implied support for other stream sources), maps to Topics[0]

Having that outlined, I wonder weather before bringing support for MSK, shouldn't we refactor currently supported streaming events (as backed by AWS::Lambda::EventSourceMapping).

Current situation is that we have stream event that backs Kinesis and DynamoDB streams and sqs event for SQS streams.

Additionally in them I see following issues and in-consequences:

  • In stream event (dynamodb, kinesis):
    • We set defaultbatchSize to 10 while AWS default is 100
    • We provide no suport for 0 value at batchWindow (maps to MaximumBatchingWindowInSeconds), where it's a supported value by AWS
    • Naming is confusing (e.g. batchWindow and maximumRecordAgeInSeconds doesn't seem to follow same convention)
  • sqs stream misses support for bisectBatchOnFunctionError, maximumRetryAttempts, maximumRecordAgeInSeconds, batchWindow and destinations properties

Implementation proposal

1. Introduce dynamodb and kinesis events, and deprecate stream event

By doing that, we can we fix listed above stream event issues, and follow better naming (as I proposed outlining the possible options for MSK). Separating both will also I think bring better design. It'll be upfront clear to what stream is attached, and internally we no longer have to deduct the type from ARN (if type property was not provided)

Both events could use one generic, secluded AWS::Lambda::EventSourceMapping resource generator, which then we can use for sqs (and with that ensuring support for currently not supported properties) and then msk.

We should also introduce a schema config to fully cover dynamodb, kinesis and deprecated stream event.

Documentation should also be upgraded

2. Upgrade sqs event implementation

So it relies on generic AWS::Lambda::EventSourceMapping resource generator (introduced with previous step).
This will solve an issue of missing support for some properties.

Additionally we should introduce config schema for sqs event, and update documentation so it covers new properties

3. Introduce support for msk event

Relying on generic AWS::Lambda::EventSourceMapping resource generator, and with introduction of config schema and documentation for it


I think to avoid making things to complex best would be to cover those 3 steps with 3 distinct following after each other PR's.

What do you think?

@pgrzesik
Copy link
Contributor

Hello 👋

I'd say let's follow with what we did in EFS, so stick to singular notation (unless we have a clear hint from AWS that it's likely to change soon).

I totally agree with that.

As for the available options, I'm not sure if all of the listed options are available for MSK (and SQS as well). If I understand the docs correctly, the ones that are supported for MSK are the ones I listed, I was following the CloudFormation doc as well as these documents:

The second one states that:

The following error handling options are only available for stream sources (DynamoDB and Kinesis):

  • BisectBatchOnFunctionError - If the function returns an error, split the batch in two and retry.
  • DestinationConfig - Send discarded records to an Amazon SQS queue or Amazon SNS topic.
  • MaximumRecordAgeInSeconds - Discard records older than the specified age. The default value is infinite (-1). When set to infinite (-1), failed records are retried until the record expires
  • MaximumRetryAttempts - Discard records after the specified number of retries. The default value is infinite (-1). When set to infinite (-1), failed records are retried until the record expires.
  • ParallelizationFactor - Process multiple batches from each shard concurrently.

So I think we have to remove these from the proposal, as they're not supported by msk (and sqs) event types. Also, they don't mention in directly in the docs, but I believe that DestinationConfig is also supported only for streams.

As for the implementation proposal points:

  1. Introduce dynamodb and kinesis events, and deprecate stream event

I really like this idea - even though they're similar in functionality they offer, they have different defaults e.g. for BatchSize that you mentioned among other things. I believe it will also make it easier to reason about the code for streams.

  1. Upgrade sqs event implementation

As I mentioned above, I'm not sure if there's any functionality missing for sqs event as it's not supporting a lot of parameters that are streams exclusive.

  1. Introduce support for msk event

👍

I think to avoid making things to complex best would be to cover those 3 steps with 3 distinct following after each other PR's.

Makes a lot of sense, I believe trying to scramble it all in one PR will be harder to maintain and wrap up in a reasonable time. Do you feel like this refactoring/deprecation should be introduced before v2 release?

@medikoo
Copy link
Contributor

medikoo commented Aug 26, 2020

If I understand the docs correctly, the ones that are supported for MSK are the ones I listed

@pgrzesik great thanks for pointing that, I didn't get initially that (Streams) in AWS docs narrows just to DynamoDB and Kinesis, good we have that clear now.

I believe that DestinationConfig is also supported only for streams

Yes it's indicated in CF config that it's only for streams

In light of that, my idea of unifying generation of AWS::Lambda::EventSourceMapping for all event sources, doesn't seem that valid (it seems to make sense just for DynamoDB and Kinesis, which is already case), and if we agree on that, it seems that eventual deprecation of stream (in favor of kinesis and dynamodb) is not a thing that really stands in a way of providing support for msk.

Do you feel like this refactoring/deprecation should be introduced before v2 release?

It's not a requirement, we're also moving to more frequent release process (v3 is likely to be released 2-3 months after and so on).

To fit it into v1 (and drop support for stream with v2), it'll have to be merged this week, which seems as tight timeline, and on other hand as it'll be breaking for bigger group of users, it may be nicer to introduce it in v2, as then we'll give more time for them to adjust before v3 is released.


I've moved concern of deprecating stream event to: #8137, and I think it can be handled seperately. Still if you feel it's heplful to address it first, it's totally good.


Having that sorted out I believe list of supported properties for msk events goes down as below:

  • batchSize: optional, maps to BatchSize
  • arn: required, maps to EventSourceArn
  • startingPosition: optional (but required in AWS), maps to StartingPosition and we should map to (?? @pgrzesik you've proposed above LATEST while I see that for kinesis and dynamodb we use TRIM_HORIZON as default. Do you feel it was a wrong choice? Or is that in case of msk LATEST feels as more common choice?)
  • topic: required (technically optional in AWS, but it's due to implied support for other stream sources), maps to Topics[0]

@pgrzesik
Copy link
Contributor

pgrzesik commented Aug 26, 2020

@pgrzesik should we define the ConsumerGroupId in the trigger definition or it should be optional?

Sorry @safv12 for missing your question in my previous message. It's not possible to directly define ConsumerGroupId in the trigger definition, or rather, AWS Lambda integration with MSK automatically creates a ConsumerGroup with an id equal to id of EventSourceMapping - so in this case it's an implementation detail of internal integration.

it seems that eventual deprecation of stream (in favor of kinesis and dynamodb) is not a thing that really stands in a way of providing support for msk

@medikoo You're totally right - I tunnel-visioned into refactoring potentially making things easier, but implementing msk support is not dependent on it in any way 👍

@pgrzesik you've proposed above LATEST while I see that for kinesis and dynamodb we use TRIM_HORIZON as default. Do you feel it was a wrong choice? Or is that in case of msk LATEST feels as more common choice?

To be fair, I believe I've somehow misread the docs because I thought the LATEST was CloudFormation's default and I was curious why in Serverless Framework it defaults to `TRIM_HORIZON - thanks for clarifying and if you know the reason for the current choice I would love to learn it.

Having that sorted out I believe list of supported properties for msk events goes down as below

That proposal looks great 👍 Just to clarify, should we also support enabled property that maps to Enabled? I see it's currently supported both for stream and sqs event types.

@medikoo
Copy link
Contributor

medikoo commented Aug 27, 2020

if you know the reason for the current choice I would love to learn it.

I see it was picked here #2250 by @pmuens but no details on why TRIM_HORIZON as default was picked are disclosed.

Do you see it problematic to use it as default for msk ? I think it'll be nice to use same default as we have in stream event, but I do not have that much experience with it to properly judge on whether that's best choice.

should we also support enabled property that maps to Enabled?

Good question. I guess it's an important setting for cases where we want to turn off given event mapping temporarily, and that after restart it starts from position where it stopped and not e.g. from begining (as it would be with TRIM_HORIZONset for startingPosition)

Therefore we definitely should have that. I've updated top issue description with properties spec, let's keep there final version.

@pgrzesik
Copy link
Contributor

pgrzesik commented Aug 27, 2020

Do you see it problematic to use it as default for msk ?

I don't - I believe we should stick to the current default. I think it's application-specific and if current default fared well for 4 years now it sounds like the perfect choice 👍

@safv12 do you plan to implement support for msk based on the above implementation proposal? If not, I'd be happy to give it a try over the weekend

@safv12
Copy link
Author

safv12 commented Aug 27, 2020

@pgrzesik go ahead, for now, I am content with being part of the review of the pull request to learn a bit. If you need help with something I am open to help, just tell me.

@pgrzesik
Copy link
Contributor

pgrzesik commented Sep 2, 2020

Hello @safv12 👋 There's a PR available if you'd like to check it out: #8164

Any comments/insights will be much appreciated 🙇

@gurbaj5124871
Copy link

Super glad to see the support for msk released 🙌

Posting the related issue with msk support for typescript definitions:
#8288

hope it gets resolved soon and really looking forward to using it

@hanischandrew
Copy link

This may be a long shot and don't mean to hijack this thread but would anyone be able to point me in the right direction for this question.

What happens when a message/batch fails to be processed by the lambda function? Is it not ACKed and is expected to be replayed on the next fetch? Is it forever lost? What are the delivery semantics with this integration?

Cheers

@pedrocava
Copy link

pedrocava commented Mar 26, 2021

Does this support SASL/SCRAM authentication?

I was browsing the repo and confirmed there is SASL auth for self-managed kafka clusters (https://github.com/serverless/serverless/blob/ff605018a70a7156b0ca021adb080a4b4e0f2ede/docs/providers/aws/events/kafka.md), but no equivalent reference for msk events.

@pgrzesik
Copy link
Contributor

Hello @pedrocava - there's in fact support for SASL for self-managed Kafka - I'm not sure if MSK does even support SASL/SCRAM auth with Lambda - based on the CF docs (https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-sourceaccessconfiguration.html) it seems like it's only possible to configure it for self managed kafka. Please let us know if this is possible and we're definitely open to support it if that's the case

@pedrocava
Copy link

pedrocava commented Mar 26, 2021

This page lists clientAuthentication param, which links to this page and it mentions SASL, so I think it's supported, if barely documented.

Either way, is there something to look out for if passing MSK broker endpoints to a self-managed kafka event? The docs specify just what I wanted as an interface, passing the cluster secret's ARN.

@pgrzesik
Copy link
Contributor

Hey @pedrocava, could you clarify a bit what do you mean with your question? I'm not sure I understand it. msk and kafka events are a bit different as msk uses Managed Kafka where kafka uses self-managed Kafka.

@pedrocava
Copy link

pedrocava commented Mar 31, 2021

Sure! My bad, it wasn't clear at all. My team runs an MSK cluster with SASL auth - a choice made expecting it'd work fine with Lambda, which I'm starting to regret. Since there's no support for it through the MSK event, I'm thinking about defining a kafka event and passing our MSK brokers' endpoints to it, treating it as if it were a self-managed kafka cluster. The docs say I can pass the secret stored on AWS Secrets Manager I associated with the cluster as an AWS ARN, which is just what I needed. Does it sound like a bad idea, is there anything I should look out for?

@pedrocava
Copy link

@pgrzesik, look what I found:

AWS Lambda functions that are triggered from an Amazon Managed Streaming for Apache Kafka (Amazon MSK) topic can now access to usernames and passwords secured by AWS Secrets Manager using SASL/SCRAM (Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism).

https://aws.amazon.com/about-aws/whats-new/2020/12/aws-lambda-now-supports-sasl-scram-authentication-for-functions-triggered-from-amazon-msk/

MSK does support SASL/SCRAM auth with Lambda

@pgrzesik
Copy link
Contributor

pgrzesik commented Apr 1, 2021

Thanks for sharing your finding @pedrocava 🙇 Looks like something added a bit later than msk support on Framework side was introduced as I remember it wasn't avaialble at first. Would you like to open a separate issue that proposes to add this feature?

@vamshi1997
Copy link

MSK is not automatically triggering while using in serverless.yml file, I have to manually trigger after lambda is deployed, can anyone help??
I have followed below article for the syntax.
Msk

@pgrzesik
Copy link
Contributor

@vamshi1997 Could you please describe in more detail what exactly the problem is in your case? We have an integration test for msk that is passing and running on each merge to master and we didn't notice any regressions there.

@vamshi1997
Copy link

Hi @pgrzesik, Using github actions I am triggering serverless.yml file, Which contains a lambda definition with msk topic as trigger, My code in serverless look like this
lambda_name:
handler:
desc:
events:
- msk: # tried with kafka also here
arn:
topic:
batchSize: 10
startingPosition: LATEST
enabled:

but still after deployment it is not adding the trigger to the lambda, I tried replacing msk with the events with Kafka.
But the problems remains the same. It will be helpful if I get an example working snippet for this so that I can compare with my code and make changes accordingly.

@pgrzesik
Copy link
Contributor

Hello @vamshi1997 - please move your question to Github Discussions or to forum.serverless.com as it seems you have an issue with configuration and it's not a bug in the Framework itself. Documentation: https://www.serverless.com/framework/docs/providers/aws/events/msk/

@vamshi1997
Copy link

Ok thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants