Skip to content

Commit

Permalink
CSV imports (no UI) (#3895)
Browse files Browse the repository at this point in the history
* encode/decode date range in filenames

* Update lib/plausible/imported/csv_importer.ex

Co-authored-by: Adrian Gruntkowski <[email protected]>

* Update lib/plausible/imported/csv_importer.ex

Co-authored-by: Adrian Gruntkowski <[email protected]>

* drop unused functions

* send failure email if there is no data to export

* use PlausibleWeb.Email.mailer_email_from()

* ensure we get dates from minmax date query

---------

Co-authored-by: Adrian Gruntkowski <[email protected]>
  • Loading branch information
ruslandoga and zoldar authored Mar 19, 2024
1 parent 4242b52 commit 279e89c
Show file tree
Hide file tree
Showing 9 changed files with 292 additions and 127 deletions.
1 change: 1 addition & 0 deletions config/.env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ S3_SECRET_ACCESS_KEY=minioadmin
S3_REGION=us-east-1
S3_ENDPOINT=http://localhost:10000
S3_EXPORTS_BUCKET=dev-exports
S3_IMPORTS_BUCKET=dev-imports
1 change: 1 addition & 0 deletions config/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ S3_SECRET_ACCESS_KEY=minioadmin
S3_REGION=us-east-1
S3_ENDPOINT=http://localhost:10000
S3_EXPORTS_BUCKET=test-exports
S3_IMPORTS_BUCKET=test-imports
8 changes: 7 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,10 @@ unless s3_disabled? do
%{
name: "S3_EXPORTS_BUCKET",
example: "my-csv-exports-bucket"
},
%{
name: "S3_IMPORTS_BUCKET",
example: "my-csv-imports-bucket"
}
]

Expand Down Expand Up @@ -771,5 +775,7 @@ unless s3_disabled? do
host: s3_host,
port: s3_port

config :plausible, Plausible.S3, exports_bucket: s3_env_value.("S3_EXPORTS_BUCKET")
config :plausible, Plausible.S3,
exports_bucket: s3_env_value.("S3_EXPORTS_BUCKET"),
imports_bucket: s3_env_value.("S3_IMPORTS_BUCKET")
end
163 changes: 113 additions & 50 deletions lib/plausible/imported/csv_importer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ defmodule Plausible.Imported.CSVImporter do

@impl true
def import_data(site_import, opts) do
%{id: import_id, site_id: site_id} = site_import
%{
id: import_id,
site_id: site_id,
start_date: start_date,
end_date: end_date
} = site_import

uploads = Keyword.fetch!(opts, :uploads)

%{access_key_id: s3_access_key_id, secret_access_key: s3_secret_access_key} =
Expand All @@ -31,52 +37,36 @@ defmodule Plausible.Imported.CSVImporter do
|> Keyword.replace!(:pool_size, 1)
|> Ch.start_link()

ranges =
Enum.map(uploads, fn upload ->
%{"filename" => filename, "s3_url" => s3_url} = upload

".csv" = Path.extname(filename)
table = Path.rootname(filename)
ensure_importable_table!(table)

s3_structure = input_structure!(table)

statement =
"""
INSERT INTO {table:Identifier} \
SELECT {site_id:UInt64} AS site_id, *, {import_id:UInt64} AS import_id \
FROM s3({s3_url:String},{s3_access_key_id:String},{s3_secret_access_key:String},{s3_format:String},{s3_structure:String})\
"""

params =
%{
"table" => table,
"site_id" => site_id,
"import_id" => import_id,
"s3_url" => s3_url,
"s3_access_key_id" => s3_access_key_id,
"s3_secret_access_key" => s3_secret_access_key,
"s3_format" => "CSVWithNames",
"s3_structure" => s3_structure
}

Ch.query!(ch, statement, params, timeout: :infinity)

%Ch.Result{rows: [[min_date, max_date]]} =
Ch.query!(
ch,
"SELECT min(date), max(date) FROM {table:Identifier} WHERE site_id = {site_id:UInt64} AND import_id = {import_id:UInt64}",
%{"table" => table, "site_id" => site_id, "import_id" => import_id}
)

Date.range(min_date, max_date)
end)

