diff --git a/.gitignore b/.gitignore index ad7681de9..a7c582b85 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,5 @@ development.log /Dockerfile /Makefile /docker-compose.yml +Gemfile.lock + diff --git a/Gemfile b/Gemfile index bdd00fff9..08f0f1b2a 100644 --- a/Gemfile +++ b/Gemfile @@ -3,7 +3,7 @@ source "https://rubygems.org" gemspec gem "rake" -gem "redis-namespace" +gem "redis-client", github: "redis-rb/redis-client" gem "rails", "~> 7.0" gem "sqlite3", platforms: :ruby gem "activerecord-jdbcsqlite3-adapter", platforms: :jruby diff --git a/bin/sidekiqload b/bin/sidekiqload index 799785029..1a1eda7cc 100755 --- a/bin/sidekiqload +++ b/bin/sidekiqload @@ -55,7 +55,7 @@ rescue ArgumentError puts "Signal #{sig} not supported" end -Sidekiq.redis { |c| c.flushdb } +Sidekiq.redis { |c| c.call("FLUSHDB") } def handle_signal(launcher, sig) Sidekiq.logger.debug "Got #{sig} signal" case sig @@ -100,7 +100,7 @@ Monitoring = Thread.new do while true sleep 0.2 qsize = Sidekiq.redis do |conn| - conn.llen "queue:default" + conn.call("LLEN", "queue:default") end total = qsize # Sidekiq.logger.error("RSS: #{Process.rss} Pending: #{total}") diff --git a/lib/sidekiq.rb b/lib/sidekiq.rb index 4abe04aa6..8bb28fed8 100644 --- a/lib/sidekiq.rb +++ b/lib/sidekiq.rb @@ -63,7 +63,7 @@ def self.options=(opts) # Configuration for Sidekiq server, use like: # # Sidekiq.configure_server do |config| - # config.redis = { :namespace => 'myapp', :size => 25, :url => 'redis://myhost:8877/0' } + # config.redis = { :size => 25, :url => 'redis://myhost:8877/0' } # config.server_middleware do |chain| # chain.add MyServerHook # end @@ -76,7 +76,7 @@ def self.configure_server # Configuration for Sidekiq client, use like: # # Sidekiq.configure_client do |config| - # config.redis = { :namespace => 'myapp', :size => 1, :url => 'redis://myhost:8877/0' } + # config.redis = { ::size => 1, :url => 'redis://myhost:8877/0' } # end def self.configure_client yield self unless server? @@ -92,14 +92,14 @@ def self.redis retryable = true begin yield conn - rescue Redis::BaseError => ex + rescue RedisClient::Error => ex # 2550 Failover can cause the server to become a replica, need # to disconnect and reopen the socket to get back to the primary. # 4495 Use the same logic if we have a "Not enough replicas" error from the primary # 4985 Use the same logic when a blocking command is force-unblocked # The same retry logic is also used in client.rb if retryable && ex.message =~ /READONLY|NOREPLICAS|UNBLOCKED/ - conn.disconnect! + conn.close retryable = false retry end @@ -110,14 +110,8 @@ def self.redis def self.redis_info redis do |conn| - # admin commands can't go through redis-namespace starting - # in redis-namespace 2.0 - if conn.respond_to?(:namespace) - conn.redis.info - else - conn.info - end - rescue Redis::CommandError => ex + conn.call("INFO").lines(chomp: true).map { |l| l.split(":", 2) }.select { |l| l.size == 2 }.to_h + rescue RedisClient::CommandError => ex # 2850 return fake version when INFO command has (probably) been renamed raise unless /unknown command/.match?(ex.message) FAKE_INFO diff --git a/lib/sidekiq/api.rb b/lib/sidekiq/api.rb index 55a7fb976..d8aa2d982 100644 --- a/lib/sidekiq/api.rb +++ b/lib/sidekiq/api.rb @@ -55,13 +55,13 @@ def queues def fetch_stats_fast! pipe1_res = Sidekiq.redis { |conn| conn.pipelined do |pipeline| - pipeline.get("stat:processed") - pipeline.get("stat:failed") - pipeline.zcard("schedule") - pipeline.zcard("retry") - pipeline.zcard("dead") - pipeline.scard("processes") - pipeline.lrange("queue:default", -1, -1) + pipeline.call("GET", "stat:processed") + pipeline.call("GET", "stat:failed") + pipeline.call("ZCARD", "schedule") + pipeline.call("ZCARD", "retry") + pipeline.call("ZCARD", "dead") + pipeline.call("SCARD", "processes") + pipeline.call("LRANGE", "queue:default", -1, -1) end } @@ -93,17 +93,17 @@ def fetch_stats_fast! # O(number of processes + number of queues) redis calls def fetch_stats_slow! processes = Sidekiq.redis { |conn| - conn.sscan_each("processes").to_a + conn.sscan("processes").to_a } queues = Sidekiq.redis { |conn| - conn.sscan_each("queues").to_a + conn.sscan("queues").to_a } pipe2_res = Sidekiq.redis { |conn| conn.pipelined do |pipeline| - processes.each { |key| pipeline.hget(key, "busy") } - queues.each { |queue| pipeline.llen("queue:#{queue}") } + processes.each { |key| pipeline.call("HGET", key, "busy") } + queues.each { |queue| pipeline.call("LLEN", "queue:#{queue}") } end } @@ -131,7 +131,7 @@ def reset(*stats) mset_args << 0 end Sidekiq.redis do |conn| - conn.mset(*mset_args) + conn.call("MSET", *mset_args) end end @@ -145,11 +145,11 @@ def stat(s) class Queues def lengths Sidekiq.redis do |conn| - queues = conn.sscan_each("queues").to_a + queues = conn.sscan("queues").to_a lengths = conn.pipelined { |pipeline| queues.each do |queue| - pipeline.llen("queue:#{queue}") + pipeline.call("LLEN", "queue:#{queue}") end } @@ -187,11 +187,11 @@ def date_stat_hash(stat) begin Sidekiq.redis do |conn| - conn.mget(keys).each_with_index do |value, idx| + conn.call("MGET", *keys).each_with_index do |value, idx| stat_hash[dates[idx]] = value ? value.to_i : 0 end end - rescue Redis::CommandError + rescue RedisClient::CommandError # mget will trigger a CROSSSLOT error when run against a Cluster # TODO Someone want to add Cluster support? end @@ -220,7 +220,7 @@ class Queue # Return all known queues within Redis. # def self.all - Sidekiq.redis { |c| c.sscan_each("queues").to_a }.sort.map { |q| Sidekiq::Queue.new(q) } + Sidekiq.redis { |c| c.sscan("queues").to_a }.sort.map { |q| Sidekiq::Queue.new(q) } end attr_reader :name @@ -231,7 +231,7 @@ def initialize(name = "default") end def size - Sidekiq.redis { |con| con.llen(@rname) } + Sidekiq.redis { |con| con.call("LLEN", @rname) } end # Sidekiq Pro overrides this @@ -246,7 +246,7 @@ def paused? # @return Float def latency entry = Sidekiq.redis { |conn| - conn.lrange(@rname, -1, -1) + conn.call("LRANGE", @rname, -1, -1) }.first return 0 unless entry job = Sidekiq.load_json(entry) @@ -265,7 +265,7 @@ def each range_start = page * page_size - deleted_size range_end = range_start + page_size - 1 entries = Sidekiq.redis { |conn| - conn.lrange @rname, range_start, range_end + conn.call "LRANGE", @rname, range_start, range_end } break if entries.empty? page += 1 @@ -288,8 +288,8 @@ def find_job(jid) def clear Sidekiq.redis do |conn| conn.multi do |transaction| - transaction.unlink(@rname) - transaction.srem("queues", name) + transaction.call("UNLINK", @rname) + transaction.call("SREM", "queues", name) end end end @@ -409,7 +409,7 @@ def latency # Remove this job from the queue. def delete count = Sidekiq.redis { |conn| - conn.lrem("queue:#{@queue}", 1, @value) + conn.call("LREM", "queue:#{@queue}", 1, @value) } count != 0 end @@ -436,7 +436,7 @@ class SortedEntry < JobRecord def initialize(parent, score, item) super(item) - @score = score + @score = Float(score) @parent = parent end @@ -454,7 +454,7 @@ def delete def reschedule(at) Sidekiq.redis do |conn| - conn.zincrby(@parent.name, at.to_f - @score, Sidekiq.dump_json(@item)) + conn.call("ZINCRBY", @parent.name, at.to_f - @score, Sidekiq.dump_json(@item)) end end @@ -490,8 +490,8 @@ def error? def remove_job Sidekiq.redis do |conn| results = conn.multi { |transaction| - transaction.zrangebyscore(parent.name, score, score) - transaction.zremrangebyscore(parent.name, score, score) + transaction.call("ZRANGEBYSCORE", parent.name, score, score) + transaction.call("ZREMRANGEBYSCORE", parent.name, score, score) }.first if results.size == 1 @@ -514,7 +514,7 @@ def remove_job # push the rest back onto the sorted set conn.multi do |transaction| nonmatched.each do |message| - transaction.zadd(parent.name, score.to_f.to_s, message) + transaction.call("ZADD", parent.name, score.to_f.to_s, message) end end end @@ -533,7 +533,7 @@ def initialize(name) end def size - Sidekiq.redis { |c| c.zcard(name) } + Sidekiq.redis { |c| c.call("ZCARD", name) } end def scan(match, count = 100) @@ -541,7 +541,7 @@ def scan(match, count = 100) match = "*#{match}*" unless match.include?("*") Sidekiq.redis do |conn| - conn.zscan_each(name, match: match, count: count) do |entry, score| + conn.zscan(name, match: match, count: count) do |entry, score| yield SortedEntry.new(self, score, entry) end end @@ -549,7 +549,7 @@ def scan(match, count = 100) def clear Sidekiq.redis do |conn| - conn.unlink(name) + conn.call("UNLINK", name) end end alias_method :💣, :clear @@ -558,7 +558,7 @@ def clear class JobSet < SortedSet def schedule(timestamp, message) Sidekiq.redis do |conn| - conn.zadd(name, timestamp.to_f.to_s, Sidekiq.dump_json(message)) + conn.call("ZADD", name, timestamp.to_f.to_s, Sidekiq.dump_json(message)) end end @@ -572,7 +572,7 @@ def each range_start = page * page_size + offset_size range_end = range_start + page_size - 1 elements = Sidekiq.redis { |conn| - conn.zrange name, range_start, range_end, with_scores: true + conn.call "ZRANGE", name, range_start, range_end, "WITHSCORES" } break if elements.empty? page -= 1 @@ -595,7 +595,7 @@ def fetch(score, jid = nil) end elements = Sidekiq.redis { |conn| - conn.zrangebyscore(name, begin_score, end_score, with_scores: true) + conn.call("ZRANGEBYSCORE", name, begin_score, end_score, "WITHSCORES") } elements.each_with_object([]) do |element, result| @@ -610,7 +610,7 @@ def fetch(score, jid = nil) # This is a slower O(n) operation. Do not use for app logic. def find_job(jid) Sidekiq.redis do |conn| - conn.zscan_each(name, match: "*#{jid}*", count: 100) do |entry, score| + conn.zscan(name, match: "*#{jid}*", count: 100) do |entry, score| job = JSON.parse(entry) matched = job["jid"] == jid return SortedEntry.new(self, score, entry) if matched @@ -621,7 +621,7 @@ def find_job(jid) def delete_by_value(name, value) Sidekiq.redis do |conn| - ret = conn.zrem(name, value) + ret = conn.call("ZREM", name, value) @_size -= 1 if ret ret end @@ -629,12 +629,12 @@ def delete_by_value(name, value) def delete_by_jid(score, jid) Sidekiq.redis do |conn| - elements = conn.zrangebyscore(name, score, score) + elements = conn.call("ZRANGEBYSCORE", name, score, score) elements.each do |element| if element.index(jid) message = Sidekiq.load_json(element) if message["jid"] == jid - ret = conn.zrem(name, element) + ret = conn.call("ZREM", name, element) @_size -= 1 if ret break ret end @@ -694,9 +694,9 @@ def kill(message, opts = {}) now = Time.now.to_f Sidekiq.redis do |conn| conn.multi do |transaction| - transaction.zadd(name, now.to_s, message) - transaction.zremrangebyscore(name, "-inf", now - self.class.timeout) - transaction.zremrangebyrank(name, 0, - self.class.max_jobs) + transaction.call("ZADD", name, now.to_s, message) + transaction.call("ZREMRANGEBYSCORE", name, "-inf", now - self.class.timeout) + transaction.call("ZREMRANGEBYRANK", name, 0, - self.class.max_jobs) end end @@ -743,10 +743,10 @@ def initialize(clean_plz = true) def cleanup count = 0 Sidekiq.redis do |conn| - procs = conn.sscan_each("processes").to_a.sort + procs = conn.sscan("processes").to_a.sort heartbeats = conn.pipelined { |pipeline| procs.each do |key| - pipeline.hget(key, "info") + pipeline.call("HGET", key, "info") end } @@ -756,21 +756,21 @@ def cleanup to_prune = procs.select.with_index { |proc, i| heartbeats[i].nil? } - count = conn.srem("processes", to_prune) unless to_prune.empty? + count = conn.call("SREM", "processes", to_prune) unless to_prune.empty? end count end def each result = Sidekiq.redis { |conn| - procs = conn.sscan_each("processes").to_a.sort + procs = conn.sscan("processes").to_a.sort # We're making a tradeoff here between consuming more memory instead of # making more roundtrips to Redis, but if you have hundreds or thousands of workers, # you'll be happier this way conn.pipelined do |pipeline| procs.each do |key| - pipeline.hmget(key, "info", "busy", "beat", "quiet", "rss", "rtt_us") + pipeline.call("HMGET", key, "info", "busy", "beat", "quiet", "rss", "rtt_us") end end } @@ -795,7 +795,7 @@ def each # contains Sidekiq processes which have sent a heartbeat within the last # 60 seconds. def size - Sidekiq.redis { |conn| conn.scard("processes") } + Sidekiq.redis { |conn| conn.call("SCARD", "processes") } end # Total number of threads available to execute jobs. @@ -815,7 +815,7 @@ def total_rss_in_kb # or Sidekiq Pro. def leader @leader ||= begin - x = Sidekiq.redis { |c| c.get("dear-leader") } + x = Sidekiq.redis { |c| c.call("GET", "dear-leader") } # need a non-falsy value so we can memoize x ||= "" x @@ -885,8 +885,8 @@ def signal(sig) key = "#{identity}-signals" Sidekiq.redis do |c| c.multi do |transaction| - transaction.lpush(key, sig) - transaction.expire(key, 60) + transaction.call("LPUSH", key, sig) + transaction.call("EXPIRE", key, 60) end end end @@ -918,13 +918,13 @@ class WorkSet def each(&block) results = [] Sidekiq.redis do |conn| - procs = conn.sscan_each("processes").to_a + procs = conn.sscan("processes").to_a.sort procs.sort.each do |key| valid, workers = conn.pipelined { |pipeline| - pipeline.exists?(key) - pipeline.hgetall("#{key}:work") + pipeline.call("EXISTS", key) + pipeline.call("HGETALL", "#{key}:work") } - next unless valid + next unless valid == 1 workers.each_pair do |tid, json| hsh = Sidekiq.load_json(json) p = hsh["payload"] @@ -946,13 +946,13 @@ def each(&block) # which can easily get out of sync with crashy processes. def size Sidekiq.redis do |conn| - procs = conn.sscan_each("processes").to_a + procs = conn.sscan("processes").to_a.sort if procs.empty? 0 else conn.pipelined { |pipeline| procs.each do |key| - pipeline.hget(key, "busy") + pipeline.call("HGET", key, "busy") end }.sum(&:to_i) end diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index 1dced4548..9f530ebb9 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -201,14 +201,14 @@ def raw_push(payloads) conn.pipelined do |pipeline| atomic_push(pipeline, payloads) end - rescue Redis::BaseError => ex + rescue RedisClient::Error => ex # 2550 Failover can cause the server to become a replica, need # to disconnect and reopen the socket to get back to the primary. # 4495 Use the same logic if we have a "Not enough replicas" error from the primary # 4985 Use the same logic when a blocking command is force-unblocked # The retry logic is copied from sidekiq.rb if retryable && ex.message =~ /READONLY|NOREPLICAS|UNBLOCKED/ - conn.disconnect! + conn.close retryable = false retry end @@ -220,7 +220,7 @@ def raw_push(payloads) def atomic_push(conn, payloads) if payloads.first.key?("at") - conn.zadd("schedule", payloads.map { |hash| + conn.call("ZADD", "schedule", payloads.flat_map { |hash| at = hash.delete("at").to_s [at, Sidekiq.dump_json(hash)] }) @@ -231,8 +231,8 @@ def atomic_push(conn, payloads) entry["enqueued_at"] = now Sidekiq.dump_json(entry) } - conn.sadd("queues", queue) - conn.lpush("queue:#{queue}", to_push) + conn.call("SADD", "queues", queue) + conn.call("LPUSH", "queue:#{queue}", to_push) end end end diff --git a/lib/sidekiq/fetch.rb b/lib/sidekiq/fetch.rb index f3d1fbb81..82363513e 100644 --- a/lib/sidekiq/fetch.rb +++ b/lib/sidekiq/fetch.rb @@ -19,7 +19,7 @@ def queue_name def requeue Sidekiq.redis do |conn| - conn.rpush(queue, job) + conn.call("RPUSH", queue, job) end end } @@ -44,7 +44,7 @@ def retrieve_work return nil end - work = Sidekiq.redis { |conn| conn.brpop(*qs) } + work = Sidekiq.redis { |conn| conn.blocking_call(false, "BRPOP", *qs) } UnitOfWork.new(*work) if work end @@ -61,7 +61,7 @@ def bulk_requeue(inprogress, options) Sidekiq.redis do |conn| conn.pipelined do |pipeline| jobs_to_requeue.each do |queue, jobs| - pipeline.rpush(queue, jobs) + pipeline.call("RPUSH", queue, jobs) end end end diff --git a/lib/sidekiq/job_retry.rb b/lib/sidekiq/job_retry.rb index 3474af14d..f64bea9da 100644 --- a/lib/sidekiq/job_retry.rb +++ b/lib/sidekiq/job_retry.rb @@ -176,7 +176,7 @@ def attempt_retry(jobinst, msg, queue, exception) retry_at = Time.now.to_f + delay payload = Sidekiq.dump_json(msg) Sidekiq.redis do |conn| - conn.zadd("retry", retry_at.to_s, payload) + conn.call("ZADD", "retry", retry_at.to_s, payload) end else # Goodbye dear message, you (re)tried your best I'm sure. diff --git a/lib/sidekiq/launcher.rb b/lib/sidekiq/launcher.rb index 3aa0a9269..c6262c49a 100644 --- a/lib/sidekiq/launcher.rb +++ b/lib/sidekiq/launcher.rb @@ -83,8 +83,8 @@ def clear_heartbeat # doesn't actually exit, it'll reappear in the Web UI. Sidekiq.redis do |conn| conn.pipelined do |pipeline| - pipeline.srem("processes", identity) - pipeline.unlink("#{identity}:work") + pipeline.call("srem", "processes", identity) + pipeline.call("unlink" "#{identity}:work") end end rescue @@ -106,13 +106,13 @@ def self.flush_stats begin Sidekiq.redis do |conn| conn.pipelined do |pipeline| - pipeline.incrby("stat:processed", procd) - pipeline.incrby("stat:processed:#{nowdate}", procd) - pipeline.expire("stat:processed:#{nowdate}", STATS_TTL) + pipeline.call("INCRBY", "stat:processed", procd) + pipeline.call("INCRBY", "stat:processed:#{nowdate}", procd) + pipeline.call("EXPIRE", "stat:processed:#{nowdate}", STATS_TTL) - pipeline.incrby("stat:failed", fails) - pipeline.incrby("stat:failed:#{nowdate}", fails) - pipeline.expire("stat:failed:#{nowdate}", STATS_TTL) + pipeline.call("INCRBY", "stat:failed", fails) + pipeline.call("INCRBY", "stat:failed:#{nowdate}", fails) + pipeline.call("EXPIRE", "stat:failed:#{nowdate}", STATS_TTL) end end rescue => ex @@ -136,23 +136,23 @@ def ❤ Sidekiq.redis do |conn| conn.multi do |transaction| - transaction.incrby("stat:processed", procd) - transaction.incrby("stat:processed:#{nowdate}", procd) - transaction.expire("stat:processed:#{nowdate}", STATS_TTL) + transaction.call("INCRBY", "stat:processed", procd) + transaction.call("INCRBY", "stat:processed:#{nowdate}", procd) + transaction.call("EXPIRE", "stat:processed:#{nowdate}", STATS_TTL) - transaction.incrby("stat:failed", fails) - transaction.incrby("stat:failed:#{nowdate}", fails) - transaction.expire("stat:failed:#{nowdate}", STATS_TTL) + transaction.call("INCRBY", "stat:failed", fails) + transaction.call("INCRBY", "stat:failed:#{nowdate}", fails) + transaction.call("EXPIRE", "stat:failed:#{nowdate}", STATS_TTL) end # work is the current set of executing jobs work_key = "#{key}:work" conn.pipelined do |transaction| - transaction.unlink(work_key) + transaction.call("UNLINK", work_key) curstate.each_pair do |tid, hash| - transaction.hset(work_key, tid, Sidekiq.dump_json(hash)) + transaction.call("HSET", work_key, tid, Sidekiq.dump_json(hash)) end - transaction.expire(work_key, 60) + transaction.call("EXPIRE", work_key, 60) end end @@ -163,26 +163,27 @@ def ❤ _, exists, _, _, msg = Sidekiq.redis { |conn| conn.multi { |transaction| - transaction.sadd("processes", key) - transaction.exists?(key) - transaction.hmset(key, "info", to_json, + transaction.call("SADD", "processes", key) + transaction.call("EXISTS", key) + transaction.call("HMSET", key, + "info", to_json, "busy", curstate.size, "beat", Time.now.to_f, "rtt_us", rtt, - "quiet", @done, + "quiet", @done.to_s, "rss", kb) - transaction.expire(key, 60) - transaction.rpop("#{key}-signals") + transaction.call("EXPIRE", key, 60) + transaction.call("RPOP", "#{key}-signals") } } # first heartbeat or recovering from an outage and need to reestablish our heartbeat - fire_event(:heartbeat) unless exists + fire_event(:heartbeat) unless exists == 1 return unless msg ::Process.kill(msg, ::Process.pid) - rescue => e + rescue RedisClient::Error => e # ignore all redis/network issues logger.error("heartbeat: #{e}") # don't lose the counts if there was a network issue @@ -201,7 +202,7 @@ def check_rtt a = b = 0 Sidekiq.redis do |x| a = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :microsecond) - x.ping + x.call("PING") b = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :microsecond) end rtt = b - a diff --git a/lib/sidekiq/paginator.rb b/lib/sidekiq/paginator.rb index 59d8c8227..6ea6ea3f2 100644 --- a/lib/sidekiq/paginator.rb +++ b/lib/sidekiq/paginator.rb @@ -11,27 +11,27 @@ def page(key, pageidx = 1, page_size = 25, opts = nil) ending = starting + page_size - 1 Sidekiq.redis do |conn| - type = conn.type(key) + type = conn.call("TYPE", key) rev = opts && opts[:reverse] case type when "zset" total_size, items = conn.multi { |transaction| - transaction.zcard(key) + transaction.call("ZCARD", key) if rev - transaction.zrevrange(key, starting, ending, with_scores: true) + transaction.call("ZREVRANGE", key, starting, ending, "WITHSCORES") else - transaction.zrange(key, starting, ending, with_scores: true) + transaction.call("ZRANGE", key, starting, ending, "WITHSCORES") end } [current_page, total_size, items] when "list" total_size, items = conn.multi { |transaction| - transaction.llen(key) + transaction.call("LLEN", key) if rev - transaction.lrange(key, -ending - 1, -starting - 1) + transaction.call("LRANGE", key, -ending - 1, -starting - 1) else - transaction.lrange(key, starting, ending) + transaction.call("LRANGE", key, starting, ending) end } items.reverse! if rev diff --git a/lib/sidekiq/redis_connection.rb b/lib/sidekiq/redis_connection.rb index 7985f8590..269cf102e 100644 --- a/lib/sidekiq/redis_connection.rb +++ b/lib/sidekiq/redis_connection.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true require "connection_pool" -require "redis" +require "redis-client" require "uri" module Sidekiq @@ -15,7 +15,7 @@ def create(options = {}) end size = if symbolized_options[:size] - symbolized_options[:size] + symbolized_options.delete(:size) elsif Sidekiq.server? # Give ourselves plenty of connections. pool is lazy # so we won't create them until we need them. @@ -28,11 +28,12 @@ def create(options = {}) verify_sizing(size, Sidekiq.options[:concurrency]) if Sidekiq.server? - pool_timeout = symbolized_options[:pool_timeout] || 1 + pool_timeout = symbolized_options.delete(:pool_timeout) || 1 log_info(symbolized_options) + redis_config = build_config(symbolized_options) ConnectionPool.new(timeout: pool_timeout, size: size) do - build_client(symbolized_options) + redis_config.new_client end end @@ -50,26 +51,18 @@ def verify_sizing(size, concurrency) raise ArgumentError, "Your Redis connection pool is too small for Sidekiq. Your pool has #{size} connections but must have at least #{concurrency + 2}" if size < (concurrency + 2) end - def build_client(options) - namespace = options[:namespace] - - client = Redis.new client_opts(options) - if namespace - begin - require "redis/namespace" - Redis::Namespace.new(namespace, redis: client) - rescue LoadError - Sidekiq.logger.error("Your Redis configuration uses the namespace '#{namespace}' but the redis-namespace gem is not included in the Gemfile." \ - "Add the gem to your Gemfile to continue using a namespace. Otherwise, remove the namespace parameter.") - exit(-127) - end + def build_config(options) + opts = client_opts(options) + if opts.key?(:sentinels) + RedisClient.sentinel(**opts) else - client + RedisClient.config(**opts) end end def client_opts(options) opts = options.dup + if opts[:namespace] opts.delete(:namespace) end @@ -79,7 +72,9 @@ def client_opts(options) opts.delete(:network_timeout) end - opts[:driver] ||= Redis::Connection.drivers.last || "ruby" + opts[:name] = opts.delete(:master_name) if opts.key?(:master_name) + opts[:role] = opts[:role].to_sym if opts.key?(:role) + opts.delete(:url) if opts.key?(:sentinels) # Issue #3303, redis-rb will silently retry an operation. # This can lead to duplicate jobs if Sidekiq::Client's LPUSH diff --git a/lib/sidekiq/scheduled.rb b/lib/sidekiq/scheduled.rb index 761dab5dc..6c836e40f 100644 --- a/lib/sidekiq/scheduled.rb +++ b/lib/sidekiq/scheduled.rb @@ -48,12 +48,11 @@ def terminate def zpopbyscore(conn, keys: nil, argv: nil) if @lua_zpopbyscore_sha.nil? - raw_conn = conn.respond_to?(:redis) ? conn.redis : conn - @lua_zpopbyscore_sha = raw_conn.script(:load, LUA_ZPOPBYSCORE) + @lua_zpopbyscore_sha = conn.call("SCRIPT", "LOAD", LUA_ZPOPBYSCORE) end - conn.evalsha(@lua_zpopbyscore_sha, keys: keys, argv: argv) - rescue Redis::CommandError => e + conn.call("EVALSHA", @lua_zpopbyscore_sha, keys.size, *keys, *argv) + rescue RedisClient::CommandError => e raise unless e.message.start_with?("NOSCRIPT") @lua_zpopbyscore_sha = nil diff --git a/lib/sidekiq/web/application.rb b/lib/sidekiq/web/application.rb index 8debac91b..e5178ba62 100644 --- a/lib/sidekiq/web/application.rb +++ b/lib/sidekiq/web/application.rb @@ -44,7 +44,7 @@ def self.set(key, val) head "/" do # HEAD / is the cheapest heartbeat possible, # it hits Redis to ensure connectivity - Sidekiq.redis { |c| c.llen("queue:default") } + Sidekiq.redis { |c| c.call("LLEN", "queue:default") } "" end diff --git a/lib/sidekiq/web/helpers.rb b/lib/sidekiq/web/helpers.rb index f5953e2c6..a71729b61 100644 --- a/lib/sidekiq/web/helpers.rb +++ b/lib/sidekiq/web/helpers.rb @@ -154,7 +154,7 @@ def stats def redis_connection Sidekiq.redis do |conn| - conn.connection[:id] + conn.id end end diff --git a/sidekiq.gemspec b/sidekiq.gemspec index 16f7bda68..bd7ebd69a 100644 --- a/sidekiq.gemspec +++ b/sidekiq.gemspec @@ -22,7 +22,7 @@ Gem::Specification.new do |gem| "source_code_uri" => "https://github.com/mperham/sidekiq" } - gem.add_dependency "redis", ">= 4.5.1" + gem.add_dependency "redis-client" gem.add_dependency "connection_pool", ">= 2.2.5" gem.add_dependency "rack", "~> 2.2" end diff --git a/test/test_actors.rb b/test/test_actors.rb index 3f3ee10e1..644990460 100644 --- a/test/test_actors.rb +++ b/test/test_actors.rb @@ -17,7 +17,7 @@ def perform(slp) end before do - Sidekiq.redis { |c| c.flushdb } + Sidekiq.redis { |c| c.call("FLUSHDB") } end describe "scheduler" do diff --git a/test/test_api.rb b/test/test_api.rb index 768ccdb9d..f2036fdad 100644 --- a/test/test_api.rb +++ b/test/test_api.rb @@ -7,7 +7,7 @@ describe "API" do before do - Sidekiq.redis { |c| c.flushdb } + Sidekiq.redis { |c| c.call("FLUSHDB") } end describe "stats" do @@ -22,7 +22,7 @@ describe "processed" do it "returns number of processed jobs" do - Sidekiq.redis { |conn| conn.set("stat:processed", 5) } + Sidekiq.redis { |conn| conn.call("SET", "stat:processed", 5) } s = Sidekiq::Stats.new assert_equal 5, s.processed end @@ -30,7 +30,7 @@ describe "failed" do it "returns number of failed jobs" do - Sidekiq.redis { |conn| conn.set("stat:failed", 5) } + Sidekiq.redis { |conn| conn.call("SET", "stat:failed", 5) } s = Sidekiq::Stats.new assert_equal 5, s.failed end @@ -39,8 +39,8 @@ describe "reset" do before do Sidekiq.redis do |conn| - conn.set("stat:processed", 5) - conn.set("stat:failed", 10) + conn.call("SET", "stat:processed", 5) + conn.call("SET", "stat:failed", 10) end end @@ -76,10 +76,10 @@ describe "workers_size" do it "retrieves the number of busy workers" do Sidekiq.redis do |c| - c.sadd("processes", "process_1") - c.sadd("processes", "process_2") - c.hset("process_1", "busy", 1) - c.hset("process_2", "busy", 2) + c.call("SADD", "processes", "process_1") + c.call("SADD", "processes", "process_2") + c.call("HSET", "process_1", "busy", 1) + c.call("HSET", "process_2", "busy", 2) end s = Sidekiq::Stats.new assert_equal 3, s.workers_size @@ -94,11 +94,11 @@ it "returns a hash of queue and size in order" do Sidekiq.redis do |conn| - conn.rpush "queue:foo", "{}" - conn.sadd "queues", "foo" + conn.call "RPUSH", "queue:foo", "{}" + conn.call "SADD", "queues", "foo" - 3.times { conn.rpush "queue:bar", "{}" } - conn.sadd "queues", "bar" + 3.times { conn.call "RPUSH", "queue:bar", "{}" } + conn.call "SADD", "queues", "bar" end s = Sidekiq::Stats::Queues.new @@ -112,8 +112,8 @@ describe "enqueued" do it "handles latency for good jobs" do Sidekiq.redis do |conn| - conn.rpush "queue:default", "{\"enqueued_at\": #{Time.now.to_f}}" - conn.sadd "queues", "default" + conn.call "RPUSH", "queue:default", "{\"enqueued_at\": #{Time.now.to_f}}" + conn.call "SADD", "queues", "default" end s = Sidekiq::Stats.new assert s.default_queue_latency > 0 @@ -123,8 +123,8 @@ it "handles latency for incomplete jobs" do Sidekiq.redis do |conn| - conn.rpush "queue:default", "{}" - conn.sadd "queues", "default" + conn.call "RPUSH", "queue:default", "{}" + conn.call "SADD", "queues", "default" end s = Sidekiq::Stats.new assert_equal 0, s.default_queue_latency @@ -134,11 +134,11 @@ it "returns total enqueued jobs" do Sidekiq.redis do |conn| - conn.rpush "queue:foo", "{}" - conn.sadd "queues", "foo" + conn.call "RPUSH", "queue:foo", "{}" + conn.call "SADD", "queues", "foo" - 3.times { conn.rpush "queue:bar", "{}" } - conn.sadd "queues", "bar" + 3.times { conn.call "RPUSH", "queue:bar", "{}" } + conn.call "SADD", "queues", "bar" end s = Sidekiq::Stats.new @@ -169,10 +169,10 @@ describe "processed" do it "retrieves hash of dates" do Sidekiq.redis do |c| - c.incrby("stat:processed:2012-12-24", 4) - c.incrby("stat:processed:2012-12-25", 1) - c.incrby("stat:processed:2012-12-26", 6) - c.incrby("stat:processed:2012-12-27", 2) + c.call("INCRBY", "stat:processed:2012-12-24", 4) + c.call("INCRBY", "stat:processed:2012-12-25", 1) + c.call("INCRBY", "stat:processed:2012-12-26", 6) + c.call("INCRBY", "stat:processed:2012-12-27", 2) end Time.stub(:now, Time.parse("2012-12-26 1:00:00 -0500")) do s = Sidekiq::Stats::History.new(2) @@ -190,10 +190,10 @@ describe "failed" do it "retrieves hash of dates" do Sidekiq.redis do |c| - c.incrby("stat:failed:2012-12-24", 4) - c.incrby("stat:failed:2012-12-25", 1) - c.incrby("stat:failed:2012-12-26", 6) - c.incrby("stat:failed:2012-12-27", 2) + c.call("INCRBY", "stat:failed:2012-12-24", 4) + c.call("INCRBY", "stat:failed:2012-12-25", 1) + c.call("INCRBY", "stat:failed:2012-12-26", 6) + c.call("INCRBY", "stat:failed:2012-12-27", 2) end Time.stub(:now, Time.parse("2012-12-26 1:00:00 -0500")) do s = Sidekiq::Stats::History.new(2) @@ -415,8 +415,8 @@ class WorkerWithTags q.clear Sidekiq.redis do |conn| - refute conn.smembers("queues").include?("foo") - refute conn.exists?("queue:foo") + refute conn.call("SMEMBERS", "queues").include?("foo") + assert_equal 0, conn.call("EXISTS", "queue:foo") end end @@ -540,9 +540,9 @@ class WorkerWithTags time = Time.now.to_f Sidekiq.redis do |conn| conn.multi do |transaction| - transaction.sadd("processes", odata["key"]) - transaction.hmset(odata["key"], "info", Sidekiq.dump_json(odata), "busy", 10, "beat", time) - transaction.sadd("processes", "fake:pid") + transaction.call("SADD", "processes", odata["key"]) + transaction.call("HMSET", odata["key"], "info", Sidekiq.dump_json(odata), "busy", 10, "beat", time) + transaction.call("SADD", "processes", "fake:pid") end end @@ -556,8 +556,8 @@ class WorkerWithTags data.quiet! data.stop! signals_string = "#{odata["key"]}-signals" - assert_equal "TERM", Sidekiq.redis { |c| c.lpop(signals_string) } - assert_equal "TSTP", Sidekiq.redis { |c| c.lpop(signals_string) } + assert_equal "TERM", Sidekiq.redis { |c| c.call("LPOP", signals_string) } + assert_equal "TSTP", Sidekiq.redis { |c| c.call("LPOP", signals_string) } end it "can enumerate workers" do @@ -571,14 +571,14 @@ class WorkerWithTags key = "#{hn}:#{$$}" pdata = {"pid" => $$, "hostname" => hn, "started_at" => Time.now.to_i} Sidekiq.redis do |conn| - conn.sadd("processes", key) - conn.hmset(key, "info", Sidekiq.dump_json(pdata), "busy", 0, "beat", Time.now.to_f) + conn.call("SADD", "processes", key) + conn.call("HMSET", key, "info", Sidekiq.dump_json(pdata), "busy", 0, "beat", Time.now.to_f) end s = "#{key}:work" data = Sidekiq.dump_json({"payload" => "{}", "queue" => "default", "run_at" => Time.now.to_i}) Sidekiq.redis do |c| - c.hmset(s, "1234", data) + c.call("HMSET", s, "1234", data) end w.each do |p, x, y| @@ -593,8 +593,8 @@ class WorkerWithTags data = Sidekiq.dump_json({"payload" => {}, "queue" => "default", "run_at" => (Time.now.to_i - 2 * 60 * 60)}) Sidekiq.redis do |c| c.multi do |transaction| - transaction.hmset(s, "5678", data) - transaction.hmset("b#{s}", "5678", data) + transaction.call("HMSET", s, "5678", data) + transaction.call("HMSET", "b#{s}", "5678", data) end end @@ -623,8 +623,8 @@ class WorkerWithTags data = {"pid" => rand(10_000), "hostname" => "app#{rand(1_000)}", "started_at" => Time.now.to_f} key = "#{data["hostname"]}:#{data["pid"]}" Sidekiq.redis do |conn| - conn.sadd("processes", key) - conn.hmset(key, "info", Sidekiq.dump_json(data), "busy", 0, "beat", Time.now.to_f) + conn.call("SADD", "processes", key) + conn.call("HMSET", key, "info", Sidekiq.dump_json(data), "busy", 0, "beat", Time.now.to_f) end ps = Sidekiq::ProcessSet.new @@ -632,8 +632,8 @@ class WorkerWithTags assert_equal 1, ps.to_a.size Sidekiq.redis do |conn| - conn.sadd("processes", "bar:987") - conn.sadd("processes", "bar:986") + conn.call("SADD", "processes", "bar:987") + conn.call("SADD", "processes", "bar:986") end ps = Sidekiq::ProcessSet.new @@ -644,7 +644,7 @@ class WorkerWithTags def add_retry(jid = "bob", at = Time.now.to_f) payload = Sidekiq.dump_json("class" => "ApiWorker", "args" => [1, "mike"], "queue" => "default", "jid" => jid, "retry_count" => 2, "failed_at" => Time.now.to_f, "error_backtrace" => ["line1", "line2"]) Sidekiq.redis do |conn| - conn.zadd("retry", at.to_s, payload) + conn.call("ZADD", "retry", at.to_s, payload) end end end diff --git a/test/test_client.rb b/test/test_client.rb index 3098b44e8..583a716e4 100644 --- a/test/test_client.rb +++ b/test/test_client.rb @@ -117,7 +117,7 @@ class QueuedWorker end it "enqueues" do - Sidekiq.redis { |c| c.flushdb } + Sidekiq.redis { |c| c.call("FLUSHDB") } assert_equal Sidekiq.default_job_options, MyWorker.get_sidekiq_options assert MyWorker.perform_async(1, 2) assert Sidekiq::Client.enqueue(MyWorker, 1, 2) @@ -448,7 +448,7 @@ class DWorker < BaseWorker it "allows Resque helpers to point to different Redi" do conn = MiniTest::Mock.new conn.expect(:pipelined, []) { |*args, &block| block.call(conn) } - conn.expect(:zadd, 1, [String, Array]) + conn.expect(:call, 2, [String, String, Array]) DWorker.sidekiq_options("pool" => ConnectionPool.new(size: 1) { conn }) Sidekiq::Client.enqueue_in(10, DWorker, 3) conn.verify diff --git a/test/test_fetch.rb b/test/test_fetch.rb index 5999e0d5d..0574f9e23 100644 --- a/test/test_fetch.rb +++ b/test/test_fetch.rb @@ -7,10 +7,9 @@ describe Sidekiq::BasicFetch do before do @prev_redis = Sidekiq.instance_variable_get(:@redis) || {} - Sidekiq.redis = {namespace: "fuzzy"} Sidekiq.redis do |conn| - conn.redis.flushdb - conn.rpush("queue:basic", "msg") + conn.call("FLUSHDB") + conn.call("RPUSH","queue:basic", "msg") end end @@ -39,8 +38,8 @@ it "bulk requeues" do Sidekiq.redis do |conn| - conn.rpush("queue:foo", ["bob", "bar"]) - conn.rpush("queue:bar", "widget") + conn.call("RPUSH", "queue:foo", ["bob", "bar"]) + conn.call("RPUSH", "queue:bar", "widget") end q1 = Sidekiq::Queue.new("foo") diff --git a/test/test_job.rb b/test/test_job.rb index 54ea1d137..a2acbfd1e 100644 --- a/test/test_job.rb +++ b/test/test_job.rb @@ -11,7 +11,7 @@ def perform end def setup - Sidekiq.redis { |c| c.flushdb } + Sidekiq.redis { |c| c.call("FLUSHDB") } end it "provides basic ActiveJob compatibilility" do diff --git a/test/test_launcher.rb b/test/test_launcher.rb index 4a9c1a90d..bbae642ff 100644 --- a/test/test_launcher.rb +++ b/test/test_launcher.rb @@ -6,7 +6,7 @@ describe Sidekiq::Launcher do subject { Sidekiq::Launcher.new(options) } before do - Sidekiq.redis { |c| c.flushdb } + Sidekiq.redis { |c| c.call("FLUSHDB") } end def new_manager(opts) @@ -49,13 +49,13 @@ def new_manager(opts) it "stores process info in redis" do subject.heartbeat - workers, rtt = Sidekiq.redis { |c| c.hmget(subject.identity, "busy", "rtt_us") } + workers, rtt = Sidekiq.redis { |c| c.call("HMGET", subject.identity, "busy", "rtt_us") } assert_equal "1", workers refute_nil rtt assert_in_delta 1000, rtt.to_i, 1000 - expires = Sidekiq.redis { |c| c.pttl(subject.identity) } + expires = Sidekiq.redis { |c| c.call("PTTL", subject.identity) } assert_in_delta 60000, expires, 500 end @@ -93,11 +93,11 @@ def new_manager(opts) it "stores process info in redis" do subject.heartbeat - info = Sidekiq.redis { |c| c.hmget(subject.identity, "busy") } + info = Sidekiq.redis { |c| c.call("HMGET", subject.identity, "busy") } assert_equal ["1"], info - expires = Sidekiq.redis { |c| c.pttl(subject.identity) } + expires = Sidekiq.redis { |c| c.call("PTTL", subject.identity) } assert_in_delta 60000, expires, 50 end @@ -127,9 +127,9 @@ def new_manager(opts) end it "stores process info in redis" do - info = Sidekiq.redis { |c| c.hmget(@id, "busy") } + info = Sidekiq.redis { |c| c.call("HMGET", @id, "busy") } assert_equal ["1"], info - expires = Sidekiq.redis { |c| c.pttl(@id) } + expires = Sidekiq.redis { |c| c.call("PTTL", @id) } assert_in_delta 60000, expires, 500 end end @@ -150,9 +150,9 @@ def new_manager(opts) end it "stores process info in redis" do - info = Sidekiq.redis { |c| c.hmget(@id, "busy") } + info = Sidekiq.redis { |c| c.call("HMGET", @id, "busy") } assert_equal ["1"], info - expires = Sidekiq.redis { |c| c.pttl(@id) } + expires = Sidekiq.redis { |c| c.call("PTTL", @id) } assert_in_delta 60000, expires, 50 end end diff --git a/test/test_manager.rb b/test/test_manager.rb index c8147a971..d1d021446 100644 --- a/test/test_manager.rb +++ b/test/test_manager.rb @@ -5,7 +5,7 @@ describe Sidekiq::Manager do before do - Sidekiq.redis { |c| c.flushdb } + Sidekiq.redis { |c| c.call("FLUSHDB") } end def new_manager(opts) diff --git a/test/test_processor.rb b/test/test_processor.rb index 1407a5b08..01098f690 100644 --- a/test/test_processor.rb +++ b/test/test_processor.rb @@ -298,7 +298,7 @@ def call(worker, item, queue, redis_pool) describe "stats" do before do - Sidekiq.redis { |c| c.flushdb } + Sidekiq.redis { |c| c.call("FLUSHDB") } end describe "when successful" do @@ -336,7 +336,7 @@ def call(item, queue) describe "stats" do before do - Sidekiq.redis { |c| c.flushdb } + Sidekiq.redis { |c| c.call("FLUSHDB") } end def successful_job diff --git a/test/test_rails.rb b/test/test_rails.rb index 71f36cc3b..0263c11e1 100644 --- a/test/test_rails.rb +++ b/test/test_rails.rb @@ -6,7 +6,7 @@ describe "ActiveJob" do before do - Sidekiq.redis { |c| c.flushdb } + Sidekiq.redis { |c| c.call("FLUSHDB") } # need to force this since we aren't booting a Rails app ActiveJob::Base.queue_adapter = :sidekiq ActiveJob::Base.logger = nil diff --git a/test/test_redis_connection.rb b/test/test_redis_connection.rb index d619d9865..806dc8c4e 100644 --- a/test/test_redis_connection.rb +++ b/test/test_redis_connection.rb @@ -15,18 +15,9 @@ ENV["REDIS_URL"] = @old end - # To support both redis-rb 3.3.x #client and 4.0.x #_client - def client_for(redis) - if redis.respond_to?(:_client) - redis._client - else - redis.client - end - end - it "creates a pooled redis connection" do pool = Sidekiq::RedisConnection.create - assert_equal Redis, pool.checkout.class + assert_equal RedisClient, pool.checkout.class end # Readers for these ivars should be available in the next release of @@ -87,8 +78,8 @@ def server_connection(*args) it "disables client setname with nil id" do pool = Sidekiq::RedisConnection.create(id: nil) - assert_equal Redis, pool.checkout.class - assert_equal "redis://localhost:6379/15", pool.checkout.connection.fetch(:id) + assert_equal RedisClient, pool.checkout.class + assert_nil pool.checkout.id end describe "network_timeout" do @@ -96,48 +87,28 @@ def server_connection(*args) pool = Sidekiq::RedisConnection.create(network_timeout: 8) redis = pool.checkout - assert_equal 8, client_for(redis).timeout + assert_equal 8, redis.read_timeout end it "uses the default network_timeout if none specified" do pool = Sidekiq::RedisConnection.create redis = pool.checkout - assert_equal 5, client_for(redis).timeout - end - end - - describe "namespace" do - it "uses a given :namespace set by a symbol key" do - pool = Sidekiq::RedisConnection.create(namespace: "xxx") - assert_equal "xxx", pool.checkout.namespace - end - - it "uses a given :namespace set by a string key" do - pool = Sidekiq::RedisConnection.create("namespace" => "xxx") - assert_equal "xxx", pool.checkout.namespace - end - - it "uses given :namespace over :namespace from Sidekiq.options" do - Sidekiq.options[:namespace] = "xxx" - pool = Sidekiq::RedisConnection.create(namespace: "yyy") - assert_equal "yyy", pool.checkout.namespace + assert_equal 1.0, redis.read_timeout end end describe "socket path" do it "uses a given :path" do pool = Sidekiq::RedisConnection.create(path: "/var/run/redis.sock") - assert_equal "unix", client_for(pool.checkout).scheme - assert_equal "/var/run/redis.sock", pool.checkout.connection.fetch(:location) - assert_equal 15, pool.checkout.connection.fetch(:db) + assert_equal "/var/run/redis.sock", pool.checkout.config.path end + end - it "uses a given :path and :db" do - pool = Sidekiq::RedisConnection.create(path: "/var/run/redis.sock", db: 8) - assert_equal "unix", client_for(pool.checkout).scheme - assert_equal "/var/run/redis.sock", pool.checkout.connection.fetch(:location) - assert_equal 8, pool.checkout.connection.fetch(:db) + describe "db" do + it "uses a given :db" do + pool = Sidekiq::RedisConnection.create(db: 8) + assert_includes pool.checkout.call("CLIENT", "INFO"), " db=8 " end end @@ -155,35 +126,6 @@ def server_connection(*args) end end - describe "driver" do - it "uses redis' ruby driver" do - pool = Sidekiq::RedisConnection.create - redis = pool.checkout - - assert_equal Redis::Connection::Ruby, redis.instance_variable_get(:@client).driver - end - - it "uses redis' default driver if there are many available" do - redis_driver = Object.new - Redis::Connection.drivers << redis_driver - - pool = Sidekiq::RedisConnection.create - redis = pool.checkout - - assert_equal redis_driver, redis.instance_variable_get(:@client).driver - ensure - Redis::Connection.drivers.pop - end - - it "uses a given :driver" do - redis_driver = Object.new - pool = Sidekiq::RedisConnection.create(driver: redis_driver) - redis = pool.checkout - - assert_equal redis_driver, redis.instance_variable_get(:@client).driver - end - end - describe "logging redis options" do it "redacts credentials" do options = { diff --git a/test/test_retry.rb b/test/test_retry.rb index b9d0a5c9f..dcd265336 100644 --- a/test/test_retry.rb +++ b/test/test_retry.rb @@ -18,7 +18,7 @@ def message end before do - Sidekiq.redis { |c| c.flushdb } + Sidekiq.redis { |c| c.call("FLUSHDB") } end def worker diff --git a/test/test_scheduled.rb b/test/test_scheduled.rb index 4d50fcddf..6d142392f 100644 --- a/test/test_scheduled.rb +++ b/test/test_scheduled.rb @@ -12,7 +12,7 @@ def perform(x) describe "poller" do before do - Sidekiq.redis { |c| c.flushdb } + Sidekiq.redis { |c| c.call("FLUSHDB") } @error_1 = {"class" => ScheduledWorker.name, "args" => [0], "queue" => "queue_1"} @error_2 = {"class" => ScheduledWorker.name, "args" => [1], "queue" => "queue_2"} @error_3 = {"class" => ScheduledWorker.name, "args" => [2], "queue" => "queue_3"} @@ -68,8 +68,8 @@ def call(worker_class, job, queue, r) Sidekiq.redis do |conn| %w[queue:queue_1 queue:queue_2 queue:queue_4 queue:queue_5].each do |queue_name| - assert_equal 1, conn.llen(queue_name) - job = Sidekiq.load_json(conn.lrange(queue_name, 0, -1)[0]) + assert_equal 1, conn.call("LLEN", queue_name) + job = Sidekiq.load_json(conn.call("LRANGE", queue_name, 0, -1)[0]) assert_equal enqueued_time.to_f, job["enqueued_at"] assert_equal created_time.to_f, job["created_at"] end @@ -95,7 +95,7 @@ def call(worker_class, job, queue, r) Sidekiq.redis do |conn| %w[queue:queue_1 queue:queue_4].each do |queue_name| - assert_equal 0, conn.llen(queue_name) + assert_equal 0, conn.call("LLEN", queue_name) end end @@ -128,8 +128,8 @@ def with_sidekiq_option(name, value) with_sidekiq_option(:average_scheduled_poll_interval, 10) do 3.times do |i| Sidekiq.redis do |conn| - conn.sadd("processes", "process-#{i}") - conn.hset("process-#{i}", "info", nil) + conn.call("SADD", "processes", "process-#{i}") + conn.call("HSET", "process-#{i}", "info", "") end end diff --git a/test/test_sidekiq.rb b/test/test_sidekiq.rb index 175791f7a..e5a693299 100644 --- a/test/test_sidekiq.rb +++ b/test/test_sidekiq.rb @@ -69,9 +69,9 @@ describe "redis connection" do it "does not continually retry" do - assert_raises Redis::CommandError do + assert_raises RedisClient::CommandError do Sidekiq.redis do |c| - raise Redis::CommandError, "READONLY You can't write against a replica." + raise RedisClient::CommandError, "READONLY You can't write against a replica." end end end @@ -79,8 +79,8 @@ it "reconnects if connection is flagged as readonly" do counts = [] Sidekiq.redis do |c| - counts << c.info["total_connections_received"].to_i - raise Redis::CommandError, "READONLY You can't write against a replica." if counts.size == 1 + counts << c.call("INFO").match(/total_connections_received:(\d+)/)[1].to_i + raise RedisClient::CommandError, "READONLY You can't write against a replica." if counts.size == 1 end assert_equal 2, counts.size assert_equal counts[0] + 1, counts[1] @@ -89,8 +89,8 @@ it "reconnects if instance state changed" do counts = [] Sidekiq.redis do |c| - counts << c.info["total_connections_received"].to_i - raise Redis::CommandError, "UNBLOCKED force unblock from blocking operation, instance state changed (master -> replica?)" if counts.size == 1 + counts << c.call("INFO").match(/total_connections_received:(\d+)/)[1].to_i + raise RedisClient::CommandError, "UNBLOCKED force unblock from blocking operation, instance state changed (master -> replica?)" if counts.size == 1 end assert_equal 2, counts.size assert_equal counts[0] + 1, counts[1] diff --git a/test/test_sidekiqmon.rb b/test/test_sidekiqmon.rb index df406b8ad..b28306f2b 100644 --- a/test/test_sidekiqmon.rb +++ b/test/test_sidekiqmon.rb @@ -19,7 +19,7 @@ def output(section = "all") describe Sidekiq::Monitor do before do - Sidekiq.redis { |c| c.flushdb } + Sidekiq.redis { |c| c.call("FLUSHDB") } end describe "status" do diff --git a/test/test_web.rb b/test/test_web.rb index 5c0007421..57b5bbe42 100644 --- a/test/test_web.rb +++ b/test/test_web.rb @@ -17,7 +17,7 @@ def job_params(job, score) end before do - Sidekiq.redis { |c| c.flushdb } + Sidekiq.redis { |c| c.call("FLUSHDB") } app.middlewares.clear end @@ -62,12 +62,12 @@ def perform(a, b) describe "busy" do it "can display workers" do Sidekiq.redis do |conn| - conn.incr("busy") - conn.sadd("processes", "foo:1234") - conn.hmset("foo:1234", "info", Sidekiq.dump_json("hostname" => "foo", "started_at" => Time.now.to_f, "queues" => [], "concurrency" => 10), "at", Time.now.to_f, "busy", 4) + conn.call("INCR", "busy") + conn.call("SADD", "processes", "foo:1234") + conn.call("HMSET", "foo:1234", "info", Sidekiq.dump_json("hostname" => "foo", "started_at" => Time.now.to_f, "queues" => [], "concurrency" => 10), "at", Time.now.to_f, "busy", 4) identity = "foo:1234:work" hash = {queue: "critical", payload: {"class" => WebWorker.name, "args" => [1, "abc"]}, run_at: Time.now.to_i} - conn.hmset(identity, 1001, Sidekiq.dump_json(hash)) + conn.call("HMSET", identity, 1001, Sidekiq.dump_json(hash)) end assert_equal ["1001"], Sidekiq::WorkSet.new.map { |pid, tid, data| tid } @@ -82,20 +82,20 @@ def perform(a, b) identity = "identity" signals_key = "#{identity}-signals" - assert_nil Sidekiq.redis { |c| c.lpop signals_key } + assert_nil Sidekiq.redis { |c| c.call "LPOP", signals_key } post "/busy", "quiet" => "1", "identity" => identity assert_equal 302, last_response.status - assert_equal "TSTP", Sidekiq.redis { |c| c.lpop signals_key } + assert_equal "TSTP", Sidekiq.redis { |c| c.call "LPOP", signals_key } end it "can stop a process" do identity = "identity" signals_key = "#{identity}-signals" - assert_nil Sidekiq.redis { |c| c.lpop signals_key } + assert_nil Sidekiq.redis { |c| c.call "LPOP", signals_key } post "/busy", "stop" => "1", "identity" => identity assert_equal 302, last_response.status - assert_equal "TERM", Sidekiq.redis { |c| c.lpop signals_key } + assert_equal "TERM", Sidekiq.redis { |c| c.call "LPOP", signals_key } end end @@ -137,7 +137,7 @@ def perform(a, b) it "can sort on enqueued_at column" do Sidekiq.redis do |conn| (1000..1005).each do |i| - conn.lpush("queue:default", Sidekiq.dump_json(args: [i], enqueued_at: Time.now.to_i + i)) + conn.call("LPUSH", "queue:default", Sidekiq.dump_json(args: [i], enqueued_at: Time.now.to_i + i)) end end @@ -152,8 +152,8 @@ def perform(a, b) it "can delete a queue" do Sidekiq.redis do |conn| - conn.rpush("queue:foo", "{\"args\":[],\"enqueued_at\":1567894960}") - conn.sadd("queues", "foo") + conn.call("RPUSH", "queue:foo", "{\"args\":[],\"enqueued_at\":1567894960}") + conn.call("SADD", "queues", "foo") end get "/queues/foo" @@ -163,8 +163,8 @@ def perform(a, b) assert_equal 302, last_response.status Sidekiq.redis do |conn| - refute conn.smembers("queues").include?("foo") - refute conn.exists?("queue:foo") + refute conn.call("SMEMBERS", "queues").include?("foo") + refute conn.call("EXISTS", "queue:foo") > 0 end end @@ -242,9 +242,9 @@ def perform(a, b) it "can delete a job" do Sidekiq.redis do |conn| - conn.rpush("queue:foo", '{"args":[],"enqueued_at":1567894960}') - conn.rpush("queue:foo", '{"foo":"bar","args":[],"enqueued_at":1567894960}') - conn.rpush("queue:foo", '{"foo2":"bar2","args":[],"enqueued_at":1567894960}') + conn.call("RPUSH", "queue:foo", '{"args":[],"enqueued_at":1567894960}') + conn.call("RPUSH", "queue:foo", '{"foo":"bar","args":[],"enqueued_at":1567894960}') + conn.call("RPUSH", "queue:foo", '{"foo2":"bar2","args":[],"enqueued_at":1567894960}') end get "/queues/foo" @@ -254,7 +254,7 @@ def perform(a, b) assert_equal 302, last_response.status Sidekiq.redis do |conn| - refute conn.lrange("queue:foo", 0, -1).include?("{\"foo\":\"bar\"}") + refute conn.call("LRANGE", "queue:foo", 0, -1).include?("{\"foo\":\"bar\"}") end end @@ -388,11 +388,11 @@ def perform(a, b) it "can delete scheduled" do params = add_scheduled Sidekiq.redis do |conn| - assert_equal 1, conn.zcard("schedule") + assert_equal 1, conn.call("ZCARD", "schedule") post "/scheduled", "key" => [job_params(*params)], "delete" => "Delete" assert_equal 302, last_response.status assert_equal "http://example.org/scheduled", last_response.header["Location"] - assert_equal 0, conn.zcard("schedule") + assert_equal 0, conn.call("ZCARD", "schedule") end end @@ -400,12 +400,12 @@ def perform(a, b) q = Sidekiq::Queue.new params = add_scheduled Sidekiq.redis do |conn| - assert_equal 1, conn.zcard("schedule") + assert_equal 1, conn.call("ZCARD", "schedule") assert_equal 0, q.size post "/scheduled", "key" => [job_params(*params)], "add_to_queue" => "AddToQueue" assert_equal 302, last_response.status assert_equal "http://example.org/scheduled", last_response.header["Location"] - assert_equal 0, conn.zcard("schedule") + assert_equal 0, conn.call("ZCARD", "schedule") assert_equal 1, q.size get "/queues/default" assert_equal 200, last_response.status @@ -443,12 +443,12 @@ def perform(a, b) # on /workers page Sidekiq.redis do |conn| pro = "foo:1234" - conn.sadd("processes", pro) - conn.hmset(pro, "info", Sidekiq.dump_json("started_at" => Time.now.to_f, "labels" => ["frumduz"], "queues" => [], "concurrency" => 10), "busy", 1, "beat", Time.now.to_f) + conn.call("SADD", "processes", pro) + conn.call("HMSET", pro, "info", Sidekiq.dump_json("started_at" => Time.now.to_f, "labels" => ["frumduz"], "queues" => [], "concurrency" => 10), "busy", 1, "beat", Time.now.to_f) identity = "#{pro}:work" hash = {queue: "critical", payload: {"class" => "FailWorker", "args" => ["hello"]}, run_at: Time.now.to_i} - conn.hmset(identity, 100001, Sidekiq.dump_json(hash)) - conn.incr("busy") + conn.call("HMSET", identity, 100001, Sidekiq.dump_json(hash)) + conn.call("INCR", "busy") end get "/busy" @@ -518,9 +518,9 @@ def perform(a, b) before do Sidekiq.redis do |conn| - conn.set("stat:processed", 5) - conn.set("stat:failed", 2) - conn.sadd("queues", "default") + conn.call("SET", "stat:processed", 5) + conn.call("SET", "stat:failed", 2) + conn.call("SADD", "queues", "default") end 2.times { add_retry } 3.times { add_scheduled } @@ -573,10 +573,10 @@ def perform(a, b) before do Sidekiq.redis do |conn| - conn.set("stat:processed", 5) - conn.set("stat:failed", 2) - conn.sadd("queues", "default") - conn.sadd("queues", "queue2") + conn.call("SET", "stat:processed", 5) + conn.call("SET", "stat:failed", 2) + conn.call("SADD", "queues", "default") + conn.call("SADD", "queues", "queue2") end 2.times { add_retry } 3.times { add_scheduled } @@ -646,7 +646,7 @@ def add_scheduled "jid" => SecureRandom.hex(12), "tags" => ["tag1", "tag2"]} Sidekiq.redis do |conn| - conn.zadd("schedule", score, Sidekiq.dump_json(msg)) + conn.call("ZADD", "schedule", score, Sidekiq.dump_json(msg)) end [msg, score] end @@ -662,7 +662,7 @@ def add_retry "jid" => SecureRandom.hex(12)} score = Time.now.to_f Sidekiq.redis do |conn| - conn.zadd("retry", score, Sidekiq.dump_json(msg)) + conn.call("ZADD", "retry", score, Sidekiq.dump_json(msg)) end [msg, score] @@ -679,7 +679,7 @@ def add_dead(jid = SecureRandom.hex(12)) "jid" => jid} score = Time.now.to_f Sidekiq.redis do |conn| - conn.zadd("dead", score, Sidekiq.dump_json(msg)) + conn.call("ZADD", "dead", score, Sidekiq.dump_json(msg)) end [msg, score] end @@ -688,7 +688,7 @@ def kill_bad job = "{ something bad }" score = Time.now.to_f Sidekiq.redis do |conn| - conn.zadd("dead", score, job) + conn.call("ZADD", "dead", score, job) end [job, score] end @@ -704,7 +704,7 @@ def add_xss_retry(job_id = SecureRandom.hex(12)) "jid" => SecureRandom.hex(12)} score = Time.now.to_f Sidekiq.redis do |conn| - conn.zadd("retry", score, Sidekiq.dump_json(msg)) + conn.call("ZADD", "retry", score, Sidekiq.dump_json(msg)) end [msg, score] @@ -715,9 +715,9 @@ def add_worker msg = "{\"queue\":\"default\",\"payload\":{\"retry\":true,\"queue\":\"default\",\"timeout\":20,\"backtrace\":5,\"class\":\"HardWorker\",\"args\":[\"bob\",10,5],\"jid\":\"2b5ad2b016f5e063a1c62872\"},\"run_at\":1361208995}" Sidekiq.redis do |conn| conn.multi do |transaction| - transaction.sadd("processes", key) - transaction.hmset(key, "info", Sidekiq.dump_json("hostname" => "foo", "started_at" => Time.now.to_f, "queues" => []), "at", Time.now.to_f, "busy", 4) - transaction.hmset("#{key}:work", Time.now.to_f, msg) + transaction.call("SADD", "processes", key) + transaction.call("HMSET", key, "info", Sidekiq.dump_json("hostname" => "foo", "started_at" => Time.now.to_f, "queues" => []), "at", Time.now.to_f, "busy", 4) + transaction.call("HMSET", "#{key}:work", Time.now.to_f, msg) end end end