Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js: Allow users to disable channelz #1944

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/grpc-js/src/channel-options.ts
Expand Up @@ -36,6 +36,7 @@ export interface ChannelOptions {
'grpc.enable_http_proxy'?: number;
'grpc.http_connect_target'?: string;
'grpc.http_connect_creds'?: string;
'grpc.enable_channelz'?: number;
'grpc-node.max_session_memory'?: number;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[key: string]: any;
Expand All @@ -61,6 +62,7 @@ export const recognizedOptions = {
'grpc.max_send_message_length': true,
'grpc.max_receive_message_length': true,
'grpc.enable_http_proxy': true,
'grpc.enable_channelz': true,
'grpc-node.max_session_memory': true,
};

Expand Down
69 changes: 52 additions & 17 deletions packages/grpc-js/src/channel.ts
Expand Up @@ -172,6 +172,7 @@ export class ChannelImplementation implements Channel {
private configSelector: ConfigSelector | null = null;

// Channelz info
private readonly channelzEnabled: boolean = true;
private originalTarget: string;
private channelzRef: ChannelRef;
private channelzTrace: ChannelzTrace;
Expand Down Expand Up @@ -213,9 +214,22 @@ export class ChannelImplementation implements Channel {
this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME);
this.callRefTimer.unref?.();

this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo());
if (this.options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
}

this.channelzTrace = new ChannelzTrace();
this.channelzTrace.addTrace('CT_INFO', 'Channel created');
if (this.channelzEnabled) {
this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo());
this.channelzTrace.addTrace('CT_INFO', 'Channel created');
} else {
// Dummy channelz ref that will never be used
this.channelzRef = {
kind: 'channel',
id: -1,
name: ''
};
}

if (this.options['grpc.default_authority']) {
this.defaultAuthority = this.options['grpc.default_authority'] as string;
Expand All @@ -242,7 +256,9 @@ export class ChannelImplementation implements Channel {
Object.assign({}, this.options, subchannelArgs),
this.credentials
);
this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
}
return subchannel;
},
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
Expand All @@ -262,18 +278,24 @@ export class ChannelImplementation implements Channel {
);
},
addChannelzChild: (child: ChannelRef | SubchannelRef) => {
this.childrenTracker.refChild(child);
if (this.channelzEnabled) {
this.childrenTracker.refChild(child);
}
},
removeChannelzChild: (child: ChannelRef | SubchannelRef) => {
this.childrenTracker.unrefChild(child);
if (this.channelzEnabled) {
this.childrenTracker.unrefChild(child);
}
}
};
this.resolvingLoadBalancer = new ResolvingLoadBalancer(
this.target,
channelControlHelper,
options,
(configSelector) => {
this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
}
this.configSelector = configSelector;
/* We process the queue asynchronously to ensure that the corresponding
* load balancer update has completed. */
Expand All @@ -288,7 +310,9 @@ export class ChannelImplementation implements Channel {
});
},
(status) => {
this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"');
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"');
}
if (this.configSelectionQueue.length > 0) {
this.trace('Name resolution failed with calls queued for config selection');
}
Expand Down Expand Up @@ -553,7 +577,9 @@ export class ChannelImplementation implements Channel {
' -> ' +
ConnectivityState[newState]
);
this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
}
this.connectivityState = newState;
const watchersCopy = this.connectivityStateWatchers.slice();
for (const watcherObject of watchersCopy) {
Expand Down Expand Up @@ -638,7 +664,9 @@ export class ChannelImplementation implements Channel {
this.resolvingLoadBalancer.destroy();
this.updateState(ConnectivityState.SHUTDOWN);
clearInterval(this.callRefTimer);
unregisterChannelzRef(this.channelzRef);
if (this.channelzEnabled) {
unregisterChannelzRef(this.channelzRef);
}

this.subchannelPool.unrefUnusedSubchannels();
}
Expand Down Expand Up @@ -690,6 +718,11 @@ export class ChannelImplementation implements Channel {
this.connectivityStateWatchers.push(watcherObject);
}

/**
* Get the channelz reference object for this channel. The returned value is
* garbage if channelz is disabled for this channel.
* @returns
*/
getChannelzRef() {
return this.channelzRef;
}
Expand Down Expand Up @@ -735,14 +768,16 @@ export class ChannelImplementation implements Channel {
this.credentials._getCallCredentials(),
callNumber
);
this.callTracker.addCallStarted();
stream.addStatusWatcher(status => {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
}
});
if (this.channelzEnabled) {
this.callTracker.addCallStarted();
stream.addStatusWatcher(status => {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
}
});
}
return stream;
}
}
41 changes: 32 additions & 9 deletions packages/grpc-js/src/server.ts
Expand Up @@ -149,6 +149,7 @@ export class Server {
private options: ChannelOptions;

// Channelz Info
private readonly channelzEnabled: boolean = true;
private channelzRef: ServerRef;
private channelzTrace = new ChannelzTrace();
private callTracker = new ChannelzCallTracker();
Expand All @@ -157,9 +158,20 @@ export class Server {

constructor(options?: ChannelOptions) {
this.options = options ?? {};
this.channelzRef = registerChannelzServer(() => this.getChannelzInfo());
this.channelzTrace.addTrace('CT_INFO', 'Server created');
this.trace('Server constructed');
if (this.options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
}
if (this.channelzEnabled) {
this.channelzRef = registerChannelzServer(() => this.getChannelzInfo());
this.channelzTrace.addTrace('CT_INFO', 'Server created');
this.trace('Server constructed');
} else {
// Dummy channelz ref that will never be used
this.channelzRef = {
kind: 'server',
id: -1
};
}
}

private getChannelzInfo(): ServerInfo {
Expand Down Expand Up @@ -638,7 +650,9 @@ export class Server {
if (this.started === true) {
throw new Error('server is already started');
}
this.channelzTrace.addTrace('CT_INFO', 'Starting');
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Starting');
}
this.started = true;
}

Expand Down Expand Up @@ -686,6 +700,11 @@ export class Server {
throw new Error('Not yet implemented');
}

/**
* Get the channelz reference object for this server. The returned value is
* garbage if channelz is disabled for this server.
* @returns
*/
getChannelzRef() {
return this.channelzRef;
}
Expand Down Expand Up @@ -841,12 +860,16 @@ export class Server {

this.sessions.set(session, channelzSessionInfo);
const clientAddress = session.socket.remoteAddress;
this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress);
this.sessionChildrenTracker.refChild(channelzRef);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress);
this.sessionChildrenTracker.refChild(channelzRef);
}
session.on('close', () => {
this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress);
this.sessionChildrenTracker.unrefChild(channelzRef);
unregisterChannelzRef(channelzRef);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress);
this.sessionChildrenTracker.unrefChild(channelzRef);
unregisterChannelzRef(channelzRef);
}
this.sessions.delete(session);
});
});
Expand Down
94 changes: 61 additions & 33 deletions packages/grpc-js/src/subchannel.ts
Expand Up @@ -157,6 +157,7 @@ export class Subchannel {
private subchannelAddressString: string;

// Channelz info
private readonly channelzEnabled: boolean = true;
private channelzRef: SubchannelRef;
private channelzTrace: ChannelzTrace;
private callTracker = new ChannelzCallTracker();
Expand Down Expand Up @@ -226,9 +227,21 @@ export class Subchannel {
}, backoffOptions);
this.subchannelAddressString = subchannelAddressToString(subchannelAddress);

