Skip to content

Commit

Permalink
feat: add broker plugin system
Browse files Browse the repository at this point in the history
  • Loading branch information
aarlaud committed May 3, 2024
1 parent e44f584 commit 6bbeb8b
Show file tree
Hide file tree
Showing 21 changed files with 895 additions and 8 deletions.
15 changes: 15 additions & 0 deletions config.universal.json.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"BROKER_CLIENT_CONFIGURATION": {
"common": {
"default": {
"BROKER_SERVER_URL": "https://broker.pre-prod.snyk.io",
"BROKER_HA_MODE_ENABLED": "false",
"BROKER_DISPATCHER_BASE_URL": "https://api.pre-prod.snyk.io"
},
"oauth": {
"clientId": "${CLIENT_ID}",
"clientSecret": "${CLIENT_SECRET}"
}
}
}
}
57 changes: 57 additions & 0 deletions lib/client/brokerClientPlugins/abstractBrokerPlugin.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import {
HttpResponse,
makeRequestToDownstream,
} from '../../common/http/request';
import { PostFilterPreparedRequest } from '../../common/relay/prepareRequest';
import { log as logger } from '../../logs/logger';

export default abstract class BrokerPlugin {
abstract pluginCode: string;
abstract pluginName: string;
abstract description: string;
abstract version: string;
abstract applicableBrokerTypes: Array<string>;
logger;
brokerClientConfiguration: Record<string, any>;
makeRequestToDownstream: (
req: PostFilterPreparedRequest,
retries?: any,
) => Promise<HttpResponse>;
request?: PostFilterPreparedRequest;

constructor(brokerClientCfg: Record<string, any>) {
this.logger = logger;
this.brokerClientConfiguration = brokerClientCfg;
this.makeRequestToDownstream = makeRequestToDownstream;
}

getApplicableTypes(): Array<string> {
const applicableTypes: Array<string> = [];
if (
this.applicableBrokerTypes.every((type) =>
this.brokerClientConfiguration.supportedBrokerTypes.includes(type),
)
) {
applicableTypes.push(...this.applicableBrokerTypes);
}
return applicableTypes;
}
isDisabled(config): boolean {
let isDisabled = false;
if (config[`DISABLE_${this.pluginCode}_PLUGIN`]) {
logger.info({ plugin: this.pluginName }, `Plugin disabled`);
isDisabled = true;
}
return isDisabled;
}
abstract isPluginActive(): boolean;

abstract startUp(connectionConfiguration: Record<string, any>): Promise<void>;

async preRequest(
connectionConfiguration: Record<string, any>,
postFilterPreparedRequest: PostFilterPreparedRequest,
): Promise<PostFilterPreparedRequest> {
return postFilterPreparedRequest;
}
}
126 changes: 126 additions & 0 deletions lib/client/brokerClientPlugins/pluginManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import { readdir } from 'fs/promises';
import { log as logger } from '../../logs/logger';
import BrokerPlugin from './abstractBrokerPlugin';
import { existsSync } from 'fs';
import { PostFilterPreparedRequest } from '../../common/relay/prepareRequest';

export const loadPlugins = async (pluginsFolderPath: string, clientOpts) => {
clientOpts.config['plugins'] = new Map<string, unknown>();
clientOpts.config.supportedBrokerTypes.forEach((type) => {
clientOpts.config.plugins.set(type, []);
});
try {
if (existsSync(pluginsFolderPath)) {
const pluginsFiles = await readdir(pluginsFolderPath);
for (const pluginFile of pluginsFiles.filter((filename) =>
filename.endsWith('.js'),
)) {
const plugin = await import(`${pluginsFolderPath}/${pluginFile}`);
// Passing the config object so we can mutate things like filters instead of READONLY
const pluginInstance = new plugin.Plugin(clientOpts.config);
const applicableBrokerTypes = pluginInstance.getApplicableTypes();
applicableBrokerTypes.forEach((applicableBrokerType) => {
if (
!pluginInstance.isDisabled(clientOpts.config) &&
pluginInstance.isPluginActive()
) {
logger.debug({}, `Loading plugin ${pluginInstance.pluginName}`);
const configPluginForCurrentType =
clientOpts.config.plugins.get(applicableBrokerType);
if (
configPluginForCurrentType.some(
(x) =>
x.pluginCode === pluginInstance.pluginCode ||
x.pluginName === pluginInstance.pluginName,
)
) {
const errMsg = `Some Plugins have identical name or code.`;
logger.error({}, errMsg);
throw new Error(errMsg);
}
configPluginForCurrentType.push(pluginInstance);
} else {
logger.debug(
{},
`Skipping plugin ${pluginInstance.pluginName}, not active.`,
);
}
});
}
}
return clientOpts.config['plugins'];
} catch (err) {
const errMsg = `Error loading plugins from ${pluginsFolderPath}`;
logger.error({ err }, `Error loading plugins from ${pluginsFolderPath}`);
throw new Error(errMsg);
}
};

