Skip to content

Commit

Permalink
Simplify passing around of compiled ops
Browse files Browse the repository at this point in the history
  • Loading branch information
josevalim committed Jan 6, 2023
1 parent 195cce4 commit 560f44f
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions lib/flow/materialize.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ defmodule Flow.Materialize do

def materialize(%Flow{} = flow, demand, start_link, type, dispatcher) do
%{operations: operations, options: options, producers: producers, window: window} = flow
{ops, batchers} = split_operations(operations)
{ops, batchers} = compile_operations(operations)

{producers, consumers, ops, window} =
start_producers(producers, ops, start_link, window, options, dispatcher)
Expand All @@ -36,20 +36,20 @@ defmodule Flow.Materialize do
@doc """
Splits the flow operations into layers of stages.
"""
def split_operations([]) do
def compile_operations([]) do
{:none, []}
end

def split_operations(operations) do
def compile_operations(operations) do
{batchers, operations} =
operations
|> :lists.reverse()
|> Enum.split_while(&match?({:batch, _}, &1))

if Enum.all?(operations, &match?({:mapper, _, _}, &1)) do
{{:mapper, mapper_ops(operations), operations}, batchers}
{mapper_ops(operations), batchers}
else
{{:reducer, reducer_ops(operations), operations}, batchers}
{reducer_ops(operations), batchers}
end
end

Expand All @@ -74,7 +74,7 @@ defmodule Flow.Materialize do
end

defp start_stages(
{_mr, compiled_ops, _ops},
compiled_ops,
window,
producers,
start_link,
Expand Down Expand Up @@ -129,7 +129,7 @@ defmodule Flow.Materialize do
{right_producers, right_consumers} =
start_join(:right, right, right_key, partitions, start_link)

{type, {acc, fun, trigger}, ops} = ensure_ops(ops)
{acc, fun, trigger} = ensure_ops(ops)

window =
case window do
Expand All @@ -140,7 +140,7 @@ defmodule Flow.Materialize do
producers = left_producers ++ right_producers
consumers = left_consumers ++ right_consumers

{producers, consumers, {type, join_ops(kind, join, acc, fun, trigger), ops}, window}
{producers, consumers, join_ops(kind, join, acc, fun, trigger), window}
end

defp start_producers(
Expand All @@ -154,13 +154,13 @@ defmodule Flow.Materialize do
{producers, consumers} =
materialize(flow, :forward, start_link, :producer_consumer, GenStage.DemandDispatcher)

{type, {acc, fun, trigger}, ops} = ensure_ops(ops)
{acc, fun, trigger} = ensure_ops(ops)

stages = Keyword.fetch!(flow.options, :stages)
partitions = Enum.to_list(0..(stages - 1))

{producers, consumers,
{type, departition_ops(acc, fun, trigger, partitions, acc_fun, merge_fun, done_fun), ops},
departition_ops(acc, fun, trigger, partitions, acc_fun, merge_fun, done_fun),
window}
end

Expand Down Expand Up @@ -297,7 +297,7 @@ defmodule Flow.Materialize do
defp ensure_ops(ops, dispatcher, dispatcher), do: ops
defp ensure_ops(ops, _up_dispatcher, _dispatcher), do: ensure_ops(ops)

defp ensure_ops(:none), do: {:mapper, mapper_ops([]), []}
defp ensure_ops(:none), do: mapper_ops([])
defp ensure_ops(ops), do: ops

## Departition
Expand Down

0 comments on commit 560f44f

Please sign in to comment.