this.channelzRef = registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo());
if (options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
}
this.channelzTrace = new ChannelzTrace();
this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
if (this.channelzEnabled) {
this.channelzRef = registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo());
this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
} else {
// Dummy channelz ref that will never be used
this.channelzRef = {
kind: 'subchannel',
id: -1,
name: ''
};
}
this.trace('Subchannel constructed with options ' + JSON.stringify(options, undefined, 2));
}

Expand Down Expand Up @@ -286,6 +299,9 @@ export class Subchannel {
}

private resetChannelzSocketInfo() {
if (!this.channelzEnabled) {
return;
}
if (this.channelzSocketRef) {
unregisterChannelzRef(this.channelzSocketRef);
this.childrenTracker.unrefChild(this.channelzSocketRef);
Expand Down Expand Up @@ -335,7 +351,9 @@ export class Subchannel {
}

private sendPing() {
this.keepalivesSent += 1;
if (this.channelzEnabled) {
this.keepalivesSent += 1;
}
logging.trace(
LogVerbosity.DEBUG,
'keepalive',
Expand Down Expand Up @@ -462,8 +480,10 @@ export class Subchannel {
connectionOptions
);
this.session = session;
this.channelzSocketRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!);
this.childrenTracker.refChild(this.channelzSocketRef);
if (this.channelzEnabled) {
this.channelzSocketRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!);
this.childrenTracker.refChild(this.channelzSocketRef);
}
session.unref();
/* For all of these events, check if the session at the time of the event
* is the same one currently attached to this subchannel, to ensure that
Expand Down Expand Up @@ -615,7 +635,9 @@ export class Subchannel {
' -> ' +
ConnectivityState[newState]
);
this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
}
const previousState = this.connectivityState;
this.connectivityState = newState;
switch (newState) {
Expand Down Expand Up @@ -678,12 +700,16 @@ export class Subchannel {
/* If no calls, channels, or subchannel pools have any more references to
* this subchannel, we can be sure it will never be used again. */
if (this.callRefcount === 0 && this.refcount === 0) {
this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
}
this.transitionToState(
[ConnectivityState.CONNECTING, ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE
);
unregisterChannelzRef(this.channelzRef);
if (this.channelzEnabled) {
unregisterChannelzRef(this.channelzRef);
}
}
}

Expand Down Expand Up @@ -805,34 +831,36 @@ export class Subchannel {
' with headers\n' +
headersString
);
this.callTracker.addCallStarted();
callStream.addStatusWatcher(status => {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
}
});
const streamSession = this.session;
this.streamTracker.addCallStarted();
callStream.addStreamEndWatcher(success => {
if (streamSession === this.session) {
if (success) {
this.streamTracker.addCallSucceeded();
if (this.channelzEnabled) {
this.callTracker.addCallStarted();
callStream.addStatusWatcher(status => {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.streamTracker.addCallFailed();
this.callTracker.addCallFailed();
}
}
});
callStream.attachHttp2Stream(http2Stream, this, extraFilters, {
addMessageSent: () => {
this.messagesSent += 1;
this.lastMessageSentTimestamp = new Date();
},
addMessageReceived: () => {
this.messagesReceived += 1;
}
});
});
this.streamTracker.addCallStarted();
callStream.addStreamEndWatcher(success => {
if (streamSession === this.session) {
if (success) {
this.streamTracker.addCallSucceeded();
} else {
this.streamTracker.addCallFailed();
}
}
});
callStream.attachHttp2Stream(http2Stream, this, extraFilters, {
addMessageSent: () => {
this.messagesSent += 1;
this.lastMessageSentTimestamp = new Date();
},
addMessageReceived: () => {
this.messagesReceived += 1;
}
});
}
}

/**
Expand Down