From b63cdc79058f0624563d7c52692c4ce980a8b267 Mon Sep 17 00:00:00 2001 From: luin Date: Sat, 5 Mar 2016 21:38:50 +0800 Subject: [PATCH 1/2] feat(cluster): add enableReadyCheck option for cluster --- API.md | 2 ++ README.md | 2 +- lib/cluster/index.js | 64 ++++++++++++++++++++++++++++++++++---- test/functional/cluster.js | 41 ++++++++++++++++++++---- 4 files changed, 96 insertions(+), 13 deletions(-) diff --git a/API.md b/API.md index d8b59175..cabb39c4 100644 --- a/API.md +++ b/API.md @@ -215,11 +215,13 @@ Creates a Redis Cluster instance | startupNodes | Array.<Object> | | An array of nodes in the cluster, [{ port: number, host: string }] | | options | Object | | | | [options.enableOfflineQueue] | boolean | true | See Redis class | +| [options.enableReadyCheck] | boolean | true | When enabled, ioredis only emits "ready" event when `CLUSTER INFO` command reporting the cluster is ready for handling commands. | | [options.scaleReads] | string | "master" | Scale reads to the node with the specified role. Available values are "master", "slave" and "all". | | [options.maxRedirections] | number | 16 | When a MOVED or ASK error is received, client will redirect the command to another node. This option limits the max redirections allowed to send a command. | | [options.clusterRetryStrategy] | function | | See "Quick Start" section | | [options.retryDelayOnFailover] | number | 100 | When an error is received when sending a command(e.g. "Connection is closed." when the target Redis node is down), | | [options.retryDelayOnClusterDown] | number | 100 | When a CLUSTERDOWN error is received, client will retry if `retryDelayOnClusterDown` is valid delay time. | +| [options.redisOptions] | Object | | Passed to the constructor of `Redis`. | ### cluster.connect() ⇒ Promise diff --git a/README.md b/README.md index 5146449c..b0a89b56 100644 --- a/README.md +++ b/README.md @@ -751,7 +751,7 @@ sub.subscribe('news', function () { Event | Description :------------- | :------------- connect | emits when a connection is established to the Redis server. -ready | emits immediately after `connect` event. +ready | emits when `CLUSTER INFO` reporting the cluster is able to receive commands (if `enableReadyCheck` is `true`) or immediately after `connect` event (if `enableReadyCheck` is false). error | emits when an error occurs while connecting with a property of `lastNodeError` representing the last node error received. This event is emitted silently (only emitting if there's at least one listener). close | emits when an established Redis server connection has closed. reconnecting | emits after `close` when a reconnection will be made. The argument of the event is the time (in ms) before reconnecting. diff --git a/lib/cluster/index.js b/lib/cluster/index.js index 7181a7f0..063ed601 100644 --- a/lib/cluster/index.js +++ b/lib/cluster/index.js @@ -21,6 +21,8 @@ var ConnectionPool = require('./connection_pool'); * @param {Object[]} startupNodes - An array of nodes in the cluster, [{ port: number, host: string }] * @param {Object} options * @param {boolean} [options.enableOfflineQueue=true] - See Redis class + * @param {boolean} [options.enableReadyCheck=true] - When enabled, ioredis only emits "ready" event when `CLUSTER INFO` + * command reporting the cluster is ready for handling commands. * @param {string} [options.scaleReads=master] - Scale reads to the node with the specified role. * Available values are "master", "slave" and "all". * @param {number} [options.maxRedirections=16] - When a MOVED or ASK error is received, client will redirect the @@ -30,6 +32,7 @@ var ConnectionPool = require('./connection_pool'); * "Connection is closed." when the target Redis node is down), * @param {number} [options.retryDelayOnClusterDown=100] - When a CLUSTERDOWN error is received, client will retry * if `retryDelayOnClusterDown` is valid delay time. + * @param {Object} [options.redisOptions] - Passed to the constructor of `Redis`. * @extends [EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter) * @extends Commander */ @@ -103,10 +106,11 @@ function Cluster(startupNodes, options) { */ Cluster.defaultOptions = { maxRedirections: 16, + enableOfflineQueue: true, + enableReadyCheck: true, retryDelayOnFailover: 100, retryDelayOnClusterDown: 100, scaleReads: 'master', - enableOfflineQueue: true, clusterRetryStrategy: function (times) { return Math.min(100 + times * 2, 2000); } @@ -134,6 +138,12 @@ Cluster.prototype.resetClusterDownQueue = function () { * @public */ Cluster.prototype.connect = function () { + function readyHandler() { + this.setStatus('ready'); + this.retryAttempts = 0; + this.executeOfflineCommands(); + } + return new Promise(function (resolve, reject) { if (this.status === 'connecting' || this.status === 'connect' || this.status === 'ready') { reject(new Error('Redis is already connecting/connected')); @@ -146,11 +156,19 @@ Cluster.prototype.connect = function () { var closeListener; var refreshListener = function () { this.removeListener('close', closeListener); - this.retryAttempts = 0; this.manuallyClosing = false; this.setStatus('connect'); - this.setStatus('ready'); - this.executeOfflineCommands(); + if (this.options.enableReadyCheck) { + this._readyCheck(function (err, fail) { + if (err || fail) { + this.disconnect(true); + } else { + readyHandler.call(this); + } + }.bind(this)); + } else { + readyHandler.call(this); + } resolve(); }; @@ -444,7 +462,7 @@ Cluster.prototype.sendCommand = function (command, stream, node) { return; } var redis; - if (_this.status === 'ready') { + if (_this.status === 'ready' || (command.name === 'cluster')) { if (node && node.redis) { redis = node.redis; } else if (_.includes(Command.FLAGS.ENTER_SUBSCRIBER_MODE, command.name) || @@ -457,7 +475,7 @@ Cluster.prototype.sendCommand = function (command, stream, node) { if (typeof to === 'function') { var nodes = nodeKeys - .map(function(key) { + .map(function (key) { return _this.connectionPool.nodes.all[key]; }); redis = to(nodes, command); @@ -582,6 +600,40 @@ Cluster.prototype.getInfoFromNode = function (redis, callback) { }, 1000)); }; +/** + * Check whether Cluster is able to process commands + * + * @param {Function} callback + * @private + */ +Cluster.prototype._readyCheck = function (callback) { + this.cluster('info', function (err, res) { + if (err) { + return callback(err); + } + if (typeof res !== 'string') { + return callback(); + } + + var state; + var lines = res.split('\r\n'); + for (var i = 0; i < lines.length; ++i) { + var parts = lines[i].split(':'); + if (parts[0] === 'cluster_state') { + state = parts[1]; + break; + } + } + + if (state === 'fail') { + debug('cluster state not ok (%s)', state); + callback(null, state); + } else { + callback(); + } + }); +}; + ['sscan', 'hscan', 'zscan', 'sscanBuffer', 'hscanBuffer', 'zscanBuffer'] .forEach(function (command) { Cluster.prototype[command + 'Stream'] = function (key, options) { diff --git a/test/functional/cluster.js b/test/functional/cluster.js index 18160a1b..dab34407 100644 --- a/test/functional/cluster.js +++ b/test/functional/cluster.js @@ -935,6 +935,37 @@ describe('cluster', function () { }); }); + describe('enableReadyCheck', function () { + it('should reconnect when cluster state is not ok', function (done) { + var state = 'fail'; + var server = new MockServer(30001, function (argv) { + if (argv[0] === 'cluster' && argv[1] === 'slots') { + return [ + [0, 16383, ['127.0.0.1', 30001]] + ]; + } else if (argv[0] === 'cluster' && argv[1] === 'info') { + return 'cluster_state:' + state; + } + }); + var count = 0; + var client = new Redis.Cluster([{ + host: '127.0.0.1', port: '30001' + }], { + clusterRetryStrategy: function (times) { + expect(++count).to.eql(times); + if (count === 3) { + state = 'ok'; + } + return 0; + } + }); + client.on('ready', function () { + client.disconnect(); + disconnect([server], done); + }); + }); + }); + describe('scaleReads', function () { beforeEach(function () { function handler(port, argv) { @@ -1010,12 +1041,11 @@ describe('cluster', function () { context('custom', function () { it('should send to selected slave', function (done) { var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], { - scaleReads: function(node, command) { + scaleReads: function (node, command) { if (command.name === 'get') { return node[1]; - } else { - return node[2]; } + return node[2]; } }); cluster.on('ready', function () { @@ -1035,12 +1065,11 @@ describe('cluster', function () { it('should send writes to masters', function (done) { var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], { - scaleReads: function(node, command) { + scaleReads: function (node, command) { if (command.name === 'get') { return node[1]; - } else { - return node[2]; } + return node[2]; } }); cluster.on('ready', function () { From a1181dd990659fb77de3f2a954798565a1fc4ac9 Mon Sep 17 00:00:00 2001 From: luin Date: Sat, 5 Mar 2016 22:00:02 +0800 Subject: [PATCH 2/2] docs(cluster): update cluster doc for options --- API.md | 2 +- README.md | 7 +++++-- lib/cluster/index.js | 14 +++++++------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/API.md b/API.md index cabb39c4..ec1508f1 100644 --- a/API.md +++ b/API.md @@ -214,11 +214,11 @@ Creates a Redis Cluster instance | --- | --- | --- | --- | | startupNodes | Array.<Object> | | An array of nodes in the cluster, [{ port: number, host: string }] | | options | Object | | | +| [options.clusterRetryStrategy] | function | | See "Quick Start" section | | [options.enableOfflineQueue] | boolean | true | See Redis class | | [options.enableReadyCheck] | boolean | true | When enabled, ioredis only emits "ready" event when `CLUSTER INFO` command reporting the cluster is ready for handling commands. | | [options.scaleReads] | string | "master" | Scale reads to the node with the specified role. Available values are "master", "slave" and "all". | | [options.maxRedirections] | number | 16 | When a MOVED or ASK error is received, client will redirect the command to another node. This option limits the max redirections allowed to send a command. | -| [options.clusterRetryStrategy] | function | | See "Quick Start" section | | [options.retryDelayOnFailover] | number | 100 | When an error is received when sending a command(e.g. "Connection is closed." when the target Redis node is down), | | [options.retryDelayOnClusterDown] | number | 100 | When a CLUSTERDOWN error is received, client will retry if `retryDelayOnClusterDown` is valid delay time. | | [options.redisOptions] | Object | | Passed to the constructor of `Redis`. | diff --git a/README.md b/README.md index b0a89b56..334117b9 100644 --- a/README.md +++ b/README.md @@ -659,14 +659,17 @@ but a few so that if one is unreachable the client will try the next one, and th } ``` + * `enableOfflineQueue`: Similar to the `enableOfflineQueue` option of `Redis` class. + * `enableReadyCheck`: When enabled, "ready" event will only be emitted when `CLUSTER INFO` command + reporting the cluster is ready for handling commands. Otherwise, it will be emitted immediately after "connect" is emitted. + * `scaleReads`: Config where to send the read queries. See below for more details. * `maxRedirections`: When a cluster related error (e.g. `MOVED`, `ASK` and `CLUSTERDOWN` etc.) is received, the client will redirect the command to another node. This option limits the max redirections allowed when sending a command. The default value is `16`. - * `retryDelayOnFailover`: If the error of "Connection is closed." is received when sending a command, + * `retryDelayOnFailover`: If the target node is disconnected when sending a command, ioredis will retry after the specified delay. The default value is `100`. You should make sure `retryDelayOnFailover * maxRedirections > cluster-node-timeout` to insure that no command will fail during a failover. * `retryDelayOnClusterDown`: When a cluster is down, all commands will be rejected with the error of `CLUSTERDOWN`. If this option is a number (by default, it is `100`), the client will resend the commands after the specified time (in ms). - * `scaleReads`: Config where to send the read queries. See below for more details. * `redisOptions`: Default options passed to the constructor of `Redis` when connecting to a node. ### Read-write splitting diff --git a/lib/cluster/index.js b/lib/cluster/index.js index 063ed601..5ea0af20 100644 --- a/lib/cluster/index.js +++ b/lib/cluster/index.js @@ -20,6 +20,7 @@ var ConnectionPool = require('./connection_pool'); * @constructor * @param {Object[]} startupNodes - An array of nodes in the cluster, [{ port: number, host: string }] * @param {Object} options + * @param {function} [options.clusterRetryStrategy] - See "Quick Start" section * @param {boolean} [options.enableOfflineQueue=true] - See Redis class * @param {boolean} [options.enableReadyCheck=true] - When enabled, ioredis only emits "ready" event when `CLUSTER INFO` * command reporting the cluster is ready for handling commands. @@ -27,7 +28,6 @@ var ConnectionPool = require('./connection_pool'); * Available values are "master", "slave" and "all". * @param {number} [options.maxRedirections=16] - When a MOVED or ASK error is received, client will redirect the * command to another node. This option limits the max redirections allowed to send a command. - * @param {function} [options.clusterRetryStrategy] - See "Quick Start" section * @param {number} [options.retryDelayOnFailover=100] - When an error is received when sending a command(e.g. * "Connection is closed." when the target Redis node is down), * @param {number} [options.retryDelayOnClusterDown=100] - When a CLUSTERDOWN error is received, client will retry @@ -105,15 +105,15 @@ function Cluster(startupNodes, options) { * @protected */ Cluster.defaultOptions = { - maxRedirections: 16, + clusterRetryStrategy: function (times) { + return Math.min(100 + times * 2, 2000); + }, enableOfflineQueue: true, enableReadyCheck: true, - retryDelayOnFailover: 100, - retryDelayOnClusterDown: 100, scaleReads: 'master', - clusterRetryStrategy: function (times) { - return Math.min(100 + times * 2, 2000); - } + maxRedirections: 16, + retryDelayOnFailover: 100, + retryDelayOnClusterDown: 100 }; util.inherits(Cluster, EventEmitter);