Skip to content

Commit

Permalink
Merge pull request #36 from infinitered/31-stream-callback
Browse files Browse the repository at this point in the history
[#31] Base Store behaviour on streams
  • Loading branch information
danielberkompas authored Aug 31, 2018
2 parents d7a44df + 00f56b0 commit d9ef2a4
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 125 deletions.
19 changes: 11 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,8 @@ config :my_app, MyApp.ElasticsearchCluster,
# therefore allows all the settings you could post directly.
settings: "priv/elasticsearch/posts.json",

# This store module must implement the Elasticsearch.Store
# behaviour. It will be used to fetch data for each source in each
# indexes' `sources` list, below:
# This store module must implement a store behaviour. It will be used to
# fetch data for each source in each indexes' `sources` list, below:
store: MyApp.ElasticsearchStore,

# This is the list of data sources that should be used to populate this
Expand Down Expand Up @@ -124,11 +123,15 @@ defmodule MyApp.ElasticsearchStore do

alias MyApp.Repo

def load(schema, offset, limit) do
schema
|> offset(^offset)
|> limit(^limit)
|> Repo.all()
@impl true
def stream(schema) do
Repo.stream(schema)
end

@impl true
def transaction(fun) do
{:ok, result} = Repo.transaction(fun, timeout: :infinity)
result
end
end
```
Expand Down
17 changes: 9 additions & 8 deletions lib/elasticsearch/indexing/bulk.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ defmodule Elasticsearch.Index.Bulk do

alias Elasticsearch.{
Cluster,
DataStream,
Document
}

Expand Down Expand Up @@ -102,13 +101,15 @@ defmodule Elasticsearch.Index.Bulk do
bulk_wait_interval = index_config[:bulk_wait_interval] || 0

errors =
source
|> DataStream.stream(store, bulk_page_size)
|> Stream.map(&encode!(config, &1, index_name))
|> Stream.chunk_every(bulk_page_size)
|> Stream.intersperse(bulk_wait_interval)
|> Stream.map(&put_bulk_page(config, index_name, &1))
|> Enum.reduce(errors, &collect_errors/2)
store.transaction(fn ->
source
|> store.stream()
|> Stream.map(&encode!(config, &1, index_name))
|> Stream.chunk_every(bulk_page_size)
|> Stream.intersperse(bulk_wait_interval)
|> Stream.map(&put_bulk_page(config, index_name, &1))
|> Enum.reduce(errors, &collect_errors/2)
end)

upload(config, index_name, %{index_config | sources: tail}, errors)
end
Expand Down
64 changes: 0 additions & 64 deletions lib/elasticsearch/storage/data_stream.ex

This file was deleted.

46 changes: 21 additions & 25 deletions lib/elasticsearch/storage/store.ex
Original file line number Diff line number Diff line change
@@ -1,40 +1,36 @@
defmodule Elasticsearch.Store do
@moduledoc """
A behaviour for fetching data to index. Used by `mix elasticsearch.build`.
A behaviour for fetching data to index using a streaming strategy.
"""

@typedoc """
A data source. For example, `Post`, where `Post` is an `Ecto.Schema`.
Each datum returned must implement `Elasticsearch.Document`.
"""
@type source :: any

@typedoc """
Instances of the data source. For example, `%Post{}` structs.
"""
@type data :: any
@doc """
Returns a stream of the given datasource.
@typedoc """
The current offset for the query.
"""
@type offset :: integer
## Example
@typedoc """
A limit on the number of elements to return.
def stream(Post) do
Repo.stream(Post)
end
"""
@type limit :: integer
@callback stream(any) :: Stream.t()

@doc """
Loads data based on the given source, offset, and limit.
Returns a transaction wrapper to execute the stream returned by `stream/1`
within. This is required when using Ecto.
## Example
def load(Post, offset, limit) do
Post
|> offset(^offset)
|> limit(^limit)
|> Repo.all()
def transaction(fun) do
{:ok, result} = Repo.transaction(fun, timeout: :infinity)
result
end
If you are not using Ecto and do not require transactions, simply call the
function passed as a parameter.
def transaction(fun) do
fun.()
end
"""
@callback load(source, offset, limit) :: [data]
@callback transaction(fun) :: any
end
12 changes: 7 additions & 5 deletions test/elasticsearch/indexing/bulk_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ defmodule Elasticsearch.Index.BulkTest do
import ExUnit.CaptureLog

alias Elasticsearch.{
Index.Bulk,
Test.Cluster,
Test.Store,
Index.Bulk
Test.Store
}

defmodule TestException do
Expand Down Expand Up @@ -38,7 +38,9 @@ defmodule Elasticsearch.Index.BulkTest do
@tag :regression
test "calls itself recursively properly" do
assert {:error, [%TestException{}]} =
Bulk.upload(Cluster, :posts, %{store: Store, sources: [Post]}, [%TestException{}])
Bulk.upload(Cluster, :posts, %{store: Store, sources: [Post]}, [
%TestException{}
])
end

test "collects errors properly" do
Expand Down Expand Up @@ -69,13 +71,13 @@ defmodule Elasticsearch.Index.BulkTest do
store: Store,
sources: [Post],
bulk_page_size: 1,
bulk_wait_interval: 10
bulk_wait_interval: 0
})

Elasticsearch.delete!(Cluster, "/posts-bulk-test")
end)

assert output =~ "Pausing 10ms between bulk pages"
assert output =~ "Pausing 0ms between bulk pages"
end
end

Expand Down
7 changes: 0 additions & 7 deletions test/elasticsearch/storage/data_stream_test.exs

This file was deleted.

17 changes: 9 additions & 8 deletions test/support/store.ex
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
defmodule Elasticsearch.Test.Store do
@moduledoc false
@behaviour Elasticsearch.Store

import Ecto.Query

alias Elasticsearch.Test.Repo

def load(Post, offset, limit) do
Post
|> offset(^offset)
|> limit(^limit)
|> Repo.all()
@impl true
def stream(Post) do
Repo.stream(Post)
end

@impl true
def transaction(fun) do
{:ok, result} = Repo.transaction(fun, timeout: :infinity)
result
end

def load(Comment, offset, limit) do
Expand Down

0 comments on commit d9ef2a4

Please sign in to comment.