{:ok,
%{
start_date: Enum.min_by(ranges, & &1.first, Date).first,
end_date: Enum.max_by(ranges, & &1.last, Date).last
}}
Enum.each(uploads, fn upload ->
%{"filename" => filename, "s3_url" => s3_url} = upload

{table, _, _} = parse_filename!(filename)
s3_structure = input_structure!(table)

statement =
"""
INSERT INTO {table:Identifier} \
SELECT {site_id:UInt64} AS site_id, *, {import_id:UInt64} AS import_id \
FROM s3({s3_url:String},{s3_access_key_id:String},{s3_secret_access_key:String},{s3_format:String},{s3_structure:String}) \
WHERE date >= {start_date:Date} AND date <= {end_date:Date}\
"""

params =
%{
"table" => table,
"site_id" => site_id,
"import_id" => import_id,
"s3_url" => s3_url,
"s3_access_key_id" => s3_access_key_id,
"s3_secret_access_key" => s3_secret_access_key,
"s3_format" => "CSVWithNames",
"s3_structure" => s3_structure,
"start_date" => start_date,
"end_date" => end_date
}

Ch.query!(ch, statement, params, timeout: :infinity)
end)
rescue
# we are cancelling on any argument or ClickHouse errors
e in [ArgumentError, Ch.Error] ->
Expand All @@ -103,12 +93,85 @@ defmodule Plausible.Imported.CSVImporter do
"date Date, visitors UInt64, pageviews UInt64, bounces UInt64, visits UInt64, visit_duration UInt64"
}

@doc """
Extracts min/max date range from a list of uploads.
Examples:
iex> date_range([
...> %{"filename" => "imported_devices_20190101_20210101.csv"},
...> "imported_pages_20200101_20220101.csv"
...> ])
Date.range(~D[2019-01-01], ~D[2022-01-01])
iex> date_range([])
** (ArgumentError) empty uploads
"""
@spec date_range([String.t() | %{String.t() => String.t()}, ...]) :: Date.Range.t()
def date_range([_ | _] = uploads), do: date_range(uploads, _start_date = nil, _end_date = nil)
def date_range([]), do: raise(ArgumentError, "empty uploads")

defp date_range([upload | uploads], prev_start_date, prev_end_date) do
filename =
case upload do
%{"filename" => filename} -> filename
filename when is_binary(filename) -> filename
end

{_table, start_date, end_date} = parse_filename!(filename)

start_date =
if prev_start_date do
Enum.min([start_date, prev_start_date], Date)
else
start_date
end

end_date =
if prev_end_date do
Enum.max([end_date, prev_end_date], Date)
else
end_date
end

date_range(uploads, start_date, end_date)
end

defp date_range([], first, last), do: Date.range(first, last)

@spec parse_date!(String.t()) :: Date.t()
defp parse_date!(date) do
date |> Timex.parse!("{YYYY}{0M}{0D}") |> NaiveDateTime.to_date()
end

@doc """
Extracts table name and min/max dates from the filename.
Examples:
iex> parse_filename!("my_data.csv")
** (ArgumentError) invalid filename
iex> parse_filename!("imported_devices_00010101_20250101.csv")
{"imported_devices", ~D[0001-01-01], ~D[2025-01-01]}
"""
@spec parse_filename!(String.t()) ::
{table :: String.t(), start_date :: Date.t(), end_date :: Date.t()}
def parse_filename!(filename)

for {table, input_structure} <- input_structures do
defp input_structure!(unquote(table)), do: unquote(input_structure)
defp ensure_importable_table!(unquote(table)), do: :ok

def parse_filename!(
<<unquote(table)::bytes, ?_, start_date::8-bytes, ?_, end_date::8-bytes, ".csv">>
) do
{unquote(table), parse_date!(start_date), parse_date!(end_date)}
end
end

defp ensure_importable_table!(table) do
raise ArgumentError, "table #{table} is not supported for data import"
def parse_filename!(_filename) do
raise ArgumentError, "invalid filename"
end
end
61 changes: 61 additions & 0 deletions lib/plausible/s3.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,64 @@ defmodule Plausible.S3 do
@spec exports_bucket :: String.t()
def exports_bucket, do: config(:exports_bucket)

@doc """
Returns the pre-configured S3 bucket for CSV imports.
config :plausible, Plausible.S3,
imports_bucket: System.fetch_env!("S3_IMPORTS_BUCKET")
Example:
iex> imports_bucket()
"test-imports"
"""
@spec imports_bucket :: String.t()
def imports_bucket, do: config(:imports_bucket)

defp config, do: Application.fetch_env!(:plausible, __MODULE__)
defp config(key), do: Keyword.fetch!(config(), key)

