Skip to content

Commit

Permalink
Add Flow.child_spec/1
Browse files Browse the repository at this point in the history
  • Loading branch information
josevalim committed Dec 29, 2022
1 parent cd76152 commit cbeb3ba
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 37 deletions.
39 changes: 30 additions & 9 deletions lib/flow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -307,25 +307,38 @@ defmodule Flow do
Flow allows computations to be started as a group of processes
which may run indefinitely. This can be done by starting
the flow as part of a supervision tree using `Flow.start_link/2`.
Since Elixir v1.5, the easiest way to add Flow to your supervision
tree is by calling `use Flow` and then defining a `start_link/1`.
the flow as part of a supervision tree using `{Flow, your_flow}`
as your child specification:
children = [
{Flow,
Flow.from_stages(...)
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.reduce(fn -> %{} end, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)}
]
It is also possible to move a Flow to its own module. This is done by
calling `use Flow` and then defining a `start_link/1` function that
calls `Flow.start_link/1` at the end:
defmodule MyFlow do
use Flow
def start_link(_) do
Flow.from_stages(...)
|> ...
|> ...
|> ...
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.reduce(fn -> %{} end, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Flow.start_link()
end
end
The `:shutdown` and `:restart` child spec configurations can be
given to `use Flow`.
By the default the `Flow` is permanent, which means it is always
restarted. The `:shutdown` and `:restart` child spec configurations
can be given to `use Flow`.
Flow also provides integration with `GenStage`, allowing you to
specify child specifications of producers, producer consumers, and
Expand Down Expand Up @@ -494,6 +507,14 @@ defmodule Flow do
end
end

@doc false
def child_spec(%Flow{} = arg) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [arg]}
}
end

## Building

