Skip to content

Commit

Permalink
feat(persons-on-events): add required person and group columns to eve…
Browse files Browse the repository at this point in the history
…nts table (#9251)

* refactor(ingestion): establish setup for json consumption from kafka into clickhouse [nuke protobuf pt. 1]

* address review

* fix kafka table name across the board

* Update posthog/async_migrations/test/test_0004_replicated_schema.py

* run checks

* feat(persons-on-events): add required person and group columns to events table

* rename

* update snapshots

* address review

* Revert "update snapshots"

This reverts commit 63d7126.

* address review

* update snapshots

* update more snapshots

* use runpython

* update schemas

* update more queries

* some improvements :D

* fix naming

* fix breakdown prop name

* update snapshot

* fix naming

* fix ambiguous test

* fix queries'

* last bits

* fix typo to retrigger tests

* also handle kafka and mv tables in migration

* update snapshots

* drop tables if exists

Co-authored-by: eric <[email protected]>
  • Loading branch information
yakkomajuri and EDsCODE authored Apr 13, 2022
1 parent 38bf3ef commit 3d71ad0
Show file tree
Hide file tree
Showing 22 changed files with 164 additions and 47 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ For <100K events ingested monthly on Linux with Docker (recommended 4GB memory):
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/posthog/posthog/HEAD/bin/deploy-hobby)"
```
### Option 2: Production instance on your infrastructure
Follow our <a href="https://posthog.com/docs/self-host/overview#deploy">Scaleable Self-Hosting Guide</a> for all major cloud service providers and on-premise deploys
Follow our <a href="https://posthog.com/docs/self-host/overview#deploy">Scalable Self-Hosting Guide</a> for all major cloud service providers and on-premise deploys

### Option 3: If you don't need to self-host
Sign up for a free [PostHog Cloud](https://app.posthog.com/signup) project
Expand Down
2 changes: 1 addition & 1 deletion ee/clickhouse/materialized_columns/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from constance import config
from django.utils.timezone import now

from ee.clickhouse.materialized_columns.replication import clickhouse_is_replicated
from ee.clickhouse.materialized_columns.util import cache_for
from ee.clickhouse.replication.utils import clickhouse_is_replicated
from ee.clickhouse.sql.clickhouse import trim_quotes_expr
from posthog.client import sync_execute
from posthog.models.property import PropertyName, TableWithProperties
Expand Down
35 changes: 35 additions & 0 deletions ee/clickhouse/migrations/0026_persons_and_groups_on_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from infi.clickhouse_orm import migrations

from ee.clickhouse.replication.utils import clickhouse_is_replicated
from ee.clickhouse.sql.events import EVENTS_TABLE_JSON_MV_SQL, KAFKA_EVENTS_TABLE_JSON_SQL
from posthog.client import sync_execute
from posthog.settings import CLICKHOUSE_CLUSTER

ADD_COLUMNS_BASE_SQL = """
ALTER TABLE {table}
ON CLUSTER '{cluster}'
ADD COLUMN IF NOT EXISTS person_id UUID,
ADD COLUMN IF NOT EXISTS person_properties VARCHAR,
ADD COLUMN IF NOT EXISTS group0_properties VARCHAR,
ADD COLUMN IF NOT EXISTS group1_properties VARCHAR,
ADD COLUMN IF NOT EXISTS group2_properties VARCHAR,
ADD COLUMN IF NOT EXISTS group3_properties VARCHAR,
ADD COLUMN IF NOT EXISTS group4_properties VARCHAR
"""


def add_columns_to_required_tables(_):
sync_execute(ADD_COLUMNS_BASE_SQL.format(table="events", cluster=CLICKHOUSE_CLUSTER))

if clickhouse_is_replicated():
sync_execute(ADD_COLUMNS_BASE_SQL.format(table="writable_events", cluster=CLICKHOUSE_CLUSTER))
sync_execute(ADD_COLUMNS_BASE_SQL.format(table="sharded_events", cluster=CLICKHOUSE_CLUSTER))


operations = [
migrations.RunPython(add_columns_to_required_tables),
migrations.RunSQL(f"DROP TABLE IF EXISTS events_json_mv ON CLUSTER '{CLICKHOUSE_CLUSTER}'"),
migrations.RunSQL(f"DROP TABLE IF EXISTS kafka_events_json ON CLUSTER '{CLICKHOUSE_CLUSTER}'"),
migrations.RunSQL(KAFKA_EVENTS_TABLE_JSON_SQL()),
migrations.RunSQL(EVENTS_TABLE_JSON_MV_SQL()),
]
10 changes: 6 additions & 4 deletions ee/clickhouse/models/cohort.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def format_person_query(

for group_idx, group in enumerate(groups):
if group.get("action_id") or group.get("event_id"):
entity_query, entity_params = get_entity_cohort_subquery(cohort, group, group_idx)
entity_query, entity_params = get_entity_cohort_subquery(cohort, group, group_idx, custom_match_field)
params = {**params, **entity_params}
filters.append(entity_query)

Expand Down Expand Up @@ -128,7 +128,9 @@ def get_properties_cohort_subquery(cohort: Cohort, cohort_group: Dict, group_idx
return "\n".join(query_parts).replace("AND ", "", 1), params


def get_entity_cohort_subquery(cohort: Cohort, cohort_group: Dict, group_idx: int):
def get_entity_cohort_subquery(
cohort: Cohort, cohort_group: Dict, group_idx: int, custom_match_field: str = "person_id"
):
event_id = cohort_group.get("event_id")
action_id = cohort_group.get("action_id")
days = cohort_group.get("days")
Expand Down Expand Up @@ -157,7 +159,7 @@ def get_entity_cohort_subquery(cohort: Cohort, cohort_group: Dict, group_idx: in

params: Dict[str, Union[str, int]] = {"count": int(count), **entity_params, **date_params}

return f"{'NOT' if is_negation else ''} person_id IN ({extract_person})", params
return f"{'NOT' if is_negation else ''} {custom_match_field} IN ({extract_person})", params
else:
extract_person = GET_DISTINCT_ID_BY_ENTITY_SQL.format(entity_query=entity_query, date_query=date_query,)
return f"distinct_id IN ({extract_person})", {**entity_params, **date_params}
Expand Down Expand Up @@ -240,7 +242,7 @@ def is_precalculated_query(cohort: Cohort) -> bool:


def format_filter_query(cohort: Cohort, index: int = 0, id_column: str = "distinct_id") -> Tuple[str, Dict[str, Any]]:
person_query, params = format_cohort_subquery(cohort, index)
person_query, params = format_cohort_subquery(cohort, index, "person_id")

person_id_query = CALCULATE_COHORT_PEOPLE_SQL.format(
query=person_query,
Expand Down
1 change: 1 addition & 0 deletions ee/clickhouse/queries/breakdown_props.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def get_breakdown_prop_values(
prepend="e_brkdwn",
person_properties_mode=PersonPropertiesMode.USING_PERSON_PROPERTIES_COLUMN,
allow_denormalized_props=True,
person_id_joined_alias="pdi.person_id",
)

entity_params, entity_format_params = get_entity_filtering_params(entity=entity, team_id=team_id, table_name="e")
Expand Down
8 changes: 5 additions & 3 deletions ee/clickhouse/queries/related_actors_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@


class RelatedActorsQuery:
DISTINCT_ID_TABLE_ALIAS = "pdi"

"""
This query calculates other groups and persons that are related to a person or a group.
Expand Down Expand Up @@ -49,7 +51,7 @@ def _query_related_people(self) -> List[SerializedPerson]:
person_ids = self._take_first(
sync_execute(
f"""
SELECT DISTINCT person_id
SELECT DISTINCT {self.DISTINCT_ID_TABLE_ALIAS}.person_id
FROM events e
{self._distinct_ids_join}
WHERE team_id = %(team_id)s
Expand Down Expand Up @@ -102,11 +104,11 @@ def _filter_clause(self):
if self.is_aggregating_by_groups:
return f"$group_{self.group_type_index} = %(id)s"
else:
return "person_id = %(id)s"
return f"{self.DISTINCT_ID_TABLE_ALIAS}.person_id = %(id)s"

@property
def _distinct_ids_join(self):
return f"JOIN ({get_team_distinct_ids_query(self.team_id)}) pdi on e.distinct_id = pdi.distinct_id"
return f"JOIN ({get_team_distinct_ids_query(self.team_id)}) {self.DISTINCT_ID_TABLE_ALIAS} on e.distinct_id = {self.DISTINCT_ID_TABLE_ALIAS}.distinct_id"

@cached_property
def _params(self):
Expand Down
10 changes: 5 additions & 5 deletions ee/clickhouse/queries/test/__snapshots__/test_lifecycle.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
ORDER BY period ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as following_activity,
created_at
FROM
(SELECT DISTINCT person_id,
(SELECT DISTINCT pdi.person_id as person_id,
toDateTime(dateTrunc('day', events.timestamp)) AS period,
person.created_at AS created_at
FROM events AS e
Expand Down Expand Up @@ -131,7 +131,7 @@
ORDER BY period ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as following_activity,
created_at
FROM
(SELECT DISTINCT person_id,
(SELECT DISTINCT pdi.person_id as person_id,
toDateTime(dateTrunc('month', events.timestamp)) AS period,
person.created_at AS created_at
FROM events AS e
Expand Down Expand Up @@ -214,7 +214,7 @@
ORDER BY period ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as following_activity,
created_at
FROM
(SELECT DISTINCT person_id,
(SELECT DISTINCT pdi.person_id as person_id,
toDateTime(dateTrunc('week', events.timestamp)) AS period,
person.created_at AS created_at
FROM events AS e
Expand Down Expand Up @@ -297,7 +297,7 @@
ORDER BY period ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as following_activity,
created_at
FROM
(SELECT DISTINCT person_id,
(SELECT DISTINCT pdi.person_id as person_id,
toDateTime(dateTrunc('day', events.timestamp)) AS period,
person.created_at AS created_at
FROM events AS e
Expand Down Expand Up @@ -380,7 +380,7 @@
ORDER BY period ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as following_activity,
created_at
FROM
(SELECT DISTINCT person_id,
(SELECT DISTINCT pdi.person_id as person_id,
toDateTime(dateTrunc('day', events.timestamp)) AS period,
person.created_at AS created_at
FROM events AS e
Expand Down
9 changes: 7 additions & 2 deletions ee/clickhouse/queries/trends/breakdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@


class ClickhouseTrendsBreakdown:
DISTINCT_ID_TABLE_ALIAS = "pdi"

def __init__(
self, entity: Entity, filter: Filter, team: Team, column_optimizer: Optional[EnterpriseColumnOptimizer] = None
):
Expand Down Expand Up @@ -69,8 +71,11 @@ def get_query(self) -> Tuple[str, Dict, Callable]:
property_group=outer_properties,
table_name="e",
person_properties_mode=PersonPropertiesMode.USING_PERSON_PROPERTIES_COLUMN,
person_id_joined_alias=f"{self.DISTINCT_ID_TABLE_ALIAS}.person_id",
)
aggregate_operation, _, math_params = process_math(
self.entity, self.team, event_table_alias="e", person_id_alias=f"{self.DISTINCT_ID_TABLE_ALIAS}.person_id"
)
aggregate_operation, _, math_params = process_math(self.entity, self.team, event_table_alias="e")

action_query = ""
action_params: Dict = {}
Expand Down Expand Up @@ -341,7 +346,7 @@ def _person_join_condition(self) -> Tuple[str, Dict]:
f"""
{event_join}
INNER JOIN ({query}) person
ON person.id = pdi.person_id
ON person.id = {self.DISTINCT_ID_TABLE_ALIAS}.person_id
""",
params,
)
Expand Down
2 changes: 1 addition & 1 deletion ee/clickhouse/queries/trends/lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def get_query(self):
return (
f"""
SELECT DISTINCT
person_id,
{self.DISTINCT_ID_TABLE_ALIAS}.person_id as person_id,
toDateTime(dateTrunc(%(interval)s, events.timestamp)) AS period,
person.created_at AS created_at
FROM events AS {self.EVENT_TABLE_ALIAS}
Expand Down
2 changes: 1 addition & 1 deletion ee/clickhouse/queries/trends/total_volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ClickhouseTrendsTotalVolume:
def _total_volume_query(self, entity: Entity, filter: Filter, team: Team) -> Tuple[str, Dict, Callable]:
trunc_func = get_trunc_func_ch(filter.interval)
interval_func = get_interval_func_ch(filter.interval)
aggregate_operation, join_condition, math_params = process_math(entity, team)
aggregate_operation, join_condition, math_params = process_math(entity, team, person_id_alias="person_id")

trend_event_query = TrendsEventQuery(
filter=filter,
Expand Down
4 changes: 2 additions & 2 deletions ee/clickhouse/queries/trends/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@


def process_math(
entity: Entity, team: Team, event_table_alias: Optional[str] = None
entity: Entity, team: Team, event_table_alias: Optional[str] = None, person_id_alias: str = "person_id"
) -> Tuple[str, str, Dict[str, Any]]:
aggregate_operation = "count(*)"
join_condition = ""
Expand All @@ -36,7 +36,7 @@ def process_math(
aggregate_operation = f"count(DISTINCT {event_table_alias + '.' if event_table_alias else ''}distinct_id)"
else:
join_condition = EVENT_JOIN_PERSON_SQL
aggregate_operation = "count(DISTINCT person_id)"
aggregate_operation = f"count(DISTINCT {person_id_alias})"
elif entity.math == "unique_group":
validate_group_type_index("math_group_type_index", entity.math_group_type_index, required=True)

Expand Down
Empty file.
File renamed without changes.
2 changes: 1 addition & 1 deletion ee/clickhouse/sql/cohort.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
"""

GET_PERSON_ID_BY_ENTITY_COUNT_SQL = """
SELECT person_id FROM events
SELECT pdi.person_id as person_id FROM events
INNER JOIN ({GET_TEAM_PERSON_DISTINCT_IDS}) as pdi
ON events.distinct_id = pdi.distinct_id
WHERE team_id = %(team_id)s {date_query} AND {entity_query}
Expand Down
31 changes: 27 additions & 4 deletions ee/clickhouse/sql/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@
team_id Int64,
distinct_id VARCHAR,
elements_chain VARCHAR,
created_at DateTime64(6, 'UTC')
created_at DateTime64(6, 'UTC'),
person_id UUID,
person_properties VARCHAR,
group0_properties VARCHAR,
group1_properties VARCHAR,
group2_properties VARCHAR,
group3_properties VARCHAR,
group4_properties VARCHAR
{materialized_columns}
{extra_fields}
) ENGINE = {engine}
Expand All @@ -48,7 +55,7 @@
"""

EVENTS_DATA_TABLE_ENGINE = lambda: ReplacingMergeTree(
"events", ver="_timestamp", replication_scheme=ReplicationScheme.SHARDED
"events", ver="_timestamp", replication_scheme=ReplicationScheme.SHARDED,
)
EVENTS_TABLE_SQL = lambda: (
EVENTS_TABLE_BASE_SQL
Expand Down Expand Up @@ -98,7 +105,16 @@
database=settings.CLICKHOUSE_DATABASE,
)

KAFKA_EVENTS_TABLE_JSON_SQL = lambda: EVENTS_TABLE_BASE_SQL.format(
# we add the settings to prevent poison pills from stopping ingestion
# kafka_skip_broken_messages is an int, not a boolean, so we explicitly set
# the max block size to consume from kafka such that we skip _all_ broken messages
# this is an added safety mechanism given we control payloads to this topic
KAFKA_EVENTS_TABLE_JSON_SQL = lambda: (
EVENTS_TABLE_BASE_SQL
+ """
SETTINGS kafka_max_block_size=65505, kafka_skip_broken_messages=65505
"""
).format(
table_name="kafka_events_json",
cluster=settings.CLICKHOUSE_CLUSTER,
engine=kafka_engine(topic=KAFKA_EVENTS_JSON),
Expand All @@ -118,6 +134,13 @@
distinct_id,
elements_chain,
created_at,
person_id,
person_properties,
group0_properties,
group1_properties,
group2_properties,
group3_properties,
group4_properties,
_timestamp,
_offset
FROM {database}.kafka_events_json
Expand Down Expand Up @@ -308,7 +331,7 @@
ORDER BY tag_count desc, tag_name
LIMIT %(limit)s
""".format(
tag_regex=EXTRACT_TAG_REGEX, text_regex=EXTRACT_TEXT_REGEX
tag_regex=EXTRACT_TAG_REGEX, text_regex=EXTRACT_TEXT_REGEX,
)

GET_CUSTOM_EVENTS = """
Expand Down
Loading

0 comments on commit 3d71ad0

Please sign in to comment.