Skip to content

Commit

Permalink
fix(cluster): lazyConnect with pipeline (#1408)
Browse files Browse the repository at this point in the history
* fix(cluster): lazyConnect with pipeline

* add test for cluster lazyConnect

* catch connect error
  • Loading branch information
jseagull authored Oct 4, 2021
1 parent ccd381a commit b798107
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 4 deletions.
3 changes: 2 additions & 1 deletion lib/autoPipelining.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as PromiseContainer from "./promiseContainer";
import { flatten, isArguments } from "./utils/lodash";
import { flatten, isArguments, noop } from "./utils/lodash";
import * as calculateSlot from "cluster-key-slot";
import asCallback from "standard-as-callback";

Expand Down Expand Up @@ -120,6 +120,7 @@ export function executeWithAutoPipelining(

// On cluster mode let's wait for slots to be available
if (client.isCluster && !client.slots.length) {
if (client.status === "wait") client.connect().catch(noop);
return new CustomPromise(function (resolve, reject) {
client.delayUntilReady((err) => {
if (err) {
Expand Down
7 changes: 5 additions & 2 deletions lib/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import * as pMap from "p-map";
import * as PromiseContainer from "./promiseContainer";
import { CallbackFunction } from "./types";
import Commander from "./commander";
import { noop } from "./utils";

/*
This function derives from the cluster-key-slot implementation.
Expand All @@ -16,7 +17,7 @@ import Commander from "./commander";
function generateMultiWithNodes(redis, keys) {
const slot = calculateSlot(keys[0]);
const target = redis._groupsBySlot[slot];

for (let i = 1; i < keys.length; i++) {
if (redis._groupsBySlot[calculateSlot(keys[i])] !== target) {
return -1;
Expand Down Expand Up @@ -156,7 +157,8 @@ Pipeline.prototype.fillResult = function (value, position) {
moved: function (slot, key) {
_this.preferKey = key;
_this.redis.slots[errv[1]] = [key];
_this.redis._groupsBySlot[errv[1]] = _this.redis._groupsIds[_this.redis.slots[errv[1]].join(";")];
_this.redis._groupsBySlot[errv[1]] =
_this.redis._groupsIds[_this.redis.slots[errv[1]].join(";")];
_this.redis.refreshSlotsCache();
_this.exec();
},
Expand Down Expand Up @@ -241,6 +243,7 @@ Pipeline.prototype.execBuffer = deprecate(function () {
Pipeline.prototype.exec = function (callback: CallbackFunction) {
// Wait for the cluster to be connected, since we need nodes information before continuing
if (this.isCluster && !this.redis.slots.length) {
if (this.redis.status === "wait") this.redis.connect().catch(noop);
this.redis.delayUntilReady((err) => {
if (err) {
callback(err);
Expand Down
3 changes: 2 additions & 1 deletion lib/transaction.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { wrapMultiResult } from "./utils";
import { wrapMultiResult, noop } from "./utils";
import asCallback from "standard-as-callback";
import Pipeline from "./pipeline";
import { CallbackFunction } from "./types";
Expand Down Expand Up @@ -30,6 +30,7 @@ export function addTransactionSupport(redis) {
pipeline.exec = function (callback: CallbackFunction) {
// Wait for the cluster to be connected, since we need nodes information before continuing
if (this.isCluster && !this.redis.slots.length) {
if (this.redis.status === "wait") this.redis.connect().catch(noop);
return asCallback(
new Promise((resolve, reject) => {
this.redis.delayUntilReady((err) => {
Expand Down
22 changes: 22 additions & 0 deletions test/functional/cluster/autopipelining.ts
Original file line number Diff line number Diff line change
Expand Up @@ -594,4 +594,26 @@ describe("autoPipelining for cluster", function () {
changeSlot(cluster, key1Slot, key2Slot);
});
});

it("should support lazyConnect", async () => {
const cluster = new Cluster(hosts, {
enableAutoPipelining: true,
lazyConnect: true,
});

await cluster.set("foo1", "bar1");
await cluster.set("foo5", "bar5");

expect(
await Promise.all([
cluster.get("foo1"),
cluster.get("foo5"),
cluster.get("foo1"),
cluster.get("foo5"),
cluster.get("foo1"),
])
).to.eql(["bar1", "bar5", "bar1", "bar5", "bar1"]);

cluster.disconnect();
});
});
23 changes: 23 additions & 0 deletions test/functional/lazy_connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import Redis from "../../lib/redis";
import { expect } from "chai";
import * as sinon from "sinon";
import { Cluster } from "../../lib";
import Pipeline from "../../lib/pipeline";

describe("lazy connect", function () {
it("should not call `connect` when init", function () {
Expand Down Expand Up @@ -51,6 +52,28 @@ describe("lazy connect", function () {
stub.restore();
});

it("should call connect when pipeline exec", (done) => {
const stub = sinon.stub(Cluster.prototype, "connect").callsFake(() => {
stub.restore();
done();
});
const cluster = new Cluster([], { lazyConnect: true });
const pipline = new Pipeline(cluster);
pipline.get("fool1").exec(() => {});
});

it("should call connect when transction exec", (done) => {
const stub = sinon.stub(Cluster.prototype, "connect").callsFake(() => {
stub.restore();
done();
});
const cluster = new Cluster([], { lazyConnect: true });
cluster
.multi()
.get("fool1")
.exec(() => {});
});

it('should quit before "close" being emited', function (done) {
const stub = sinon
.stub(Cluster.prototype, "connect")
Expand Down

0 comments on commit b798107

Please sign in to comment.