Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js: Refactor Transport and SubchannelConnector out of Subchannel #2308

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 17 additions & 13 deletions packages/grpc-js/src/subchannel-call.ts
Expand Up @@ -21,12 +21,12 @@ import * as os from 'os';
import { Status } from './constants';
import { Metadata } from './metadata';
import { StreamDecoder } from './stream-decoder';
import { SubchannelCallStatsTracker, Subchannel } from './subchannel';
import * as logging from './logging';
import { LogVerbosity } from './constants';
import { ServerSurfaceCall } from './server-call';
import { Deadline } from './deadline';
import { InterceptingListener, MessageContext, StatusObject, WriteCallback } from './call-interface';
import { CallEventTracker } from './transport';

const TRACER_NAME = 'subchannel_call';

Expand Down Expand Up @@ -110,9 +110,9 @@ export class Http2SubchannelCall implements SubchannelCall {

constructor(
private readonly http2Stream: http2.ClientHttp2Stream,
private readonly callStatsTracker: SubchannelCallStatsTracker,
private readonly callEventTracker: CallEventTracker,
private readonly listener: SubchannelCallInterceptingListener,
private readonly subchannel: Subchannel,
private readonly peerName: string,
private readonly callId: number
) {
this.disconnectListener = () => {
Expand All @@ -122,8 +122,6 @@ export class Http2SubchannelCall implements SubchannelCall {
metadata: new Metadata(),
});
};
subchannel.addDisconnectListener(this.disconnectListener);
subchannel.callRef();
http2Stream.on('response', (headers, flags) => {
let headersString = '';
for (const header of Object.keys(headers)) {
Expand Down Expand Up @@ -185,7 +183,7 @@ export class Http2SubchannelCall implements SubchannelCall {

for (const message of messages) {
this.trace('parsed message of length ' + message.length);
this.callStatsTracker!.addMessageReceived();
this.callEventTracker!.addMessageReceived();
this.tryPush(message);
}
});
Expand Down Expand Up @@ -289,7 +287,15 @@ export class Http2SubchannelCall implements SubchannelCall {
);
this.internalError = err;
}
this.callStatsTracker.onStreamEnd(false);
this.callEventTracker.onStreamEnd(false);
});
}

public onDisconnect() {
this.endCall({
code: Status.UNAVAILABLE,
details: 'Connection dropped',
metadata: new Metadata(),
});
}

Expand All @@ -304,7 +310,7 @@ export class Http2SubchannelCall implements SubchannelCall {
this.finalStatus!.details +
'"'
);
this.callStatsTracker.onCallEnd(this.finalStatus!);
this.callEventTracker.onCallEnd(this.finalStatus!);
/* We delay the actual action of bubbling up the status to insulate the
* cleanup code in this class from any errors that may be thrown in the
* upper layers as a result of bubbling up the status. In particular,
Expand All @@ -319,8 +325,6 @@ export class Http2SubchannelCall implements SubchannelCall {
* not push more messages after the status is output, so the messages go
* nowhere either way. */
this.http2Stream.resume();
this.subchannel.callUnref();
this.subchannel.removeDisconnectListener(this.disconnectListener);
}
}

Expand Down Expand Up @@ -395,7 +399,7 @@ export class Http2SubchannelCall implements SubchannelCall {
}

private handleTrailers(headers: http2.IncomingHttpHeaders) {
this.callStatsTracker.onStreamEnd(true);
this.callEventTracker.onStreamEnd(true);
let headersString = '';
for (const header of Object.keys(headers)) {
headersString += '\t\t' + header + ': ' + headers[header] + '\n';
Expand Down Expand Up @@ -467,7 +471,7 @@ export class Http2SubchannelCall implements SubchannelCall {
}

getPeer(): string {
return this.subchannel.getAddress();
return this.peerName;
}

getCallNumber(): number {
Expand Down Expand Up @@ -506,7 +510,7 @@ export class Http2SubchannelCall implements SubchannelCall {
context.callback?.();
};
this.trace('sending data chunk of length ' + message.length);
this.callStatsTracker.addMessageSent();
this.callEventTracker.addMessageSent();
try {
this.http2Stream!.write(message, cb);
} catch (error) {
Expand Down
4 changes: 3 additions & 1 deletion packages/grpc-js/src/subchannel-pool.ts
Expand Up @@ -23,6 +23,7 @@ import {
} from './subchannel-address';
import { ChannelCredentials } from './channel-credentials';
import { GrpcUri, uriToString } from './uri-parser';
import { Http2SubchannelConnector } from './transport';

// 10 seconds in milliseconds. This value is arbitrary.
/**
Expand Down Expand Up @@ -143,7 +144,8 @@ export class SubchannelPool {
channelTargetUri,
subchannelTarget,
channelArguments,
channelCredentials
channelCredentials,
new Http2SubchannelConnector(channelTargetUri)
);
if (!(channelTarget in this.pool)) {
this.pool[channelTarget] = [];
Expand Down