From 3d0dce9ee1fc316f2ac0cd2def61fd56daf1c5ad Mon Sep 17 00:00:00 2001 From: T Floyd Wright Date: Thu, 21 Dec 2023 11:54:22 -0700 Subject: [PATCH] Implement streaming API --- lib/supplement/storages/disk.ex | 8 ++++++++ lib/supplement/storages/ram.ex | 7 +++++++ lib/supplement/storages/s3.ex | 8 ++++++++ lib/supplement/uploads/plug_upload.ex | 2 ++ lib/supplement/uploads/uri.ex | 8 ++++++++ mix.exs | 2 +- mix.lock | 2 +- test/storages/disk_test.exs | 10 ++++++++++ test/storages/ram_test.exs | 8 ++++++++ test/storages/s3_test.exs | 12 ++++++++++++ test/support/mock_upload.ex | 6 ++++++ 11 files changed, 71 insertions(+), 2 deletions(-) diff --git a/lib/supplement/storages/disk.ex b/lib/supplement/storages/disk.ex index 472e36a..765fd8e 100644 --- a/lib/supplement/storages/disk.ex +++ b/lib/supplement/storages/disk.ex @@ -3,6 +3,14 @@ defmodule Capsule.Storages.Disk do @behaviour Storage + @impl Storage + + def stream!(id, opts \\ []) do + opts + |> path_in_root(id) + |> File.stream!() + end + @impl Storage def put(upload, opts \\ []) do with path <- Path.join(opts[:prefix] || "/", Upload.name(upload)), diff --git a/lib/supplement/storages/ram.ex b/lib/supplement/storages/ram.ex index 8d4ace7..2582c6a 100644 --- a/lib/supplement/storages/ram.ex +++ b/lib/supplement/storages/ram.ex @@ -30,6 +30,13 @@ defmodule Capsule.Storages.RAM do def read(id, _opts \\ []), do: {:ok, id |> decode_pid! |> StringIO.contents() |> elem(0)} + @impl Storage + def stream!(id, _opts \\ []) do + id + |> decode_pid! + |> IO.stream(:line) + end + defp decompose_id(id), do: String.split(id, "/", parts: 2) defp decode_pid!(id) do diff --git a/lib/supplement/storages/s3.ex b/lib/supplement/storages/s3.ex index 09696b1..7c447c3 100644 --- a/lib/supplement/storages/s3.ex +++ b/lib/supplement/storages/s3.ex @@ -40,6 +40,14 @@ defmodule Capsule.Storages.S3 do end end + @impl Storage + def stream!(id, opts \\ []) do + opts + |> config(:bucket) + |> Client.download_file(id, :memory) + |> ex_aws_module().stream!() + end + defp config(opts, key) do Application.fetch_env!(:capsule, __MODULE__) |> Keyword.merge(opts) diff --git a/lib/supplement/uploads/plug_upload.ex b/lib/supplement/uploads/plug_upload.ex index 6ba020b..1110dac 100644 --- a/lib/supplement/uploads/plug_upload.ex +++ b/lib/supplement/uploads/plug_upload.ex @@ -7,4 +7,6 @@ defimpl Capsule.Upload, for: Plug.Upload do end def name(%{filename: name}), do: name + + def to_stream(%{path: path}), do: File.stream!(path) end diff --git a/lib/supplement/uploads/uri.ex b/lib/supplement/uploads/uri.ex index 09b9325..c0ac68f 100644 --- a/lib/supplement/uploads/uri.ex +++ b/lib/supplement/uploads/uri.ex @@ -20,4 +20,12 @@ defimpl Capsule.Upload, for: URI do end def name(%{path: path}), do: Path.basename(path) + + def to_stream(uri) do + {:ok, body} = contents(uri) + + {:ok, pid} = StringIO.open(body) + + IO.stream(pid, :line) + end end diff --git a/mix.exs b/mix.exs index f9ae4b0..b018a89 100644 --- a/mix.exs +++ b/mix.exs @@ -30,7 +30,7 @@ defmodule Supplement.MixProject do # Run "mix help deps" to learn about dependencies. defp deps do [ - {:capsule, "~> 0.9"}, + {:capsule, github: "elixir-capsule/capsule"}, {:ex_aws, "~> 2.0", optional: true}, {:ex_aws_s3, "~> 2.0", optional: true}, {:mox, "~> 1.0", only: [:test]}, diff --git a/mix.lock b/mix.lock index 10031c4..f231def 100644 --- a/mix.lock +++ b/mix.lock @@ -1,6 +1,6 @@ %{ "bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"}, - "capsule": {:hex, :capsule, "0.9.0", "bafacd356462889cc6e816cd1c756727da563b88ad5f0811182771aca6772e61", [:mix], [], "hexpm", "acb4f8b05c510993d09c133eb2becadf1814e80249192b5e6e9951f2cf6f08c7"}, + "capsule": {:git, "https://github.com/elixir-capsule/capsule.git", "bb36848610ac96d0c7049c2038ad494cb310062a", []}, "cowboy": {:hex, :cowboy, "2.9.0", "865dd8b6607e14cf03282e10e934023a1bd8be6f6bacf921a7e2a96d800cd452", [:make, :rebar3], [{:cowlib, "2.11.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "2c729f934b4e1aa149aff882f57c6372c15399a20d54f65c8d67bef583021bde"}, "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"}, "cowlib": {:hex, :cowlib, "2.11.0", "0b9ff9c346629256c42ebe1eeb769a83c6cb771a6ee5960bd110ab0b9b872063", [:make, :rebar3], [], "hexpm", "2b3e9da0b21c4565751a6d4901c20d1b4cc25cbb7fd50d91d2ab6dd287bc86a9"}, diff --git a/test/storages/disk_test.exs b/test/storages/disk_test.exs index 64796de..ccd9fad 100644 --- a/test/storages/disk_test.exs +++ b/test/storages/disk_test.exs @@ -56,4 +56,14 @@ defmodule Capsule.Storages.DiskTest do on_exit(fn -> File.rm!("tmp/path") end) end end + + describe "stream!/1" do + test "returns enum of file contents" do + File.write!("tmp/path", "data") + + assert "data" = "path" |> Disk.stream!() |> Enum.join() + + on_exit(fn -> File.rm!("tmp/path") end) + end + end end diff --git a/test/storages/ram_test.exs b/test/storages/ram_test.exs index 913d371..38ed8f2 100644 --- a/test/storages/ram_test.exs +++ b/test/storages/ram_test.exs @@ -49,6 +49,14 @@ defmodule Capsule.Storages.RAMTest do end end + describe "stream/1" do + setup :build_ram_file + + test "result can be enumerated", %{id: id} do + assert ["some data"] = id |> RAM.stream!() |> Enum.to_list() + end + end + defp build_ram_file(_context) do {:ok, pid} = StringIO.open("some data") diff --git a/test/storages/s3_test.exs b/test/storages/s3_test.exs index 93a5ebc..0345faf 100644 --- a/test/storages/s3_test.exs +++ b/test/storages/s3_test.exs @@ -59,4 +59,16 @@ defmodule Capsule.Storages.S3Test do assert {:ok, _} = S3.read("fake", bucket: "other") end end + + describe "stream!/1" do + test "returns ex aws stream" do + stub(ExAwsMock, :stream!, fn _ -> + Stream.transform([0, 1], 0, fn i, acc -> + if acc < 3, do: {[i], acc + 1}, else: {:halt, acc} + end) + end) + + assert [0, 1] = "fake" |> S3.stream!() |> Enum.to_list() + end + end end diff --git a/test/support/mock_upload.ex b/test/support/mock_upload.ex index f40bbde..9a4d148 100644 --- a/test/support/mock_upload.ex +++ b/test/support/mock_upload.ex @@ -5,5 +5,11 @@ defmodule Capsule.MockUpload do def contents(mock), do: {:ok, mock.content} def name(mock), do: mock.name + + def to_stream(mock) do + {:ok, pid} = StringIO.open(mock.content) + + IO.stream(pid, :line) + end end end