diff --git a/lib/exandra.ex b/lib/exandra.ex index 8403529..417b9aa 100644 --- a/lib/exandra.ex +++ b/lib/exandra.ex @@ -245,19 +245,8 @@ defmodule Exandra do :ok | {:error, Exception.t()} def execute_batch(repo, %Exandra.Batch{queries: queries} = _batch, options \\ []) when is_atom(repo) and is_list(options) do - {shared_options, other_options} = - Keyword.split(options, [ - :custom_payload, - :timeout, - :telemetry_metadata, - :tracing, - :compressor - ]) - - {prepare_options, execute_options} = Keyword.split(other_options, [:force]) - - execute_options = Keyword.merge(shared_options, execute_options) - prepare_options = Keyword.merge(shared_options, prepare_options) + {prepare_options, execute_options} = + Exandra.Connection.split_prepare_and_execute_options(options) fun = fn conn -> try do @@ -436,14 +425,16 @@ defmodule Exandra do def stream!(repo, sql, values, opts \\ []) do %{pid: cluster_pid} = Ecto.Repo.Registry.lookup(repo.get_dynamic_repo()) - prepare_opts = Keyword.drop(opts, [:page_size]) + {prepare_opts, execute_opts} = + Exandra.Connection.split_prepare_and_execute_options(opts) + prepared = @xandra_cluster_mod.prepare!(cluster_pid, sql, prepare_opts) @xandra_cluster_mod.stream_pages!( cluster_pid, prepared, values, - opts + execute_opts ) end end diff --git a/lib/exandra/connection.ex b/lib/exandra/connection.ex index c4ba8a0..fa3a266 100644 --- a/lib/exandra/connection.ex +++ b/lib/exandra/connection.ex @@ -11,6 +11,33 @@ defmodule Exandra.Connection do :xandra_cluster_module, Xandra.Cluster ) + @xandra_prepare_opts [ + :compressor, + :force, + :tracing, + :custom_payload, + :telemetry_metadata, + :timeout + ] + @xandra_exec_opts [ + :consistency, + :page_size, + :paging_state, + :timestamp, + :serial_consistency, + :compressor, + :retry_strategy, + :tracing, + :custom_payload, + :date_format, + :time_format, + :timestamp_format, + :decimal_format, + :uuid_format, + :timeuuid_format, + :telemetry_metadata, + :timeout + ] alias Ecto.Migration.{Constraint, Index, Reference, Table} alias Ecto.Query.{BooleanExpr, LimitExpr, QueryExpr, WithExpr} @@ -76,19 +103,20 @@ defmodule Exandra.Connection do @impl Ecto.Adapters.SQL.Connection def prepare_execute(cluster, _name, stmt, params, opts) do - opts = remove_ecto_opts_for_xandra_execute_or_prepare(opts) + {prepare_opts, execute_opts} = split_prepare_and_execute_options(opts) - with {:ok, %Prepared{} = prepared} <- @xandra_cluster_mod.prepare(cluster, stmt, opts) do - execute(cluster, prepared, params, opts) + with {:ok, %Prepared{} = prepared} <- @xandra_cluster_mod.prepare(cluster, stmt, prepare_opts) do + execute(cluster, prepared, params, execute_opts) end end @impl Ecto.Adapters.SQL.Connection def execute(cluster, query, params, opts) do + {_, execute_opts} = split_prepare_and_execute_options(opts) opts = remove_ecto_opts_for_xandra_execute_or_prepare(opts) @xandra_cluster_mod.run(cluster, opts, fn conn -> - case @xandra_mod.execute(conn, query, params, opts) do + case @xandra_mod.execute(conn, query, params, execute_opts) do {:ok, %Xandra.Void{}} -> {:ok, query, %{rows: nil, num_rows: 1}} @@ -96,7 +124,7 @@ defmodule Exandra.Connection do {:ok, query, %{rows: nil, num_rows: 1}} {:ok, %Xandra.Page{} = page} -> - stream_pages(conn, query, params, opts, page, %{rows: [], num_rows: 0}) + stream_pages(conn, query, params, execute_opts, page, %{rows: [], num_rows: 0}) {:error, error} -> {:error, error} @@ -137,11 +165,11 @@ defmodule Exandra.Connection do end defp prepare_execute(cluster, sql, params, opts) do - opts = remove_ecto_opts_for_xandra_execute_or_prepare(opts) + {prepare_opts, execute_opts} = split_prepare_and_execute_options(opts) @xandra_cluster_mod.run(cluster, fn conn -> - with {:ok, %Prepared{} = prepared} <- @xandra_mod.prepare(conn, sql, opts) do - @xandra_mod.execute(conn, prepared, params, opts) + with {:ok, %Prepared{} = prepared} <- @xandra_mod.prepare(conn, sql, prepare_opts) do + @xandra_mod.execute(conn, prepared, params, execute_opts) end end) end @@ -967,22 +995,38 @@ defmodule Exandra.Connection do defp ecto_cast_to_db(:string, _query), do: "text" defp ecto_cast_to_db(:uuid, _query), do: "uuid" + @doc false + def split_prepare_and_execute_options(opts) do + opts = + opts + |> Enum.uniq_by(fn {key, _value} -> key end) + |> apply_ecto_telemetry_options_to_xandra() + + prepare_opts = Keyword.take(opts, @xandra_prepare_opts) + execute_opts = Keyword.take(opts, @xandra_exec_opts) + + {prepare_opts, execute_opts} + end + defp remove_ecto_opts_for_xandra_execute_or_prepare(opts) do + opts + |> Keyword.drop([ + :schema_migration, + :timeout, + :pool_size, + :pool, + :log, + :cast_params, + :prefix, + :cache_statement + ]) + |> apply_ecto_telemetry_options_to_xandra() + end + + defp apply_ecto_telemetry_options_to_xandra(opts) do {repo, opts} = Keyword.pop(opts, :repo) {source, opts} = Keyword.pop(opts, :source) - opts = - Keyword.drop(opts, [ - :schema_migration, - :timeout, - :pool_size, - :pool, - :log, - :cast_params, - :prefix, - :cache_statement - ]) - case Keyword.pop(opts, :telemetry_options) do {nil, opts} -> opts