Skip to content

Commit

Permalink
feat(AWS Kafka): Add support for mTLS access configuration (#10273)
Browse files Browse the repository at this point in the history
  • Loading branch information
mishabruml committed Dec 5, 2021
1 parent 4c341b1 commit 9faf37a
Show file tree
Hide file tree
Showing 3 changed files with 455 additions and 241 deletions.
73 changes: 65 additions & 8 deletions docs/providers/aws/events/kafka.md
Expand Up @@ -15,15 +15,53 @@ layout: Doc

A self-managed Apache Kafka cluster can be used as an event source for AWS Lambda.

## Simple event definition
In order to configure lambda to trigger via `kafka` events, you must provide three required properties:

In the following example, we specify that the `compute` function should be triggered whenever there are new messages available to consume from defined Kafka `topic`.
- `accessConfigurations` which defines the chosen [authentication](#authentication) method configuration
- `topic` to consume messages from
- `bootstrapServers` - an array of bootstrap server addresses for your Kafka cluster

In order to configure `kafka` event, you have to provide three required properties:
## Authentication

- `accessConfigurations`, which is either secret credentials required to do [SASL_SCRAM auth](https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_scram.html),[SASL_PLAIN auth](https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_plain.html) or this is VPC configuration to allow Lambda to connect to your cluster. Valid options are: `saslPlainAuth`, `saslScram256Auth`, or `saslScram512Auth`
- `topic` to consume messages from.
- `bootstrapServers` an array of bootstrap server addresses for your Kafka cluster
You must authenticate your Lambda with a self-managed Apache Kafka cluster using one of;

- VPC - subnet(s) and security group
- SASL SCRAM/PLAIN - AWS Secrets Manager secret containing credentials
- Mutual TLS (mTLS) - AWS Secrets Manager secret containing client certificate, private key, and optionally a CA certificate

You can provide this configuration via `accessConfigurations`

You must provide at least one method, but it is possible to use VPC in parallel with other methods. For example, you may choose to authenticate via mTLS or SASL/SCRAM, and also place your Lambda and cluster within a VPC.

Valid options for `accessConfigurations` are:

```yaml
saslPlainAuth: arn:aws:secretsmanager:us-east-1:01234567890:secret:SaslPlain
saslScram256Auth: arn:aws:secretsmanager:us-east-1:01234567890:secret:SaslScram256
saslScram512Auth: arn:aws:secretsmanager:us-east-1:01234567890:secret:SaslScram512
clientCertificateTlsAuth: arn:aws:secretsmanager:us-east-1:01234567890:secret:ClientCertificateTLS
serverRootCaCertificate: arn:aws:secretsmanager:us-east-1:01234567890:secret:ServerRootCaCertificate
vpcSubnet:
- subnet-0011001100
- subnet-0022002200
vpcSecurityGroup: sg-0123456789
```

For more information see:

- [AWS Documentation - Using Lambda with self-managed Apache Kafka](https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html#smaa-authentication)

- [AWS Documentation - AWS::Lambda::EventSourceMapping SourceAccessConfiguration](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-sourceaccessconfiguration.html)

- [Confluent documentation - Authentication with SASL/PLAIN](https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_plain.html)

- [Confluent documentation - Authentication with SASL/SCRAM](https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_scram.html)

- [Confluent documentation Encryption and Authentication with SSL](https://docs.confluent.io/platform/current/kafka/authentication_ssl.html)

## Basic Example: SASL/SCRAM

In the following example, we specify that the `compute` function should be triggered whenever there are new messages available to consume from Kafka topic `MySelfManagedKafkaTopic` from self-hosted cluster at `xyz.com`. The cluster has been authenticated using SASL/SCRAM, the credentials are stored at secret `MyBrokerSecretName`

```yml
functions:
Expand All @@ -33,7 +71,26 @@ functions:
- kafka:
accessConfigurations:
saslScram512Auth: arn:aws:secretsmanager:us-east-1:01234567890:secret:MyBrokerSecretName
topic: AWSKafkaTopic
topic: MySelfManagedKafkaTopic
bootstrapServers:
- abc3.xyz.com:9092
- abc2.xyz.com:9092
```

## Example: Using mTLS

In this example, the lambda event source is a self-managed Apache kafka cluster authenticated via mTLS. The value of `clientCertificateTlsAuth` is an arn of a secret containing the client certificate and privatekey required for the mTLS handshake. The value of `serverRootCaCertificate` is an arn of a secret containing the Certificate Authority (CA) Certificate. This is optional, you only need to provide if your cluster requires it.

```yml
functions:
compute:
handler: handler.compute
events:
- kafka:
accessConfigurations:
clientCertificateTlsAuth: arn:aws:secretsmanager:us-east-1:01234567890:secret:ClientCertificateTLS
serverRootCaCertificate: arn:aws:secretsmanager:us-east-1:01234567890:secret:ServerRootCaCertificate
topic: MySelfManagedMTLSKafkaTopic
bootstrapServers:
- abc3.xyz.com:9092
- abc2.xyz.com:9092
Expand All @@ -60,7 +117,7 @@ functions:
- abc2.xyz.com:9092
```

## Enabling and disabling Kafka event
## Enabling and disabling Kafka event trigger

The `kafka` event also supports `enabled` parameter, which is used to control if the event source mapping is active. Setting it to `false` will pause polling for and processing new messages.

Expand Down
44 changes: 44 additions & 0 deletions lib/plugins/aws/package/compile/events/kafka.js
@@ -1,5 +1,7 @@
'use strict';

const ServerlessError = require('../../../../../serverless-error');

class AwsCompileKafkaEvents {
constructor(serverless) {
this.serverless = serverless;
Expand Down Expand Up @@ -47,6 +49,16 @@ class AwsCompileKafkaEvents {
minItems: 1,
items: { $ref: '#/definitions/awsSecretsManagerArnString' },
},
clientCertificateTlsAuth: {
type: 'array',
minItems: 1,
items: { $ref: '#/definitions/awsSecretsManagerArnString' },
},
serverRootCaCertificate: {
type: 'array',
minItems: 1,
items: { $ref: '#/definitions/awsSecretsManagerArnString' },
},
},
additionalProperties: false,
},
Expand Down Expand Up @@ -112,6 +124,30 @@ class AwsCompileKafkaEvents {
functionObj.events.forEach((event) => {
if (!event.kafka) return;

const {
accessConfigurations: {
vpcSecurityGroup,
vpcSubnet,
clientCertificateTlsAuth,
serverRootCaCertificate,
},
} = event.kafka;

if ((vpcSecurityGroup && !vpcSubnet) || (vpcSubnet && !vpcSecurityGroup)) {
const missing = vpcSecurityGroup ? 'vpcSubnet' : 'vpcSecurityGroup';
throw new ServerlessError(
`You must specify at least one "${missing}" accessConfiguration for function: ${functionName}`,
'FUNCTION_KAFKA_VPC_ACCESS_CONFIGURATION_INVALID'
);
}

if (serverRootCaCertificate && !clientCertificateTlsAuth) {
throw new ServerlessError(
`You cannot specify "serverRootCaCertificate" accessConfiguration without providing "clientCertificateTlsAuth" accessConfiguration for function: ${functionName}`,
'FUNCTION_KAFKA_CLIENT_CERTIFICATE_TLS_AUTH_CONFIGURATION_MISSING'
);
}

hasKafkaEvent = true;
const { topic, batchSize, enabled } = event.kafka;
const startingPosition = event.kafka.startingPosition || 'TRIM_HORIZON';
Expand Down Expand Up @@ -167,6 +203,14 @@ class AwsCompileKafkaEvents {
type = 'SASL_SCRAM_512_AUTH';
needsSecretsManagerPermissions = true;
break;
case 'clientCertificateTlsAuth':
type = 'CLIENT_CERTIFICATE_TLS_AUTH';
needsSecretsManagerPermissions = true;
break;
case 'serverRootCaCertificate':
type = 'SERVER_ROOT_CA_CERTIFICATE';
needsSecretsManagerPermissions = true;
break;
default:
type = accessConfigurationType;
}
Expand Down

0 comments on commit 9faf37a

Please sign in to comment.