Skip to content

Commit

Permalink
Document connection options
Browse files Browse the repository at this point in the history
  • Loading branch information
byroot committed Mar 30, 2022
1 parent da267de commit 90d219f
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 59 deletions.
47 changes: 47 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,23 @@ redis.call("GET", "mykey")

NOTE: `RedisClient` instances must not be shared between threads. Make sure to read the section on [thread safety](#thread-safety).

### Configuration

- `url`: A Redis connection URL, e.g. `redis://example.com:6379/5`, a `rediss://` scheme enable SSL, and the path is interpreted as a database number.
Note tht all other configurtions take precedence, e.g. `RedisClient.config(url: "redis://localhost:3000" port: 6380)` will connect on port `6380`.
- `host`: The server hostname or IP address. Defaults to `"localhost"`.
- `port`: The server port. Defaults to `6379`.
- `path`: The path to a UNIX socket, if set `url`, `host` and `port` are ignored.
- `db`: The database to select after connecting, defaults to `0`.
- `id` ID for the client connection, assigns name to current connection by sending `CLIENT SETNAME`.
- `username` Username to authenticate against server, defaults to `"default"`.
- `password` Password to authenticate against server.
- `timeout`: The general timeout in seconds, default to `1.0`.
- `connect_timeout`: The connection timeout, takes precedence over the general timeout when connecting to the server.
- `read_timeout`: The read timeout, takes precedence over the general timeout when reading responses from the server.
- `write_timeout`: The write timeout, takes precedence over the general timeout when sending commands to the server.
- `reconnect_attempts`: Specify how many times the client should retry to send queries. Defaults to `0`. Makes sure to read the [reconnection section](#reconnection) before enabling it.

### Type support

Only a select few Ruby types are supported as arguments beside strings.
Expand Down Expand Up @@ -177,6 +194,8 @@ end

If the transaction wasn't successful, `#multi` will return `nil`.

Note that transactions using optimistic locking aren't automatically retried uppon connection errors.

### Publish / Subscribe

Pub/Sub related commands must be called on a dedicated `PubSub` object:
Expand Down Expand Up @@ -218,6 +237,34 @@ RedisClient.config(

All timeout values are specified in seconds.

### Reconnection

`redis-client` support automatic reconnection after network errors via the `reconnect_attempts:` configuration option.

It can be set as a number of retries:

```ruby
redis_config = RedisClient.config(reconnect_attempts: 1)
```

Or as a list of sleep durations for implementing exponential backoff:

```ruby
redis_config = RedisClient.config(reconnect_attempts: [0, 0.05, 0.1])
```

**Important Note**: Retrying may cause commands to be issued more than once to the server, so in the case of
non-idempotent commands such as `LPUSH` or `INCR`, it may cause consistency issues.

To selectively disable automatic retries, you can use the `#call_once` method:

```ruby
redis_config = RedisClient.config(reconnect_attempts: [0, 0.05, 0.1])
redis = redis_config.new_client
redis.call("GET", "counter") # Will be retried up to 3 times.
redis.call_once("INCR", "counter") # Won't be retried.
```

### Thread Safety

Contrary to the `redis` gem, `redis-client` doesn't protect against concurrent access.
Expand Down
154 changes: 98 additions & 56 deletions lib/redis_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,45 @@ def pubsub
end

def call(*command)
_call(command, nil)
command = RESP3.coerce_command!(command)
result = ensure_connected do |connection|
connection.write(command)
connection.read
end

if result.is_a?(CommandError)
raise result
else
result
end
end

def call_once(*command)
command = RESP3.coerce_command!(command)
result = ensure_connected(retryable: false) do |connection|
connection.write(command)
connection.read
end

if result.is_a?(CommandError)
raise result
else
result
end
end

def blocking_call(timeout, *command)
_call(command, timeout)
command = RESP3.coerce_command!(command)
result = ensure_connected do |connection|
connection.write(command)
connection.read(timeout)
end

if result.is_a?(CommandError)
raise result
else
result
end
end

def scan(*args, &block)
Expand Down Expand Up @@ -131,7 +165,7 @@ def pipelined
if pipeline._size == 0
[]
else
ensure_connected do |connection|
ensure_connected(retryable: pipeline._retryable?) do |connection|
call_pipelined(connection, pipeline._commands, pipeline._timeouts)
end
end
Expand All @@ -141,11 +175,11 @@ def multi(watch: nil, &block)
if watch
# WATCH is stateful, so we can't reconnect if it's used, the whole transaction
# has to be redone.
prevent_reconnection do |connection|
ensure_connected(retryable: false) do |connection|
call("WATCH", *watch)
begin
if commands = build_transaction(&block)
call_pipelined(connection, commands).last
if transaction = build_transaction(&block)
call_pipelined(connection, transaction._commands).last
else
call("UNWATCH")
[]
Expand All @@ -155,12 +189,15 @@ def multi(watch: nil, &block)
raise
end
end
elsif commands = build_transaction(&block)
ensure_connected do |connection|
call_pipelined(connection, commands).last
end
else
[]
transaction = build_transaction(&block)
if transaction._empty?
[]
else
ensure_connected(retryable: transaction._retryable?) do |connection|
call_pipelined(connection, transaction._commands).last
end
end
end
end

Expand Down Expand Up @@ -199,13 +236,20 @@ class Multi
def initialize
@size = 0
@commands = []
@retryable = true
end

def call(*command)
@commands << RESP3.coerce_command!(command)
nil
end

def call_once(*command)
@retryable = false
@commands << RESP3.coerce_command!(command)
nil
end

def _commands
@commands
end
Expand All @@ -214,9 +258,17 @@ def _size
@commands.size
end

def _empty?
@commands.size <= 2
end

def _timeouts
nil
end

def _retryable?
@retryable
end
end

class Pipeline < Multi
Expand All @@ -229,11 +281,16 @@ def blocking_call(timeout, *command)
@timeouts ||= []
@timeouts[@commands.size] = timeout
@commands << RESP3.coerce_command!(command)
nil
end

def _timeouts
@timeouts
end

def _empty?
@commands.empty?
end
end

private
Expand All @@ -243,7 +300,7 @@ def build_transaction
transaction.call("MULTI")
yield transaction
transaction.call("EXEC")
transaction._commands if transaction._size > 2
transaction
end

def scan_list(cursor_index, command, &block)
Expand Down Expand Up @@ -272,20 +329,6 @@ def scan_pairs(cursor_index, command)
nil
end

def _call(command, timeout)
command = RESP3.coerce_command!(command)
result = ensure_connected do |connection|
connection.write(command)
connection.read(timeout)
end

if result.is_a?(CommandError)
raise result
else
result
end
end

def call_pipelined(connection, commands, timeouts = nil)
exception = nil

Expand All @@ -309,41 +352,40 @@ def call_pipelined(connection, commands, timeouts = nil)
end
end

def ensure_connected
tries = 0
connection = nil
begin
connection = raw_connection
if block_given?
yield connection
else
connection
def ensure_connected(retryable: true)
if retryable
tries = 0
connection = nil
begin
connection = raw_connection
if block_given?
yield connection
else
connection
end
rescue ConnectionError
connection&.close
close

if !@disable_reconnection && config.retry_connecting?(tries)
tries += 1
retry
else
raise
end
end
rescue ConnectionError
connection&.close
close

if !@disable_reconnection && config.retry_connecting?(tries)
tries += 1
retry
else
raise
else
previous_disable_reconnection = @disable_reconnection
connection = ensure_connected
begin
@disable_reconnection = true
yield connection
ensure
@disable_reconnection = previous_disable_reconnection
end
end
end

def prevent_reconnection
previous_disable_reconnection = @disable_reconnection

connection = ensure_connected
begin
@disable_reconnection = true
yield connection
ensure
@disable_reconnection = previous_disable_reconnection
end
end

def raw_connection
@raw_connection ||= begin
connection = config.driver.new(
Expand Down
2 changes: 1 addition & 1 deletion lib/redis_client/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

class RedisClient
class Config
DEFAULT_TIMEOUT = 3
DEFAULT_TIMEOUT = 1.0
DEFAULT_HOST = "localhost"
DEFAULT_PORT = 6379
DEFAULT_USERNAME = "default"
Expand Down
21 changes: 21 additions & 0 deletions test/redis_client/connection_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,27 @@ def test_reconnect_attempts_enabled_inside_watching_transactions
end
end

def test_reconnect_with_non_idempotent_commands
client = new_client(reconnect_attempts: 1)

simulate_network_errors(client, ["INCR"]) do
# The INCR command is retried, causing the counter to be incremented twice
assert_equal 2, client.call("INCR", "counter")
end
assert_equal "2", client.call("GET", "counter")
end

def test_reconnect_with_call_once
client = new_client(reconnect_attempts: 1)

simulate_network_errors(client, ["INCR"]) do
assert_raises ConnectionError do
client.call_once("INCR", "counter")
end
end
assert_equal "1", client.call("GET", "counter")
end

private

def assert_timeout(error, faster_than = 0.5, &block)
Expand Down
14 changes: 14 additions & 0 deletions test/redis_client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,20 @@ def test_hashes
assert_equal({ "bar" => "1", "baz" => "2" }, @redis.call("HGETALL", "foo"))
end

def test_call_once
assert_equal 1, @redis.call_once("INCR", "counter")

result = @redis.pipelined do |pipeline|
pipeline.call_once("INCR", "counter")
end
assert_equal [2], result

result = @redis.multi do |transaction|
transaction.call_once("INCR", "counter")
end
assert_equal [3], result
end

def test_pipelining
result = @redis.pipelined do |pipeline|
assert_nil pipeline.call("SET", "foo", "42")
Expand Down
12 changes: 10 additions & 2 deletions test/support/client_test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,18 @@ module ClassMethods
def write(command)
if self.class.failures.first == command.first
self.class.failures.shift
@fail_now = true
end
super
end

def read(*)
@fail_now ||= false
if @fail_now
raise ::RedisClient::ConnectionError, "simulated failure"
else
super
end

super
end

def write_multi(commands)
Expand Down

0 comments on commit 90d219f

Please sign in to comment.