Skip to content

Commit

Permalink
Add blocking trigger type (#4395)
Browse files Browse the repository at this point in the history
* tinkering with implementations

* adding in cache for discovery

* adding in skeleton for service object and identity platform api calls

* adding in all the service code

* formatter

* rename

* removing the lookup before endpointToFunction & adding v2 stuff

* add tests & changelog

* adding tsdoc comments

* addressing pr comments

* removing the empty object from gcp/identityPlatform

* added typing & fixing endpointFromFunction to set securityLevel and uri for every httpsTrigger

* removing changelog entries

* cleaning up & set default endpoint options to false

* adding in inferBlockingDetails

* checking if config changed before calling set blocking config

* formatter

* cleaning up

* adding event type if in fab & changing api to match prod

* adding serviceForEndpoint in fab setTrigger

* converting to options record & yanking auth blocking events from events/v2

* removing old comments

* fixing delete blocking function bug

* adding a promise queue for trigger registration & deletion

* making auth blocking service a class & moving trigger queue into it

* remove comment
  • Loading branch information
colerogers committed Apr 12, 2022
1 parent 908caac commit 0ea9629
Show file tree
Hide file tree
Showing 25 changed files with 1,745 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
- Fixes console error on large uploads to Storage Emulator (#4407).
- Fixes cross-platform incompatibility with Storage Emulator exports (#4411).
- Fixes issue where function deployment errored on projects without secrets (#4425).
- Adds a blocking trigger type (#4395).
20 changes: 19 additions & 1 deletion src/deploy/functions/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as gcf from "../../gcp/cloudfunctions";
import * as gcfV2 from "../../gcp/cloudfunctionsv2";
import * as utils from "../../utils";
import * as runtimes from "./runtimes";
import * as events from "../../functions/events";
import { FirebaseError } from "../../error";
import { Context } from "./args";
import { previews } from "../../previews";
Expand Down Expand Up @@ -131,6 +132,15 @@ export interface TaskQueueTriggered {
taskQueueTrigger: TaskQueueTrigger;
}

export interface BlockingTrigger {
eventType: string;
options?: Record<string, any>;
}

export interface BlockingTriggered {
blockingTrigger: BlockingTrigger;
}

/** A user-friendly string for the kind of trigger of an endpoint. */
export function endpointTriggerType(endpoint: Endpoint): string {
if (isScheduleTriggered(endpoint)) {
Expand All @@ -143,6 +153,8 @@ export function endpointTriggerType(endpoint: Endpoint): string {
return endpoint.eventTrigger.eventType;
} else if (isTaskQueueTriggered(endpoint)) {
return "taskQueue";
} else if (isBlockingTriggered(endpoint)) {
return endpoint.blockingTrigger.eventType;
} else {
throw new Error("Unexpected trigger type for endpoint " + JSON.stringify(endpoint));
}
Expand Down Expand Up @@ -227,7 +239,8 @@ export type Triggered =
| CallableTriggered
| EventTriggered
| ScheduleTriggered
| TaskQueueTriggered;
| TaskQueueTriggered
| BlockingTriggered;

/** Whether something has an HttpsTrigger */
export function isHttpsTriggered(triggered: Triggered): triggered is HttpsTriggered {
Expand All @@ -254,6 +267,11 @@ export function isTaskQueueTriggered(triggered: Triggered): triggered is TaskQue
return {}.hasOwnProperty.call(triggered, "taskQueueTrigger");
}

/** Whether something has a BlockingTrigger */
export function isBlockingTriggered(triggered: Triggered): triggered is BlockingTriggered {
return {}.hasOwnProperty.call(triggered, "blockingTrigger");
}

/**
* An endpoint that serves traffic to a stack of services.
* For now, this is always a Cloud Function. Future iterations may use complex
Expand Down
34 changes: 34 additions & 0 deletions src/deploy/functions/prepare.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { ensureServiceAgentRoles } from "./checkIam";
import { FirebaseError } from "../../error";
import { normalizeAndValidate } from "../../functions/projectConfig";
import { previews } from "../../previews";
import { AUTH_BLOCKING_EVENTS } from "../../functions/events/v1";

function hasUserConfig(config: Record<string, unknown>): boolean {
// "firebase" key is always going to exist in runtime config.
Expand Down Expand Up @@ -180,6 +181,7 @@ export async function prepare(
inferDetailsFromExisting(wantBackend, haveBackend, usedDotenv);
await ensureTriggerRegions(wantBackend);
validate.endpointsAreValid(wantBackend);
inferBlockingDetails(wantBackend);

payload.functions = { wantBackend: wantBackend, haveBackend: haveBackend };

Expand Down Expand Up @@ -283,3 +285,35 @@ function maybeCopyTriggerRegion(wantE: backend.Endpoint, haveE: backend.Endpoint
}
wantE.eventTrigger.region = haveE.eventTrigger.region;
}

/** Figures out the blocking endpoint options by taking the OR of every trigger option and reassigning that value back to the endpoint. */
export function inferBlockingDetails(want: backend.Backend): void {
const authBlockingEndpoints = backend
.allEndpoints(want)
.filter(
(ep) =>
backend.isBlockingTriggered(ep) &&
AUTH_BLOCKING_EVENTS.includes(ep.blockingTrigger.eventType as any)
) as (backend.Endpoint & backend.BlockingTriggered)[];

if (authBlockingEndpoints.length === 0) {
return;
}

let accessToken = false;
let idToken = false;
let refreshToken = false;
for (const blockingEp of authBlockingEndpoints) {
accessToken ||= !!blockingEp.blockingTrigger.options?.accessToken;
idToken ||= !!blockingEp.blockingTrigger.options?.idToken;
refreshToken ||= !!blockingEp.blockingTrigger.options?.refreshToken;
}
for (const blockingEp of authBlockingEndpoints) {
if (!blockingEp.blockingTrigger.options) {
blockingEp.blockingTrigger.options = {};
}
blockingEp.blockingTrigger.options.accessToken = accessToken;
blockingEp.blockingTrigger.options.idToken = idToken;
blockingEp.blockingTrigger.options.refreshToken = refreshToken;
}
}
51 changes: 51 additions & 0 deletions src/deploy/functions/release/fabricator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import * as reporter from "./reporter";
import * as run from "../../../gcp/run";
import * as scheduler from "../../../gcp/cloudscheduler";
import * as utils from "../../../utils";
import * as services from "../services";
import { AUTH_BLOCKING_EVENTS } from "../../../functions/events/v1";

// TODO: Tune this for better performance.
const gcfV1PollerOptions: Omit<poller.OperationPollerOptions, "operationResourceName"> = {
Expand Down Expand Up @@ -54,6 +56,7 @@ export interface FabricatorArgs {
const rethrowAs =
<T>(endpoint: backend.Endpoint, op: reporter.OperationType) =>
(err: unknown): T => {
logger.error((err as Error).message);
throw new reporter.DeploymentError(endpoint, op, err);
};

Expand Down Expand Up @@ -248,6 +251,16 @@ export class Fabricator {
})
.catch(rethrowAs(endpoint, "set invoker"));
}
} else if (
backend.isBlockingTriggered(endpoint) &&
AUTH_BLOCKING_EVENTS.includes(endpoint.blockingTrigger.eventType as any)
) {
// Auth Blocking functions should always be public
await this.executor
.run(async () => {
await gcf.setInvokerCreate(endpoint.project, backend.functionName(endpoint), ["public"]);
})
.catch(rethrowAs(endpoint, "set invoker"));
}
}

Expand Down Expand Up @@ -317,6 +330,14 @@ export class Fabricator {
})
.catch(rethrowAs(endpoint, "set invoker"));
}
} else if (
backend.isBlockingTriggered(endpoint) &&
AUTH_BLOCKING_EVENTS.includes(endpoint.blockingTrigger.eventType as any)
) {
// Auth Blocking functions should always be public
await this.executor
.run(() => run.setInvokerCreate(endpoint.project, serviceName, ["public"]))
.catch(rethrowAs(endpoint, "set invoker"));
}

const mem = endpoint.availableMemoryMb || backend.DEFAULT_MEMORY;
Expand Down Expand Up @@ -354,6 +375,11 @@ export class Fabricator {
invoker = endpoint.httpsTrigger.invoker;
} else if (backend.isTaskQueueTriggered(endpoint)) {
invoker = endpoint.taskQueueTrigger.invoker;
} else if (
backend.isBlockingTriggered(endpoint) &&
AUTH_BLOCKING_EVENTS.includes(endpoint.blockingTrigger.eventType as any)
) {
invoker = ["public"];
}
if (invoker) {
await this.executor
Expand Down Expand Up @@ -395,6 +421,11 @@ export class Fabricator {
invoker = endpoint.httpsTrigger.invoker;
} else if (backend.isTaskQueueTriggered(endpoint)) {
invoker = endpoint.taskQueueTrigger.invoker;
} else if (
backend.isBlockingTriggered(endpoint) &&
AUTH_BLOCKING_EVENTS.includes(endpoint.blockingTrigger.eventType as any)
) {
invoker = ["public"];
}
if (invoker) {
await this.executor
Expand Down Expand Up @@ -472,6 +503,8 @@ export class Fabricator {
assertExhaustive(endpoint.platform);
} else if (backend.isTaskQueueTriggered(endpoint)) {
await this.upsertTaskQueue(endpoint);
} else if (backend.isBlockingTriggered(endpoint)) {
await this.registerBlockingTrigger(endpoint);
}
}

Expand All @@ -487,6 +520,8 @@ export class Fabricator {
assertExhaustive(endpoint.platform);
} else if (backend.isTaskQueueTriggered(endpoint)) {
await this.disableTaskQueue(endpoint);
} else if (backend.isBlockingTriggered(endpoint)) {
await this.unregisterBlockingTrigger(endpoint);
}
}

Expand Down Expand Up @@ -519,6 +554,14 @@ export class Fabricator {
}
}

async registerBlockingTrigger(
endpoint: backend.Endpoint & backend.BlockingTriggered
): Promise<void> {
await this.executor
.run(() => services.serviceForEndpoint(endpoint).registerTrigger(endpoint))
.catch(rethrowAs(endpoint, "register blocking trigger"));
}

async deleteScheduleV1(endpoint: backend.Endpoint & backend.ScheduleTriggered): Promise<void> {
const job = scheduler.jobFromEndpoint(endpoint, this.appEngineLocation);
await this.executor
Expand Down Expand Up @@ -546,6 +589,14 @@ export class Fabricator {
.catch(rethrowAs(endpoint, "disable task queue"));
}

async unregisterBlockingTrigger(
endpoint: backend.Endpoint & backend.BlockingTriggered
): Promise<void> {
await this.executor
.run(() => services.serviceForEndpoint(endpoint).unregisterTrigger(endpoint))
.catch(rethrowAs(endpoint, "unregister blocking trigger"));
}

logOpStart(op: string, endpoint: backend.Endpoint): void {
const runtime = getHumanFriendlyRuntimeName(endpoint.runtime);
const label = helper.getFunctionLabel(endpoint);
Expand Down
2 changes: 2 additions & 0 deletions src/deploy/functions/release/planner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ export function checkForIllegalUpdate(want: backend.Endpoint, have: backend.Endp
return "a scheduled";
} else if (backend.isTaskQueueTriggered(e)) {
return "a task queue";
} else if (backend.isBlockingTriggered(e)) {
return e.blockingTrigger.eventType;
}
// Unfortunately TypeScript isn't like Scala and I can't prove to it
// that all cases have been handled
Expand Down
8 changes: 7 additions & 1 deletion src/deploy/functions/release/reporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ export type OperationType =
| "create topic"
| "delete topic"
| "set invoker"
| "set concurrency";
| "set concurrency"
| "register blocking trigger"
| "unregister blocking trigger";

/** An error with a deployment phase. */
export class DeploymentError extends Error {
Expand Down Expand Up @@ -245,5 +247,9 @@ export function triggerTag(endpoint: backend.Endpoint): string {
return `${prefix}.https`;
}

if (backend.isBlockingTriggered(endpoint)) {
return `${prefix}.blocking`;
}

return endpoint.eventTrigger.eventType;
}
12 changes: 12 additions & 0 deletions src/deploy/functions/runtimes/discovery/v1alpha1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export type ManifestEndpoint = backend.ServiceConfiguration &
Partial<backend.CallableTriggered> &
Partial<backend.EventTriggered> &
Partial<backend.TaskQueueTriggered> &
Partial<backend.BlockingTriggered> &
Partial<backend.ScheduleTriggered> & {
region?: string[];
entryPoint: string;
Expand Down Expand Up @@ -102,6 +103,7 @@ function parseEndpoints(
eventTrigger: "object",
scheduleTrigger: "object",
taskQueueTrigger: "object",
blockingTrigger: "object",
});
let triggerCount = 0;
if (ep.httpsTrigger) {
Expand All @@ -119,6 +121,9 @@ function parseEndpoints(
if (ep.taskQueueTrigger) {
triggerCount++;
}
if (ep.blockingTrigger) {
triggerCount++;
}
if (!triggerCount) {
throw new FirebaseError("Expected trigger in endpoint " + id);
}
Expand Down Expand Up @@ -192,6 +197,13 @@ function parseEndpoints(
});
}
triggered = { taskQueueTrigger: ep.taskQueueTrigger };
} else if (backend.isBlockingTriggered(ep)) {
requireKeys(prefix + ".blockingTrigger", ep.blockingTrigger, "eventType");
assertKeyTypes(prefix + ".blockingTrigger", ep.blockingTrigger, {
eventType: "string",
options: "object",
});
triggered = { blockingTrigger: ep.blockingTrigger };
} else {
throw new FirebaseError(
`Do not recognize trigger type for endpoint ${id}. Try upgrading ` +
Expand Down
28 changes: 24 additions & 4 deletions src/deploy/functions/runtimes/node/parseTriggers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import * as api from "../../../../api";
import * as proto from "../../../../gcp/proto";
import * as args from "../../args";
import * as runtimes from "../../runtimes";
import * as v2events from "../../../../functions/events/v2";
import * as events from "../../../../functions/events";

const TRIGGER_PARSER = path.resolve(__dirname, "./triggerParser.js");

Expand Down Expand Up @@ -68,6 +68,10 @@ export interface TriggerAnnotation {
};
invoker?: string[];
};
blockingTrigger?: {
eventType: string;
options?: Record<string, any>;
};
failurePolicy?: {};
schedule?: ScheduleAnnotation;
timeZone?: string;
Expand Down Expand Up @@ -180,7 +184,10 @@ export function addResourcesToBackend(

// +!! is 1 for truthy values and 0 for falsy values
const triggerCount =
+!!annotation.httpsTrigger + +!!annotation.eventTrigger + +!!annotation.taskQueueTrigger;
+!!annotation.httpsTrigger +
+!!annotation.eventTrigger +
+!!annotation.taskQueueTrigger +
+!!annotation.blockingTrigger;
if (triggerCount !== 1) {
throw new FirebaseError(
"Unexpected annotation generated by the Firebase Functions SDK. This should never happen."
Expand Down Expand Up @@ -211,6 +218,19 @@ export function addResourcesToBackend(
reason: "Needed for scheduled functions.",
});
triggered = { scheduleTrigger: annotation.schedule };
} else if (annotation.blockingTrigger) {
if (events.v1.AUTH_BLOCKING_EVENTS.includes(annotation.blockingTrigger.eventType as any)) {
want.requiredAPIs.push({
api: "identitytoolkit.googleapis.com",
reason: "Needed for auth blocking functions.",
});
}
triggered = {
blockingTrigger: {
eventType: annotation.blockingTrigger.eventType,
options: annotation.blockingTrigger.options,
},
};
} else {
triggered = {
eventTrigger: {
Expand All @@ -223,12 +243,12 @@ export function addResourcesToBackend(
// TODO: yank this edge case for a v2 trigger on the pre-container contract
// once we use container contract for the functionsv2 experiment.
if (annotation.platform === "gcfv2") {
if (annotation.eventTrigger!.eventType === v2events.PUBSUB_PUBLISH_EVENT) {
if (annotation.eventTrigger!.eventType === events.v2.PUBSUB_PUBLISH_EVENT) {
triggered.eventTrigger.eventFilters = { topic: annotation.eventTrigger!.resource };
}

if (
v2events.STORAGE_EVENTS.find(
events.v2.STORAGE_EVENTS.find(
(event) => event === (annotation.eventTrigger?.eventType || "")
)
) {
Expand Down

0 comments on commit 0ea9629

Please sign in to comment.