-
Notifications
You must be signed in to change notification settings - Fork 73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Repo.stream no longer supports preload #54
Comments
Offset and limit are poor for performance. We are going to need to adopt a cursor-based solution for paging through documents. |
Preload is a must for inner joins. How to upload data which needs data from related tables? Currently, it is very limited to flat tables. |
To overcome Ecto 3 blocking preloads in the stream, I just replaced Repo.stream/1 with Repo.all/1 in Elasticsearch.Store. I can't see a better solution. |
But in doing so, you will have n + 1 problem. Instead of having only 1 preload query, as it was in the stream, you will have preload queries for each row returned by .all. |
Currently I use all/1, but I know it is not a good solution, especially when my base grows more. |
No, I have no N+1 problem. For 3 inner joins I have 4 queries altogether. |
With me it did not preload even me trying to use the join :( |
Also curious about solutions, I'm stuck with a MSSQL db and Tds.Ecto doesn't support stream at all 🤣 |
@ksherman This is something I want to address in version 2.0.0. I'm going to recommend that you use a stream based on cursors. You can build one pretty easily using the defmodule CursorStream do
def new(repo, query, opts \\ []) do
Stream.resource(
fn ->
%{
query: query,
entries: [],
cursor: nil,
repo: repo,
opts: opts
}
end,
&next/1,
&cleanup/1
)
end
defp next(%{entries: []} = state) do
opts = Keyword.merge(state.opts, after: state.cursor)
page = state.repo.paginate(state.query, opts)
case page.entries do
[h | entries] ->
{[h], %{state | entries: entries, cursor: page.metadata.after}}
[] ->
{:halt, state}
end
end
defp next(%{entries: [h | t]} = state) do
{[h], %{state | entries: t}}
end
defp cleanup(_state) do
# no op
end
end
# Usage
CursorSteam.new(MyApp.Repo, query, opts_for_paginator) Cursors avoid the limit/offset performance problems, as well as the transaction problems of |
@danielberkompas Okay, I tried this out, and I think its kinda just running the same page of products over and over? My CursorStream is what you pasted above. This is what I added to the store: @impl true
def stream(schema) do
CursorStream.new(
Repo,
from(s in schema, where: s."FoodItemID" > 9000 and s."FoodItemID" < 9050, preload: :brand),
cursor_fields: [:id],
limit: 50
)
end The where on the |
@ksherman Are you sure that your ID field is named |
Hey! Sorry for the late reply, I've tried a few different options in the I know offsets aren't desirable, for now this worked for me: def new(query, chunk_size \\ 500, offset \\ 0) do
Stream.resource(
fn -> 0 end,
fn
:stop ->
{:halt, :stop}
offset ->
rows = EfAPI.Repo.all(from(query, limit: ^chunk_size, offset: ^offset))
if Enum.count(rows) < chunk_size do
{rows, :stop}
else
{rows, offset + chunk_size}
end
end,
fn _ -> :ok end
)
end |
One option would be to perform a join query and programatically hydrate the structs, for example: from(comment in Comment,
join: post in assoc(comment, :post)
select: {comment, post}
)
|> Repo.stream()
|> Stream.map(fn {comment, post} -> Map.put(comment, :post, post) end) |
I'll test this approach. Today I'm doing preload inside the document, but it gets too slow ... |
Thanks, @tcrossland. |
You could use I'm doing something similar with larger CSVs that I need to build: # ...
|> Repo.stream()
|> Stream.chunk_every(10)
|> Stream.flat_map(fn chunk ->
Repo.preload(chunk, Feed.product_preloads())
end)
# ... Do something with your stream This preloads records in groups of 10, and flatmap flattens the chunks back into a single stream. The performance overhead here seems to be negligible in my testing so far. |
Thanks, @seanwash! |
I created my defmodule MyApp.Repo do
use Ecto.Repo,
otp_app: :my_app,
adapter: Ecto.Adapters.Postgres
def reload(%module{id: id}) do
get(module, id)
end
def stream_preload(stream, size, preloads) do
stream
|> Stream.chunk_every(size)
|> Stream.flat_map(fn chunk ->
Repo.preload(chunk, preloads)
end)
end
end And the store file: defmodule MyApp.PostStore do
@behaviour Elasticsearch.Store
alias MyApp.Repo
@impl true
def stream(schema) do
schema
|> Repo.stream()
|> Repo.stream_preload(500, :comments)
end
@impl true
def transaction(fun) do
{:ok, result} = Repo.transaction(fun, timeout: :infinity)
result
end
end config.exs ...
bulk_page_size: 500,
bulk_wait_interval: 0
... Thanks |
Thanks, @seanwash! It worked very well! |
In version 0.4.0 it was possible to use a load function as can be seen in the link below. But this option has been removed. I believe we will need to have the
load
function again. I'll explain below.https://github.com/infinitered/elasticsearch-elixir/blob/58948d14a5806d76aa469702c39eef8643738ac4/guides/upgrading/0.4.x_to_0.5.x.md
In Ecto 3.0, support for preload along with Repo.stream has been removed as can be seen on these links:
https://elixirforum.com/t/repo-stream-with-preload-new-warning/17043
elixir-ecto/ecto@6655a9a#diff-122d0a4bbce6a65cc1523584a00193aaR138)
Without the option to preload next to the stream, there is only the option of preloading within Elasticsearch.Document, which is bad because it would have to preload every record, that is, we would lose the preload in batch.
I think that including again the load option, we could go back to doing things like:
I do not know if I could explain. If you have any doubt, I can try to give you more examples.
The text was updated successfully, but these errors were encountered: