-
Notifications
You must be signed in to change notification settings - Fork 5
/
kinesis-lambda.ts
131 lines (125 loc) · 5.96 KB
/
kinesis-lambda.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import { Stream, StreamEncryption } from "aws-cdk-lib/aws-kinesis";
import type { StreamProps } from "aws-cdk-lib/aws-kinesis";
import { StartingPosition } from "aws-cdk-lib/aws-lambda";
import { KinesisEventSource } from "aws-cdk-lib/aws-lambda-event-sources";
import type { KinesisEventSourceProps } from "aws-cdk-lib/aws-lambda-event-sources";
import type { GuLambdaErrorPercentageMonitoringProps, NoMonitoring } from "../constructs/cloudwatch";
import { AppIdentity } from "../constructs/core";
import type { GuMigratingResource, GuStack } from "../constructs/core";
import { GuKinesisStream } from "../constructs/kinesis";
import type { GuKinesisStreamProps } from "../constructs/kinesis";
import { GuLambdaFunction } from "../constructs/lambda";
import type { GuFunctionProps } from "../constructs/lambda";
import { toAwsErrorHandlingProps } from "../utils/lambda";
import type { StreamErrorHandlingProps, StreamProcessingProps } from "../utils/lambda";
/**
* Used to provide information about an existing Kinesis stream to the [[`GuKinesisLambda`]] pattern.
*
* Specify a `existingLogicalId` to inherit a Kinesis stream which has already
* been created via a CloudFormation stack. This is necessary to avoid data loss and interruptions of
* service when migrating stacks from CloudFormation to `cdk`.
*
* Specify an `externalKinesisStreamName` to link the lambda to a Kinesis stream owned by a different stack
* (or created outside of version control).
*
* **Example Usage**
*
* When migrating a CloudFormation stack which includes the following resource:
* ```yaml
* MyCloudFormedKinesisStream:
* Type: AWS::Kinesis::Stream
* ```
* Inherit the Kinesis stream (rather than creating a new one) using:
* ```typescript
* existingKinesisStream: { existingLogicalId: "MyCloudFormedKinesisStream" }
* ```
*
* Alternatively, reference a Kinesis stream which belongs to another stack or pattern using:
* ```typescript
* existingKinesisStream: { externalKinesisStreamName: "KinesisStreamFromAnotherStack" }
* ```
*/
export interface ExistingKinesisStream extends GuMigratingResource {
externalKinesisStreamName?: string;
}
/**
* Configuration options for the [[`GuKinesisLambda`]] pattern.
*
* For all lambda function configuration options, see [[`GuFunctionProps`]].
*
* The `existingKinesisStream` property can be used to inherit or reference a Kinesis stream which
* has been created outside of this pattern (i.e. via CloudFormation, or via a different `cdk` pattern, or stack).
* For more details and example usage, see [[`ExistingKinesisStream`]].
* If this property is omitted, the [[`GuKinesisLambda`]] pattern will create a new stream.
*
* If you have specific stream configuration requirements (e.g. data retention period), these can be set via
* `kinesisStreamProps`.
*
* If you need to override the default stream processing options (e.g. batch size and parallelization), pass
* [[`StreamProcessingProps`]] via `processingProps`.
*
* You must provide `errorHandlingConfiguration` to this pattern. Retry conditions can be configured
* via [[`StreamErrorHandlingProps`]].
*
* It is advisable to configure an alarm based on the lambda's error percentage.
* To do this, add the `monitoringConfiguration` property. The required properties for this are:
*
* ```typescript
* monitoringConfiguration: {
* toleratedErrorPercentage: <sensible_error_percentage_threshold>,
* snsTopicName: "my-topic-for-cloudwatch-alerts",
* }
* ```
* Other alarm properties (e.g. alarm name and description) will be pre-populated with sensible defaults.
* For a full list of optional properties, see [[`GuLambdaErrorPercentageMonitoringProps`]].
*
* If your team do not use CloudWatch, it's possible to opt-out with the following configuration:
* ```typescript
* monitoringConfiguration: { noMonitoring: true } as NoMonitoring
* ```
*/
export interface GuKinesisLambdaProps extends Omit<GuFunctionProps, "errorPercentageMonitoring"> {
monitoringConfiguration: NoMonitoring | GuLambdaErrorPercentageMonitoringProps;
existingKinesisStream?: ExistingKinesisStream;
errorHandlingConfiguration: StreamErrorHandlingProps;
kinesisStreamProps?: StreamProps;
processingProps?: StreamProcessingProps;
}
/**
* Pattern which creates all of the resources needed to invoke a lambda function whenever a record is
* put onto a Kinesis stream.
*
* This pattern will create a new Kinesis stream by default. If you are migrating a stack from CloudFormation,
* you will need to opt-out of this behaviour. For information on overriding the default behaviour,
* see [[`GuKinesisLambdaProps`]].
*
* @alpha This pattern is in early development. The API is likely to change in future releases.
*/
export class GuKinesisLambda extends GuLambdaFunction {
constructor(scope: GuStack, id: string, props: GuKinesisLambdaProps) {
super(scope, id, {
...props,
errorPercentageMonitoring: props.monitoringConfiguration.noMonitoring ? undefined : props.monitoringConfiguration,
});
const kinesisProps: GuKinesisStreamProps = {
existingLogicalId: props.existingKinesisStream?.existingLogicalId,
encryption: StreamEncryption.MANAGED,
...props.kinesisStreamProps,
};
const streamId = props.existingKinesisStream?.existingLogicalId?.logicalId ?? "KinesisStream";
const kinesisStream = props.existingKinesisStream?.externalKinesisStreamName
? Stream.fromStreamArn(
scope,
streamId,
`arn:aws:kinesis:${scope.region}:${scope.account}:stream/${props.existingKinesisStream.externalKinesisStreamName}`
)
: AppIdentity.taggedConstruct(props, new GuKinesisStream(scope, streamId, kinesisProps));
const errorHandlingPropsToAwsProps = toAwsErrorHandlingProps(props.errorHandlingConfiguration);
const eventSourceProps: KinesisEventSourceProps = {
startingPosition: StartingPosition.LATEST,
...props.processingProps,
...errorHandlingPropsToAwsProps,
};
this.addEventSource(new KinesisEventSource(kinesisStream, eventSourceProps));
}
}