Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSV exports (no UI) #3836

Merged
merged 2 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,8 @@ cloud_queues = [
trial_notification_emails: 1,
check_usage: 1,
notify_annual_renewal: 1,
lock_sites: 1
lock_sites: 1,
s3_csv_export: 1
]

queues = if(is_selfhost, do: base_queues, else: base_queues ++ cloud_queues)
Expand Down
349 changes: 349 additions & 0 deletions lib/plausible/exports.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,349 @@
defmodule Plausible.Exports do
@moduledoc """
Contains functions to export data for events and sessions as Zip archives.
"""

require Plausible
import Ecto.Query

@doc """
Builds Ecto queries to export data from `events_v2` and `sessions_v2`
tables into the format of `imported_*` tables for a website.
"""
@spec export_queries(pos_integer, extname: String.t(), date_range: Date.Range.t()) ::
%{String.t() => Ecto.Query.t()}
def export_queries(site_id, opts \\ []) do
extname = opts[:extname] || ".csv"
date_range = opts[:date_range]

filename = fn table ->
name =
if date_range do
first_date = Timex.format!(date_range.first, "{YYYY}{0M}{0D}")
last_date = Timex.format!(date_range.last, "{YYYY}{0M}{0D}")
"#{table}_#{first_date}_#{last_date}"
else
table
end

name <> extname
end

%{
filename.("imported_visitors") => export_visitors_q(site_id),
filename.("imported_sources") => export_sources_q(site_id),
# NOTE: this query can result in `MEMORY_LIMIT_EXCEEDED` error
filename.("imported_pages") => export_pages_q(site_id),
filename.("imported_entry_pages") => export_entry_pages_q(site_id),
filename.("imported_exit_pages") => export_exit_pages_q(site_id),
filename.("imported_locations") => export_locations_q(site_id),
filename.("imported_devices") => export_devices_q(site_id),
filename.("imported_browsers") => export_browsers_q(site_id),
filename.("imported_operating_systems") => export_operating_systems_q(site_id)
}
end

Plausible.on_full_build do
defp sampled(table) do
Plausible.Stats.Sampling.add_query_hint(from(table))
end
else
defp sampled(table) do
table
end
end

defmacrop date(timestamp) do
quote do
selected_as(fragment("toDate(?)", unquote(timestamp)), :date)
end
end

defmacrop visit_duration(t) do
quote do
selected_as(
fragment("greatest(sum(?*?),0)", unquote(t).sign, unquote(t).duration),
:visit_duration
)
end
end

defmacrop visitors(t) do
quote do
selected_as(
fragment("toUInt64(round(uniq(?)*any(_sample_factor)))", unquote(t).user_id),
:visitors
)
end
end

defmacrop visits(t) do
quote do
selected_as(sum(unquote(t).sign), :visits)
end
end

defmacrop bounces(t) do
quote do
selected_as(
fragment("greatest(sum(?*?),0)", unquote(t).sign, unquote(t).is_bounce),
:bounces
)
end
end

@spec export_visitors_q(pos_integer) :: Ecto.Query.t()
def export_visitors_q(site_id) do
visitors_sessions_q =
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id,
group_by: selected_as(:date),
select: %{
date: date(s.start),
bounces: bounces(s),
visits: visits(s),
visit_duration: visit_duration(s)
# NOTE: can we use just sessions_v2 table in this query? sum(pageviews) and visitors(s)?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While investigating issue that led to #3870, we have found cases where session.pageviews is inconsistent with actual related events - session had 0 for for pageviews but there was a pageview event associated with it, along with numerous other custom events. That might have been some one-off issue though as session updates don't do anything fancy here

pageviews:
if(event.name == "pageview", do: session.pageviews + 1, else: session.pageviews),
events: session.events + 1
. It might have been caused by cached session not being in sync with what's stored in the DB at the time of the update but can't tell for sure yet.

# visitors: visitors(s)
}

visitors_events_q =
from e in sampled("events_v2"),
where: e.site_id == ^site_id,
group_by: selected_as(:date),
select: %{
date: date(e.timestamp),
visitors: visitors(e),
pageviews:
selected_as(
fragment("toUInt64(round(countIf(?='pageview')*any(_sample_factor)))", e.name),
:pageviews
)
}

visitors_q =
"e"
|> with_cte("e", as: ^visitors_events_q)
|> with_cte("s", as: ^visitors_sessions_q)

from e in visitors_q,
full_join: s in "s",
on: e.date == s.date,
order_by: selected_as(:date),
select: [
selected_as(fragment("greatest(?,?)", s.date, e.date), :date),
e.visitors,
e.pageviews,
s.bounces,
s.visits,
s.visit_duration
]
end

@spec export_sources_q(pos_integer) :: Ecto.Query.t()
def export_sources_q(site_id) do
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id,
group_by: [
selected_as(:date),
selected_as(:source),
s.utm_medium,
s.utm_campaign,
s.utm_content,
s.utm_term
],
order_by: selected_as(:date),
select: [
date(s.start),
selected_as(s.referrer_source, :source),
s.utm_medium,
s.utm_campaign,
s.utm_content,
s.utm_term,
visitors(s),
visits(s),
visit_duration(s),
bounces(s)
]
end

