Skip to content

Commit

Permalink
feat(bigtable): Add support for channel pooling (googleapis#22747)
Browse files Browse the repository at this point in the history
  • Loading branch information
diptanshumittal authored Sep 13, 2023
1 parent 84b1dfc commit a525d9d
Show file tree
Hide file tree
Showing 16 changed files with 295 additions and 63 deletions.
2 changes: 1 addition & 1 deletion google-cloud-bigtable/.rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Metrics/BlockLength:
- "Rakefile"
- "lib/google-cloud-bigtable.rb"
Metrics/ClassLength:
Max: 300
Max: 400
Metrics/PerceivedComplexity:
Max: 12
Naming/FileName:
Expand Down
3 changes: 2 additions & 1 deletion google-cloud-bigtable/acceptance/bigtable/service_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

describe Google::Cloud::Bigtable::Service, :bigtable do
let(:config_metadata) { { :"google-cloud-resource-prefix" => "projects/#{bigtable.project_id}" } }
let(:read_table) { bigtable_read_table }

it "passes the correct configuration to its v2 instance admin client" do
_(bigtable.project_id).wont_be :empty?
Expand All @@ -40,7 +41,7 @@

it "passes the correct configuration to its v2 client" do
_(bigtable.project_id).wont_be :empty?
config = bigtable.service.client.configure
config = bigtable.service.client(read_table.path, read_table.app_profile_id).configure
_(config).must_be_kind_of Google::Cloud::Bigtable::V2::Bigtable::Client::Configuration
_(config.lib_name).must_equal "gccl"
_(config.lib_version).must_equal Google::Cloud::Bigtable::VERSION
Expand Down
4 changes: 4 additions & 0 deletions google-cloud-bigtable/acceptance/bigtable_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ def bigtable_kms_key
ENV["BIGTABLE_TEST_KMS_KEY"] || "projects/helical-zone-771/locations/us-east1/keyRings/bigtable-test/cryptoKeys/bigtable-test-1"
end

def bigtable_read_table
$bigtable.table bigtable_instance_id, $bigtable_read_table_id
end

create_test_instance(
bigtable_instance_id,
bigtable_cluster_id,
Expand Down
3 changes: 2 additions & 1 deletion google-cloud-bigtable/google-cloud-bigtable.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ Gem::Specification.new do |gem|

gem.required_ruby_version = ">= 2.6"

gem.add_dependency "concurrent-ruby", "~> 1.0"
gem.add_dependency "google-cloud-bigtable-admin-v2", "~> 0.0"
gem.add_dependency "google-cloud-bigtable-v2", "~> 0.0"
gem.add_dependency "google-cloud-bigtable-v2", "~> 0.14"
gem.add_dependency "google-cloud-core", "~> 1.5"

gem.add_development_dependency "google-style", "~> 1.26.1"
Expand Down
2 changes: 2 additions & 0 deletions google-cloud-bigtable/lib/google-cloud-bigtable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,6 @@ def self.bigtable project_id: nil, credentials: nil, scope: nil, timeout: nil
config.add_field! :emulator_host, default_emulator, match: String, allow_nil: true
config.add_field! :endpoint, "bigtable.googleapis.com", match: String
config.add_field! :endpoint_admin, "bigtableadmin.googleapis.com", match: String
config.add_field! :channel_selection, :least_loaded, match: Symbol
config.add_field! :channel_count, 1, match: Integer
end
33 changes: 26 additions & 7 deletions google-cloud-bigtable/lib/google/cloud/bigtable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,37 +63,52 @@ module Bigtable
# updater_proc is supplied.
# @param timeout [Integer]
# The default timeout, in seconds, for calls made through this client. Optional.
# @param channel_selection [Symbol] The algorithm for selecting a channel from the
# pool of available channels. This parameter can have the following symbols:
# * `:least_loaded` selects the channel having least number of concurrent streams.
# @param channel_count [Integer] The number of channels in the pool.
# @return [Google::Cloud::Bigtable::Project]
#
# @example
# require "google/cloud/bigtable"
#
# client = Google::Cloud::Bigtable.new
#
# rubocop:disable Metrics/CyclomaticComplexity
# rubocop:disable Metrics/AbcSize
def self.new project_id: nil,
credentials: nil,
emulator_host: nil,
scope: nil,
endpoint: nil,
endpoint_admin: nil,
timeout: nil
project_id ||= default_project_id
scope ||= configure.scope
timeout ||= configure.timeout
timeout: nil,
channel_selection: nil,
channel_count: nil
project_id ||= default_project_id
scope ||= configure.scope
timeout ||= configure.timeout
emulator_host ||= configure.emulator_host
endpoint ||= configure.endpoint
endpoint ||= configure.endpoint
endpoint_admin ||= configure.endpoint_admin
channel_selection ||= configure.channel_selection
channel_count ||= configure.channel_count

return new_with_emulator project_id, emulator_host, timeout if emulator_host

credentials = resolve_credentials credentials, scope
project_id = resolve_project_id project_id, credentials
raise ArgumentError, "project_id is missing" if project_id.empty?

service = Bigtable::Service.new \
project_id, credentials, host: endpoint, host_admin: endpoint_admin, timeout: timeout
service = Bigtable::Service.new project_id, credentials, host: endpoint,
host_admin: endpoint_admin, timeout: timeout,
channel_selection: channel_selection,
channel_count: channel_count
Bigtable::Project.new service
end
# rubocop:enable Metrics/CyclomaticComplexity
# rubocop:enable Metrics/AbcSize


##
# Configure the Google Cloud Bigtable library.
Expand All @@ -116,6 +131,10 @@ def self.new project_id: nil,
# to use the default endpoint.
# * `endpoint_admin` - (String) Override of the admin service endpoint
# host name, or `nil` to use the default admin endpoint.
# * `channel_selection` - (Symbol) The algorithm for selecting a channel from the
# pool of available channels. This parameter can have the following symbols:
# `:least_loaded` selects the channel having least number of concurrent streams.
# * `channel_count` - (Integer) The number of channels in the pool.
#
# @return [Google::Cloud::Config] The configuration object the
# Google::Cloud::Bigtable library uses.
Expand Down
34 changes: 34 additions & 0 deletions google-cloud-bigtable/lib/google/cloud/bigtable/convert.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@

require "time"
require "date"
require "gapic/protobuf"
require "gapic/call_options"
require "gapic/headers"
require "google/cloud/bigtable/version"
require "google/cloud/bigtable/v2/version"
require "google/bigtable/v2/bigtable_pb"
require "google/cloud/bigtable/admin/v2"

module Google
module Cloud
Expand Down Expand Up @@ -89,6 +96,33 @@ def integer_to_signed_be_64 value
return [value].pack "q>" if value.is_a? Integer
value
end

def ping_and_warm_request table_path, app_profile_id, timeout
request = {
name: table_path.split("/").slice(0, 4).join("/"),
app_profile_id: app_profile_id
}
request = ::Gapic::Protobuf.coerce request, to: ::Google::Cloud::Bigtable::V2::PingAndWarmRequest

header_params = {}
if request.name && %r{^projects/[^/]+/instances/[^/]+/?$}.match?(request.name)
header_params["name"] = request.name
end
if request.app_profile_id && !request.app_profile_id.empty?
header_params["app_profile_id"] = request.app_profile_id
end
request_params_header = URI.encode_www_form header_params
metadata = {
"x-goog-request-params": request_params_header,
"x-goog-api-client":
::Gapic::Headers.x_goog_api_client(lib_name: "gccl",
lib_version: ::Google::Cloud::Bigtable::VERSION,
gapic_version: ::Google::Cloud::Bigtable::V2::VERSION),
"google-cloud-resource-prefix": "projects/#{table_path.split('/')[1]}"
}
options = ::Gapic::CallOptions.new timeout: timeout, metadata: metadata
[request, options]
end
end
end
end
Expand Down
15 changes: 6 additions & 9 deletions google-cloud-bigtable/lib/google/cloud/bigtable/instance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -521,15 +521,12 @@ def table table_id, view: nil, perform_lookup: nil, app_profile_id: nil
ensure_service!

view ||= :SCHEMA_VIEW
table = if perform_lookup
grpc = service.get_table instance_id, table_id, view: view
Table.from_grpc grpc, service, view: view
else
Table.from_path service.table_path(instance_id, table_id), service
end

table.app_profile_id = app_profile_id
table
if perform_lookup
grpc = service.get_table instance_id, table_id, view: view
Table.from_grpc grpc, service, view: view, app_profile_id: app_profile_id
else
Table.from_path service.table_path(instance_id, table_id), service, app_profile_id: app_profile_id
end
rescue Google::Cloud::NotFoundError
nil
end
Expand Down
15 changes: 6 additions & 9 deletions google-cloud-bigtable/lib/google/cloud/bigtable/project.rb
Original file line number Diff line number Diff line change
Expand Up @@ -343,15 +343,12 @@ def table instance_id, table_id, view: nil, perform_lookup: nil, app_profile_id:
ensure_service!

view ||= :SCHEMA_VIEW
table = if perform_lookup
grpc = service.get_table instance_id, table_id, view: view
Table.from_grpc grpc, service, view: view
else
Table.from_path service.table_path(instance_id, table_id), service
end

table.app_profile_id = app_profile_id
table
if perform_lookup
grpc = service.get_table instance_id, table_id, view: view
Table.from_grpc grpc, service, view: view, app_profile_id: app_profile_id
else
Table.from_path service.table_path(instance_id, table_id), service, app_profile_id: app_profile_id
end
rescue Google::Cloud::NotFoundError
nil
end
Expand Down
55 changes: 40 additions & 15 deletions google-cloud-bigtable/lib/google/cloud/bigtable/service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
require "google/cloud/bigtable/credentials"
require "google/cloud/bigtable/v2"
require "google/cloud/bigtable/admin/v2"
require "google/cloud/bigtable/convert"
require "gapic/lru_hash"
require "concurrent"

module Google
module Cloud
Expand Down Expand Up @@ -49,12 +52,17 @@ class Service
# @param timeout [Integer]
# The default timeout, in seconds, for calls made through this client.
#
def initialize project_id, credentials, host: nil, host_admin: nil, timeout: nil
def initialize project_id, credentials, host: nil, host_admin: nil, timeout: nil,
channel_selection: nil, channel_count: nil
@project_id = project_id
@credentials = credentials
@host = host
@host_admin = host_admin
@timeout = timeout
@channel_selection = channel_selection
@channel_count = channel_count
@bigtable_clients = ::Gapic::LruHash.new 10
@mutex = Mutex.new
end

def instances
Expand Down Expand Up @@ -83,15 +91,15 @@ def tables
end
attr_accessor :mocked_tables

def client
def client table_path, app_profile_id
return mocked_client if mocked_client
@client ||= V2::Bigtable::Client.new do |config|
config.credentials = credentials if credentials
config.timeout = timeout if timeout
config.endpoint = host if host
config.lib_name = "gccl"
config.lib_version = Google::Cloud::Bigtable::VERSION
config.metadata = { "google-cloud-resource-prefix": "projects/#{@project_id}" }
table_key = "#{table_path}_#{app_profile_id}"
@mutex.synchronize do
if @bigtable_clients.get(table_key).nil?
bigtable_client = create_bigtable_client table_path, app_profile_id
@bigtable_clients.put table_key, bigtable_client
end
@bigtable_clients.get table_key
end
end
attr_accessor :mocked_client
Expand Down Expand Up @@ -649,7 +657,7 @@ def test_table_permissions instance_id, table_id, permissions
end

def read_rows instance_id, table_id, app_profile_id: nil, rows: nil, filter: nil, rows_limit: nil
client.read_rows(
client(table_path(instance_id, table_id), app_profile_id).read_rows(
**{
table_name: table_path(instance_id, table_id),
rows: rows,
Expand All @@ -661,11 +669,11 @@ def read_rows instance_id, table_id, app_profile_id: nil, rows: nil, filter: nil
end

def sample_row_keys table_name, app_profile_id: nil
client.sample_row_keys table_name: table_name, app_profile_id: app_profile_id
client(table_name, app_profile_id).sample_row_keys table_name: table_name, app_profile_id: app_profile_id
end

def mutate_row table_name, row_key, mutations, app_profile_id: nil
client.mutate_row(
client(table_name, app_profile_id).mutate_row(
**{
table_name: table_name,
app_profile_id: app_profile_id,
Expand All @@ -676,7 +684,7 @@ def mutate_row table_name, row_key, mutations, app_profile_id: nil
end

def mutate_rows table_name, entries, app_profile_id: nil
client.mutate_rows(
client(table_name, app_profile_id).mutate_rows(
**{
table_name: table_name,
app_profile_id: app_profile_id,
Expand All @@ -691,7 +699,7 @@ def check_and_mutate_row table_name,
predicate_filter: nil,
true_mutations: nil,
false_mutations: nil
client.check_and_mutate_row(
client(table_name, app_profile_id).check_and_mutate_row(
**{
table_name: table_name,
app_profile_id: app_profile_id,
Expand All @@ -704,7 +712,7 @@ def check_and_mutate_row table_name,
end

def read_modify_write_row table_name, row_key, rules, app_profile_id: nil
client.read_modify_write_row(
client(table_name, app_profile_id).read_modify_write_row(
**{
table_name: table_name,
app_profile_id: app_profile_id,
Expand Down Expand Up @@ -885,6 +893,23 @@ def backup_path instance_id, cluster_id, backup_id
def inspect
"#{self.class}(#{@project_id})"
end

def create_bigtable_client table_path, app_profile_id
V2::Bigtable::Client.new do |config|
config.credentials = credentials if credentials
config.timeout = timeout if timeout
config.endpoint = host if host
config.lib_name = "gccl"
config.lib_version = Google::Cloud::Bigtable::VERSION
config.metadata = { "google-cloud-resource-prefix": "projects/#{@project_id}" }
config.channel_pool.channel_selection = @channel_selection
config.channel_pool.channel_count = @channel_count
request, options = Convert.ping_and_warm_request table_path, app_profile_id, timeout
config.channel_pool.on_channel_create = proc do |channel|
channel.call_rpc :ping_and_warm, request, options: options
end
end
end
end
end
end
Expand Down
12 changes: 7 additions & 5 deletions google-cloud-bigtable/lib/google/cloud/bigtable/table.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,13 @@ class Table
# @private
#
# Creates a new Table instance.
def initialize grpc, service, view:
def initialize grpc, service, view:, app_profile_id: nil
@grpc = grpc
@service = service
@app_profile_id = app_profile_id
raise ArgumentError, "view must not be nil" if view.nil?
@loaded_views = Set[view]
@service.client path, app_profile_id
end

##
Expand Down Expand Up @@ -659,8 +661,8 @@ def drop_row_range row_key_prefix: nil, delete_all_data: nil, timeout: nil
# @param view [Symbol] View type.
# @return [Google::Cloud::Bigtable::Table]
#
def self.from_grpc grpc, service, view:
new grpc, service, view: view
def self.from_grpc grpc, service, view:, app_profile_id: nil
new grpc, service, view: view, app_profile_id: app_profile_id
end

# @private
Expand All @@ -672,9 +674,9 @@ def self.from_grpc grpc, service, view:
# @param service [Google::Cloud::Bigtable::Service]
# @return [Google::Cloud::Bigtable::Table]
#
def self.from_path path, service
def self.from_path path, service, app_profile_id: nil
grpc = Google::Cloud::Bigtable::Admin::V2::Table.new name: path
new grpc, service, view: :NAME_ONLY
new grpc, service, view: :NAME_ONLY, app_profile_id: app_profile_id
end

protected
Expand Down
Loading

0 comments on commit a525d9d

Please sign in to comment.