Skip to content

Commit d18f3fe

Browse files
committedMar 14, 2022
fix: improve typing for pipeline
1 parent bacf5a6 commit d18f3fe

File tree

9 files changed

+74
-51
lines changed

9 files changed

+74
-51
lines changed
 

‎lib/DataHandler.ts

+9-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { NetStream, ICommandItem, ICommand } from "./types";
1+
import { NetStream, CommandItem, ICommand } from "./types";
22
import Deque = require("denque");
33
import { EventEmitter } from "events";
44
import Command from "./command";
@@ -25,11 +25,11 @@ interface IDataHandledable extends EventEmitter {
2525
stream: NetStream;
2626
status: string;
2727
condition: ICondition;
28-
commandQueue: Deque<ICommandItem>;
28+
commandQueue: Deque<CommandItem>;
2929

3030
disconnect(reconnect: boolean): void;
3131
recoverFromFatalError(commandError: Error, err: Error, options: any): void;
32-
handleReconnection(err: Error, item: ICommandItem): void;
32+
handleReconnection(err: Error, item: CommandItem): void;
3333
}
3434

3535
interface IParserOptions {
@@ -119,7 +119,11 @@ export default class DataHandler {
119119
case "message":
120120
if (this.redis.listeners("message").length > 0) {
121121
// Check if there're listeners to avoid unnecessary `toString()`.
122-
this.redis.emit("message", reply[1].toString(), reply[2] ? reply[2].toString() : '');
122+
this.redis.emit(
123+
"message",
124+
reply[1].toString(),
125+
reply[2] ? reply[2].toString() : ""
126+
);
123127
}
124128
this.redis.emit("messageBuffer", reply[1], reply[2]);
125129
break;
@@ -208,7 +212,7 @@ export default class DataHandler {
208212
return true;
209213
}
210214

211-
private shiftCommand(reply: ReplyData | Error): ICommandItem | null {
215+
private shiftCommand(reply: ReplyData | Error): CommandItem | null {
212216
const item = this.redis.commandQueue.shift();
213217
if (!item) {
214218
const message =

‎lib/Redis.ts

+10-5
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,12 @@ import {
1010
RedisOptions,
1111
} from "./redis/RedisOptions";
1212
import { addTransactionSupport, Transaction } from "./transaction";
13-
import { CallbackFunction, ICommandItem, NetStream } from "./types";
13+
import {
14+
CallbackFunction,
15+
CommandItem,
16+
NetStream,
17+
WriteableStream,
18+
} from "./types";
1419
import {
1520
CONNECTION_CLOSED_ERROR_MSG,
1621
Debug,
@@ -53,7 +58,7 @@ class Redis extends Commander {
5358

5459
options: RedisOptions;
5560
status: RedisStatus = "wait";
56-
commandQueue: Deque;
61+
commandQueue: Deque<CommandItem>;
5762
offlineQueue: Deque;
5863
connectionEpoch = 0;
5964
connector: AbstractConnector;
@@ -360,7 +365,7 @@ class Redis extends Commander {
360365
this.disconnect(true);
361366
}
362367

363-
handleReconnection(err: Error, item: ICommandItem) {
368+
handleReconnection(err: Error, item: CommandItem) {
364369
let needReconnect: ReturnType<ReconnectOnError> = false;
365370
if (this.options.reconnectOnError) {
366371
needReconnect = this.options.reconnectOnError(err);
@@ -575,7 +580,7 @@ class Redis extends Commander {
575580
* redis.sendCommand(set);
576581
* ```
577582
*/
578-
sendCommand(command: Command, stream?: NetStream): unknown {
583+
sendCommand(command: Command, stream?: WriteableStream): unknown {
579584
if (this.status === "wait") {
580585
this.connect().catch(noop);
581586
}
@@ -648,7 +653,7 @@ class Redis extends Commander {
648653
}
649654

650655
if (stream) {
651-
if (stream.isPipeline) {
656+
if ("isPipeline" in stream && stream.isPipeline) {
652657
stream.write(command.toWritable(stream.destination.redis.stream));
653658
} else {
654659
stream.write(command.toWritable(stream));

‎lib/cluster/index.ts

+28-26
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,41 @@
11
import { EventEmitter } from "events";
2-
import ClusterAllFailedError from "../errors/ClusterAllFailedError";
3-
import { defaults, noop, Debug } from "../utils";
4-
import ConnectionPool from "./ConnectionPool";
5-
import {
6-
NodeKey,
7-
RedisOptions,
8-
normalizeNodeOptions,
9-
NodeRole,
10-
getUniqueHostnamesFromOptions,
11-
nodeKeyToRedisOptions,
12-
groupSrvRecords,
13-
weightSrvRecords,
14-
getConnectionName,
15-
} from "./util";
16-
import ClusterSubscriber from "./ClusterSubscriber";
17-
import DelayQueue from "./DelayQueue";
18-
import ScanStream from "../ScanStream";
2+
import * as commands from "redis-commands";
193
import { AbortError, RedisError } from "redis-errors";
204
import asCallback from "standard-as-callback";
21-
import { CallbackFunction, NetStream } from "../types";
22-
import { ClusterOptions, DEFAULT_CLUSTER_OPTIONS } from "./ClusterOptions";
5+
import { Pipeline } from "..";
6+
import Command from "../command";
7+
import ClusterAllFailedError from "../errors/ClusterAllFailedError";
8+
import Redis from "../Redis";
9+
import ScanStream from "../ScanStream";
10+
import { CallbackFunction, WriteableStream } from "../types";
2311
import {
24-
sample,
2512
CONNECTION_CLOSED_ERROR_MSG,
13+
Debug,
14+
defaults,
15+
noop,
16+
sample,
2617
shuffle,
2718
timeout,
2819
zipMap,
2920
} from "../utils";
30-
import * as commands from "redis-commands";
31-
import Command from "../command";
32-
import Redis from "../Redis";
21+
import applyMixin from "../utils/applyMixin";
3322
import Commander from "../utils/Commander";
23+
import { ClusterOptions, DEFAULT_CLUSTER_OPTIONS } from "./ClusterOptions";
24+
import ClusterSubscriber from "./ClusterSubscriber";
25+
import ConnectionPool from "./ConnectionPool";
26+
import DelayQueue from "./DelayQueue";
27+
import {
28+
getConnectionName,
29+
getUniqueHostnamesFromOptions,
30+
groupSrvRecords,
31+
NodeKey,
32+
nodeKeyToRedisOptions,
33+
NodeRole,
34+
normalizeNodeOptions,
35+
RedisOptions,
36+
weightSrvRecords,
37+
} from "./util";
3438
import Deque = require("denque");
35-
import { Pipeline } from "..";
36-
import applyMixin from "../utils/applyMixin";
3739

3840
const debug = Debug("cluster");
3941

@@ -573,7 +575,7 @@ class Cluster extends Commander {
573575
: nodeKey;
574576
}
575577

576-
sendCommand(command: Command, stream?: NetStream, node?: any): unknown {
578+
sendCommand(command: Command, stream?: WriteableStream, node?: any): unknown {
577579
if (this.status === "wait") {
578580
this.connect().catch(noop);
579581
}

‎lib/command.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {
99
convertObjectToArray,
1010
} from "./utils";
1111
import { flatten } from "./utils/lodash";
12-
import { CallbackFunction, ICommand, CommandParameter, NetStream } from "./types";
12+
import { CallbackFunction, ICommand, CommandParameter } from "./types";
1313

1414
export type ArgumentType =
1515
| string
@@ -273,7 +273,7 @@ export default class Command implements ICommand {
273273
* @see {@link Redis#sendCommand}
274274
* @public
275275
*/
276-
public toWritable(socket: NetStream): string | Buffer {
276+
public toWritable(_socket: object): string | Buffer {
277277
let bufferMode = false;
278278
for (const arg of this.args) {
279279
if (arg instanceof Buffer) {

‎lib/pipeline.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import asCallback from "standard-as-callback";
44
import { deprecate } from "util";
55
import Redis, { Cluster } from ".";
66
import Command from "./command";
7-
import Commander from "./utils/Commander";
8-
import { CallbackFunction } from "./types";
7+
import { CallbackFunction, PipelineWriteableStream } from "./types";
98
import { noop } from "./utils";
9+
import Commander from "./utils/Commander";
1010

1111
/*
1212
This function derives from the cluster-key-slot implementation.
@@ -344,7 +344,7 @@ Pipeline.prototype.exec = function (
344344
}
345345
let bufferMode = false;
346346

347-
const stream = {
347+
const stream: PipelineWriteableStream = {
348348
isPipeline: true,
349349
destination: _this.isCluster ? node : { redis: _this.redis },
350350
write: function (writable) {

‎lib/redis/event_handler.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import Deque = require("denque");
44
import { AbortError } from "redis-errors";
55
import Command from "../command";
66
import { MaxRetriesPerRequestError } from "../errors";
7-
import { ICommandItem, ICommand } from "../types";
7+
import { CommandItem, ICommand } from "../types";
88
import { Debug, noop, CONNECTION_CLOSED_ERROR_MSG } from "../utils";
99
import DataHandler from "../DataHandler";
1010

@@ -115,7 +115,7 @@ function abortError(command: ICommand) {
115115
// the connection close and those pipelined commands must be aborted. For
116116
// example, if the queue looks like this: [2, 3, 4, 0, 1, 2] then after
117117
// aborting and purging we'll have a queue that looks like this: [0, 1, 2]
118-
function abortIncompletePipelines(commandQueue: Deque<ICommandItem>) {
118+
function abortIncompletePipelines(commandQueue: Deque<CommandItem>) {
119119
let expectedIndex = 0;
120120
for (let i = 0; i < commandQueue.length; ) {
121121
const command = commandQueue.peekAt(i).command as Command;
@@ -135,7 +135,7 @@ function abortIncompletePipelines(commandQueue: Deque<ICommandItem>) {
135135
// If only a partial transaction result was received before connection close,
136136
// we have to abort any transaction fragments that may have ended up in the
137137
// offline queue
138-
function abortTransactionFragments(commandQueue: Deque<ICommandItem>) {
138+
function abortTransactionFragments(commandQueue: Deque<CommandItem>) {
139139
for (let i = 0; i < commandQueue.length; ) {
140140
const command = commandQueue.peekAt(i).command as Command;
141141
if (command.name === "multi") {

‎lib/script.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { createHash } from "crypto";
22
import Command from "./command";
33
import asCallback from "standard-as-callback";
4-
import { CallbackFunction, NetStream } from "./types";
4+
import { CallbackFunction } from "./types";
55
export default class Script {
66
private sha: string;
7-
private Command;
7+
private Command: new (...args: any[]) => Command;
88

99
constructor(
1010
private lua: string,
@@ -17,7 +17,7 @@ export default class Script {
1717
const sha = this.sha;
1818
const socketHasScriptLoaded = new WeakSet();
1919
this.Command = class CustomScriptCommand extends Command {
20-
public toWritable(socket: NetStream): string | Buffer {
20+
public toWritable(socket: object): string | Buffer {
2121
const origReject = this.reject;
2222
this.reject = (err) => {
2323
if (err.toString().indexOf("NOSCRIPT") !== -1) {

‎lib/types.ts

+10-2
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,16 @@ export interface ICommand {
1515
reject(error: Error): void;
1616
}
1717

18-
export interface ICommandItem {
18+
export interface PipelineWriteableStream {
19+
isPipeline: true;
20+
write(data: string | Buffer): unknown;
21+
destination: { redis: { stream: NetStream } };
22+
}
23+
24+
export type WriteableStream = NetStream | PipelineWriteableStream;
25+
26+
export interface CommandItem {
1927
command: ICommand;
20-
stream: NetStream;
28+
stream: WriteableStream;
2129
select: number;
2230
}

‎lib/utils/Commander.ts

+6-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import {
55
} from "../autoPipelining";
66
import Command, { ArgumentType } from "../command";
77
import Script from "../script";
8-
import { CallbackFunction, NetStream } from "../types";
8+
import { CallbackFunction, WriteableStream } from "../types";
99
import RedisCommander, { ClientContext } from "./RedisCommander";
1010

1111
export interface CommanderOptions {
@@ -93,7 +93,11 @@ class Commander<Context extends ClientContext = { type: "default" }> {
9393
);
9494
}
9595

96-
sendCommand(command: Command, stream?: NetStream, node?: unknown): unknown {
96+
sendCommand(
97+
command: Command,
98+
stream?: WriteableStream,
99+
node?: unknown
100+
): unknown {
97101
throw new Error('"sendCommand" is not implemented');
98102
}
99103
}

0 commit comments

Comments
 (0)
Please sign in to comment.