Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSV imports (no UI) #3895

Merged
merged 7 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/.env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ S3_SECRET_ACCESS_KEY=minioadmin
S3_REGION=us-east-1
S3_ENDPOINT=http://localhost:10000
S3_EXPORTS_BUCKET=dev-exports
S3_IMPORTS_BUCKET=dev-imports
1 change: 1 addition & 0 deletions config/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ S3_SECRET_ACCESS_KEY=minioadmin
S3_REGION=us-east-1
S3_ENDPOINT=http://localhost:10000
S3_EXPORTS_BUCKET=test-exports
S3_IMPORTS_BUCKET=test-imports
8 changes: 7 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,10 @@ unless s3_disabled? do
%{
name: "S3_EXPORTS_BUCKET",
example: "my-csv-exports-bucket"
},
%{
name: "S3_IMPORTS_BUCKET",
example: "my-csv-imports-bucket"
}
]

Expand Down Expand Up @@ -771,5 +775,7 @@ unless s3_disabled? do
host: s3_host,
port: s3_port

config :plausible, Plausible.S3, exports_bucket: s3_env_value.("S3_EXPORTS_BUCKET")
config :plausible, Plausible.S3,
exports_bucket: s3_env_value.("S3_EXPORTS_BUCKET"),
imports_bucket: s3_env_value.("S3_IMPORTS_BUCKET")
end
163 changes: 113 additions & 50 deletions lib/plausible/imported/csv_importer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ defmodule Plausible.Imported.CSVImporter do

@impl true
def import_data(site_import, opts) do
%{id: import_id, site_id: site_id} = site_import
%{
id: import_id,
site_id: site_id,
start_date: start_date,
end_date: end_date
} = site_import

uploads = Keyword.fetch!(opts, :uploads)

%{access_key_id: s3_access_key_id, secret_access_key: s3_secret_access_key} =
Expand All @@ -31,52 +37,36 @@ defmodule Plausible.Imported.CSVImporter do
|> Keyword.replace!(:pool_size, 1)
|> Ch.start_link()

ranges =
Enum.map(uploads, fn upload ->
%{"filename" => filename, "s3_url" => s3_url} = upload

".csv" = Path.extname(filename)
table = Path.rootname(filename)
ensure_importable_table!(table)

s3_structure = input_structure!(table)

statement =
"""
INSERT INTO {table:Identifier} \
SELECT {site_id:UInt64} AS site_id, *, {import_id:UInt64} AS import_id \
FROM s3({s3_url:String},{s3_access_key_id:String},{s3_secret_access_key:String},{s3_format:String},{s3_structure:String})\
"""

params =
%{
"table" => table,
"site_id" => site_id,
"import_id" => import_id,
"s3_url" => s3_url,
"s3_access_key_id" => s3_access_key_id,
"s3_secret_access_key" => s3_secret_access_key,
"s3_format" => "CSVWithNames",
"s3_structure" => s3_structure
}

Ch.query!(ch, statement, params, timeout: :infinity)

%Ch.Result{rows: [[min_date, max_date]]} =
Ch.query!(
ch,
"SELECT min(date), max(date) FROM {table:Identifier} WHERE site_id = {site_id:UInt64} AND import_id = {import_id:UInt64}",
%{"table" => table, "site_id" => site_id, "import_id" => import_id}
)

Date.range(min_date, max_date)
end)

{:ok,
%{
start_date: Enum.min_by(ranges, & &1.first, Date).first,
end_date: Enum.max_by(ranges, & &1.last, Date).last
}}
Enum.each(uploads, fn upload ->
%{"filename" => filename, "s3_url" => s3_url} = upload

{table, _, _} = parse_filename!(filename)
s3_structure = input_structure!(table)

statement =
"""
INSERT INTO {table:Identifier} \
SELECT {site_id:UInt64} AS site_id, *, {import_id:UInt64} AS import_id \
FROM s3({s3_url:String},{s3_access_key_id:String},{s3_secret_access_key:String},{s3_format:String},{s3_structure:String}) \
WHERE date >= {start_date:Date} AND date <= {end_date:Date}\
"""

params =
%{
"table" => table,
"site_id" => site_id,
"import_id" => import_id,
"s3_url" => s3_url,
"s3_access_key_id" => s3_access_key_id,
"s3_secret_access_key" => s3_secret_access_key,
"s3_format" => "CSVWithNames",
"s3_structure" => s3_structure,
"start_date" => start_date,
"end_date" => end_date
}

Ch.query!(ch, statement, params, timeout: :infinity)
end)
rescue
# we are cancelling on any argument or ClickHouse errors
e in [ArgumentError, Ch.Error] ->
Expand All @@ -103,12 +93,85 @@ defmodule Plausible.Imported.CSVImporter do
"date Date, visitors UInt64, pageviews UInt64, bounces UInt64, visits UInt64, visit_duration UInt64"
}

