From 3d71ad07cf974cbd2c2e87f687e02ff052385486 Mon Sep 17 00:00:00 2001 From: Yakko Majuri <38760734+yakkomajuri@users.noreply.github.com> Date: Wed, 13 Apr 2022 10:48:07 +0000 Subject: [PATCH] feat(persons-on-events): add required person and group columns to events table (#9251) * refactor(ingestion): establish setup for json consumption from kafka into clickhouse [nuke protobuf pt. 1] * address review * fix kafka table name across the board * Update posthog/async_migrations/test/test_0004_replicated_schema.py * run checks * feat(persons-on-events): add required person and group columns to events table * rename * update snapshots * address review * Revert "update snapshots" This reverts commit 63d7126e08631763b3ec71f1da8db8ddadb88bd8. * address review * update snapshots * update more snapshots * use runpython * update schemas * update more queries * some improvements :D * fix naming * fix breakdown prop name * update snapshot * fix naming * fix ambiguous test * fix queries' * last bits * fix typo to retrigger tests * also handle kafka and mv tables in migration * update snapshots * drop tables if exists Co-authored-by: eric --- README.md | 2 +- ee/clickhouse/materialized_columns/columns.py | 2 +- .../0026_persons_and_groups_on_events.py | 35 +++++++++++ ee/clickhouse/models/cohort.py | 10 +-- ee/clickhouse/queries/breakdown_props.py | 1 + ee/clickhouse/queries/related_actors_query.py | 8 ++- .../test/__snapshots__/test_lifecycle.ambr | 10 +-- ee/clickhouse/queries/trends/breakdown.py | 9 ++- ee/clickhouse/queries/trends/lifecycle.py | 2 +- ee/clickhouse/queries/trends/total_volume.py | 2 +- ee/clickhouse/queries/trends/util.py | 4 +- ee/clickhouse/replication/__init__.py | 0 .../replication.py => replication/utils.py} | 0 ee/clickhouse/sql/cohort.py | 2 +- ee/clickhouse/sql/events.py | 31 +++++++-- .../sql/test/__snapshots__/test_schema.ambr | 63 ++++++++++++++++--- ee/clickhouse/sql/trends/breakdown.py | 6 +- .../test_clickhouse_experiments.ambr | 6 +- .../__snapshots__/test_clickhouse_groups.ambr | 6 +- .../__snapshots__/test_clickhouse_trends.ambr | 6 +- ee/tasks/materialized_columns.py | 2 +- .../test_0004_replicated_schema.ambr | 4 +- 22 files changed, 164 insertions(+), 47 deletions(-) create mode 100644 ee/clickhouse/migrations/0026_persons_and_groups_on_events.py create mode 100644 ee/clickhouse/replication/__init__.py rename ee/clickhouse/{materialized_columns/replication.py => replication/utils.py} (100%) diff --git a/README.md b/README.md index 2d0b1753544ec..4d6130dab78b5 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ For <100K events ingested monthly on Linux with Docker (recommended 4GB memory): /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/posthog/posthog/HEAD/bin/deploy-hobby)" ``` ### Option 2: Production instance on your infrastructure -Follow our Scaleable Self-Hosting Guide for all major cloud service providers and on-premise deploys +Follow our Scalable Self-Hosting Guide for all major cloud service providers and on-premise deploys ### Option 3: If you don't need to self-host Sign up for a free [PostHog Cloud](https://app.posthog.com/signup) project diff --git a/ee/clickhouse/materialized_columns/columns.py b/ee/clickhouse/materialized_columns/columns.py index 91b4839a7fd25..f9cd4a7fb50e9 100644 --- a/ee/clickhouse/materialized_columns/columns.py +++ b/ee/clickhouse/materialized_columns/columns.py @@ -5,8 +5,8 @@ from constance import config from django.utils.timezone import now -from ee.clickhouse.materialized_columns.replication import clickhouse_is_replicated from ee.clickhouse.materialized_columns.util import cache_for +from ee.clickhouse.replication.utils import clickhouse_is_replicated from ee.clickhouse.sql.clickhouse import trim_quotes_expr from posthog.client import sync_execute from posthog.models.property import PropertyName, TableWithProperties diff --git a/ee/clickhouse/migrations/0026_persons_and_groups_on_events.py b/ee/clickhouse/migrations/0026_persons_and_groups_on_events.py new file mode 100644 index 0000000000000..1670abf11b362 --- /dev/null +++ b/ee/clickhouse/migrations/0026_persons_and_groups_on_events.py @@ -0,0 +1,35 @@ +from infi.clickhouse_orm import migrations + +from ee.clickhouse.replication.utils import clickhouse_is_replicated +from ee.clickhouse.sql.events import EVENTS_TABLE_JSON_MV_SQL, KAFKA_EVENTS_TABLE_JSON_SQL +from posthog.client import sync_execute +from posthog.settings import CLICKHOUSE_CLUSTER + +ADD_COLUMNS_BASE_SQL = """ +ALTER TABLE {table} +ON CLUSTER '{cluster}' +ADD COLUMN IF NOT EXISTS person_id UUID, +ADD COLUMN IF NOT EXISTS person_properties VARCHAR, +ADD COLUMN IF NOT EXISTS group0_properties VARCHAR, +ADD COLUMN IF NOT EXISTS group1_properties VARCHAR, +ADD COLUMN IF NOT EXISTS group2_properties VARCHAR, +ADD COLUMN IF NOT EXISTS group3_properties VARCHAR, +ADD COLUMN IF NOT EXISTS group4_properties VARCHAR +""" + + +def add_columns_to_required_tables(_): + sync_execute(ADD_COLUMNS_BASE_SQL.format(table="events", cluster=CLICKHOUSE_CLUSTER)) + + if clickhouse_is_replicated(): + sync_execute(ADD_COLUMNS_BASE_SQL.format(table="writable_events", cluster=CLICKHOUSE_CLUSTER)) + sync_execute(ADD_COLUMNS_BASE_SQL.format(table="sharded_events", cluster=CLICKHOUSE_CLUSTER)) + + +operations = [ + migrations.RunPython(add_columns_to_required_tables), + migrations.RunSQL(f"DROP TABLE IF EXISTS events_json_mv ON CLUSTER '{CLICKHOUSE_CLUSTER}'"), + migrations.RunSQL(f"DROP TABLE IF EXISTS kafka_events_json ON CLUSTER '{CLICKHOUSE_CLUSTER}'"), + migrations.RunSQL(KAFKA_EVENTS_TABLE_JSON_SQL()), + migrations.RunSQL(EVENTS_TABLE_JSON_MV_SQL()), +] diff --git a/ee/clickhouse/models/cohort.py b/ee/clickhouse/models/cohort.py index e43bd1a3b1dfe..12bd1e69f2d31 100644 --- a/ee/clickhouse/models/cohort.py +++ b/ee/clickhouse/models/cohort.py @@ -55,7 +55,7 @@ def format_person_query( for group_idx, group in enumerate(groups): if group.get("action_id") or group.get("event_id"): - entity_query, entity_params = get_entity_cohort_subquery(cohort, group, group_idx) + entity_query, entity_params = get_entity_cohort_subquery(cohort, group, group_idx, custom_match_field) params = {**params, **entity_params} filters.append(entity_query) @@ -128,7 +128,9 @@ def get_properties_cohort_subquery(cohort: Cohort, cohort_group: Dict, group_idx return "\n".join(query_parts).replace("AND ", "", 1), params -def get_entity_cohort_subquery(cohort: Cohort, cohort_group: Dict, group_idx: int): +def get_entity_cohort_subquery( + cohort: Cohort, cohort_group: Dict, group_idx: int, custom_match_field: str = "person_id" +): event_id = cohort_group.get("event_id") action_id = cohort_group.get("action_id") days = cohort_group.get("days") @@ -157,7 +159,7 @@ def get_entity_cohort_subquery(cohort: Cohort, cohort_group: Dict, group_idx: in params: Dict[str, Union[str, int]] = {"count": int(count), **entity_params, **date_params} - return f"{'NOT' if is_negation else ''} person_id IN ({extract_person})", params + return f"{'NOT' if is_negation else ''} {custom_match_field} IN ({extract_person})", params else: extract_person = GET_DISTINCT_ID_BY_ENTITY_SQL.format(entity_query=entity_query, date_query=date_query,) return f"distinct_id IN ({extract_person})", {**entity_params, **date_params} @@ -240,7 +242,7 @@ def is_precalculated_query(cohort: Cohort) -> bool: def format_filter_query(cohort: Cohort, index: int = 0, id_column: str = "distinct_id") -> Tuple[str, Dict[str, Any]]: - person_query, params = format_cohort_subquery(cohort, index) + person_query, params = format_cohort_subquery(cohort, index, "person_id") person_id_query = CALCULATE_COHORT_PEOPLE_SQL.format( query=person_query, diff --git a/ee/clickhouse/queries/breakdown_props.py b/ee/clickhouse/queries/breakdown_props.py index 0e13482df39f2..5154887fc2184 100644 --- a/ee/clickhouse/queries/breakdown_props.py +++ b/ee/clickhouse/queries/breakdown_props.py @@ -50,6 +50,7 @@ def get_breakdown_prop_values( prepend="e_brkdwn", person_properties_mode=PersonPropertiesMode.USING_PERSON_PROPERTIES_COLUMN, allow_denormalized_props=True, + person_id_joined_alias="pdi.person_id", ) entity_params, entity_format_params = get_entity_filtering_params(entity=entity, team_id=team_id, table_name="e") diff --git a/ee/clickhouse/queries/related_actors_query.py b/ee/clickhouse/queries/related_actors_query.py index 46d4f6d069234..66608526ebb57 100644 --- a/ee/clickhouse/queries/related_actors_query.py +++ b/ee/clickhouse/queries/related_actors_query.py @@ -19,6 +19,8 @@ class RelatedActorsQuery: + DISTINCT_ID_TABLE_ALIAS = "pdi" + """ This query calculates other groups and persons that are related to a person or a group. @@ -49,7 +51,7 @@ def _query_related_people(self) -> List[SerializedPerson]: person_ids = self._take_first( sync_execute( f""" - SELECT DISTINCT person_id + SELECT DISTINCT {self.DISTINCT_ID_TABLE_ALIAS}.person_id FROM events e {self._distinct_ids_join} WHERE team_id = %(team_id)s @@ -102,11 +104,11 @@ def _filter_clause(self): if self.is_aggregating_by_groups: return f"$group_{self.group_type_index} = %(id)s" else: - return "person_id = %(id)s" + return f"{self.DISTINCT_ID_TABLE_ALIAS}.person_id = %(id)s" @property def _distinct_ids_join(self): - return f"JOIN ({get_team_distinct_ids_query(self.team_id)}) pdi on e.distinct_id = pdi.distinct_id" + return f"JOIN ({get_team_distinct_ids_query(self.team_id)}) {self.DISTINCT_ID_TABLE_ALIAS} on e.distinct_id = {self.DISTINCT_ID_TABLE_ALIAS}.distinct_id" @cached_property def _params(self): diff --git a/ee/clickhouse/queries/test/__snapshots__/test_lifecycle.ambr b/ee/clickhouse/queries/test/__snapshots__/test_lifecycle.ambr index 5f2fba259edf8..653e4c2187431 100644 --- a/ee/clickhouse/queries/test/__snapshots__/test_lifecycle.ambr +++ b/ee/clickhouse/queries/test/__snapshots__/test_lifecycle.ambr @@ -48,7 +48,7 @@ ORDER BY period ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as following_activity, created_at FROM - (SELECT DISTINCT person_id, + (SELECT DISTINCT pdi.person_id as person_id, toDateTime(dateTrunc('day', events.timestamp)) AS period, person.created_at AS created_at FROM events AS e @@ -131,7 +131,7 @@ ORDER BY period ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as following_activity, created_at FROM - (SELECT DISTINCT person_id, + (SELECT DISTINCT pdi.person_id as person_id, toDateTime(dateTrunc('month', events.timestamp)) AS period, person.created_at AS created_at FROM events AS e @@ -214,7 +214,7 @@ ORDER BY period ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as following_activity, created_at FROM - (SELECT DISTINCT person_id, + (SELECT DISTINCT pdi.person_id as person_id, toDateTime(dateTrunc('week', events.timestamp)) AS period, person.created_at AS created_at FROM events AS e @@ -297,7 +297,7 @@ ORDER BY period ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as following_activity, created_at FROM - (SELECT DISTINCT person_id, + (SELECT DISTINCT pdi.person_id as person_id, toDateTime(dateTrunc('day', events.timestamp)) AS period, person.created_at AS created_at FROM events AS e @@ -380,7 +380,7 @@ ORDER BY period ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as following_activity, created_at FROM - (SELECT DISTINCT person_id, + (SELECT DISTINCT pdi.person_id as person_id, toDateTime(dateTrunc('day', events.timestamp)) AS period, person.created_at AS created_at FROM events AS e diff --git a/ee/clickhouse/queries/trends/breakdown.py b/ee/clickhouse/queries/trends/breakdown.py index 5c533029a9a66..e7e14f015e08d 100644 --- a/ee/clickhouse/queries/trends/breakdown.py +++ b/ee/clickhouse/queries/trends/breakdown.py @@ -42,6 +42,8 @@ class ClickhouseTrendsBreakdown: + DISTINCT_ID_TABLE_ALIAS = "pdi" + def __init__( self, entity: Entity, filter: Filter, team: Team, column_optimizer: Optional[EnterpriseColumnOptimizer] = None ): @@ -69,8 +71,11 @@ def get_query(self) -> Tuple[str, Dict, Callable]: property_group=outer_properties, table_name="e", person_properties_mode=PersonPropertiesMode.USING_PERSON_PROPERTIES_COLUMN, + person_id_joined_alias=f"{self.DISTINCT_ID_TABLE_ALIAS}.person_id", + ) + aggregate_operation, _, math_params = process_math( + self.entity, self.team, event_table_alias="e", person_id_alias=f"{self.DISTINCT_ID_TABLE_ALIAS}.person_id" ) - aggregate_operation, _, math_params = process_math(self.entity, self.team, event_table_alias="e") action_query = "" action_params: Dict = {} @@ -341,7 +346,7 @@ def _person_join_condition(self) -> Tuple[str, Dict]: f""" {event_join} INNER JOIN ({query}) person - ON person.id = pdi.person_id + ON person.id = {self.DISTINCT_ID_TABLE_ALIAS}.person_id """, params, ) diff --git a/ee/clickhouse/queries/trends/lifecycle.py b/ee/clickhouse/queries/trends/lifecycle.py index 7655058a7c217..91e78d9c51585 100644 --- a/ee/clickhouse/queries/trends/lifecycle.py +++ b/ee/clickhouse/queries/trends/lifecycle.py @@ -105,7 +105,7 @@ def get_query(self): return ( f""" SELECT DISTINCT - person_id, + {self.DISTINCT_ID_TABLE_ALIAS}.person_id as person_id, toDateTime(dateTrunc(%(interval)s, events.timestamp)) AS period, person.created_at AS created_at FROM events AS {self.EVENT_TABLE_ALIAS} diff --git a/ee/clickhouse/queries/trends/total_volume.py b/ee/clickhouse/queries/trends/total_volume.py index 232dbf772fb2f..a3ba892a14b8f 100644 --- a/ee/clickhouse/queries/trends/total_volume.py +++ b/ee/clickhouse/queries/trends/total_volume.py @@ -23,7 +23,7 @@ class ClickhouseTrendsTotalVolume: def _total_volume_query(self, entity: Entity, filter: Filter, team: Team) -> Tuple[str, Dict, Callable]: trunc_func = get_trunc_func_ch(filter.interval) interval_func = get_interval_func_ch(filter.interval) - aggregate_operation, join_condition, math_params = process_math(entity, team) + aggregate_operation, join_condition, math_params = process_math(entity, team, person_id_alias="person_id") trend_event_query = TrendsEventQuery( filter=filter, diff --git a/ee/clickhouse/queries/trends/util.py b/ee/clickhouse/queries/trends/util.py index 9a21740b30257..2253d4d9a5331 100644 --- a/ee/clickhouse/queries/trends/util.py +++ b/ee/clickhouse/queries/trends/util.py @@ -25,7 +25,7 @@ def process_math( - entity: Entity, team: Team, event_table_alias: Optional[str] = None + entity: Entity, team: Team, event_table_alias: Optional[str] = None, person_id_alias: str = "person_id" ) -> Tuple[str, str, Dict[str, Any]]: aggregate_operation = "count(*)" join_condition = "" @@ -36,7 +36,7 @@ def process_math( aggregate_operation = f"count(DISTINCT {event_table_alias + '.' if event_table_alias else ''}distinct_id)" else: join_condition = EVENT_JOIN_PERSON_SQL - aggregate_operation = "count(DISTINCT person_id)" + aggregate_operation = f"count(DISTINCT {person_id_alias})" elif entity.math == "unique_group": validate_group_type_index("math_group_type_index", entity.math_group_type_index, required=True) diff --git a/ee/clickhouse/replication/__init__.py b/ee/clickhouse/replication/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/ee/clickhouse/materialized_columns/replication.py b/ee/clickhouse/replication/utils.py similarity index 100% rename from ee/clickhouse/materialized_columns/replication.py rename to ee/clickhouse/replication/utils.py diff --git a/ee/clickhouse/sql/cohort.py b/ee/clickhouse/sql/cohort.py index 5b98cb3e6a6b2..960df721f4ce8 100644 --- a/ee/clickhouse/sql/cohort.py +++ b/ee/clickhouse/sql/cohort.py @@ -68,7 +68,7 @@ """ GET_PERSON_ID_BY_ENTITY_COUNT_SQL = """ -SELECT person_id FROM events +SELECT pdi.person_id as person_id FROM events INNER JOIN ({GET_TEAM_PERSON_DISTINCT_IDS}) as pdi ON events.distinct_id = pdi.distinct_id WHERE team_id = %(team_id)s {date_query} AND {entity_query} diff --git a/ee/clickhouse/sql/events.py b/ee/clickhouse/sql/events.py index 2b8c49feb8d26..ab9643655f3f7 100644 --- a/ee/clickhouse/sql/events.py +++ b/ee/clickhouse/sql/events.py @@ -21,7 +21,14 @@ team_id Int64, distinct_id VARCHAR, elements_chain VARCHAR, - created_at DateTime64(6, 'UTC') + created_at DateTime64(6, 'UTC'), + person_id UUID, + person_properties VARCHAR, + group0_properties VARCHAR, + group1_properties VARCHAR, + group2_properties VARCHAR, + group3_properties VARCHAR, + group4_properties VARCHAR {materialized_columns} {extra_fields} ) ENGINE = {engine} @@ -48,7 +55,7 @@ """ EVENTS_DATA_TABLE_ENGINE = lambda: ReplacingMergeTree( - "events", ver="_timestamp", replication_scheme=ReplicationScheme.SHARDED + "events", ver="_timestamp", replication_scheme=ReplicationScheme.SHARDED, ) EVENTS_TABLE_SQL = lambda: ( EVENTS_TABLE_BASE_SQL @@ -98,7 +105,16 @@ database=settings.CLICKHOUSE_DATABASE, ) -KAFKA_EVENTS_TABLE_JSON_SQL = lambda: EVENTS_TABLE_BASE_SQL.format( +# we add the settings to prevent poison pills from stopping ingestion +# kafka_skip_broken_messages is an int, not a boolean, so we explicitly set +# the max block size to consume from kafka such that we skip _all_ broken messages +# this is an added safety mechanism given we control payloads to this topic +KAFKA_EVENTS_TABLE_JSON_SQL = lambda: ( + EVENTS_TABLE_BASE_SQL + + """ + SETTINGS kafka_max_block_size=65505, kafka_skip_broken_messages=65505 +""" +).format( table_name="kafka_events_json", cluster=settings.CLICKHOUSE_CLUSTER, engine=kafka_engine(topic=KAFKA_EVENTS_JSON), @@ -118,6 +134,13 @@ distinct_id, elements_chain, created_at, +person_id, +person_properties, +group0_properties, +group1_properties, +group2_properties, +group3_properties, +group4_properties, _timestamp, _offset FROM {database}.kafka_events_json @@ -308,7 +331,7 @@ ORDER BY tag_count desc, tag_name LIMIT %(limit)s """.format( - tag_regex=EXTRACT_TAG_REGEX, text_regex=EXTRACT_TEXT_REGEX + tag_regex=EXTRACT_TAG_REGEX, text_regex=EXTRACT_TEXT_REGEX, ) GET_CUSTOM_EVENTS = """ diff --git a/ee/clickhouse/sql/test/__snapshots__/test_schema.ambr b/ee/clickhouse/sql/test/__snapshots__/test_schema.ambr index e85c5d13f88d5..4ab7fca0d7350 100644 --- a/ee/clickhouse/sql/test/__snapshots__/test_schema.ambr +++ b/ee/clickhouse/sql/test/__snapshots__/test_schema.ambr @@ -10,7 +10,14 @@ team_id Int64, distinct_id VARCHAR, elements_chain VARCHAR, - created_at DateTime64(6, 'UTC') + created_at DateTime64(6, 'UTC'), + person_id UUID, + person_properties VARCHAR, + group0_properties VARCHAR, + group1_properties VARCHAR, + group2_properties VARCHAR, + group3_properties VARCHAR, + group4_properties VARCHAR ) ENGINE = Kafka('kafka', 'clickhouse_events_proto_test', 'group1', 'JSONEachRow') @@ -29,7 +36,14 @@ team_id Int64, distinct_id VARCHAR, elements_chain VARCHAR, - created_at DateTime64(6, 'UTC') + created_at DateTime64(6, 'UTC'), + person_id UUID, + person_properties VARCHAR, + group0_properties VARCHAR, + group1_properties VARCHAR, + group2_properties VARCHAR, + group3_properties VARCHAR, + group4_properties VARCHAR ) ENGINE = @@ -195,7 +209,14 @@ team_id Int64, distinct_id VARCHAR, elements_chain VARCHAR, - created_at DateTime64(6, 'UTC') + created_at DateTime64(6, 'UTC'), + person_id UUID, + person_properties VARCHAR, + group0_properties VARCHAR, + group1_properties VARCHAR, + group2_properties VARCHAR, + group3_properties VARCHAR, + group4_properties VARCHAR , $group_0 VARCHAR COMMENT 'column_materializer::$group_0' , $group_1 VARCHAR COMMENT 'column_materializer::$group_1' @@ -342,7 +363,14 @@ team_id Int64, distinct_id VARCHAR, elements_chain VARCHAR, - created_at DateTime64(6, 'UTC') + created_at DateTime64(6, 'UTC'), + person_id UUID, + person_properties VARCHAR, + group0_properties VARCHAR, + group1_properties VARCHAR, + group2_properties VARCHAR, + group3_properties VARCHAR, + group4_properties VARCHAR ) ENGINE = @@ -717,7 +745,14 @@ team_id Int64, distinct_id VARCHAR, elements_chain VARCHAR, - created_at DateTime64(6, 'UTC') + created_at DateTime64(6, 'UTC'), + person_id UUID, + person_properties VARCHAR, + group0_properties VARCHAR, + group1_properties VARCHAR, + group2_properties VARCHAR, + group3_properties VARCHAR, + group4_properties VARCHAR , $group_0 VARCHAR MATERIALIZED replaceRegexpAll(JSONExtractRaw(properties, '$group_0'), '^"|"$', '') COMMENT 'column_materializer::$group_0' , $group_1 VARCHAR MATERIALIZED replaceRegexpAll(JSONExtractRaw(properties, '$group_1'), '^"|"$', '') COMMENT 'column_materializer::$group_1' @@ -779,7 +814,14 @@ team_id Int64, distinct_id VARCHAR, elements_chain VARCHAR, - created_at DateTime64(6, 'UTC') + created_at DateTime64(6, 'UTC'), + person_id UUID, + person_properties VARCHAR, + group0_properties VARCHAR, + group1_properties VARCHAR, + group2_properties VARCHAR, + group3_properties VARCHAR, + group4_properties VARCHAR , _timestamp DateTime @@ -997,7 +1039,14 @@ team_id Int64, distinct_id VARCHAR, elements_chain VARCHAR, - created_at DateTime64(6, 'UTC') + created_at DateTime64(6, 'UTC'), + person_id UUID, + person_properties VARCHAR, + group0_properties VARCHAR, + group1_properties VARCHAR, + group2_properties VARCHAR, + group3_properties VARCHAR, + group4_properties VARCHAR , $group_0 VARCHAR MATERIALIZED replaceRegexpAll(JSONExtractRaw(properties, '$group_0'), '^"|"$', '') COMMENT 'column_materializer::$group_0' , $group_1 VARCHAR MATERIALIZED replaceRegexpAll(JSONExtractRaw(properties, '$group_1'), '^"|"$', '') COMMENT 'column_materializer::$group_1' diff --git a/ee/clickhouse/sql/trends/breakdown.py b/ee/clickhouse/sql/trends/breakdown.py index 2904cbee590f2..82e4b62efc59d 100644 --- a/ee/clickhouse/sql/trends/breakdown.py +++ b/ee/clickhouse/sql/trends/breakdown.py @@ -85,7 +85,7 @@ breakdown_value FROM ( SELECT - person_id, + pdi.person_id as person_id, timestamp, {breakdown_value} as breakdown_value FROM @@ -95,7 +95,7 @@ {breakdown_filter} ) GROUP BY person_id, breakdown_value -) +) AS pdi GROUP BY day_start, breakdown_value """ @@ -106,7 +106,7 @@ SELECT toStartOfDay(timestamp) as timestamp FROM events e WHERE team_id = %(team_id)s {parsed_date_from_prev_range} {parsed_date_to} GROUP BY timestamp ) d CROSS JOIN ( - SELECT toStartOfDay(timestamp) as timestamp, person_id, {breakdown_value} as breakdown_value + SELECT toStartOfDay(timestamp) as timestamp, pdi.person_id AS person_id, {breakdown_value} as breakdown_value FROM events e INNER JOIN ({GET_TEAM_PERSON_DISTINCT_IDS}) as pdi ON e.distinct_id = pdi.distinct_id diff --git a/ee/clickhouse/views/test/__snapshots__/test_clickhouse_experiments.ambr b/ee/clickhouse/views/test/__snapshots__/test_clickhouse_experiments.ambr index cc31e704cd1a2..560cf7725c45e 100644 --- a/ee/clickhouse/views/test/__snapshots__/test_clickhouse_experiments.ambr +++ b/ee/clickhouse/views/test/__snapshots__/test_clickhouse_experiments.ambr @@ -301,7 +301,7 @@ JOIN breakdown_value) as sec ORDER BY breakdown_value, day_start - UNION ALL SELECT count(DISTINCT person_id) as total, + UNION ALL SELECT count(DISTINCT pdi.person_id) as total, toDateTime(toStartOfDay(timestamp), 'UTC') as day_start, breakdown_value FROM @@ -309,7 +309,7 @@ min(timestamp) as timestamp, breakdown_value FROM - (SELECT person_id, + (SELECT pdi.person_id as person_id, timestamp, replaceRegexpAll(JSONExtractRaw(properties, '$feature_flag_response'), '^"|"$', '') as breakdown_value FROM events e @@ -328,7 +328,7 @@ AND timestamp <= '2020-01-06 23:59:59' AND replaceRegexpAll(JSONExtractRaw(properties, '$feature_flag_response'), '^"|"$', '') in (['control', 'test']) ) GROUP BY person_id, - breakdown_value) + breakdown_value) AS pdi GROUP BY day_start, breakdown_value)) GROUP BY day_start, diff --git a/ee/clickhouse/views/test/__snapshots__/test_clickhouse_groups.ambr b/ee/clickhouse/views/test/__snapshots__/test_clickhouse_groups.ambr index 7c1c6965ffc93..9510a8a3c1549 100644 --- a/ee/clickhouse/views/test/__snapshots__/test_clickhouse_groups.ambr +++ b/ee/clickhouse/views/test/__snapshots__/test_clickhouse_groups.ambr @@ -1,7 +1,7 @@ # name: ClickhouseTestGroupsApi.test_related_groups ' /* request:api_projects_(?P[^_.]+)_groups_related_?$ (ClickhouseGroupsView) */ - SELECT DISTINCT person_id + SELECT DISTINCT pdi.person_id FROM events e JOIN (SELECT distinct_id, @@ -57,7 +57,7 @@ AND timestamp > '2021-02-09T00:00:00.000000' AND timestamp < '2021-05-10T00:00:00.000000' AND group_key != '' - AND person_id = '01795392-cc00-0003-7dc7-67a694604d72' + AND pdi.person_id = '01795392-cc00-0003-7dc7-67a694604d72' ORDER BY group_key ' --- @@ -83,7 +83,7 @@ AND timestamp > '2021-02-09T00:00:00.000000' AND timestamp < '2021-05-10T00:00:00.000000' AND group_key != '' - AND person_id = '01795392-cc00-0003-7dc7-67a694604d72' + AND pdi.person_id = '01795392-cc00-0003-7dc7-67a694604d72' ORDER BY group_key ' --- diff --git a/ee/clickhouse/views/test/__snapshots__/test_clickhouse_trends.ambr b/ee/clickhouse/views/test/__snapshots__/test_clickhouse_trends.ambr index 40563ea20c010..036cfa2b1b055 100644 --- a/ee/clickhouse/views/test/__snapshots__/test_clickhouse_trends.ambr +++ b/ee/clickhouse/views/test/__snapshots__/test_clickhouse_trends.ambr @@ -450,7 +450,7 @@ JOIN breakdown_value) as sec ORDER BY breakdown_value, day_start - UNION ALL SELECT count(DISTINCT person_id) as total, + UNION ALL SELECT count(DISTINCT pdi.person_id) as total, toDateTime(toStartOfDay(timestamp), 'UTC') as day_start, breakdown_value FROM @@ -458,7 +458,7 @@ min(timestamp) as timestamp, breakdown_value FROM - (SELECT person_id, + (SELECT pdi.person_id as person_id, timestamp, replaceRegexpAll(JSONExtractRaw(properties, 'key'), '^"|"$', '') as breakdown_value FROM events e @@ -475,7 +475,7 @@ AND timestamp <= '2012-01-15 23:59:59' AND replaceRegexpAll(JSONExtractRaw(properties, 'key'), '^"|"$', '') in (['val', 'notval']) ) GROUP BY person_id, - breakdown_value) + breakdown_value) AS pdi GROUP BY day_start, breakdown_value)) GROUP BY day_start, diff --git a/ee/tasks/materialized_columns.py b/ee/tasks/materialized_columns.py index dbba837c24da5..52bdb76d25218 100644 --- a/ee/tasks/materialized_columns.py +++ b/ee/tasks/materialized_columns.py @@ -1,7 +1,7 @@ from celery.utils.log import get_task_logger from ee.clickhouse.materialized_columns.columns import TRIM_AND_EXTRACT_PROPERTY, ColumnName, get_materialized_columns -from ee.clickhouse.materialized_columns.replication import clickhouse_is_replicated +from ee.clickhouse.replication.utils import clickhouse_is_replicated from posthog.client import sync_execute from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE diff --git a/posthog/async_migrations/test/__snapshots__/test_0004_replicated_schema.ambr b/posthog/async_migrations/test/__snapshots__/test_0004_replicated_schema.ambr index 30bab7335b9cb..5f5bdf17e347d 100644 --- a/posthog/async_migrations/test/__snapshots__/test_0004_replicated_schema.ambr +++ b/posthog/async_migrations/test/__snapshots__/test_0004_replicated_schema.ambr @@ -103,7 +103,7 @@ # name: Test0004ReplicatedSchema.test_migration.6 ( 'kafka_events_json', - "Kafka('kafka', 'clickhouse_events_json_test', 'group1', 'JSONEachRow')", + "Kafka('kafka', 'clickhouse_events_json_test', 'group1', 'JSONEachRow') SETTINGS kafka_max_block_size = 65505, kafka_skip_broken_messages = 65505", ) --- # name: Test0004ReplicatedSchema.test_migration.7 @@ -205,7 +205,7 @@ # name: Test0004ReplicatedSchema.test_rollback.6 ( 'kafka_events_json', - "Kafka('kafka', 'clickhouse_events_json_test', 'group1', 'JSONEachRow')", + "Kafka('kafka', 'clickhouse_events_json_test', 'group1', 'JSONEachRow') SETTINGS kafka_max_block_size = 65505, kafka_skip_broken_messages = 65505", ) --- # name: Test0004ReplicatedSchema.test_rollback.7