@doc """
Expand Down
72 changes: 44 additions & 28 deletions test/flow_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,22 @@ defmodule FlowTest do
end
end

test "child_spec/2" do
parent = self()

start_supervised!(
{Flow,
Flow.from_enumerables([[1, 2, 3], [4, 5, 6]], stages: 2)
|> Flow.filter(&(rem(&1, 2) == 0))
|> Flow.map(&send(parent, &1))}
)

assert_receive 2
assert_receive 4
assert_receive 6
refute_received 1
end

describe "errors" do
test "on multiple reduce calls" do
message = ~r"cannot call group_by/reduce/emit_and_reduce on a flow after another"
Expand Down Expand Up @@ -777,8 +793,8 @@ defmodule FlowTest do
assert_receive {:consumed, [2]}
assert_receive {:consumed, [4]}
assert_receive {:consumed, [6]}
assert_receive {:consumed, '\b'}
assert_receive {:consumed, '\n'}
assert_receive {:consumed, [8]}
assert_receive {:consumed, [10]}
end

test "into_stages/3 with :name", config do
Expand All @@ -805,19 +821,19 @@ defmodule FlowTest do

assert_receive {:producer_consumed, [2]}
assert_receive {:producer_consumed, [6]}
assert_receive {:producer_consumed, '\b'}
assert_receive {:producer_consumed, [8]}
assert_receive {:producer_consumed, [1]}
assert_receive {:producer_consumed, [5]}
assert_receive {:producer_consumed, '\a\t'}
assert_receive {:producer_consumed, ~c"\a\t"}
assert_receive {:producer_consumed, [3]}
assert_receive {:producer_consumed, [4]}
assert_receive {:producer_consumed, '\n'}
assert_receive {:producer_consumed, [10]}

assert_receive {:producer_consumed, [2]}
assert_receive {:producer_consumed, [6]}
assert_receive {:producer_consumed, '\b'}
assert_receive {:producer_consumed, [8]}
assert_receive {:producer_consumed, [4]}
assert_receive {:producer_consumed, '\n'}
assert_receive {:producer_consumed, [10]}
end

test "through_stages/3 + into_stages/3" do
Expand All @@ -833,25 +849,25 @@ defmodule FlowTest do

assert_receive {:producer_consumed, [2]}
assert_receive {:producer_consumed, [6]}
assert_receive {:producer_consumed, '\b'}
assert_receive {:producer_consumed, [8]}
assert_receive {:producer_consumed, [1]}
assert_receive {:producer_consumed, [5]}
assert_receive {:producer_consumed, '\a\t'}
assert_receive {:producer_consumed, ~c"\a\t"}
assert_receive {:producer_consumed, [3]}
assert_receive {:producer_consumed, [4]}
assert_receive {:producer_consumed, '\n'}
assert_receive {:producer_consumed, [10]}

assert_receive {:producer_consumed, [2]}
assert_receive {:producer_consumed, [6]}
assert_receive {:producer_consumed, '\b'}
assert_receive {:producer_consumed, [8]}
assert_receive {:producer_consumed, [4]}
assert_receive {:producer_consumed, '\n'}
assert_receive {:producer_consumed, [10]}

assert_receive {:consumed, [2]}
assert_receive {:consumed, [6]}
assert_receive {:consumed, '\b'}
assert_receive {:consumed, [8]}
assert_receive {:consumed, [4]}
assert_receive {:consumed, '\n'}
assert_receive {:consumed, [10]}
end

test "into_specs/3" do
Expand All @@ -863,8 +879,8 @@ defmodule FlowTest do
assert_receive {:consumed, [2]}
assert_receive {:consumed, [4]}
assert_receive {:consumed, [6]}
assert_receive {:consumed, '\b'}
assert_receive {:consumed, '\n'}
assert_receive {:consumed, [8]}
assert_receive {:consumed, [10]}
end

test "into_specs/3 with :name", config do
Expand All @@ -886,19 +902,19 @@ defmodule FlowTest do

assert_receive {:producer_consumed, [2]}
assert_receive {:producer_consumed, [6]}
assert_receive {:producer_consumed, '\b'}
assert_receive {:producer_consumed, [8]}
assert_receive {:producer_consumed, [1]}
assert_receive {:producer_consumed, [5]}
assert_receive {:producer_consumed, '\a\t'}
assert_receive {:producer_consumed, ~c"\a\t"}
assert_receive {:producer_consumed, [3]}
assert_receive {:producer_consumed, [4]}
assert_receive {:producer_consumed, '\n'}
assert_receive {:producer_consumed, [10]}

assert_receive {:producer_consumed, [2]}
assert_receive {:producer_consumed, [6]}
assert_receive {:producer_consumed, '\b'}
assert_receive {:producer_consumed, [8]}
assert_receive {:producer_consumed, [4]}
assert_receive {:producer_consumed, '\n'}
assert_receive {:producer_consumed, [10]}
end

test "through_specs/3 + into_specs/3" do
Expand All @@ -910,25 +926,25 @@ defmodule FlowTest do

assert_receive {:producer_consumed, [2]}
assert_receive {:producer_consumed, [6]}
assert_receive {:producer_consumed, '\b'}
assert_receive {:producer_consumed, [8]}
assert_receive {:producer_consumed, [1]}
assert_receive {:producer_consumed, [5]}
assert_receive {:producer_consumed, '\a\t'}
assert_receive {:producer_consumed, ~c"\a\t"}
assert_receive {:producer_consumed, [3]}
assert_receive {:producer_consumed, [4]}
assert_receive {:producer_consumed, '\n'}
assert_receive {:producer_consumed, [10]}

assert_receive {:producer_consumed, [2]}
assert_receive {:producer_consumed, [6]}
assert_receive {:producer_consumed, '\b'}
assert_receive {:producer_consumed, [8]}
assert_receive {:producer_consumed, [4]}
assert_receive {:producer_consumed, '\n'}
assert_receive {:producer_consumed, [10]}

assert_receive {:consumed, [2]}
assert_receive {:consumed, [6]}
assert_receive {:consumed, '\b'}
assert_receive {:consumed, [8]}
assert_receive {:consumed, [4]}
assert_receive {:consumed, '\n'}
assert_receive {:consumed, [10]}
end
end

Expand Down

0 comments on commit cbeb3ba

Please sign in to comment.