Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add csv importer #3795

Merged
merged 17 commits into from
Feb 27, 2024
2 changes: 1 addition & 1 deletion .github/workflows/elixir.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ jobs:
- name: Check Credo Warnings
run: mix credo diff --from-git-merge-base origin/master
- name: Run tests
run: mix test --include slow --max-failures 1 --warnings-as-errors
run: mix test --include slow --include minio --max-failures 1 --warnings-as-errors
- name: Run tests (small build)
run: MIX_ENV=small_test mix test --include slow --max-failures 1 --warnings-as-errors
- name: Check Dialyzer
Expand Down
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,9 @@ postgres-prod: ## Start a container with the same version of postgres as the one

postgres-stop: ## Stop and remove the postgres container
docker stop plausible_db && docker rm plausible_db

minio: ## Start a transient container with a recent version of minio (s3)
docker run -d --rm -p 6000:6000 -p 6001:6001 --name plausible_minio minio/minio server /data --address ":6000" --console-address ":6001"

minio-stop: ## Stop and remove the minio container
docker stop plausible_minio
6 changes: 6 additions & 0 deletions config/.env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,9 @@ GOOGLE_CLIENT_SECRET=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

PROMEX_DISABLED=false
SITE_DEFAULT_INGEST_THRESHOLD=1000000

S3_DISABLED=false
S3_ACCESS_KEY_ID=minioadmin
S3_SECRET_ACCESS_KEY=minioadmin
S3_REGION=us-east-1
S3_ENDPOINT=http://localhost:6000
6 changes: 6 additions & 0 deletions config/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,9 @@ IP_GEOLOCATION_DB=test/priv/GeoLite2-City-Test.mmdb
SITE_DEFAULT_INGEST_THRESHOLD=1000000
GOOGLE_CLIENT_ID=fake_client_id
GOOGLE_CLIENT_SECRET=fake_client_secret

S3_DISABLED=false
S3_ACCESS_KEY_ID=minioadmin
S3_SECRET_ACCESS_KEY=minioadmin
S3_REGION=us-east-1
S3_ENDPOINT=http://localhost:6000
58 changes: 58 additions & 0 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -704,3 +704,61 @@ if not is_selfhost do

config :plausible, Plausible.Site, default_ingest_threshold: site_default_ingest_threshold
end

s3_disabled? =
config_dir
|> get_var_from_path_or_env("S3_DISABLED", "true")
|> String.to_existing_atom()

unless s3_disabled? do
s3_env = [
%{
name: "S3_ACCESS_KEY_ID",
example: "AKIAIOSFODNN7EXAMPLE"
},
%{
name: "S3_SECRET_ACCESS_KEY",
example: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
},
%{
name: "S3_REGION",
example: "us-east-1"
},
%{
name: "S3_ENDPOINT",
example: "https://<ACCOUNT_ID>.r2.cloudflarestorage.com"
}
]

s3_env =
Enum.map(s3_env, fn var ->
Map.put(var, :value, get_var_from_path_or_env(config_dir, var.name))
end)

s3_missing_env = Enum.filter(s3_env, &is_nil(&1.value))

unless s3_missing_env == [] do
raise ArgumentError, """
Missing S3 configuration. Please set #{s3_missing_env |> Enum.map(& &1.name) |> Enum.join(", ")} environment variable(s):

#{s3_missing_env |> Enum.map(fn %{name: name, example: example} -> "\t#{name}=#{example}" end) |> Enum.join("\n")}
"""
end

s3_env_value = fn name ->
s3_env |> Enum.find(&(&1.name == name)) |> Map.fetch!(:value)
end

config :ex_aws,
http_client: Plausible.S3.Client,
access_key_id: s3_env_value.("S3_ACCESS_KEY_ID"),
secret_access_key: s3_env_value.("S3_SECRET_ACCESS_KEY"),
region: s3_env_value.("S3_REGION")

%URI{scheme: s3_scheme, host: s3_host, port: s3_port} = URI.parse(s3_env_value.("S3_ENDPOINT"))

config :ex_aws, :s3,
scheme: s3_scheme <> "://",
host: s3_host,
port: s3_port
end
3 changes: 1 addition & 2 deletions lib/plausible/imported.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ defmodule Plausible.Imported do
* `Plausible.Imported.UniversalAnalytics` - existing mechanism, for legacy Google
analytics formerly known as "Google Analytics"
* `Plausible.Imported.NoopImporter` - importer stub, used mainly for testing purposes
* `Plausible.Imported.CSVImporter` - a placeholder stub for CSV importer that will
be added soon
* `Plausible.Imported.CSVImporter` - CSV importer from S3