export const runStartupPlugins = async (clientOpts) => {
const loadedPlugins = clientOpts.config.plugins as Map<
string,
BrokerPlugin[]
>;
const connectionsKeys = Object.keys(clientOpts.config.connections);

for (const connectionKey of connectionsKeys) {
if (
loadedPlugins.has(`${clientOpts.config.connections[connectionKey].type}`)
) {
const pluginInstances =
loadedPlugins.get(
`${clientOpts.config.connections[connectionKey].type}`,
) ?? [];
for (let i = 0; i < pluginInstances.length; i++) {
await pluginInstances[i].startUp(
clientOpts.config.connections[connectionKey],
);
}
}
}
};

export const runPreRequestPlugins = async (
clientOpts,
connectionIdentifier,
pristinePreRequest: PostFilterPreparedRequest,
) => {
let preRequest = pristinePreRequest;
const loadedPlugins = clientOpts.config.plugins as Map<
string,
BrokerPlugin[]
>;
const connectionsKeys = Object.keys(clientOpts.config.connections);
let connectionKey;
for (let i = 0; i < connectionsKeys.length; i++) {
if (
clientOpts.config.connections[connectionsKeys[i]].identifier ==
connectionIdentifier
) {
connectionKey = connectionsKeys[i];
break;
}
}
if (!connectionsKeys.includes(connectionKey)) {
const errMsg = `Plugin preRequest: connection ${connectionKey} not found`;
logger.error({ connectionKey }, errMsg);
throw new Error(errMsg);
}

if (
loadedPlugins.has(`${clientOpts.config.connections[connectionKey].type}`)
) {
const pluginInstances =
loadedPlugins.get(
`${clientOpts.config.connections[connectionKey].type}`,
) ?? [];
for (let i = 0; i < pluginInstances.length; i++) {
preRequest = await pluginInstances[i].preRequest(
clientOpts.config.connections[connectionKey],
preRequest,
);
}
}

return preRequest;
};
60 changes: 60 additions & 0 deletions lib/client/brokerClientPlugins/plugins/githubServerAppAuth.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// import { PostFilterPreparedRequest } from '../../../common/relay/prepareRequest';
import BrokerPlugin from '../abstractBrokerPlugin';

export class Plugin extends BrokerPlugin {
// Plugin Code and Name must be unique across all plugins.
pluginCode = 'GITHUB_SERVER_APP';
pluginName = 'Github Server App Authentication Plugin';
description = `
Plugin to retrieve and manage credentials for Brokered Github Server App installs
`;
version = '0.1';
applicableBrokerTypes = ['github-server-app']; // Must match broker types

// Provide a way to include specific conditional logic to execute
isPluginActive(): boolean {
// if (this.brokerClientConfiguration['XYZ']) {
// this.logger.debug({ plugin: this.pluginName }, 'Disabling plugin');
// return false;
// }
return true;
}

// Function running upon broker client startup
// Useful for credentials retrieval, initial setup, etc...
async startUp(connectionConfig): Promise<void> {
this.logger.info({ plugin: this.pluginName }, 'Running Startup');
this.logger.info(
{ config: connectionConfig },
'Connection Config passed to the plugin',
);
// const data = {
// install_id: connectionConfig.GITHUB_APP_INSTALL_ID,
// client_id: connectionConfig.GITHUB_CLIENT_ID,
// client_secret: connectionConfig.GITHUB_CLIENT_SECRET,
// };
// const formData = new URLSearchParams(data);

// this.request = {
// url: `https://${connectionConfig.GITHUB_API}/oauth/path`,
// headers: {
// 'Content-Type': 'application/x-www-form-urlencoded',
// },
// method: 'POST',
// body: formData.toString(),
// };
// const response = await this.makeRequestToDownstream(this.request);
// if (response.statusCode && response.statusCode > 299) {
// throw Error('Error making request');
// }
}

// Hook to run pre requests operations - Optional. Uncomment to enable
// async preRequest(
// connectionConfiguration: Record<string, any>,
// postFilterPreparedRequest:PostFilterPreparedRequest,
// ) {
// this.logger.debug({ plugin: this.pluginName, connection: connectionConfiguration }, 'Running prerequest plugin');
// return postFilterPreparedRequest;
// }
}
5 changes: 5 additions & 0 deletions lib/client/hooks/startup/processHooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { HookResults } from '../../types/client';
import { CheckResult } from '../../checks/types';
import { ClientOpts } from '../../../common/types/options';
import { highAvailabilityModeEnabled } from '../../config/configHelpers';
import { runStartupPlugins } from '../../brokerClientPlugins/pluginManager';

