Skip to content

Commit

Permalink
Split Xandra prepare and execute options
Browse files Browse the repository at this point in the history
With this the :timeout option is also available to xandra functions
instead of being dropped.

Fixes #47
  • Loading branch information
noaccOS committed Feb 15, 2024
1 parent 2817824 commit 0e941ae
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 35 deletions.
21 changes: 6 additions & 15 deletions lib/exandra.ex
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ defmodule Exandra do

use Ecto.Adapters.SQL, driver: :exandra

alias Exandra.Connection

@behaviour Ecto.Adapter.Storage

@xandra_mod Application.compile_env(:exandra, :xandra_module, Xandra)
Expand Down Expand Up @@ -245,19 +247,8 @@ defmodule Exandra do
:ok | {:error, Exception.t()}
def execute_batch(repo, %Exandra.Batch{queries: queries} = _batch, options \\ [])
when is_atom(repo) and is_list(options) do
{shared_options, other_options} =
Keyword.split(options, [
:custom_payload,
:timeout,
:telemetry_metadata,
:tracing,
:compressor
])

{prepare_options, execute_options} = Keyword.split(other_options, [:force])

execute_options = Keyword.merge(shared_options, execute_options)
prepare_options = Keyword.merge(shared_options, prepare_options)
{prepare_options, execute_options} =
Connection.split_prepare_and_execute_options(options)

fun = fn conn ->
try do
Expand Down Expand Up @@ -436,14 +427,14 @@ defmodule Exandra do
def stream!(repo, sql, values, opts \\ []) do
%{pid: cluster_pid} = Ecto.Repo.Registry.lookup(repo.get_dynamic_repo())

prepare_opts = Keyword.drop(opts, [:page_size])
{prepare_opts, execute_opts} = Connection.split_prepare_and_execute_options(opts)
prepared = @xandra_cluster_mod.prepare!(cluster_pid, sql, prepare_opts)

@xandra_cluster_mod.stream_pages!(
cluster_pid,
prepared,
values,
opts
execute_opts
)
end
end
Expand Down
82 changes: 62 additions & 20 deletions lib/exandra/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,33 @@ defmodule Exandra.Connection do
:xandra_cluster_module,
Xandra.Cluster
)
@xandra_prepare_opts [
:compressor,
:force,
:tracing,
:custom_payload,
:telemetry_metadata,
:timeout
]
@xandra_exec_opts [
:consistency,
:page_size,
:paging_state,
:timestamp,
:serial_consistency,
:compressor,
:retry_strategy,
:tracing,
:custom_payload,
:date_format,
:time_format,
:timestamp_format,
:decimal_format,
:uuid_format,
:timeuuid_format,
:telemetry_metadata,
:timeout
]

alias Ecto.Migration.{Constraint, Index, Reference, Table}
alias Ecto.Query.{BooleanExpr, LimitExpr, QueryExpr, WithExpr}
Expand Down Expand Up @@ -76,27 +103,28 @@ defmodule Exandra.Connection do

@impl Ecto.Adapters.SQL.Connection
def prepare_execute(cluster, _name, stmt, params, opts) do
opts = remove_ecto_opts_for_xandra_execute_or_prepare(opts)
{prepare_opts, execute_opts} = split_prepare_and_execute_options(opts)

with {:ok, %Prepared{} = prepared} <- @xandra_cluster_mod.prepare(cluster, stmt, opts) do
execute(cluster, prepared, params, opts)
with {:ok, %Prepared{} = prepared} <- @xandra_cluster_mod.prepare(cluster, stmt, prepare_opts) do
execute(cluster, prepared, params, execute_opts)
end
end

@impl Ecto.Adapters.SQL.Connection
def execute(cluster, query, params, opts) do
{_, execute_opts} = split_prepare_and_execute_options(opts)
opts = remove_ecto_opts_for_xandra_execute_or_prepare(opts)

@xandra_cluster_mod.run(cluster, opts, fn conn ->
case @xandra_mod.execute(conn, query, params, opts) do
case @xandra_mod.execute(conn, query, params, execute_opts) do
{:ok, %Xandra.Void{}} ->
{:ok, query, %{rows: nil, num_rows: 1}}

{:ok, %Xandra.SchemaChange{}} ->
{:ok, query, %{rows: nil, num_rows: 1}}

{:ok, %Xandra.Page{} = page} ->
stream_pages(conn, query, params, opts, page, %{rows: [], num_rows: 0})
stream_pages(conn, query, params, execute_opts, page, %{rows: [], num_rows: 0})

{:error, error} ->
{:error, error}
Expand Down Expand Up @@ -137,11 +165,11 @@ defmodule Exandra.Connection do
end

defp prepare_execute(cluster, sql, params, opts) do
opts = remove_ecto_opts_for_xandra_execute_or_prepare(opts)
{prepare_opts, execute_opts} = split_prepare_and_execute_options(opts)

@xandra_cluster_mod.run(cluster, fn conn ->
with {:ok, %Prepared{} = prepared} <- @xandra_mod.prepare(conn, sql, opts) do
@xandra_mod.execute(conn, prepared, params, opts)
with {:ok, %Prepared{} = prepared} <- @xandra_mod.prepare(conn, sql, prepare_opts) do
@xandra_mod.execute(conn, prepared, params, execute_opts)
end
end)
end
Expand Down Expand Up @@ -967,22 +995,36 @@ defmodule Exandra.Connection do
defp ecto_cast_to_db(:string, _query), do: "text"
defp ecto_cast_to_db(:uuid, _query), do: "uuid"

@doc false
def split_prepare_and_execute_options(opts) do
opts =
Enum.uniq_by(opts, fn {key, _value} -> key end)
|> apply_ecto_telemetry_options_to_xandra()

prepare_opts = Keyword.take(opts, @xandra_prepare_opts)
execute_opts = Keyword.take(opts, @xandra_exec_opts)

{prepare_opts, execute_opts}
end

defp remove_ecto_opts_for_xandra_execute_or_prepare(opts) do
Keyword.drop(opts, [
:schema_migration,
:timeout,
:pool_size,
:pool,
:log,
:cast_params,
:prefix,
:cache_statement
])
|> apply_ecto_telemetry_options_to_xandra()
end

defp apply_ecto_telemetry_options_to_xandra(opts) do
{repo, opts} = Keyword.pop(opts, :repo)
{source, opts} = Keyword.pop(opts, :source)

opts =
Keyword.drop(opts, [
:schema_migration,
:timeout,
:pool_size,
:pool,
:log,
:cast_params,
:prefix,
:cache_statement
])

case Keyword.pop(opts, :telemetry_options) do
{nil, opts} ->
opts
Expand Down

0 comments on commit 0e941ae

Please sign in to comment.