Skip to content

Commit

Permalink
Add programming model info to worker metadata (#662)
Browse files Browse the repository at this point in the history
And include worker metadata in reload response
  • Loading branch information
ejizba committed Mar 3, 2023
1 parent 26f79ca commit f6d3625
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/eventHandlers/EventHandler.ts
Expand Up @@ -31,7 +31,7 @@ export abstract class EventHandler<
/**
* The default response with any properties unique to this request that should be set for both success & failure scenarios
*/
abstract getDefaultResponse(request: TRequest): TResponse;
abstract getDefaultResponse(channel: WorkerChannel, request: TRequest): TResponse;

/**
* Handles the event and returns the response
Expand Down
14 changes: 11 additions & 3 deletions src/eventHandlers/FunctionEnvironmentReloadHandler.ts
Expand Up @@ -5,6 +5,7 @@ import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language
import { startApp } from '../startApp';
import { WorkerChannel } from '../WorkerChannel';
import { EventHandler } from './EventHandler';
import { getWorkerMetadata } from './getWorkerMetadata';
import LogCategory = rpc.RpcLog.RpcLogCategory;
import LogLevel = rpc.RpcLog.Level;

Expand All @@ -17,15 +18,20 @@ export class FunctionEnvironmentReloadHandler extends EventHandler<
> {
readonly responseName = 'functionEnvironmentReloadResponse';

getDefaultResponse(_msg: rpc.IFunctionEnvironmentReloadRequest): rpc.IFunctionEnvironmentReloadResponse {
return {};
getDefaultResponse(
channel: WorkerChannel,
_msg: rpc.IFunctionEnvironmentReloadRequest
): rpc.IFunctionEnvironmentReloadResponse {
return {
workerMetadata: getWorkerMetadata(channel),
};
}

async handleEvent(
channel: WorkerChannel,
msg: rpc.IFunctionEnvironmentReloadRequest
): Promise<rpc.IFunctionEnvironmentReloadResponse> {
const response = this.getDefaultResponse(msg);
const response = this.getDefaultResponse(channel, msg);

// Add environment variables from incoming
const numVariables = (msg.environmentVariables && Object.keys(msg.environmentVariables).length) || 0;
Expand All @@ -49,6 +55,8 @@ export class FunctionEnvironmentReloadHandler extends EventHandler<
});
process.chdir(msg.functionAppDirectory);
await startApp(msg.functionAppDirectory, channel);
// model info may have changed, so we need to update this
response.workerMetadata = getWorkerMetadata(channel);
}

return response;
Expand Down
4 changes: 2 additions & 2 deletions src/eventHandlers/FunctionLoadHandler.ts
Expand Up @@ -15,14 +15,14 @@ import LogLevel = rpc.RpcLog.Level;
export class FunctionLoadHandler extends EventHandler<'functionLoadRequest', 'functionLoadResponse'> {
readonly responseName = 'functionLoadResponse';

getDefaultResponse(msg: rpc.IFunctionLoadRequest): rpc.IFunctionLoadResponse {
getDefaultResponse(_channel: WorkerChannel, msg: rpc.IFunctionLoadRequest): rpc.IFunctionLoadResponse {
return { functionId: msg.functionId };
}

async handleEvent(channel: WorkerChannel, msg: rpc.IFunctionLoadRequest): Promise<rpc.IFunctionLoadResponse> {
channel.workerIndexingLocked = true;

const response = this.getDefaultResponse(msg);
const response = this.getDefaultResponse(channel, msg);

channel.log({
message: `Worker ${channel.workerId} received FunctionLoadRequest`,
Expand Down
4 changes: 2 additions & 2 deletions src/eventHandlers/FunctionsMetadataHandler.ts
Expand Up @@ -10,7 +10,7 @@ import LogLevel = rpc.RpcLog.Level;
export class FunctionsMetadataHandler extends EventHandler<'functionsMetadataRequest', 'functionMetadataResponse'> {
readonly responseName = 'functionMetadataResponse';

getDefaultResponse(_msg: rpc.IFunctionsMetadataRequest): rpc.IFunctionMetadataResponse {
getDefaultResponse(_channel: WorkerChannel, _msg: rpc.IFunctionsMetadataRequest): rpc.IFunctionMetadataResponse {
return {
useDefaultMetadataIndexing: true,
};
Expand All @@ -22,7 +22,7 @@ export class FunctionsMetadataHandler extends EventHandler<'functionsMetadataReq
): Promise<rpc.IFunctionMetadataResponse> {
channel.workerIndexingLocked = true;

const response = this.getDefaultResponse(msg);
const response = this.getDefaultResponse(channel, msg);

channel.log({
message: `Worker ${channel.workerId} received FunctionsMetadataRequest`,
Expand Down
2 changes: 1 addition & 1 deletion src/eventHandlers/InvocationHandler.ts
Expand Up @@ -29,7 +29,7 @@ import { EventHandler } from './EventHandler';
export class InvocationHandler extends EventHandler<'invocationRequest', 'invocationResponse'> {
readonly responseName = 'invocationResponse';

getDefaultResponse(msg: rpc.IInvocationRequest): rpc.IInvocationResponse {
getDefaultResponse(_channel: WorkerChannel, msg: rpc.IInvocationRequest): rpc.IInvocationResponse {
return { invocationId: msg.invocationId };
}

Expand Down
15 changes: 6 additions & 9 deletions src/eventHandlers/WorkerInitHandler.ts
Expand Up @@ -4,12 +4,12 @@
import { access, constants } from 'fs';
import * as path from 'path';
import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc';
import { version as workerVersion } from '../constants';
import { isError } from '../errors';
import { startApp } from '../startApp';
import { nonNullProp } from '../utils/nonNull';
import { WorkerChannel } from '../WorkerChannel';
import { EventHandler } from './EventHandler';
import { getWorkerMetadata } from './getWorkerMetadata';
import LogCategory = rpc.RpcLog.RpcLogCategory;
import LogLevel = rpc.RpcLog.Level;

Expand All @@ -19,19 +19,14 @@ import LogLevel = rpc.RpcLog.Level;
export class WorkerInitHandler extends EventHandler<'workerInitRequest', 'workerInitResponse'> {
readonly responseName = 'workerInitResponse';

getDefaultResponse(_msg: rpc.IWorkerInitRequest): rpc.IWorkerInitResponse {
getDefaultResponse(channel: WorkerChannel, _msg: rpc.IWorkerInitRequest): rpc.IWorkerInitResponse {
return {
workerMetadata: {
runtimeName: 'node',
runtimeVersion: process.versions.node,
workerBitness: process.arch,
workerVersion,
},
workerMetadata: getWorkerMetadata(channel),
};
}

async handleEvent(channel: WorkerChannel, msg: rpc.IWorkerInitRequest): Promise<rpc.IWorkerInitResponse> {
const response = this.getDefaultResponse(msg);
const response = this.getDefaultResponse(channel, msg);

channel.log({
message: `Worker ${channel.workerId} received WorkerInitRequest`,
Expand All @@ -45,6 +40,8 @@ export class WorkerInitHandler extends EventHandler<'workerInitRequest', 'worker

if (msg.functionAppDirectory) {
await startApp(msg.functionAppDirectory, channel);
// model info may have changed, so we need to update this
response.workerMetadata = getWorkerMetadata(channel);
}

response.capabilities = {
Expand Down
22 changes: 22 additions & 0 deletions src/eventHandlers/getWorkerMetadata.ts
@@ -0,0 +1,22 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License.

import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc';
import { version as workerVersion } from '../constants';
import { WorkerChannel } from '../WorkerChannel';

export function getWorkerMetadata(channel: WorkerChannel): rpc.IWorkerMetadata {
const result: rpc.IWorkerMetadata = {
runtimeName: 'node',
runtimeVersion: process.versions.node,
workerBitness: process.arch,
workerVersion,
};
if (channel.programmingModel) {
result.customProperties = {
modelName: channel.programmingModel.name,
modelVersion: channel.programmingModel.version,
};
}
return result;
}
2 changes: 1 addition & 1 deletion src/setupEventStream.ts
Expand Up @@ -107,7 +107,7 @@ async function handleMessage(channel: WorkerChannel, inMsg: rpc.StreamingMessage
}

if (eventHandler && request) {
const response = eventHandler.getDefaultResponse(request);
const response = eventHandler.getDefaultResponse(channel, request);
response.result = {
status: rpc.StatusResult.Status.Failure,
exception: {
Expand Down
30 changes: 23 additions & 7 deletions test/eventHandlers/FunctionEnvironmentReloadHandler.test.ts
Expand Up @@ -7,7 +7,7 @@ import * as mock from 'mock-fs';
import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc';
import { WorkerChannel } from '../../src/WorkerChannel';
import { beforeEventHandlerSuite } from './beforeEventHandlerSuite';
import { TestEventStream } from './TestEventStream';
import { RegExpStreamingMessage, TestEventStream } from './TestEventStream';
import { Msg as WorkerInitMsg } from './WorkerInitHandler.test';
import path = require('path');
import LogCategory = rpc.RpcLog.RpcLogCategory;
Expand All @@ -24,14 +24,30 @@ export namespace Msg {
};
}

export const reloadSuccess: rpc.IStreamingMessage = {
requestId: 'id',
functionEnvironmentReloadResponse: {
result: {
status: rpc.StatusResult.Status.Success,
const workerMetadataRegExps = {
'functionEnvironmentReloadResponse.workerMetadata.runtimeVersion': /^[0-9]+\.[0-9]+\.[0-9]+$/,
'functionEnvironmentReloadResponse.workerMetadata.workerBitness': /^(x64|ia32|arm64)$/,
'functionEnvironmentReloadResponse.workerMetadata.workerVersion': /^3\.[0-9]+\.[0-9]+$/,
'functionEnvironmentReloadResponse.workerMetadata.customProperties.modelVersion': /^3\.[0-9]+\.[0-9]+$/,
};

export const reloadSuccess = new RegExpStreamingMessage(
{
requestId: 'id',
functionEnvironmentReloadResponse: {
result: {
status: rpc.StatusResult.Status.Success,
},
workerMetadata: {
runtimeName: 'node',
customProperties: {
modelName: '@azure/functions',
},
},
},
},
};
workerMetadataRegExps
);

export const noHandlerRpcLog: rpc.IStreamingMessage = {
rpcLog: {
Expand Down
4 changes: 4 additions & 0 deletions test/eventHandlers/WorkerInitHandler.test.ts
Expand Up @@ -32,6 +32,7 @@ export namespace Msg {
'workerInitResponse.workerMetadata.runtimeVersion': /^[0-9]+\.[0-9]+\.[0-9]+$/,
'workerInitResponse.workerMetadata.workerBitness': /^(x64|ia32|arm64)$/,
'workerInitResponse.workerMetadata.workerVersion': /^3\.[0-9]+\.[0-9]+$/,
'workerInitResponse.workerMetadata.customProperties.modelVersion': /^3\.[0-9]+\.[0-9]+$/,
};

export const response = new RegExpStreamingMessage(
Expand All @@ -53,6 +54,9 @@ export namespace Msg {
},
workerMetadata: {
runtimeName: 'node',
customProperties: {
modelName: '@azure/functions',
},
},
},
},
Expand Down

0 comments on commit f6d3625

Please sign in to comment.