From 4c3cb6913e1a540a3a2ebb3636646725f6a8eae9 Mon Sep 17 00:00:00 2001 From: Ross Date: Fri, 10 Jan 2025 11:19:20 +0000 Subject: [PATCH] feat(batch-exports): Add `created_at` to Redshift persons batch export (#27403) --- .../batch_exports/postgres_batch_export.py | 21 ++ .../batch_exports/redshift_batch_export.py | 20 +- .../temporal/tests/batch_exports/README.md | 4 +- .../temporal/tests/batch_exports/conftest.py | 34 +++ .../test_redshift_batch_export_workflow.py | 226 +++++++++++++++++- 5 files changed, 288 insertions(+), 17 deletions(-) diff --git a/posthog/temporal/batch_exports/postgres_batch_export.py b/posthog/temporal/batch_exports/postgres_batch_export.py index 3c70e5b747dea..6a71d3d75e8ae 100644 --- a/posthog/temporal/batch_exports/postgres_batch_export.py +++ b/posthog/temporal/batch_exports/postgres_batch_export.py @@ -254,6 +254,27 @@ async def adelete_table(self, schema: str | None, table_name: str, not_found_ok: await cursor.execute(sql.SQL(base_query).format(table=table_identifier)) + async def aget_table_columns(self, schema: str | None, table_name: str) -> list[str]: + """Get the column names for a table in PostgreSQL. + + Args: + schema: Name of the schema where the table is located. + table_name: Name of the table to get columns for. + + Returns: + A list of column names in the table. + """ + if schema: + table_identifier = sql.Identifier(schema, table_name) + else: + table_identifier = sql.Identifier(table_name) + + async with self.connection.transaction(): + async with self.connection.cursor() as cursor: + await cursor.execute(sql.SQL("SELECT * FROM {} WHERE 1=0").format(table_identifier)) + columns = [column.name for column in cursor.description or []] + return columns + @contextlib.asynccontextmanager async def managed_table( self, diff --git a/posthog/temporal/batch_exports/redshift_batch_export.py b/posthog/temporal/batch_exports/redshift_batch_export.py index 7abbbc885e09f..61d3398078ea1 100644 --- a/posthog/temporal/batch_exports/redshift_batch_export.py +++ b/posthog/temporal/batch_exports/redshift_batch_export.py @@ -47,11 +47,11 @@ run_consumer, wait_for_schema_or_producer, ) -from posthog.temporal.batch_exports.temporary_file import BatchExportTemporaryFile, WriterFormat -from posthog.temporal.batch_exports.utils import ( - JsonType, - set_status_to_running_task, +from posthog.temporal.batch_exports.temporary_file import ( + BatchExportTemporaryFile, + WriterFormat, ) +from posthog.temporal.batch_exports.utils import JsonType, set_status_to_running_task from posthog.temporal.common.clickhouse import get_client from posthog.temporal.common.heartbeat import Heartbeater from posthog.temporal.common.logger import configure_temporal_worker_logger @@ -120,6 +120,10 @@ async def amerge_identical_tables( for field in merge_key ) + # Redshift doesn't support adding a condition on the merge, so we have + # to first delete any rows in stage that match those in final, where + # stage also has a higher version. Otherwise we risk merging adding old + # versions back. delete_condition = and_separator.join( sql.SQL("{final_field} = {stage_field}").format( final_field=sql.Identifier("final", field[0]), @@ -404,6 +408,7 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> Records include_events=inputs.include_events, extra_query_parameters=extra_query_parameters, max_record_batch_size_bytes=1024 * 1024 * 2, # 2MB + use_latest_schema=True, ) record_batch_schema = await wait_for_schema_or_producer(queue, producer_task) @@ -453,6 +458,13 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> Records primary_key = None async with RedshiftClient.from_inputs(inputs).connect() as redshift_client: + # filter out fields that are not in the destination table + try: + columns = await redshift_client.aget_table_columns(inputs.schema, inputs.table_name) + table_fields = [field for field in table_fields if field[0] in columns] + except psycopg.errors.UndefinedTable: + pass + async with ( redshift_client.managed_table( inputs.schema, inputs.table_name, table_fields, delete=False, primary_key=primary_key diff --git a/posthog/temporal/tests/batch_exports/README.md b/posthog/temporal/tests/batch_exports/README.md index c3f63f176ebde..3b66176052a83 100644 --- a/posthog/temporal/tests/batch_exports/README.md +++ b/posthog/temporal/tests/batch_exports/README.md @@ -10,6 +10,7 @@ BigQuery batch exports can be tested against a real BigQuery instance, but doing > Since BigQuery batch export tests require additional setup, we skip them by default and will not be ran by automated CI pipelines. Please ensure these tests pass when making changes that affect BigQuery batch exports. To enable testing for BigQuery batch exports, we require: + 1. A BigQuery project and dataset 2. A BigQuery ServiceAccount with access to said project and dataset. See the [BigQuery batch export documentation](https://posthog.com/docs/cdp/batch-exports/bigquery#setting-up-bigquery-access) on detailed steps to setup a ServiceAccount. @@ -29,11 +30,12 @@ Redshift batch exports can be tested against a real Redshift (or Redshift Server > Since Redshift batch export tests require additional setup, we skip them by default and will not be ran by automated CI pipelines. Please ensure these tests pass when making changes that affect Redshift batch exports. To enable testing for Redshift batch exports, we require: + 1. A Redshift (or Redshift Serverless) instance. 2. Network access to this instance (via a VPN connection or jumphost, making a Redshift instance publicly available has serious security implications). 3. User credentials (user requires `CREATEDB` permissions for testing but **not** superuser access). -For PostHog employees, check the password manager as a set of development credentials should already be available. With these credentials, and after connecting to the appropriate VPN, we can run the tests from the root of the `posthog` repo with: +For PostHog employees, check the password manager as a set of development credentials should already be available. You will also need to use the `dev` exit node in Tailscale and be added to the `group:engineering` group in the tailnet policy. With these credentials, and Tailscale setup, we can run the tests from the root of the `posthog` repo with: ```bash DEBUG=1 REDSHIFT_HOST=workgroup.111222333.region.redshift-serverless.amazonaws.com REDSHIFT_USER=test_user REDSHIFT_PASSWORD=test_password pytest posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py diff --git a/posthog/temporal/tests/batch_exports/conftest.py b/posthog/temporal/tests/batch_exports/conftest.py index 2c1d6f7793d15..e2449586e27a4 100644 --- a/posthog/temporal/tests/batch_exports/conftest.py +++ b/posthog/temporal/tests/batch_exports/conftest.py @@ -332,3 +332,37 @@ async def generate_test_data( persons_to_export_created.append(person_to_export) return (events_to_export_created, persons_to_export_created) + + +@pytest_asyncio.fixture +async def generate_test_persons_data(ateam, clickhouse_client, data_interval_start, data_interval_end): + """Generate test persons data in ClickHouse.""" + persons, _ = await generate_test_persons_in_clickhouse( + client=clickhouse_client, + team_id=ateam.pk, + start_time=data_interval_start, + end_time=data_interval_end, + count=10, + count_other_team=1, + properties={"utm_medium": "referral", "$initial_os": "Linux"}, + ) + + persons_to_export_created = [] + for person in persons: + person_distinct_id, _ = await generate_test_person_distinct_id2_in_clickhouse( + client=clickhouse_client, + team_id=ateam.pk, + person_id=uuid.UUID(person["id"]), + distinct_id=f"distinct-id-{uuid.UUID(person['id'])}", + timestamp=dt.datetime.fromisoformat(person["_timestamp"]), + ) + person_to_export = { + "team_id": person["team_id"], + "person_id": person["id"], + "distinct_id": person_distinct_id["distinct_id"], + "version": person_distinct_id["version"], + "_timestamp": dt.datetime.fromisoformat(person["_timestamp"]), + } + persons_to_export_created.append(person_to_export) + + return persons_to_export_created diff --git a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py index cbb75d8948301..80039276505bb 100644 --- a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py @@ -31,7 +31,9 @@ insert_into_redshift_activity, redshift_default_fields, ) -from posthog.temporal.batch_exports.temporary_file import remove_escaped_whitespace_recursive +from posthog.temporal.batch_exports.temporary_file import ( + remove_escaped_whitespace_recursive, +) from posthog.temporal.common.clickhouse import ClickHouseClient from posthog.temporal.tests.batch_exports.utils import mocked_start_batch_export_run from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse @@ -40,6 +42,10 @@ adelete_batch_export, afetch_batch_export_runs, ) +from posthog.temporal.tests.utils.persons import ( + generate_test_person_distinct_id2_in_clickhouse, + generate_test_persons_in_clickhouse, +) REQUIRED_ENV_VARS = ( "REDSHIFT_USER", @@ -52,6 +58,16 @@ pytestmark = [pytest.mark.django_db, pytest.mark.asyncio] +EXPECTED_PERSONS_BATCH_EXPORT_FIELDS = [ + "team_id", + "distinct_id", + "person_id", + "properties", + "person_version", + "person_distinct_id_version", + "created_at", +] + async def assert_clickhouse_records_in_redshfit( redshift_connection, @@ -67,6 +83,7 @@ async def assert_clickhouse_records_in_redshfit( sort_key: str = "event", is_backfill: bool = False, expected_duplicates_threshold: float = 0.0, + expected_fields: list[str] | None = None, ): """Assert expected records are written to a given Redshift table. @@ -93,6 +110,7 @@ async def assert_clickhouse_records_in_redshfit( date_ranges: Ranges of records we should expect to have been exported. expected_duplicates_threshold: Threshold of duplicates we should expect relative to number of unique events, fail if we exceed it. + expected_fields: The expected fields to be exported. """ super_columns = ["properties", "set", "set_once", "person_properties"] @@ -115,8 +133,10 @@ async def assert_clickhouse_records_in_redshfit( inserted_records.append(event) - schema_column_names = [field["alias"] for field in redshift_default_fields()] - if batch_export_model is not None: + schema_column_names = ( + expected_fields if expected_fields is not None else [field["alias"] for field in redshift_default_fields()] + ) + if batch_export_model is not None and expected_fields is None: if isinstance(batch_export_model, BatchExportModel): batch_export_schema = batch_export_model.schema else: @@ -125,15 +145,7 @@ async def assert_clickhouse_records_in_redshfit( if batch_export_schema is not None: schema_column_names = [field["alias"] for field in batch_export_schema["fields"]] elif isinstance(batch_export_model, BatchExportModel) and batch_export_model.name == "persons": - schema_column_names = [ - "team_id", - "distinct_id", - "person_id", - "properties", - "person_distinct_id_version", - "person_version", - "_inserted_at", - ] + schema_column_names = EXPECTED_PERSONS_BATCH_EXPORT_FIELDS expected_records = [] for data_interval_start, data_interval_end in date_ranges: @@ -147,6 +159,7 @@ async def assert_clickhouse_records_in_redshfit( include_events=include_events, destination_default_fields=redshift_default_fields(), is_backfill=is_backfill, + use_latest_schema=True, ): for record in record_batch.select(schema_column_names).to_pylist(): expected_record = {} @@ -242,6 +255,8 @@ async def psycopg_connection(redshift_config, setup_postgres_test_db): dbname=redshift_config["database"], host=redshift_config["host"], port=redshift_config["port"], + # this is needed, otherwise query results are cached + autocommit=True, ) connection.prepare_threshold = None @@ -835,3 +850,190 @@ class InsufficientPrivilege(Exception): assert run.status == "Failed" assert run.latest_error == "InsufficientPrivilege: A useful error message" assert run.records_completed is None + + +async def test_insert_into_redshift_activity_merges_data_in_follow_up_runs( + clickhouse_client, + activity_environment, + psycopg_connection, + redshift_config, + generate_test_persons_data, + data_interval_start, + data_interval_end, + ateam, +): + """Test that the `insert_into_redshift_activity` merges new versions of rows. + + This unit test looks at the mutability handling capabilities of the aforementioned activity. + We will generate a new entry in the persons table for half of the persons exported in a first + run of the activity. We expect the new entries to have replaced the old ones in Redshift after + the second run. + """ + if MISSING_REQUIRED_ENV_VARS: + pytest.skip("Persons batch export cannot be tested in PostgreSQL") + + model = BatchExportModel(name="persons", schema=None) + properties_data_type = "varchar" + + insert_inputs = RedshiftInsertInputs( + team_id=ateam.pk, + table_name=f"test_insert_activity_mutability_table_{ateam.pk}", + data_interval_start=data_interval_start.isoformat(), + data_interval_end=data_interval_end.isoformat(), + batch_export_model=model, + properties_data_type=properties_data_type, + **redshift_config, + ) + + await activity_environment.run(insert_into_redshift_activity, insert_inputs) + + await assert_clickhouse_records_in_redshfit( + redshift_connection=psycopg_connection, + clickhouse_client=clickhouse_client, + schema_name=redshift_config["schema"], + table_name=f"test_insert_activity_mutability_table_{ateam.pk}", + team_id=ateam.pk, + date_ranges=[(data_interval_start, data_interval_end)], + batch_export_model=model, + properties_data_type=properties_data_type, + sort_key="person_id", + ) + + persons_to_export_created = generate_test_persons_data + + for old_person in persons_to_export_created[: len(persons_to_export_created) // 2]: + new_person_id = uuid.uuid4() + new_person, _ = await generate_test_persons_in_clickhouse( + client=clickhouse_client, + team_id=ateam.pk, + start_time=data_interval_start, + end_time=data_interval_end, + person_id=new_person_id, + count=1, + properties={"utm_medium": "referral", "$initial_os": "Linux", "new_property": "Something"}, + ) + + await generate_test_person_distinct_id2_in_clickhouse( + clickhouse_client, + ateam.pk, + person_id=uuid.UUID(new_person[0]["id"]), + distinct_id=old_person["distinct_id"], + version=old_person["version"] + 1, + timestamp=old_person["_timestamp"], + ) + + await activity_environment.run(insert_into_redshift_activity, insert_inputs) + + await assert_clickhouse_records_in_redshfit( + redshift_connection=psycopg_connection, + clickhouse_client=clickhouse_client, + schema_name=redshift_config["schema"], + table_name=f"test_insert_activity_mutability_table_{ateam.pk}", + team_id=ateam.pk, + date_ranges=[(data_interval_start, data_interval_end)], + batch_export_model=model, + properties_data_type=properties_data_type, + sort_key="person_id", + ) + + +async def test_insert_into_redshift_activity_handles_person_schema_changes( + clickhouse_client, + activity_environment, + psycopg_connection, + redshift_config, + generate_test_persons_data, + data_interval_start, + data_interval_end, + ateam, +): + """Test that the `insert_into_redshift_activity` handles changes to the + person schema. + + If we update the schema of the persons model we export, we should still be + able to export the data without breaking existing exports. For example, any + new fields should not be added to the destination (in future we may want to + allow this but for now we don't). + + To replicate this situation we first export the data with the original + schema, then delete a column in the destination and then rerun the export. + """ + if MISSING_REQUIRED_ENV_VARS: + pytest.skip("Persons batch export cannot be tested in PostgreSQL") + + model = BatchExportModel(name="persons", schema=None) + properties_data_type = "varchar" + + insert_inputs = RedshiftInsertInputs( + team_id=ateam.pk, + table_name=f"test_insert_activity_migration_table__{ateam.pk}", + data_interval_start=data_interval_start.isoformat(), + data_interval_end=data_interval_end.isoformat(), + batch_export_model=model, + properties_data_type=properties_data_type, + **redshift_config, + ) + + await activity_environment.run(insert_into_redshift_activity, insert_inputs) + + await assert_clickhouse_records_in_redshfit( + redshift_connection=psycopg_connection, + clickhouse_client=clickhouse_client, + schema_name=redshift_config["schema"], + table_name=f"test_insert_activity_migration_table__{ateam.pk}", + team_id=ateam.pk, + date_ranges=[(data_interval_start, data_interval_end)], + batch_export_model=model, + properties_data_type=properties_data_type, + sort_key="person_id", + ) + + # Drop the created_at column from the Redshift table + async with psycopg_connection.transaction(): + async with psycopg_connection.cursor() as cursor: + await cursor.execute( + sql.SQL("ALTER TABLE {table} DROP COLUMN created_at").format( + table=sql.Identifier(redshift_config["schema"], f"test_insert_activity_migration_table__{ateam.pk}") + ) + ) + + persons_to_export_created = generate_test_persons_data + + for old_person in persons_to_export_created[: len(persons_to_export_created) // 2]: + new_person_id = uuid.uuid4() + new_person, _ = await generate_test_persons_in_clickhouse( + client=clickhouse_client, + team_id=ateam.pk, + start_time=data_interval_start, + end_time=data_interval_end, + person_id=new_person_id, + count=1, + properties={"utm_medium": "referral", "$initial_os": "Linux", "new_property": "Something"}, + ) + + await generate_test_person_distinct_id2_in_clickhouse( + clickhouse_client, + ateam.pk, + person_id=uuid.UUID(new_person[0]["id"]), + distinct_id=old_person["distinct_id"], + version=old_person["version"] + 1, + timestamp=old_person["_timestamp"], + ) + + await activity_environment.run(insert_into_redshift_activity, insert_inputs) + + # This time we don't expect there to be a created_at column + expected_fields = [f for f in EXPECTED_PERSONS_BATCH_EXPORT_FIELDS if f != "created_at"] + + await assert_clickhouse_records_in_redshfit( + redshift_connection=psycopg_connection, + clickhouse_client=clickhouse_client, + schema_name=redshift_config["schema"], + table_name=f"test_insert_activity_migration_table__{ateam.pk}", + team_id=ateam.pk, + date_ranges=[(data_interval_start, data_interval_end)], + batch_export_model=model, + properties_data_type=properties_data_type, + sort_key="person_id", + expected_fields=expected_fields, + )