@spec export_pages_q(pos_integer) :: Ecto.Query.t()
def export_pages_q(site_id) do
window_q =
from e in sampled("events_v2"),
where: e.site_id == ^site_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering it that shouldn't be limited to pageview events 🤔 If there's a custom event setup that for instance records custom events like "scrolled to the bottom", they might skew the time on page metric calculated like this. That's probably something that should be considered in the actual calculation logic at

defp window_breakdown_time_on_page(site, query, pages) do
first. It's also possible that I'm reading it completely wrong 😅

select: %{
timestamp: e.timestamp,
next_timestamp:
over(fragment("leadInFrame(?)", e.timestamp),
partition_by: e.session_id,
order_by: e.timestamp,
frame: fragment("ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING")
),
pathname: e.pathname,
hostname: e.hostname,
name: e.name,
user_id: e.user_id,
_sample_factor: fragment("_sample_factor")
}

from e in subquery(window_q),
group_by: [selected_as(:date), e.pathname],
order_by: selected_as(:date),
select: [
date(e.timestamp),
selected_as(fragment("any(?)", e.hostname), :hostname),
selected_as(e.pathname, :page),
visitors(e),
selected_as(
fragment("toUInt64(round(countIf(?='pageview')*any(_sample_factor)))", e.name),
:pageviews
),
# NOTE: are exits pageviews or any events?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#3870 might be relevant here. Currently, exit_page is set on all events, not only pageviews but that seems to be incorrect and leads to wrong calculation of other derivative metrics under some circumstances. We'll most likely switch to recording exit_pages only on events that are strictly pageviews.

selected_as(
fragment("toUInt64(round(countIf(?=0)*any(_sample_factor)))", e.next_timestamp),
:exits
),
selected_as(
fragment("sum(greatest(?,0))", e.next_timestamp - e.timestamp),
:time_on_page
)
]
end

@spec export_entry_pages_q(pos_integer) :: Ecto.Query.t()
def export_entry_pages_q(site_id) do
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id,
group_by: [selected_as(:date), s.entry_page],
order_by: selected_as(:date),
select: [
date(s.start),
s.entry_page,
visitors(s),
selected_as(
fragment("toUInt64(round(sum(?)*any(_sample_factor)))", s.sign),
:entrances
),
visit_duration(s),
bounces(s)
]
end

@spec export_exit_pages_q(pos_integer) :: Ecto.Query.t()
def export_exit_pages_q(site_id) do
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id,
group_by: [selected_as(:date), s.exit_page],
order_by: selected_as(:date),
select: [
date(s.start),
s.exit_page,
visitors(s),
selected_as(
fragment("toUInt64(round(sum(?)*any(_sample_factor)))", s.sign),
:exits
)
]
end

@spec export_locations_q(pos_integer) :: Ecto.Query.t()
def export_locations_q(site_id) do
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id,
where: s.city_geoname_id != 0 and s.country_code != "\0\0" and s.country_code != "ZZ",
group_by: [selected_as(:date), s.country_code, selected_as(:region), s.city_geoname_id],
order_by: selected_as(:date),
select: [
date(s.start),
selected_as(s.country_code, :country),
selected_as(s.subdivision1_code, :region),
selected_as(s.city_geoname_id, :city),
visitors(s),
visits(s),
visit_duration(s),
bounces(s)
]
end

@spec export_devices_q(pos_integer) :: Ecto.Query.t()
def export_devices_q(site_id) do
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id,
group_by: [selected_as(:date), s.screen_size],
order_by: selected_as(:date),
select: [
date(s.start),
selected_as(s.screen_size, :device),
visitors(s),
visits(s),
visit_duration(s),
bounces(s)
]
end

@spec export_browsers_q(pos_integer) :: Ecto.Query.t()
def export_browsers_q(site_id) do
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id,
group_by: [selected_as(:date), s.browser],
order_by: selected_as(:date),
select: [
date(s.start),
s.browser,
visitors(s),
visits(s),
visit_duration(s),
bounces(s)
]
end

@spec export_operating_systems_q(pos_integer) :: Ecto.Query.t()
def export_operating_systems_q(site_id) do
from s in sampled("sessions_v2"),
where: s.site_id == ^site_id,
group_by: [selected_as(:date), s.operating_system],
order_by: selected_as(:date),
select: [
date(s.start),
s.operating_system,
visitors(s),
visits(s),
visit_duration(s),
bounces(s)
]
end

@doc """
Creates a streamable Zip archive from the provided (named) Ecto queries.

Example usage:

{:ok, pool} = Ch.start_link(pool_size: 1)

DBConnection.run(pool, fn conn ->
conn
|> stream_archive(export_queries(_site_id = 1), format: "CSVWithNames")
|> Stream.into(File.stream!("export.zip"))
|> Stream.run()
end)

"""
@spec stream_archive(DBConnection.t(), %{String.t() => Ecto.Query.t()}, [Ch.query_option()]) ::
Enumerable.t()
def stream_archive(conn, named_queries, opts \\ []) do
entries =
Enum.map(named_queries, fn {name, query} ->
{sql, params} = Plausible.ClickhouseRepo.to_sql(:all, query)

datastream =
conn
|> Ch.stream(sql, params, opts)
|> Stream.map(fn %Ch.Result{data: data} -> data end)

Zstream.entry(name, datastream, coder: Zstream.Coder.Stored)
end)

Zstream.zip(entries)
end
end
Loading