Skip to content

Commit

Permalink
parallel lambda support
Browse files Browse the repository at this point in the history
  • Loading branch information
twelch committed May 18, 2024
1 parent 8b55bbc commit 7ae5c20
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 9 deletions.
31 changes: 28 additions & 3 deletions packages/geoprocessing/scripts/aws/functionResources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import path from "path";

const GP_ROOT = process.env.GP_ROOT;

// Once sync functions create, contains policies to invoke all sync functions
const invokeSyncLambdaPolicies: PolicyStatement[] = [];

/**
* Create Lambda function constructs
*/
Expand Down Expand Up @@ -144,6 +147,15 @@ const createSyncFunctions = (
),
description: functionMeta.description,
});

// Allow sync functions to invoked by other functions
const syncInvokeLambdaPolicy = new PolicyStatement({
effect: Effect.ALLOW,
resources: [func.functionArn],
actions: ["lambda:InvokeFunction"],
});
invokeSyncLambdaPolicies.push(syncInvokeLambdaPolicy);

return {
func,
meta: functionMeta,
Expand Down Expand Up @@ -218,16 +230,29 @@ const createAsyncFunctions = (
);

// Allow start function to invoke run function
const asyncLambdaPolicy = new PolicyStatement({
const invokeAsyncRunLambdaPolicy = new PolicyStatement({
effect: Effect.ALLOW,
resources: [runFunc.functionArn],
actions: ["lambda:InvokeFunction"],
});
const asyncLambdaRole = new Role(stack, "GpAsyncLambdaRole" + index, {
assumedBy: new ServicePrincipal("lambda.amazonaws.com"),
});
asyncLambdaRole.addToPolicy(asyncLambdaPolicy);
startFunc.addToRolePolicy(asyncLambdaPolicy);
asyncLambdaRole.addToPolicy(invokeAsyncRunLambdaPolicy);
startFunc.addToRolePolicy(invokeAsyncRunLambdaPolicy);

// Allow async lambdas to invoke sync lambdas (workers)
invokeSyncLambdaPolicies.forEach((curInvokeSyncLambdaPolicy, index2) => {
const syncLambdaRole = new Role(
stack,
"GpSyncLambdaRole" + index + "_" + index2,
{
assumedBy: new ServicePrincipal("lambda.amazonaws.com"),
}
);
syncLambdaRole.addToPolicy(curInvokeSyncLambdaPolicy);
runFunc.addToRolePolicy(curInvokeSyncLambdaPolicy);
});

return {
startFunc,
Expand Down
16 changes: 10 additions & 6 deletions packages/geoprocessing/src/aws/GeoprocessingHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ if (process) {
export class GeoprocessingHandler<
T = JSONValue,
G extends Geometry = Polygon | LineString | Point,
P extends Record<string, JSONValue> = Record<string, JSONValue>
P extends Record<string, JSONValue> = Record<string, JSONValue>,
> {
func: (
feature:
Expand All @@ -75,7 +75,9 @@ export class GeoprocessingHandler<
| Sketch<G>
| SketchCollection<G>,
/** Optional additional runtime parameters from report client for geoprocessing function. Validation left to implementing function */
extraParams?: P
extraParams?: P,
/** Original event params used to invoke geoprocessing function made accessible to func */
request?: GeoprocessingRequestModel<G>
) => Promise<T>;
options: GeoprocessingHandlerOptions;
// Store last request id to avoid retries on a failure of the lambda
Expand All @@ -93,19 +95,21 @@ export class GeoprocessingHandler<
constructor(
func: (
feature: Feature<G> | FeatureCollection<G>,
extraParams: P
extraParams: P,
request?: GeoprocessingRequestModel<G>
) => Promise<T>,
options: GeoprocessingHandlerOptions
);
constructor(
func: (
feature: Sketch<G> | SketchCollection<G>,
extraParams: P
extraParams: P,
request?: GeoprocessingRequestModel<G>
) => Promise<T>,
options: GeoprocessingHandlerOptions
);
constructor(
func: (feature, extraParams) => Promise<T>,
func: (feature, extraParams, request) => Promise<T>,
options: GeoprocessingHandlerOptions
) {
this.func = func;
Expand Down Expand Up @@ -269,7 +273,7 @@ export class GeoprocessingHandler<
const extraParams = request.extraParams as unknown as P;
try {
console.time(`run func ${this.options.title}`);
const results = await this.func(featureSet, extraParams);
const results = await this.func(featureSet, extraParams, request);
console.timeEnd(`run func ${this.options.title}`);

task.data = results;
Expand Down

0 comments on commit 7ae5c20

Please sign in to comment.