Skip to content

Commit

Permalink
Separate cluster helper methods to ClusterHelperMethods
Browse files Browse the repository at this point in the history
  • Loading branch information
gussan committed Dec 25, 2014
1 parent 4767b08 commit 3259d5c
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 75 deletions.
2 changes: 2 additions & 0 deletions lib/active_record/turntable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ module ActiveRecord::Turntable
autoload :Algorithm
autoload :Base
autoload :Cluster
autoload :ClusterHelperMethods
autoload :Config
autoload :ConnectionProxy
autoload :MasterShard
Expand All @@ -35,6 +36,7 @@ module ActiveRecord::Turntable

included do
include ActiveRecordExt
include ClusterHelperMethods
include Base
end

Expand Down
75 changes: 0 additions & 75 deletions lib/active_record/turntable/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,38 +34,8 @@ def turntable(cluster_name, shard_key_name, options = {})
self.turntable_clusters[cluster_name][self] = turntable_cluster

turntable_replace_connection_pool
turntable_define_cluster_methods(cluster_name)
end

#
def force_transaction_all_shards!(options={}, &block)
force_connect_all_shards!
shards = turntable_connections.values
shards += [ActiveRecord::Base.connection_pool]
recursive_transaction(shards, options, &block)
end

def recursive_transaction(pools, options, &block)
pool = pools.shift
if pools.present?
pool.connection.transaction(options) do
recursive_transaction(pools, options, &block)
end
else
pool.connection.transaction(options, &block)
end
end

def force_connect_all_shards!
conf = configurations[Rails.env]
shards = {}
shards = shards.merge(conf["shards"]) if conf["shards"]
shards = shards.merge(conf["seq"]) if conf["seq"]
shards.each do |name, config|
turntable_connections[name] ||=
ActiveRecord::ConnectionAdapters::ConnectionPool.new(spec_for(config))
end
end

def turntable_replace_connection_pool
ch = connection_handler
Expand Down Expand Up @@ -114,16 +84,6 @@ def current_last_shard
turntable_cluster.select_shard(current_sequence) if sequencer_enabled?
end

def weighted_random_shard_with(*klasses, &block)
shards_weight = self.turntable_cluster.weighted_shards
sum = shards_weight.values.inject(&:+)
idx = rand(sum)
shard, weight = shards_weight.find {|k,v|
(idx -= v) < 0
}
self.connection.with_recursive_shards(shard.name, *klasses, &block)
end

def with_shard(any_shard)
shard = case any_shard
when Numeric
Expand All @@ -135,41 +95,6 @@ def with_shard(any_shard)
end
connection.with_shard(shard) { yield }
end

private

def turntable_define_cluster_methods(cluster_name)
turntable_define_cluster_class_methods(cluster_name)
end

def turntable_define_cluster_class_methods(cluster_name)
(class << ActiveRecord::Base; self; end).class_eval <<-EOD
unless respond_to?(:#{cluster_name}_transaction)
def #{cluster_name}_transaction(shards = [], options = {})
cluster = turntable_clusters[#{cluster_name.inspect}].values.first
cluster.shards_transaction(shards, options) { yield }
end
end
unless respond_to?(:all_cluster_transaction)
def all_cluster_transaction(options = {})
clusters = turntable_clusters.values.map { |v| v.values.first }
recursive_cluster_transaction(clusters) { yield }
end
def recursive_cluster_transaction(clusters, options = {}, &block)
current_cluster = clusters.shift
current_cluster.shards_transaction do
if clusters.present?
recursive_cluster_transaction(clusters, options, &block)
else
yield
end
end
end
end
EOD
end
end

def shards_transaction(options = {}, &block)
Expand Down
77 changes: 77 additions & 0 deletions lib/active_record/turntable/cluster_helper_methods.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
module ActiveRecord::Turntable
module ClusterHelperMethods
extend ActiveSupport::Concern

module ClassMethods
def force_transaction_all_shards!(options={}, &block)
force_connect_all_shards!
shards = turntable_connections.values
shards += [ActiveRecord::Base.connection_pool]
recursive_transaction(shards, options, &block)
end

def recursive_transaction(pools, options, &block)
pool = pools.shift
if pools.present?
pool.connection.transaction(options) do
recursive_transaction(pools, options, &block)
end
else
pool.connection.transaction(options, &block)
end
end

def force_connect_all_shards!
conf = configurations[Rails.env]
shards = {}
shards = shards.merge(conf["shards"]) if conf["shards"]
shards = shards.merge(conf["seq"]) if conf["seq"]
shards.each do |name, config|
turntable_connections[name] ||=
ActiveRecord::ConnectionAdapters::ConnectionPool.new(spec_for(config))
end
end

def weighted_random_shard_with(*klasses, &block)
shards_weight = self.turntable_cluster.weighted_shards
sum = shards_weight.values.inject(&:+)
idx = rand(sum)
shard, weight = shards_weight.find {|k,v|
(idx -= v) < 0
}
self.connection.with_recursive_shards(shard.name, *klasses, &block)
end

def all_cluster_transaction(options = {})
clusters = turntable_clusters.values.map { |v| v.values.first }
recursive_cluster_transaction(clusters) { yield }
end

def recursive_cluster_transaction(clusters, options = {}, &block)
current_cluster = clusters.shift
current_cluster.shards_transaction do
if clusters.present?
recursive_cluster_transaction(clusters, options, &block)
else
yield
end
end
end

def turntable_define_cluster_methods(cluster_name)
turntable_define_cluster_class_methods(cluster_name)
end

def turntable_define_cluster_class_methods(cluster_name)
(class << ActiveRecord::Base; self; end).class_eval <<-EOD
unless respond_to?(:#{cluster_name}_transaction)
def #{cluster_name}_transaction(shards = [], options = {})
cluster = turntable_clusters[#{cluster_name.inspect}].values.first
cluster.shards_transaction(shards, options) { yield }
end
end
EOD
end
end
end
end

0 comments on commit 3259d5c

Please sign in to comment.