@doc """
Extracts min/max date range from a list of uploads.

Examples:

iex> date_range([
...> %{"filename" => "imported_devices_20190101_20210101.csv"},
...> "imported_pages_20200101_20220101.csv"
...> ])
Date.range(~D[2019-01-01], ~D[2022-01-01])

iex> date_range([])
** (ArgumentError) empty uploads

"""
@spec date_range([String.t() | %{String.t() => String.t()}, ...]) :: Date.Range.t()
def date_range([_ | _] = uploads), do: date_range(uploads, _start_date = nil, _end_date = nil)
def date_range([]), do: raise(ArgumentError, "empty uploads")

defp date_range([upload | uploads], prev_start_date, prev_end_date) do
filename =
case upload do
%{"filename" => filename} -> filename
filename when is_binary(filename) -> filename
end

{_table, start_date, end_date} = parse_filename!(filename)

start_date =
if prev_start_date do
Enum.min([start_date, prev_start_date], Date)
else
start_date
end

end_date =
if prev_end_date do
Enum.max([end_date, prev_end_date], Date)
else
end_date
end

date_range(uploads, start_date, end_date)
end

defp date_range([], first, last), do: Date.range(first, last)

@spec parse_date!(String.t()) :: Date.t()
defp parse_date!(date) do
date |> Timex.parse!("{YYYY}{0M}{0D}") |> NaiveDateTime.to_date()
end

@doc """
Extracts table name and min/max dates from the filename.

Examples:

iex> parse_filename!("my_data.csv")
** (ArgumentError) invalid filename

iex> parse_filename!("imported_devices_00010101_20250101.csv")
{"imported_devices", ~D[0001-01-01], ~D[2025-01-01]}

"""
@spec parse_filename!(String.t()) ::
{table :: String.t(), start_date :: Date.t(), end_date :: Date.t()}
def parse_filename!(filename)

for {table, input_structure} <- input_structures do
defp input_structure!(unquote(table)), do: unquote(input_structure)
defp ensure_importable_table!(unquote(table)), do: :ok

def parse_filename!(
<<unquote(table)::bytes, ?_, start_date::8-bytes, ?_, end_date::8-bytes, ".csv">>
) do
{unquote(table), parse_date!(start_date), parse_date!(end_date)}
end
end

defp ensure_importable_table!(table) do
raise ArgumentError, "table #{table} is not supported for data import"
def parse_filename!(_filename) do
raise ArgumentError, "invalid filename"
end
end
61 changes: 61 additions & 0 deletions lib/plausible/s3.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,64 @@ defmodule Plausible.S3 do
@spec exports_bucket :: String.t()
def exports_bucket, do: config(:exports_bucket)

@doc """
Returns the pre-configured S3 bucket for CSV imports.

config :plausible, Plausible.S3,
imports_bucket: System.fetch_env!("S3_IMPORTS_BUCKET")

Example:

iex> imports_bucket()
"test-imports"

"""
@spec imports_bucket :: String.t()
def imports_bucket, do: config(:imports_bucket)

defp config, do: Application.fetch_env!(:plausible, __MODULE__)
defp config(key), do: Keyword.fetch!(config(), key)

@doc """
Presigns an upload for an imported file.

In the current implementation the bucket always goes into the path component.

Example:

iex> %{
...> s3_url: "http://localhost:10000/test-imports/123/imported_browsers.csv",
...> presigned_url: "http://localhost:10000/test-imports/123/imported_browsers.csv?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=minioadmin" <> _
...> } = import_presign_upload(_site_id = 123, _filename = "imported_browsers.csv")

"""
def import_presign_upload(site_id, filename) do
config = ExAws.Config.new(:s3)
s3_path = Path.join(to_string(site_id), filename)
bucket = imports_bucket()
{:ok, presigned_url} = ExAws.S3.presigned_url(config, :put, bucket, s3_path)
%{s3_url: extract_s3_url(presigned_url), presigned_url: presigned_url}
end