export const validateMinimalConfig = async (
clientOpts: ClientOpts,
Expand Down Expand Up @@ -134,6 +135,10 @@ export const processStartUpHooks = async (
);
}

if (clientOpts.config.universalBrokerEnabled) {
await runStartupPlugins(clientOpts);
}

return {
preflightCheckResults: preflightCheckResults.length
? preflightCheckResults
Expand Down
22 changes: 17 additions & 5 deletions lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import { getClientConfigMetadata } from './config/configHelpers';
import { fetchJwt } from './auth/oauth';
import {
CONFIGURATION,
findProjectRoot,
getConfig,
loadBrokerConfig,
} from '../common/config/config';
import { retrieveConnectionsForDeployment } from './config/remoteConfig';
import { handleTerminationSignal } from '../common/utils/signals';
import { cleanUpUniversalFile } from './utils/cleanup';
import { loadPlugins } from './brokerClientPlugins/pluginManager';

process.on('uncaughtException', (error) => {
if (error.message == 'read ECONNRESET') {
Expand Down Expand Up @@ -62,6 +64,15 @@ export const main = async (clientOpts: ClientOpts) => {
'https://api.snyk.io';

await validateMinimalConfig(clientOpts);
if (clientOpts.config.universalBrokerEnabled) {
const pluginsFolderPath = `${findProjectRoot(
__dirname,
)}/dist/lib/client/brokerClientPlugins/plugins`;
clientOpts.config.plugins = await loadPlugins(
pluginsFolderPath,
clientOpts,
);
}

if (
clientOpts.config.brokerClientConfiguration.common.oauth?.clientId &&
Expand All @@ -75,7 +86,7 @@ export const main = async (clientOpts: ClientOpts) => {
);
await retrieveConnectionsForDeployment(
clientOpts,
`${__dirname}/../../../../config.universal.json`,
`${findProjectRoot(process.cwd())}/config.universal.json`,
);
// Reload config with connection
await loadBrokerConfig();
Expand All @@ -87,6 +98,11 @@ export const main = async (clientOpts: ClientOpts) => {
) as Record<string, any> as CONFIGURATION;
handleTerminationSignal(cleanUpUniversalFile);
}

const brokerClientId = uuidv4();
logger.info({ brokerClientId }, 'generated broker client id');
const hookResults = await processStartUpHooks(clientOpts, brokerClientId);

const loadedClientOpts: LoadedClientOpts = {
loadedFilters: loadAllFilters(clientOpts.filters, clientOpts.config),
...clientOpts,
Expand All @@ -97,10 +113,6 @@ export const main = async (clientOpts: ClientOpts) => {
throw new Error('Unable to load filters');
}

const brokerClientId = uuidv4();
logger.info({ brokerClientId }, 'generated broker client id');
const hookResults = await processStartUpHooks(clientOpts, brokerClientId);

const globalIdentifyingMetadata: IdentifyingMetadata = {
capabilities: ['post-streams'],
clientId: brokerClientId,
Expand Down
9 changes: 9 additions & 0 deletions lib/common/relay/forwardWebsocketRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
} from './requestsHelper';
import { LOADEDFILTERSET } from '../types/filter';
import { LoadedClientOpts, LoadedServerOpts } from '../types/options';
import { runPreRequestPlugins } from '../../client/brokerClientPlugins/pluginManager';

export const forwardWebSocketRequest = (
options: LoadedClientOpts | LoadedServerOpts,
Expand Down Expand Up @@ -209,6 +210,14 @@ export const forwardWebSocketRequest = (
connectionIdentifier,
websocketConnectionHandler?.socketType,
);
if (options.config.universalBrokerEnabled) {
preparedRequest.req = await runPreRequestPlugins(
options,
connectionIdentifier,
preparedRequest.req,
);
}

incrementHttpRequestsTotal(false, 'outbound-request');
payload.streamingID
? await makePostStreamingRequest(preparedRequest.req, emit, logContext)
Expand Down
6 changes: 4 additions & 2 deletions lib/common/relay/prepareRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,13 @@ export const prepareRequestFromFilterResult = async (
logContext.requestHeaders = payload.headers;
logger.debug(logContext, 'Prepared request');

const req = {
const req: PostFilterPreparedRequest = {
url: result.url,
headers: payload.headers,
method: payload.method,
body: payload.body,
};
if (payload.body) {
req.body = payload.body;
}
return { req, error: errorPreparing };
};
1 change: 1 addition & 0 deletions lib/common/types/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export interface ClientOpts {
authHeader: string;
expiresIn: number;
};
plugins?: Map<string, any>;
}

export interface ServerOpts {
Expand Down

0 comments on commit 6bbeb8b

Please sign in to comment.