-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
0353680
commit e421301
Showing
3 changed files
with
13 additions
and
89 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,90 +5,6 @@ defmodule Plausible.Exports do | |
|
||
import Ecto.Query | ||
|
||
# TODO time_on_page | ||
# TODO sampling | ||
# TODO export_visitors_q only can use just sessions_v2? | ||
|
||
# @sample to_string(2_000_000) | ||
|
||
def export_import do | ||
### clean | ||
|
||
imported_com = Plausible.Repo.get_by!(Plausible.Site, domain: "imported.com") | ||
Plausible.Repo.delete_all(where("site_imports", site_id: ^imported_com.id)) | ||
|
||
Enum.each(Plausible.Imported.tables(), fn table -> | ||
Plausible.ImportDeletionRepo.query!( | ||
"ALTER TABLE {$0:Identifier} DELETE WHERE site_id = {$1:UInt64}", | ||
[table, imported_com.id], | ||
settings: [mutations_sync: 1] | ||
) | ||
end) | ||
|
||
Plausible.Sites.clear_stats_start_date!(imported_com) | ||
|
||
if File.exists?("Plausible") do | ||
File.ls!("Plausible") | ||
|> Enum.each(fn file -> | ||
File.rm!("Plausible/" <> file) | ||
end) | ||
|
||
File.rmdir!("Plausible") | ||
end | ||
|
||
File.rm("Plausible.zip") | ||
|
||
### export | ||
|
||
task = | ||
Task.async(fn -> | ||
{:ok, ch} = | ||
Plausible.ClickhouseRepo.config() | ||
|> Keyword.replace!(:pool_size, 1) | ||
|> Ch.start_link() | ||
|
||
DBConnection.run( | ||
ch, | ||
fn conn -> | ||
conn | ||
|> stream_archive( | ||
export_queries(_plausible_io = 37, extname: ".csv"), | ||
format: "CSVWithNames" | ||
) | ||
|> Stream.into(File.stream!("Plausible.zip")) | ||
|> Stream.run() | ||
end, | ||
timeout: :infinity | ||
) | ||
|
||
IO.inspect(Float.round(File.stat!("Plausible.zip").size / 1_000_000, 2), | ||
label: "Plausible.zip size (MB)" | ||
) | ||
end) | ||
|
||
Task.await(task, :infinity) | ||
|
||
### import | ||
|
||
{:ok, files} = :zip.unzip(~c"Plausible.zip", cwd: ~c"Plausible") | ||
|
||
uploads = | ||
Enum.map(files, fn ~c"Plausible/" ++ file -> | ||
key = "#{imported_com.id}/#{file}" | ||
ExAws.request!(ExAws.S3.put_object("imports", key, File.read!("Plausible/#{file}"))) | ||
%{"filename" => file, "s3_url" => "http://172.17.0.1:6000/imports/" <> key} | ||
end) | ||
|
||
user = Plausible.Repo.get_by!(Plausible.Auth.User, email: "[email protected]") | ||
|
||
{:ok, _job} = | ||
Plausible.Imported.CSVImporter.new_import(imported_com, user, | ||
start_date: ~D[1970-01-01], | ||
end_date: ~D[1970-01-01], | ||
uploads: uploads | ||
) | ||
end | ||
|
||
@doc """ | ||
Builds Ecto queries to export data from `events_v2` and `sessions_v2` | ||
tables into the format of `imported_*` tables for a website. | ||
|
@@ -142,7 +58,6 @@ defmodule Plausible.Exports do | |
end | ||
|
||
defmacrop bounces(t) do | ||
# TODO multiply by sample_factor? | ||
quote do | ||
selected_as( | ||
fragment("greatest(sum(?*?),0)", unquote(t).sign, unquote(t).is_bounce), | ||
|
@@ -155,6 +70,7 @@ defmodule Plausible.Exports do | |
def export_visitors_q(site_id) do | ||
visitors_sessions_q = | ||
from s in "sessions_v2", | ||
# NOTE: no smapling is used right now | ||
# hints: ["SAMPLE", unsafe_fragment(^@sample)], | ||
where: s.site_id == ^site_id, | ||
group_by: selected_as(:date), | ||
|
@@ -163,12 +79,13 @@ defmodule Plausible.Exports do | |
bounces: bounces(s), | ||
visits: visits(s), | ||
visit_duration: visit_duration(s) | ||
# TODO | ||
# NOTE: can we use just sessions_v2 table in this query? sum(pageviews) and visitors(s)? | ||
# visitors: visitors(s) | ||
} | ||
|
||
visitors_events_q = | ||
from e in "events_v2", | ||
# NOTE: no smapling is used right now | ||
# hints: ["SAMPLE", unsafe_fragment(^@sample)], | ||
where: e.site_id == ^site_id, | ||
group_by: selected_as(:date), | ||
|
@@ -204,6 +121,7 @@ defmodule Plausible.Exports do | |
@spec export_sources_q(pos_integer) :: Ecto.Query.t() | ||
def export_sources_q(site_id) do | ||
from s in "sessions_v2", | ||
# NOTE: no smapling is used right now | ||
# hints: ["SAMPLE", unsafe_fragment(^@sample)], | ||
where: s.site_id == ^site_id, | ||
group_by: [ | ||
|
@@ -233,6 +151,7 @@ defmodule Plausible.Exports do | |
def export_pages_q(site_id) do | ||
window_q = | ||
from e in "events_v2", | ||
# NOTE: no smapling is used right now | ||
# hints: ["SAMPLE", unsafe_fragment(^@sample)], | ||
where: e.site_id == ^site_id, | ||
select: %{ | ||
|
@@ -262,7 +181,7 @@ defmodule Plausible.Exports do | |
fragment("toUInt64(round(countIf(?='pageview')*any(_sample_factor)))", e.name), | ||
:pageviews | ||
), | ||
# TODO are exits pageviews or any events? | ||
# NOTE: are exits pageviews or any events? | ||
selected_as( | ||
fragment("toUInt64(round(countIf(?=0)*any(_sample_factor)))", e.next_timestamp), | ||
:exits | ||
|
@@ -277,6 +196,7 @@ defmodule Plausible.Exports do | |
@spec export_entry_pages_q(pos_integer) :: Ecto.Query.t() | ||
def export_entry_pages_q(site_id) do | ||
from s in "sessions_v2", | ||
# NOTE: no smapling is used right now | ||
# hints: ["SAMPLE", unsafe_fragment(^@sample)], | ||
where: s.site_id == ^site_id, | ||
group_by: [selected_as(:date), s.entry_page], | ||
|
@@ -297,6 +217,7 @@ defmodule Plausible.Exports do | |
@spec export_exit_pages_q(pos_integer) :: Ecto.Query.t() | ||
def export_exit_pages_q(site_id) do | ||
from s in "sessions_v2", | ||
# NOTE: no smapling is used right now | ||
# hints: ["SAMPLE", unsafe_fragment(^@sample)], | ||
where: s.site_id == ^site_id, | ||
group_by: [selected_as(:date), s.exit_page], | ||
|
@@ -315,6 +236,7 @@ defmodule Plausible.Exports do | |
@spec export_locations_q(pos_integer) :: Ecto.Query.t() | ||
def export_locations_q(site_id) do | ||
from s in "sessions_v2", | ||
# NOTE: no smapling is used right now | ||
# hints: ["SAMPLE", unsafe_fragment(^@sample)], | ||
where: s.site_id == ^site_id, | ||
where: s.city_geoname_id != 0 and s.country_code != "\0\0" and s.country_code != "ZZ", | ||
|
@@ -335,6 +257,7 @@ defmodule Plausible.Exports do | |
@spec export_devices_q(pos_integer) :: Ecto.Query.t() | ||
def export_devices_q(site_id) do | ||
from s in "sessions_v2", | ||
# NOTE: no smapling is used right now | ||
# hints: ["SAMPLE", unsafe_fragment(^@sample)], | ||
where: s.site_id == ^site_id, | ||
group_by: [selected_as(:date), s.screen_size], | ||
|
@@ -352,6 +275,7 @@ defmodule Plausible.Exports do | |
@spec export_browsers_q(pos_integer) :: Ecto.Query.t() | ||
def export_browsers_q(site_id) do | ||
from s in "sessions_v2", | ||
# NOTE: no smapling is used right now | ||
# hints: ["SAMPLE", unsafe_fragment(^@sample)], | ||
where: s.site_id == ^site_id, | ||
group_by: [selected_as(:date), s.browser], | ||
|
@@ -369,6 +293,7 @@ defmodule Plausible.Exports do | |
@spec export_operating_systems_q(pos_integer) :: Ecto.Query.t() | ||
def export_operating_systems_q(site_id) do | ||
from s in "sessions_v2", | ||
# NOTE: no smapling is used right now | ||
# hints: ["SAMPLE", unsafe_fragment(^@sample)], | ||
where: s.site_id == ^site_id, | ||
group_by: [selected_as(:date), s.operating_system], | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters