Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Repo.stream no longer supports preload #54

Closed
jhonathas opened this issue Dec 6, 2018 · 19 comments
Closed

Repo.stream no longer supports preload #54

jhonathas opened this issue Dec 6, 2018 · 19 comments

Comments

@jhonathas
Copy link

In version 0.4.0 it was possible to use a load function as can be seen in the link below. But this option has been removed. I believe we will need to have the load function again. I'll explain below.
https://github.com/infinitered/elasticsearch-elixir/blob/58948d14a5806d76aa469702c39eef8643738ac4/guides/upgrading/0.4.x_to_0.5.x.md

In Ecto 3.0, support for preload along with Repo.stream has been removed as can be seen on these links:

https://elixirforum.com/t/repo-stream-with-preload-new-warning/17043

elixir-ecto/ecto@6655a9a#diff-122d0a4bbce6a65cc1523584a00193aaR138)

Without the option to preload next to the stream, there is only the option of preloading within Elasticsearch.Document, which is bad because it would have to preload every record, that is, we would lose the preload in batch.

I think that including again the load option, we could go back to doing things like:

  def load (schema, offset, limit)
    schema
    |> offset (^ offset)
    |> limit (^ limit)
    |> Repo.all ()
    |> Repo.preload ([: ad,: address,: medias])
  end

I do not know if I could explain. If you have any doubt, I can try to give you more examples.

@danielberkompas
Copy link
Owner

Offset and limit are poor for performance. We are going to need to adopt a cursor-based solution for paging through documents.

@danielberkompas danielberkompas changed the title Problem on use Repo.stream on store. Repo.stream no longer supports preload Dec 6, 2018
@hipertracker
Copy link

Preload is a must for inner joins. How to upload data which needs data from related tables? Currently, it is very limited to flat tables.

@hipertracker
Copy link

To overcome Ecto 3 blocking preloads in the stream, I just replaced Repo.stream/1 with Repo.all/1 in Elasticsearch.Store. I can't see a better solution.

@jhonathas
Copy link
Author

But in doing so, you will have n + 1 problem. Instead of having only 1 preload query, as it was in the stream, you will have preload queries for each row returned by .all.

@jhonathas
Copy link
Author

Currently I use all/1, but I know it is not a good solution, especially when my base grows more.

@hipertracker
Copy link

No, I have no N+1 problem. For 3 inner joins I have 4 queries altogether.

@jhonathas
Copy link
Author

