Skip to content

Commit

Permalink
Merge pull request #2308 from murgatroid99/grpc-js_transport_refactor
Browse files Browse the repository at this point in the history
grpc-js: Refactor `Transport` and `SubchannelConnector` out of `Subchannel`
  • Loading branch information
murgatroid99 committed Jan 5, 2023
2 parents d2bd713 + df8b897 commit a4d409d
Show file tree
Hide file tree
Showing 4 changed files with 714 additions and 647 deletions.
30 changes: 17 additions & 13 deletions packages/grpc-js/src/subchannel-call.ts
Expand Up @@ -21,12 +21,12 @@ import * as os from 'os';
import { Status } from './constants';
import { Metadata } from './metadata';
import { StreamDecoder } from './stream-decoder';
import { SubchannelCallStatsTracker, Subchannel } from './subchannel';
import * as logging from './logging';
import { LogVerbosity } from './constants';
import { ServerSurfaceCall } from './server-call';
import { Deadline } from './deadline';
import { InterceptingListener, MessageContext, StatusObject, WriteCallback } from './call-interface';
import { CallEventTracker } from './transport';

const TRACER_NAME = 'subchannel_call';

Expand Down Expand Up @@ -110,9 +110,9 @@ export class Http2SubchannelCall implements SubchannelCall {

constructor(
private readonly http2Stream: http2.ClientHttp2Stream,
private readonly callStatsTracker: SubchannelCallStatsTracker,
private readonly callEventTracker: CallEventTracker,
private readonly listener: SubchannelCallInterceptingListener,
private readonly subchannel: Subchannel,
private readonly peerName: string,
private readonly callId: number
) {
this.disconnectListener = () => {
Expand All @@ -122,8 +122,6 @@ export class Http2SubchannelCall implements SubchannelCall {
metadata: new Metadata(),
});
};
subchannel.addDisconnectListener(this.disconnectListener);
subchannel.callRef();
http2Stream.on('response', (headers, flags) => {
let headersString = '';
for (const header of Object.keys(headers)) {
Expand Down Expand Up @@ -185,7 +183,7 @@ export class Http2SubchannelCall implements SubchannelCall {

for (const message of messages) {
this.trace('parsed message of length ' + message.length);
this.callStatsTracker!.addMessageReceived();
this.callEventTracker!.addMessageReceived();
this.tryPush(message);
}
});
Expand Down Expand Up @@ -289,7 +287,15 @@ export class Http2SubchannelCall implements SubchannelCall {
);
this.internalError = err;
}
this.callStatsTracker.onStreamEnd(false);
this.callEventTracker.onStreamEnd(false);
});
}

public onDisconnect() {
this.endCall({
code: Status.UNAVAILABLE,
details: 'Connection dropped',
metadata: new Metadata(),
});
}

Expand All @@ -304,7 +310,7 @@ export class Http2SubchannelCall implements SubchannelCall {
this.finalStatus!.details +
'"'
);
this.callStatsTracker.onCallEnd(this.finalStatus!);
this.callEventTracker.onCallEnd(this.finalStatus!);
/* We delay the actual action of bubbling up the status to insulate the
* cleanup code in this class from any errors that may be thrown in the
* upper layers as a result of bubbling up the status. In particular,
Expand All @@ -319,8 +325,6 @@ export class Http2SubchannelCall implements SubchannelCall {
* not push more messages after the status is output, so the messages go
* nowhere either way. */
this.http2Stream.resume();
this.subchannel.callUnref();
this.subchannel.removeDisconnectListener(this.disconnectListener);
}
}

Expand Down Expand Up @@ -395,7 +399,7 @@ export class Http2SubchannelCall implements SubchannelCall {
}

private handleTrailers(headers: http2.IncomingHttpHeaders) {
this.callStatsTracker.onStreamEnd(true);
this.callEventTracker.onStreamEnd(true);
let headersString = '';
for (const header of Object.keys(headers)) {
headersString += '\t\t' + header + ': ' + headers[header] + '\n';
Expand Down Expand Up @@ -467,7 +471,7 @@ export class Http2SubchannelCall implements SubchannelCall {
}

getPeer(): string {
return this.subchannel.getAddress();
return this.peerName;
}

getCallNumber(): number {
Expand Down Expand Up @@ -506,7 +510,7 @@ export class Http2SubchannelCall implements SubchannelCall {
context.callback?.();
};
this.trace('sending data chunk of length ' + message.length);
this.callStatsTracker.addMessageSent();
this.callEventTracker.addMessageSent();
try {
this.http2Stream!.write(message, cb);
} catch (error) {
Expand Down
4 changes: 3 additions & 1 deletion packages/grpc-js/src/subchannel-pool.ts
Expand Up @@ -23,6 +23,7 @@ import {
} from './subchannel-address';
import { ChannelCredentials } from './channel-credentials';
import { GrpcUri, uriToString } from './uri-parser';
import { Http2SubchannelConnector } from './transport';

// 10 seconds in milliseconds. This value is arbitrary.
/**
Expand Down Expand Up @@ -143,7 +144,8 @@ export class SubchannelPool {
channelTargetUri,
subchannelTarget,
channelArguments,
channelCredentials
channelCredentials,
new Http2SubchannelConnector(channelTargetUri)
);
if (!(channelTarget in this.pool)) {
this.pool[channelTarget] = [];
Expand Down

0 comments on commit a4d409d

Please sign in to comment.