@doc """
Presigns an upload for an imported file.
In the current implementation the bucket always goes into the path component.
Example:
iex> %{
...> s3_url: "http://localhost:10000/test-imports/123/imported_browsers.csv",
...> presigned_url: "http://localhost:10000/test-imports/123/imported_browsers.csv?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin" <> _
...> } = import_presign_upload(_site_id = 123, _filename = "imported_browsers.csv")
"""
def import_presign_upload(site_id, filename) do
config = ExAws.Config.new(:s3)
s3_path = Path.join(to_string(site_id), filename)
bucket = imports_bucket()
{:ok, presigned_url} = ExAws.S3.presigned_url(config, :put, bucket, s3_path)
%{s3_url: extract_s3_url(presigned_url), presigned_url: presigned_url}
end

# to make ClickHouse see MinIO in dev and test envs we replace
# the host in the S3 URL with whatever's set in S3_CLICKHOUSE_HOST env var
if Mix.env() in [:dev, :test, :small_dev, :small_test] do
defp extract_s3_url(presigned_url) do
[s3_url, _] = String.split(presigned_url, "?")

if ch_host = System.get_env("S3_CLICKHOUSE_HOST") do
URI.to_string(%URI{URI.parse(s3_url) | host: ch_host})
else
s3_url
end
end
else
defp extract_s3_url(presigned_url) do
[s3_url, _] = String.split(presigned_url, "?")
s3_url
end
end

@doc """
Chunks and uploads Zip archive to the provided S3 destination.
Expand Down Expand Up @@ -77,6 +132,12 @@ defmodule Plausible.S3 do

@doc """
Returns `access_key_id` and `secret_access_key` to be used by ClickHouse during imports from S3.
Example:
iex> import_clickhouse_credentials()
%{access_key_id: "minioadmin", secret_access_key: "minioadmin"}
"""
@spec import_clickhouse_credentials ::
%{access_key_id: String.t(), secret_access_key: String.t()}
Expand Down
79 changes: 46 additions & 33 deletions lib/workers/export_csv.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,43 +25,56 @@ defmodule Plausible.Workers.ExportCSV do
|> Keyword.replace!(:pool_size, 1)
|> Ch.start_link()

# NOTE: should we use site.timezone?
# %Ch.Result{rows: [[min_date, max_date]]} =
# Ch.query!(
# ch,
# "SELECT toDate(min(timestamp)), toDate(max(timestamp)) FROM events_v2 WHERE site_id={site_id:UInt64}",
# %{"site_id" => site_id}
# )

download_url =
DBConnection.run(
%Ch.Result{rows: [[%Date{} = min_date, %Date{} = max_date]]} =
Ch.query!(
ch,
fn conn ->
conn
|> Plausible.Exports.stream_archive(
# date_range: Date.range(min_date, max_date)
Plausible.Exports.export_queries(site_id, extname: ".csv"),
format: "CSVWithNames"
)
|> Plausible.S3.export_upload_multipart(s3_bucket, s3_path, s3_config_overrides(args))
end,
timeout: :infinity
"SELECT toDate(min(timestamp)), toDate(max(timestamp)) FROM events_v2 WHERE site_id={site_id:UInt64}",
%{"site_id" => site_id}
)

if max_date == ~D[1970-01-01] do
# NOTE: replace with proper Plausible.Email template
Plausible.Mailer.deliver_now!(
Bamboo.Email.new_email(
from: PlausibleWeb.Email.mailer_email_from(),
to: email,
subject: "EXPORT FAILURE",
text_body: "there is nothing to export"
)
)
else
download_url =
DBConnection.run(
ch,
fn conn ->
conn
|> Plausible.Exports.stream_archive(
Plausible.Exports.export_queries(site_id,
date_range: Date.range(min_date, max_date),
extname: ".csv"
),
format: "CSVWithNames"
)
|> Plausible.S3.export_upload_multipart(s3_bucket, s3_path, s3_config_overrides(args))
end,
timeout: :infinity
)

# NOTE: replace with proper Plausible.Email template
Plausible.Mailer.deliver_now!(
Bamboo.Email.new_email(
from: "[email protected]",
to: email,
subject: "EXPORT SUCCESS",
text_body: """
download it from #{download_url}! hurry up! you have 24 hours!"
""",
html_body: """
download it from <a href="#{download_url}">here</a>! hurry up! you have 24 hours!
"""
# NOTE: replace with proper Plausible.Email template
Plausible.Mailer.deliver_now!(
Bamboo.Email.new_email(
from: PlausibleWeb.Email.mailer_email_from(),
to: email,
subject: "EXPORT SUCCESS",
text_body: """
download it from #{download_url}! hurry up! you have 24 hours!"
""",
html_body: """
download it from <a href="#{download_url}">here</a>! hurry up! you have 24 hours!
"""
)
)
)
end

:ok
end
Expand Down
Loading

0 comments on commit 279e89c

Please sign in to comment.