Skip to content

Commit

Permalink
Implement streaming API
Browse files Browse the repository at this point in the history
  • Loading branch information
tfwright committed Dec 22, 2023
1 parent 06103b1 commit 3d0dce9
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 2 deletions.
8 changes: 8 additions & 0 deletions lib/supplement/storages/disk.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ defmodule Capsule.Storages.Disk do

@behaviour Storage

@impl Storage

def stream!(id, opts \\ []) do
opts
|> path_in_root(id)
|> File.stream!()
end

@impl Storage
def put(upload, opts \\ []) do
with path <- Path.join(opts[:prefix] || "/", Upload.name(upload)),
Expand Down
7 changes: 7 additions & 0 deletions lib/supplement/storages/ram.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ defmodule Capsule.Storages.RAM do
def read(id, _opts \\ []),
do: {:ok, id |> decode_pid! |> StringIO.contents() |> elem(0)}

@impl Storage
def stream!(id, _opts \\ []) do
id
|> decode_pid!
|> IO.stream(:line)
end

defp decompose_id(id), do: String.split(id, "/", parts: 2)

defp decode_pid!(id) do
Expand Down
8 changes: 8 additions & 0 deletions lib/supplement/storages/s3.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ defmodule Capsule.Storages.S3 do
end
end

@impl Storage
def stream!(id, opts \\ []) do
opts
|> config(:bucket)
|> Client.download_file(id, :memory)
|> ex_aws_module().stream!()
end

defp config(opts, key) do
Application.fetch_env!(:capsule, __MODULE__)
|> Keyword.merge(opts)
Expand Down
2 changes: 2 additions & 0 deletions lib/supplement/uploads/plug_upload.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ defimpl Capsule.Upload, for: Plug.Upload do
end

def name(%{filename: name}), do: name

def to_stream(%{path: path}), do: File.stream!(path)
end
8 changes: 8 additions & 0 deletions lib/supplement/uploads/uri.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,12 @@ defimpl Capsule.Upload, for: URI do
end

def name(%{path: path}), do: Path.basename(path)

def to_stream(uri) do
{:ok, body} = contents(uri)

{:ok, pid} = StringIO.open(body)

IO.stream(pid, :line)
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ defmodule Supplement.MixProject do
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:capsule, "~> 0.9"},
{:capsule, github: "elixir-capsule/capsule"},
{:ex_aws, "~> 2.0", optional: true},
{:ex_aws_s3, "~> 2.0", optional: true},
{:mox, "~> 1.0", only: [:test]},
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
%{
"bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"},
"capsule": {:hex, :capsule, "0.9.0", "bafacd356462889cc6e816cd1c756727da563b88ad5f0811182771aca6772e61", [:mix], [], "hexpm", "acb4f8b05c510993d09c133eb2becadf1814e80249192b5e6e9951f2cf6f08c7"},
"capsule": {:git, "https://github.com/elixir-capsule/capsule.git", "bb36848610ac96d0c7049c2038ad494cb310062a", []},
"cowboy": {:hex, :cowboy, "2.9.0", "865dd8b6607e14cf03282e10e934023a1bd8be6f6bacf921a7e2a96d800cd452", [:make, :rebar3], [{:cowlib, "2.11.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "2c729f934b4e1aa149aff882f57c6372c15399a20d54f65c8d67bef583021bde"},
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"},
"cowlib": {:hex, :cowlib, "2.11.0", "0b9ff9c346629256c42ebe1eeb769a83c6cb771a6ee5960bd110ab0b9b872063", [:make, :rebar3], [], "hexpm", "2b3e9da0b21c4565751a6d4901c20d1b4cc25cbb7fd50d91d2ab6dd287bc86a9"},
Expand Down
10 changes: 10 additions & 0 deletions test/storages/disk_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,14 @@ defmodule Capsule.Storages.DiskTest do
on_exit(fn -> File.rm!("tmp/path") end)
end
end

describe "stream!/1" do
test "returns enum of file contents" do
File.write!("tmp/path", "data")

assert "data" = "path" |> Disk.stream!() |> Enum.join()

on_exit(fn -> File.rm!("tmp/path") end)
end
end
end
8 changes: 8 additions & 0 deletions test/storages/ram_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ defmodule Capsule.Storages.RAMTest do
end
end

describe "stream/1" do
setup :build_ram_file

test "result can be enumerated", %{id: id} do
assert ["some data"] = id |> RAM.stream!() |> Enum.to_list()
end
end

defp build_ram_file(_context) do
{:ok, pid} = StringIO.open("some data")

Expand Down
12 changes: 12 additions & 0 deletions test/storages/s3_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,16 @@ defmodule Capsule.Storages.S3Test do
assert {:ok, _} = S3.read("fake", bucket: "other")
end
end

describe "stream!/1" do
test "returns ex aws stream" do
stub(ExAwsMock, :stream!, fn _ ->
Stream.transform([0, 1], 0, fn i, acc ->
if acc < 3, do: {[i], acc + 1}, else: {:halt, acc}
end)
end)

assert [0, 1] = "fake" |> S3.stream!() |> Enum.to_list()
end
end
end
6 changes: 6 additions & 0 deletions test/support/mock_upload.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,11 @@ defmodule Capsule.MockUpload do
def contents(mock), do: {:ok, mock.content}

def name(mock), do: mock.name

def to_stream(mock) do
{:ok, pid} = StringIO.open(mock.content)

IO.stream(pid, :line)
end
end
end

0 comments on commit 3d0dce9

Please sign in to comment.