For more information on implementing importers, see `Plausible.Imported.Importer`.
"""
Expand Down
97 changes: 93 additions & 4 deletions lib/plausible/imported/csv_importer.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Plausible.Imported.CSVImporter do
@moduledoc """
CSV importer stub.
CSV importer from S3 that uses ClickHouse [s3 table function.](https://clickhouse.com/docs/en/sql-reference/table-functions/s3)
"""

use Plausible.Imported.Importer
Expand All @@ -16,10 +16,99 @@ defmodule Plausible.Imported.CSVImporter do
def email_template(), do: "google_analytics_import.html"

@impl true
def parse_args(%{"s3_path" => s3_path}), do: [s3_path: s3_path]
def parse_args(%{"uploads" => uploads}), do: [uploads: uploads]

@impl true
def import_data(_site_import, _opts) do
:ok
def import_data(site_import, opts) do
%{id: import_id, site_id: site_id} = site_import
uploads = Keyword.fetch!(opts, :uploads)

%{access_key_id: s3_access_key_id, secret_access_key: s3_secret_access_key} =
Plausible.S3.import_clickhouse_credentials()

{:ok, ch} =
Plausible.IngestRepo.config()
|> Keyword.replace!(:pool_size, 1)
|> Ch.start_link()

ranges =
Enum.map(uploads, fn upload ->
%{"filename" => filename, "s3_url" => s3_url} = upload

".csv" = Path.extname(filename)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll revisit this after implementing imports. I think we'd do this sort of check there. If not, I'll make it into an ArgumentError to cancel the job early.

table = Path.rootname(filename)
ensure_importable_table!(table)

s3_structure = input_structure!(table)

statement =
"""
INSERT INTO {table:Identifier} \
SELECT {site_id:UInt64} AS site_id, *, {import_id:UInt64} AS import_id \
FROM s3({s3_url:String},{s3_access_key_id:String},{s3_secret_access_key:String},{s3_format:String},{s3_structure:String})\
"""

params =
%{
"table" => table,
"site_id" => site_id,
"import_id" => import_id,
"s3_url" => s3_url,
"s3_access_key_id" => s3_access_key_id,
"s3_secret_access_key" => s3_secret_access_key,
"s3_format" => "CSVWithNames",
"s3_structure" => s3_structure
}

Ch.query!(ch, statement, params, timeout: :infinity)

%Ch.Result{rows: [[min_date, max_date]]} =
Ch.query!(
ch,
"SELECT min(date), max(date) FROM {table:Identifier} WHERE site_id = {site_id:UInt64} AND import_id = {import_id:UInt64}",
%{"table" => table, "site_id" => site_id, "import_id" => import_id}
)

Date.range(min_date, max_date)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something that can be tackled in a follow-up, but eventually we'll need to always know import date range before executing it. We have to determine the boundaries up front so that imported stats do not overlap with native stats. We might also provide an option to let user adjust the import date boundaries.

end)

{:ok,
%{
start_date: Enum.min_by(ranges, & &1.first, Date).first,
end_date: Enum.max_by(ranges, & &1.last, Date).last
}}
rescue
# we are cancelling on any argument or ClickHouse errors
e in [ArgumentError, Ch.Error] ->
{:error, Exception.message(e)}
end

input_structures = %{
"imported_browsers" =>
"date Date, browser String, visitors UInt64, visits UInt64, visit_duration UInt64, bounces UInt32",
"imported_devices" =>
"date Date, device String, visitors UInt64, visits UInt64, visit_duration UInt64, bounces UInt32",
"imported_entry_pages" =>
"date Date, entry_page String, visitors UInt64, entrances UInt64, visit_duration UInt64, bounces UInt32",
"imported_exit_pages" => "date Date, exit_page String, visitors UInt64, exits UInt64",
"imported_locations" =>
"date Date, country String, region String, city UInt64, visitors UInt64, visits UInt64, visit_duration UInt64, bounces UInt32",
"imported_operating_systems" =>
"date Date, operating_system String, visitors UInt64, visits UInt64, visit_duration UInt64, bounces UInt32",
"imported_pages" =>
"date Date, hostname String, page String, visitors UInt64, pageviews UInt64, exits UInt64, time_on_page UInt64",
"imported_sources" =>
"date Date, source String, utm_medium String, utm_campaign String, utm_content String, utm_term String, visitors UInt64, visits UInt64, visit_duration UInt64, bounces UInt32",
"imported_visitors" =>
"date Date, visitors UInt64, pageviews UInt64, bounces UInt64, visits UInt64, visit_duration UInt64"
}

for {table, input_structure} <- input_structures do
defp input_structure!(unquote(table)), do: unquote(input_structure)
defp ensure_importable_table!(unquote(table)), do: :ok
end

defp ensure_importable_table!(table) do
raise ArgumentError, "table #{table} is not supported for data import"
end
end
15 changes: 15 additions & 0 deletions lib/plausible/s3.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule Plausible.S3 do
@moduledoc """
Helper functions for S3 exports/imports.
"""

@doc """
Returns `access_key_id` and `secret_access_key` to be used by ClickHouse during imports from S3.
"""
@spec import_clickhouse_credentials ::
%{access_key_id: String.t(), secret_access_key: String.t()}
def import_clickhouse_credentials do
%{access_key_id: access_key_id, secret_access_key: secret_access_key} = ExAws.Config.new(:s3)
%{access_key_id: access_key_id, secret_access_key: secret_access_key}
end
end
17 changes: 17 additions & 0 deletions lib/plausible/s3/client.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Plausible.S3.Client do
@moduledoc false
@behaviour ExAws.Request.HttpClient

@impl true
def request(method, url, body, headers, opts) do
req = Finch.build(method, url, headers, body)

case Finch.request(req, Plausible.Finch, opts) do
{:ok, %Finch.Response{status: status, headers: headers, body: body}} ->
{:ok, %{status_code: status, headers: headers, body: body}}

{:error, reason} ->
{:error, %{reason: reason}}
end
end
end
6 changes: 5 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ defmodule Plausible.MixProject do
{:esbuild, "~> 0.7", runtime: Mix.env() in [:dev, :small_dev]},
{:tailwind, "~> 0.2.0", runtime: Mix.env() in [:dev, :small_dev]},
{:ex_json_logger, "~> 1.4.0"},
{:ecto_network, "~> 1.5.0"}
{:ecto_network, "~> 1.5.0"},
{:ex_aws, "~> 2.5"},
{:ex_aws_s3, "~> 2.5"},
{:sweet_xml, "~> 0.7.4"},
{:testcontainers, "~> 1.6", only: [:test, :small_test]}
]
end

Expand Down
Loading
Loading