From cbeb3baad492349c355029a014012907c77e11c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Valim?= Date: Thu, 29 Dec 2022 12:36:07 -0300 Subject: [PATCH] Add Flow.child_spec/1 --- lib/flow.ex | 39 +++++++++++++++++++------ test/flow_test.exs | 72 ++++++++++++++++++++++++++++------------------ 2 files changed, 74 insertions(+), 37 deletions(-) diff --git a/lib/flow.ex b/lib/flow.ex index e7fd7a5..29ca0a0 100644 --- a/lib/flow.ex +++ b/lib/flow.ex @@ -307,25 +307,38 @@ defmodule Flow do Flow allows computations to be started as a group of processes which may run indefinitely. This can be done by starting - the flow as part of a supervision tree using `Flow.start_link/2`. - - Since Elixir v1.5, the easiest way to add Flow to your supervision - tree is by calling `use Flow` and then defining a `start_link/1`. + the flow as part of a supervision tree using `{Flow, your_flow}` + as your child specification: + + children = [ + {Flow, + Flow.from_stages(...) + |> Flow.flat_map(&String.split(&1, " ")) + |> Flow.reduce(fn -> %{} end, fn word, acc -> + Map.update(acc, word, 1, & &1 + 1) + end)} + ] + + It is also possible to move a Flow to its own module. This is done by + calling `use Flow` and then defining a `start_link/1` function that + calls `Flow.start_link/1` at the end: defmodule MyFlow do use Flow def start_link(_) do Flow.from_stages(...) - |> ... - |> ... - |> ... + |> Flow.flat_map(&String.split(&1, " ")) + |> Flow.reduce(fn -> %{} end, fn word, acc -> + Map.update(acc, word, 1, & &1 + 1) + end) |> Flow.start_link() end end - The `:shutdown` and `:restart` child spec configurations can be - given to `use Flow`. + By the default the `Flow` is permanent, which means it is always + restarted. The `:shutdown` and `:restart` child spec configurations + can be given to `use Flow`. Flow also provides integration with `GenStage`, allowing you to specify child specifications of producers, producer consumers, and @@ -494,6 +507,14 @@ defmodule Flow do end end + @doc false + def child_spec(%Flow{} = arg) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [arg]} + } + end + ## Building @doc """ diff --git a/test/flow_test.exs b/test/flow_test.exs index 7fe051d..c64fed6 100644 --- a/test/flow_test.exs +++ b/test/flow_test.exs @@ -99,6 +99,22 @@ defmodule FlowTest do end end + test "child_spec/2" do + parent = self() + + start_supervised!( + {Flow, + Flow.from_enumerables([[1, 2, 3], [4, 5, 6]], stages: 2) + |> Flow.filter(&(rem(&1, 2) == 0)) + |> Flow.map(&send(parent, &1))} + ) + + assert_receive 2 + assert_receive 4 + assert_receive 6 + refute_received 1 + end + describe "errors" do test "on multiple reduce calls" do message = ~r"cannot call group_by/reduce/emit_and_reduce on a flow after another" @@ -777,8 +793,8 @@ defmodule FlowTest do assert_receive {:consumed, [2]} assert_receive {:consumed, [4]} assert_receive {:consumed, [6]} - assert_receive {:consumed, '\b'} - assert_receive {:consumed, '\n'} + assert_receive {:consumed, [8]} + assert_receive {:consumed, [10]} end test "into_stages/3 with :name", config do @@ -805,19 +821,19 @@ defmodule FlowTest do assert_receive {:producer_consumed, [2]} assert_receive {:producer_consumed, [6]} - assert_receive {:producer_consumed, '\b'} + assert_receive {:producer_consumed, [8]} assert_receive {:producer_consumed, [1]} assert_receive {:producer_consumed, [5]} - assert_receive {:producer_consumed, '\a\t'} + assert_receive {:producer_consumed, ~c"\a\t"} assert_receive {:producer_consumed, [3]} assert_receive {:producer_consumed, [4]} - assert_receive {:producer_consumed, '\n'} + assert_receive {:producer_consumed, [10]} assert_receive {:producer_consumed, [2]} assert_receive {:producer_consumed, [6]} - assert_receive {:producer_consumed, '\b'} + assert_receive {:producer_consumed, [8]} assert_receive {:producer_consumed, [4]} - assert_receive {:producer_consumed, '\n'} + assert_receive {:producer_consumed, [10]} end test "through_stages/3 + into_stages/3" do @@ -833,25 +849,25 @@ defmodule FlowTest do assert_receive {:producer_consumed, [2]} assert_receive {:producer_consumed, [6]} - assert_receive {:producer_consumed, '\b'} + assert_receive {:producer_consumed, [8]} assert_receive {:producer_consumed, [1]} assert_receive {:producer_consumed, [5]} - assert_receive {:producer_consumed, '\a\t'} + assert_receive {:producer_consumed, ~c"\a\t"} assert_receive {:producer_consumed, [3]} assert_receive {:producer_consumed, [4]} - assert_receive {:producer_consumed, '\n'} + assert_receive {:producer_consumed, [10]} assert_receive {:producer_consumed, [2]} assert_receive {:producer_consumed, [6]} - assert_receive {:producer_consumed, '\b'} + assert_receive {:producer_consumed, [8]} assert_receive {:producer_consumed, [4]} - assert_receive {:producer_consumed, '\n'} + assert_receive {:producer_consumed, [10]} assert_receive {:consumed, [2]} assert_receive {:consumed, [6]} - assert_receive {:consumed, '\b'} + assert_receive {:consumed, [8]} assert_receive {:consumed, [4]} - assert_receive {:consumed, '\n'} + assert_receive {:consumed, [10]} end test "into_specs/3" do @@ -863,8 +879,8 @@ defmodule FlowTest do assert_receive {:consumed, [2]} assert_receive {:consumed, [4]} assert_receive {:consumed, [6]} - assert_receive {:consumed, '\b'} - assert_receive {:consumed, '\n'} + assert_receive {:consumed, [8]} + assert_receive {:consumed, [10]} end test "into_specs/3 with :name", config do @@ -886,19 +902,19 @@ defmodule FlowTest do assert_receive {:producer_consumed, [2]} assert_receive {:producer_consumed, [6]} - assert_receive {:producer_consumed, '\b'} + assert_receive {:producer_consumed, [8]} assert_receive {:producer_consumed, [1]} assert_receive {:producer_consumed, [5]} - assert_receive {:producer_consumed, '\a\t'} + assert_receive {:producer_consumed, ~c"\a\t"} assert_receive {:producer_consumed, [3]} assert_receive {:producer_consumed, [4]} - assert_receive {:producer_consumed, '\n'} + assert_receive {:producer_consumed, [10]} assert_receive {:producer_consumed, [2]} assert_receive {:producer_consumed, [6]} - assert_receive {:producer_consumed, '\b'} + assert_receive {:producer_consumed, [8]} assert_receive {:producer_consumed, [4]} - assert_receive {:producer_consumed, '\n'} + assert_receive {:producer_consumed, [10]} end test "through_specs/3 + into_specs/3" do @@ -910,25 +926,25 @@ defmodule FlowTest do assert_receive {:producer_consumed, [2]} assert_receive {:producer_consumed, [6]} - assert_receive {:producer_consumed, '\b'} + assert_receive {:producer_consumed, [8]} assert_receive {:producer_consumed, [1]} assert_receive {:producer_consumed, [5]} - assert_receive {:producer_consumed, '\a\t'} + assert_receive {:producer_consumed, ~c"\a\t"} assert_receive {:producer_consumed, [3]} assert_receive {:producer_consumed, [4]} - assert_receive {:producer_consumed, '\n'} + assert_receive {:producer_consumed, [10]} assert_receive {:producer_consumed, [2]} assert_receive {:producer_consumed, [6]} - assert_receive {:producer_consumed, '\b'} + assert_receive {:producer_consumed, [8]} assert_receive {:producer_consumed, [4]} - assert_receive {:producer_consumed, '\n'} + assert_receive {:producer_consumed, [10]} assert_receive {:consumed, [2]} assert_receive {:consumed, [6]} - assert_receive {:consumed, '\b'} + assert_receive {:consumed, [8]} assert_receive {:consumed, [4]} - assert_receive {:consumed, '\n'} + assert_receive {:consumed, [10]} end end