Skip to content

Commit

Permalink
Handle connection closing edge cases
Browse files Browse the repository at this point in the history
  • Loading branch information
mattbrictson committed Feb 10, 2016
1 parent 201f8ca commit e76e556
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 21 deletions.
38 changes: 30 additions & 8 deletions lib/sshkit/backends/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def initialize(idle_timeout=30)
@caches = {}
@caches.extend(MonitorMixin)
@timed_out_connections = Queue.new
Thread.new { close_timed_out_connections }
Thread.new { run_eviction_loop }
end

# Creates a new connection or reuses a cached connection (if possible) and
Expand All @@ -63,12 +63,21 @@ def with(connection_factory, *args)
cache.push(conn) unless conn.nil?
end

# Immediately remove all cached connections, without closing them. This is
# intended for a unit test environment only.
# Immediately remove all cached connections, without closing them. This only
# exists for unit test purposes.
def flush_connections
caches.synchronize { caches.clear }
end

# Immediately close all cached connections and empty the pool.
def close_connections
caches.synchronize do
caches.values.each(&:clear)
caches.clear
process_deferred_close
end
end

private

attr_reader :caches, :timed_out_connections
Expand Down Expand Up @@ -99,19 +108,32 @@ def thread_safe_find_or_create_cache(key)

# Loops indefinitely to close connections and to find abandoned connections
# that need to be closed.
def close_timed_out_connections
def run_eviction_loop
loop do
until timed_out_connections.empty?
connection = timed_out_connections.pop
silently_close_connection(connection)
end
process_deferred_close

# Periodically sweep all Caches to evict stale connections
sleep([idle_timeout, 5].min)
caches.values.each(&:evict)
end
end

# Immediately close any connections that are pending closure.
# rubocop:disable Lint/HandleExceptions
def process_deferred_close
until timed_out_connections.empty?
connection = timed_out_connections.pop(true)
silently_close_connection(connection)
end
rescue ThreadError
# Queue#pop(true) raises ThreadError if the queue is empty.
# This could only happen if `close_connections` is called at the same time
# the background eviction thread has woken up to close connections. In any
# case, it is not something we need to care about, since an empty queue is
# perfectly OK.
end
# rubocop:enable Lint/HandleExceptions

# Adds the connection to a queue that is processed asynchronously by a
# background thread. The connection will eventually be closed.
def silently_close_connection_later(connection)
Expand Down
15 changes: 15 additions & 0 deletions lib/sshkit/backends/connection_pool/cache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ def pop

# Return a connection to this Cache.
def push(conn)
# No need to cache if the connection has already been closed.
return if closed?(conn)

connections.synchronize do
connections.push([Time.now + idle_timeout, conn])
end
Expand All @@ -42,11 +45,23 @@ def evict
end
end

# Close all connections and completely clear the cache.
def clear
connections.synchronize do
connections.map(&:last).each(&closer)
connections.clear
end
end

private

attr_reader :connections, :idle_timeout, :closer

def fresh?(expires_at)
expires_at > Time.now
end

def closed?(conn)
conn.respond_to?(:closed?) && conn.closed?
end
end
28 changes: 15 additions & 13 deletions test/unit/backends/test_connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def test_expired_connection_is_not_reused
def test_expired_connection_is_closed
pool.idle_timeout = 0.1
conn1 = mock
conn1.expects(:closed?).returns(false)
conn1.expects(:closed?).twice.returns(false)
conn1.expects(:close)

pool.with(->(*) { conn1 }, "conn1") {}
Expand All @@ -104,10 +104,8 @@ def test_expired_connection_is_closed
end

def test_closed_connection_is_not_reused
skip "Not implemented yet"
conn1 = pool.checkout("conn", &connect_and_close)
pool.checkin conn1
conn2 = pool.checkout("conn", &connect)
conn1 = pool.with(connect_and_close, "conn") { |c| c }
conn2 = pool.with(connect, "conn") { |c| c }

refute_equal conn1, conn2
end
Expand All @@ -120,18 +118,22 @@ def test_connections_with_different_args_are_not_reused
end

def test_close_connections
skip "Not implemented yet"
conn1 = mock
conn1.expects(:closed?).returns(false)
conn1.expects(:closed?).twice.returns(false)
conn1.expects(:close)
entry1 = pool.checkout("conn1"){ conn1 }
pool.checkin entry1
# the following isn't closed if close_connections is called
pool.checkout("conn2", &connect)

pool.close_connections
end
conn2 = mock
conn2.expects(:closed?).returns(false)
conn2.expects(:close).never

pool.with(->(*) { conn1 }, "conn1") {}

# We are using conn2 when close_connections is called, so it should
# not be closed.
pool.with(->(*) { conn2 }, "conn2") do
pool.close_connections
end
end
end
end
end

0 comments on commit e76e556

Please sign in to comment.