With me it did not preload even me trying to use the join :(

@ksherman
Copy link

ksherman commented Apr 6, 2019

Also curious about solutions, I'm stuck with a MSSQL db and Tds.Ecto doesn't support stream at all 🤣

@danielberkompas
Copy link
Owner

@ksherman This is something I want to address in version 2.0.0. I'm going to recommend that you use a stream based on cursors. You can build one pretty easily using the paginator library.

defmodule CursorStream do
  def new(repo, query, opts \\ []) do
    Stream.resource(
      fn ->
        %{
          query: query,
          entries: [],
          cursor: nil,
          repo: repo,
          opts: opts
        }
      end,
      &next/1,
      &cleanup/1
    )
  end

  defp next(%{entries: []} = state) do
    opts = Keyword.merge(state.opts, after: state.cursor)
    page = state.repo.paginate(state.query, opts)

    case page.entries do
      [h | entries] ->
        {[h], %{state | entries: entries, cursor: page.metadata.after}}

      [] ->
        {:halt, state}
    end
  end

  defp next(%{entries: [h | t]} = state) do
    {[h], %{state | entries: t}}
  end

  defp cleanup(_state) do
    # no op
  end
end

# Usage
CursorSteam.new(MyApp.Repo, query, opts_for_paginator)

Cursors avoid the limit/offset performance problems, as well as the transaction problems of Repo.stream.

@ksherman
Copy link

ksherman commented Apr 11, 2019

@danielberkompas Okay, I tried this out, and I think its kinda just running the same page of products over and over? My CursorStream is what you pasted above. This is what I added to the store:

  @impl true
  def stream(schema) do
    CursorStream.new(
      Repo,
      from(s in schema, where: s."FoodItemID" > 9000 and s."FoodItemID" < 9050, preload: :brand),
      cursor_fields: [:id],
      limit: 50
    )
  end

The where on the FoodItemID was just to start with a small collection of results, this table has > 1.5M rows. There's only 9 products that match that range, the mix command just seems to index those 9 products over and over.

@danielberkompas
Copy link
Owner

@ksherman Are you sure that your ID field is named :id? It looks like it might be capitalized, ID?

@ksherman
Copy link

ksherman commented Apr 18, 2019

Hey! Sorry for the late reply, I've tried a few different options in the cursor_fields. Our id is actually :FoodItemID and using that doesn't seem to stop it from looping either.

I know offsets aren't desirable, for now this worked for me:

  def new(query, chunk_size \\ 500, offset \\ 0) do
    Stream.resource(
      fn -> 0 end,
      fn
        :stop ->
          {:halt, :stop}

        offset ->
          rows = EfAPI.Repo.all(from(query, limit: ^chunk_size, offset: ^offset))

          if Enum.count(rows) < chunk_size do
            {rows, :stop}
          else
            {rows, offset + chunk_size}
          end
      end,
      fn _ -> :ok end
    )
  end

@tcrossland
Copy link
Contributor

tcrossland commented Oct 2, 2019

To overcome Ecto 3 blocking preloads in the stream, I just replaced Repo.stream/1 with Repo.all/1 in Elasticsearch.Store. I can't see a better solution.

One option would be to perform a join query and programatically hydrate the structs, for example:

from(comment in Comment,
    join: post in assoc(comment, :post)
    select: {comment, post}
)
|> Repo.stream()
|> Stream.map(fn {comment, post} -> Map.put(comment, :post, post) end)

@jhonathas
Copy link
Author

I'll test this approach. Today I'm doing preload inside the document, but it gets too slow ...
thanks

@diegonogueira
Copy link

To overcome Ecto 3 blocking preloads in the stream, I just replaced Repo.stream/1 with Repo.all/1 in Elasticsearch.Store. I can't see a better solution.

One option would be to perform a join query and programatically hydrate the structs, for example:

from(comment in Comment,
    join: post in assoc(comment, :post)
    select: {comment, post}
)
|> Repo.stream()
|> Stream.map(fn {comment, post} -> Map.put(comment, :post, post) end)

Thanks, @tcrossland.
The problem with this approach is when we need to use has_many associations..

@seanwash
Copy link

seanwash commented Oct 2, 2019

@diegonogueira

You could use Stream.chunk_every/2 and Stream.flat_map/2 to preload records in batches.

I'm doing something similar with larger CSVs that I need to build:

# ...
|> Repo.stream()
|> Stream.chunk_every(10)
|> Stream.flat_map(fn chunk ->
  Repo.preload(chunk, Feed.product_preloads())
end)
# ... Do something with your stream

This preloads records in groups of 10, and flatmap flattens the chunks back into a single stream. The performance overhead here seems to be negligible in my testing so far.

@diegonogueira
Copy link

Thanks, @seanwash!
It works! :)

@diegonogueira
Copy link

I created my stream_preload method on Repo file.

defmodule MyApp.Repo do
  use Ecto.Repo,
    otp_app: :my_app,
    adapter: Ecto.Adapters.Postgres

  def reload(%module{id: id}) do
    get(module, id)
  end

  def stream_preload(stream, size, preloads) do
    stream
    |> Stream.chunk_every(size)
    |> Stream.flat_map(fn chunk ->
      Repo.preload(chunk, preloads)
    end)
  end
end

And the store file:

defmodule MyApp.PostStore do
  @behaviour Elasticsearch.Store
  alias MyApp.Repo

  @impl true
  def stream(schema) do
    schema
    |> Repo.stream()
    |> Repo.stream_preload(500, :comments)
  end

  @impl true
  def transaction(fun) do
    {:ok, result} = Repo.transaction(fun, timeout: :infinity)
    result
  end
end

config.exs

...
bulk_page_size: 500,
bulk_wait_interval: 0
...

Thanks

@jhonathas
Copy link
Author

Thanks, @seanwash!

It worked very well!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants