-
Notifications
You must be signed in to change notification settings - Fork 253
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Rewrite ConnectionPool for efficiency
This is a rewrite of SSHKit's ConnectionPool. The advantages of this version are: * Cleaner API * Less mutex locking * Asynchronous connection eviction/closing * Faster (slightly) Cleaner API =========== # Old API entry = pool.checkout(host, username, &Net::SSH.method(:start)) begin # do stuff with entry.connection ensure pool.checkin(entry) end # New API pool.with(Net::SSH.method(:start), host, username) do |connection| # do stuff with connection end Less mutex locking ================== Previously, every checkin and checkout would use a mutex to lock the entire pool every time. Now, each unique connection key has its own separate mutex, so any locking is done per host, and not for the entire pool. Asynchronous connection eviction/closing ======================================== Previously, every checkin and checkout would trigger an expiration check for all connections in the pool. Thus every thread must potentially wait for connections used by other threads to be cleaned up and closed. Now this logic is removed. Eviction happens in a separate background thread periodically sweeps the connections to identify and close stale ones. Faster (slightly) ================= Profile before rewrite: TOTAL (pct) SAMPLES (pct) FRAME 54 (12.0%) 54 (12.0%) SSHKit::Backend::ConnectionPool::Entry#expired? 50 (11.1%) 50 (11.1%) Net::SSH::Compat.io_select 28 (6.2%) 28 (6.2%) block (2 levels) in Net::SSH::Connection::Channel#forward_local_env 11 (2.4%) 11 (2.4%) Net::SSH::Transport::HMAC::Abstract.mac_length 11 (2.4%) 10 (2.2%) Net::SSH::Buffer#read 22 (4.9%) 10 (2.2%) Net::SSH::BufferedIo#fill 9 (2.0%) 9 (2.0%) Net::SSH::Transport::State#update_next_iv 12 (2.7%) 9 (2.0%) Net::SSH::BufferedIo#send_pending 63 (14.0%) 8 (1.8%) block (2 levels) in SSHKit::Backend::ConnectionPool#prune_expired 8 (1.8%) 8 (1.8%) Net::SSH::Buffer#initialize 7 (1.6%) 7 (1.6%) Net::SSH::Buffer#append After rewrite: TOTAL (pct) SAMPLES (pct) FRAME 383 (11.7%) 383 (11.7%) block (2 levels) in Net::SSH::Connection::Channel#forward_local_env 331 (10.2%) 331 (10.2%) Net::SSH::Compat.io_select 116 (3.6%) 102 (3.1%) Net::SSH::Buffer#read 100 (3.1%) 100 (3.1%) Net::SSH::Transport::HMAC::Abstract.digest_class 100 (3.1%) 100 (3.1%) Net::SSH::Transport::HMAC::Abstract.mac_length 133 (4.1%) 80 (2.5%) Net::SSH::BufferedIo#send_pending 139 (4.3%) 74 (2.3%) Net::SSH::BufferedIo#fill 71 (2.2%) 71 (2.2%) Net::SSH::Transport::State#update_next_iv 80 (2.5%) 71 (2.2%) SSHKit::Runner::Parallel#execute 57 (1.7%) 57 (1.7%) Net::SSH::Buffer#initialize 55 (1.7%) 55 (1.7%) Net::SSH::BufferedIo#output Note that ConnectionPool is no longer listed.
- Loading branch information
1 parent
211a168
commit 72cf0b4
Showing
7 changed files
with
270 additions
and
174 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,126 +1,159 @@ | ||
require "monitor" | ||
require "thread" | ||
|
||
# Since we call to_s on new_connection_args and use that as a hash | ||
# We need to make sure the memory address of the object is not used as part of the key | ||
# Otherwise identical objects with different memory address won't get a hash hit. | ||
# In the case of proxy commands, this can lead to proxy processes leaking | ||
# And in severe cases can cause deploys to fail due to default file descriptor limits | ||
# An alternate solution would be to use a different means of generating hash keys | ||
# Since we call to_s on new connection arguments and use that as a cache key, we | ||
# need to make sure the memory address of the object is not used as part of the | ||
# key. Otherwise identical objects with different memory address won't reuse the | ||
# cache. | ||
# | ||
# In the case of proxy commands, this can lead to proxy processes leaking, and | ||
# in severe cases can cause deploys to fail due to default file descriptor | ||
# limits. An alternate solution would be to use a different means of generating | ||
# hash keys. | ||
# | ||
require "net/ssh/proxy/command" | ||
class Net::SSH::Proxy::Command | ||
# Ensure a stable string value is used, rather than memory address. | ||
def inspect | ||
@command_line_template | ||
end | ||
end | ||
|
||
module SSHKit | ||
|
||
module Backend | ||
|
||
class ConnectionPool | ||
|
||
attr_accessor :idle_timeout, :enabled | ||
|
||
def initialize | ||
self.idle_timeout = 30 | ||
self.enabled = true | ||
@mutex = Mutex.new | ||
@pool = {} | ||
end | ||
# The ConnectionPool caches connections and allows them to be reused, so long as | ||
# the reuse happens within the `idle_timeout` period. Timed out connections are | ||
# closed, forcing a new connection to be used in that case. | ||
# | ||
# Additionally, a background thread is started to check for abandoned | ||
# connections that have timed out without any attempt at being reused. These | ||
# are eventually closed as well and removed from the cache. | ||
# | ||
# If `idle_timeout` set to `false`, `0`, or `nil`, no caching is performed, and | ||
# a new connection is created and then immediately closed each time. The default | ||
# timeout is 30 (seconds). | ||
# | ||
# There is a single public method: `with`. Example usage: | ||
# | ||
# pool = SSHKit::Backend::ConnectionPool.new | ||
# pool.with(Net::SSH.method(:start), "host", "username") do |connection| | ||
# # do stuff with connection | ||
# end | ||
# | ||
class SSHKit::Backend::ConnectionPool | ||
attr_accessor :idle_timeout | ||
|
||
def initialize(idle_timeout=30) | ||
@idle_timeout = idle_timeout | ||
@caches = {} | ||
@caches.extend(MonitorMixin) | ||
@timed_out_connections = Queue.new | ||
Thread.new { run_eviction_loop } | ||
end | ||
|
||
def prune_expired? | ||
idle_timeout && idle_timeout != 0 | ||
end | ||
# Creates a new connection or reuses a cached connection (if possible) and | ||
# yields the connection to the given block. Connections are created by | ||
# invoking the `connection_factory` proc with the given `args`. The arguments | ||
# are used to construct a key used for caching. | ||
def with(connection_factory, *args) | ||
cache = find_cache(args) | ||
conn = cache.pop || begin | ||
connection_factory.call(*args) | ||
end | ||
yield(conn) | ||
ensure | ||
cache.push(conn) unless conn.nil? | ||
end | ||
|
||
def checkout(*new_connection_args, &block) | ||
entry = nil | ||
key = new_connection_args.to_s | ||
if enabled | ||
prune_expired if prune_expired? | ||
entry = find_live_entry(key) | ||
end | ||
entry || create_new_entry(new_connection_args, key, &block) | ||
end | ||
# Immediately remove all cached connections, without closing them. This only | ||
# exists for unit test purposes. | ||
def flush_connections | ||
caches.synchronize { caches.clear } | ||
end | ||
|
||
def checkin(entry) | ||
if enabled | ||
if prune_expired? | ||
entry.expires_at = Time.now + idle_timeout | ||
prune_expired | ||
end | ||
@mutex.synchronize do | ||
@pool[entry.key] ||= [] | ||
@pool[entry.key] << entry | ||
end | ||
end | ||
end | ||
# Immediately close all cached connections and empty the pool. | ||
def close_connections | ||
caches.synchronize do | ||
caches.values.each(&:clear) | ||
caches.clear | ||
process_deferred_close | ||
end | ||
end | ||
|
||
def close_connections | ||
@mutex.synchronize do | ||
@pool.values.flatten.map(&:connection).uniq.each do |conn| | ||
if conn.respond_to?(:closed?) && conn.respond_to?(:close) | ||
conn.close unless conn.closed? | ||
end | ||
end | ||
@pool.clear | ||
end | ||
end | ||
private | ||
|
||
def flush_connections | ||
@mutex.synchronize { @pool.clear } | ||
end | ||
attr_reader :caches, :timed_out_connections | ||
|
||
private | ||
|
||
def prune_expired | ||
@mutex.synchronize do | ||
@pool.each_value do |entries| | ||
entries.collect! do |entry| | ||
if entry.expired? | ||
entry.close unless entry.closed? | ||
nil | ||
else | ||
entry | ||
end | ||
end.compact! | ||
end | ||
end | ||
end | ||
def cache_enabled? | ||
idle_timeout && idle_timeout > 0 | ||
end | ||
|
||
def find_live_entry(key) | ||
@mutex.synchronize do | ||
return nil unless @pool.key?(key) | ||
while (entry = @pool[key].shift) | ||
return entry if entry.live? | ||
end | ||
end | ||
nil | ||
end | ||
# Look up a Cache that matches the given connection arguments. | ||
def find_cache(args) | ||
if cache_enabled? | ||
key = args.to_s | ||
caches[key] || thread_safe_find_or_create_cache(key) | ||
else | ||
NilCache.new(method(:silently_close_connection)) | ||
end | ||
end | ||
|
||
def create_new_entry(args, key, &block) | ||
Entry.new block.call(*args), key | ||
# Cache creation needs to happen in a mutex, because otherwise a race | ||
# condition might cause two identical caches to be created for the same key. | ||
def thread_safe_find_or_create_cache(key) | ||
caches.synchronize do | ||
caches[key] ||= begin | ||
Cache.new(idle_timeout, method(:silently_close_connection_later)) | ||
end | ||
end | ||
end | ||
|
||
Entry = Struct.new(:connection, :key) do | ||
attr_accessor :expires_at | ||
|
||
def live? | ||
!expired? && !closed? | ||
end | ||
# Loops indefinitely to close connections and to find abandoned connections | ||
# that need to be closed. | ||
def run_eviction_loop | ||
loop do | ||
process_deferred_close | ||
|
||
def expired? | ||
expires_at && Time.now > expires_at | ||
end | ||
# Periodically sweep all Caches to evict stale connections | ||
sleep([idle_timeout, 5].min) | ||
caches.values.each(&:evict) | ||
end | ||
end | ||
|
||
def close | ||
connection.respond_to?(:close) && connection.close | ||
end | ||
# Immediately close any connections that are pending closure. | ||
# rubocop:disable Lint/HandleExceptions | ||
def process_deferred_close | ||
until timed_out_connections.empty? | ||
connection = timed_out_connections.pop(true) | ||
silently_close_connection(connection) | ||
end | ||
rescue ThreadError | ||
# Queue#pop(true) raises ThreadError if the queue is empty. | ||
# This could only happen if `close_connections` is called at the same time | ||
# the background eviction thread has woken up to close connections. In any | ||
# case, it is not something we need to care about, since an empty queue is | ||
# perfectly OK. | ||
end | ||
# rubocop:enable Lint/HandleExceptions | ||
|
||
def closed? | ||
connection.respond_to?(:closed?) && connection.closed? | ||
end | ||
end | ||
# Adds the connection to a queue that is processed asynchronously by a | ||
# background thread. The connection will eventually be closed. | ||
def silently_close_connection_later(connection) | ||
timed_out_connections << connection | ||
end | ||
|
||
end | ||
# Close the given `connection` immediately, assuming it responds to a `close` | ||
# method. If it doesn't, or if `nil` is provided, it is silently ignored. Any | ||
# `StandardError` is also silently ignored. Returns `true` if the connection | ||
# was closed; `false` if it was already closed or could not be closed due to | ||
# an error. | ||
def silently_close_connection(connection) | ||
return false unless connection.respond_to?(:close) | ||
return false if connection.respond_to?(:closed?) && connection.closed? | ||
connection.close | ||
true | ||
rescue StandardError | ||
false | ||
end | ||
end | ||
|
||
require "sshkit/backends/connection_pool/cache" | ||
require "sshkit/backends/connection_pool/nil_cache" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
# A Cache holds connections for a given key. Each connection is stored along | ||
# with an expiration time so that its idle duration can be measured. | ||
class SSHKit::Backend::ConnectionPool::Cache | ||
def initialize(idle_timeout, closer) | ||
@connections = [] | ||
@connections.extend(MonitorMixin) | ||
@idle_timeout = idle_timeout | ||
@closer = closer | ||
end | ||
|
||
# Remove and return a fresh connection from this Cache. Returns `nil` if | ||
# the Cache is empty or if all existing connections have gone stale. | ||
def pop | ||
connections.synchronize do | ||
evict | ||
_, connection = connections.pop | ||
connection | ||
end | ||
end | ||
|
||
# Return a connection to this Cache. | ||
def push(conn) | ||
# No need to cache if the connection has already been closed. | ||
return if closed?(conn) | ||
|
||
connections.synchronize do | ||
connections.push([Time.now + idle_timeout, conn]) | ||
end | ||
end | ||
|
||
# Close and remove any connections in this Cache that have been idle for | ||
# too long. | ||
def evict | ||
# Peek at the first connection to see if it is still fresh. If so, we can | ||
# return right away without needing to use `synchronize`. | ||
first_expires_at, _connection = connections.first | ||
return if first_expires_at.nil? || fresh?(first_expires_at) | ||
|
||
connections.synchronize do | ||
fresh, stale = connections.partition do |expires_at, _| | ||
fresh?(expires_at) | ||
end | ||
connections.replace(fresh) | ||
stale.each { |_, conn| closer.call(conn) } | ||
end | ||
end | ||
|
||
# Close all connections and completely clear the cache. | ||
def clear | ||
connections.synchronize do | ||
connections.map(&:last).each(&closer) | ||
connections.clear | ||
end | ||
end | ||
|
||
private | ||
|
||
attr_reader :connections, :idle_timeout, :closer | ||
|
||
def fresh?(expires_at) | ||
expires_at > Time.now | ||
end | ||
|
||
def closed?(conn) | ||
conn.respond_to?(:closed?) && conn.closed? | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
# A cache that holds no connections. Any connection provided to this cache | ||
# is simply closed. | ||
SSHKit::Backend::ConnectionPool::NilCache = Struct.new(:closer) do | ||
def pop | ||
nil | ||
end | ||
|
||
def push(conn) | ||
closer.call(conn) | ||
end | ||
end |
Oops, something went wrong.