# to make ClickHouse see MinIO in dev and test envs we replace
# the host in the S3 URL with whatever's set in S3_CLICKHOUSE_HOST env var
if Mix.env() in [:dev, :test, :small_dev, :small_test] do
defp extract_s3_url(presigned_url) do
[s3_url, _] = String.split(presigned_url, "?")

if ch_host = System.get_env("S3_CLICKHOUSE_HOST") do
URI.to_string(%URI{URI.parse(s3_url) | host: ch_host})
else
s3_url
end
end
else
defp extract_s3_url(presigned_url) do
[s3_url, _] = String.split(presigned_url, "?")
s3_url
end
end

@doc """
Chunks and uploads Zip archive to the provided S3 destination.

Expand Down Expand Up @@ -77,6 +132,12 @@ defmodule Plausible.S3 do

@doc """
Returns `access_key_id` and `secret_access_key` to be used by ClickHouse during imports from S3.

Example:

iex> import_clickhouse_credentials()
%{access_key_id: "minioadmin", secret_access_key: "minioadmin"}

"""
@spec import_clickhouse_credentials ::
%{access_key_id: String.t(), secret_access_key: String.t()}
Expand Down
79 changes: 46 additions & 33 deletions lib/workers/export_csv.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,43 +25,56 @@ defmodule Plausible.Workers.ExportCSV do
|> Keyword.replace!(:pool_size, 1)
|> Ch.start_link()

# NOTE: should we use site.timezone?
# %Ch.Result{rows: [[min_date, max_date]]} =
# Ch.query!(
# ch,
# "SELECT toDate(min(timestamp)), toDate(max(timestamp)) FROM events_v2 WHERE site_id={site_id:UInt64}",
# %{"site_id" => site_id}
# )

download_url =
DBConnection.run(
%Ch.Result{rows: [[%Date{} = min_date, %Date{} = max_date]]} =
Ch.query!(
ch,
fn conn ->
conn
|> Plausible.Exports.stream_archive(
# date_range: Date.range(min_date, max_date)
Plausible.Exports.export_queries(site_id, extname: ".csv"),
format: "CSVWithNames"
)
|> Plausible.S3.export_upload_multipart(s3_bucket, s3_path, s3_config_overrides(args))
end,
timeout: :infinity
"SELECT toDate(min(timestamp)), toDate(max(timestamp)) FROM events_v2 WHERE site_id={site_id:UInt64}",
%{"site_id" => site_id}
)

if max_date == ~D[1970-01-01] do
# NOTE: replace with proper Plausible.Email template
Plausible.Mailer.deliver_now!(
Bamboo.Email.new_email(
from: PlausibleWeb.Email.mailer_email_from(),
to: email,
subject: "EXPORT FAILURE",
text_body: "there is nothing to export"
)
)
else
download_url =
DBConnection.run(
ch,
fn conn ->
conn
|> Plausible.Exports.stream_archive(
Plausible.Exports.export_queries(site_id,
date_range: Date.range(min_date, max_date),
extname: ".csv"
),
format: "CSVWithNames"
)
|> Plausible.S3.export_upload_multipart(s3_bucket, s3_path, s3_config_overrides(args))
end,
timeout: :infinity
)

# NOTE: replace with proper Plausible.Email template
Plausible.Mailer.deliver_now!(
Bamboo.Email.new_email(
from: "[email protected]",
to: email,
subject: "EXPORT SUCCESS",
text_body: """
download it from #{download_url}! hurry up! you have 24 hours!"
""",
html_body: """
download it from <a href="#{download_url}">here</a>! hurry up! you have 24 hours!
"""
# NOTE: replace with proper Plausible.Email template
Plausible.Mailer.deliver_now!(
Bamboo.Email.new_email(
from: PlausibleWeb.Email.mailer_email_from(),
to: email,
subject: "EXPORT SUCCESS",
text_body: """
download it from #{download_url}! hurry up! you have 24 hours!"
""",
html_body: """
download it from <a href="#{download_url}">here</a>! hurry up! you have 24 hours!
"""
)
)
)
end

:ok
end
Expand Down
Loading
Loading