Skip to content

Commit

Permalink
feat(batch-exports): Add created_at to Redshift persons batch export (
Browse files Browse the repository at this point in the history
  • Loading branch information
rossgray authored Jan 10, 2025
1 parent 9bb58d8 commit 4c3cb69
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 17 deletions.
21 changes: 21 additions & 0 deletions posthog/temporal/batch_exports/postgres_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,27 @@ async def adelete_table(self, schema: str | None, table_name: str, not_found_ok:

await cursor.execute(sql.SQL(base_query).format(table=table_identifier))

async def aget_table_columns(self, schema: str | None, table_name: str) -> list[str]:
"""Get the column names for a table in PostgreSQL.
Args:
schema: Name of the schema where the table is located.
table_name: Name of the table to get columns for.
Returns:
A list of column names in the table.
"""
if schema:
table_identifier = sql.Identifier(schema, table_name)
else:
table_identifier = sql.Identifier(table_name)

async with self.connection.transaction():
async with self.connection.cursor() as cursor:
await cursor.execute(sql.SQL("SELECT * FROM {} WHERE 1=0").format(table_identifier))
columns = [column.name for column in cursor.description or []]
return columns

@contextlib.asynccontextmanager
async def managed_table(
self,
Expand Down
20 changes: 16 additions & 4 deletions posthog/temporal/batch_exports/redshift_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@
run_consumer,
wait_for_schema_or_producer,
)
from posthog.temporal.batch_exports.temporary_file import BatchExportTemporaryFile, WriterFormat
from posthog.temporal.batch_exports.utils import (
JsonType,
set_status_to_running_task,
from posthog.temporal.batch_exports.temporary_file import (
BatchExportTemporaryFile,
WriterFormat,
)
from posthog.temporal.batch_exports.utils import JsonType, set_status_to_running_task
from posthog.temporal.common.clickhouse import get_client
from posthog.temporal.common.heartbeat import Heartbeater
from posthog.temporal.common.logger import configure_temporal_worker_logger
Expand Down Expand Up @@ -120,6 +120,10 @@ async def amerge_identical_tables(
for field in merge_key
)

# Redshift doesn't support adding a condition on the merge, so we have
# to first delete any rows in stage that match those in final, where
# stage also has a higher version. Otherwise we risk merging adding old
# versions back.
delete_condition = and_separator.join(
sql.SQL("{final_field} = {stage_field}").format(
final_field=sql.Identifier("final", field[0]),
Expand Down Expand Up @@ -404,6 +408,7 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> Records
include_events=inputs.include_events,
extra_query_parameters=extra_query_parameters,
max_record_batch_size_bytes=1024 * 1024 * 2, # 2MB
use_latest_schema=True,
)

record_batch_schema = await wait_for_schema_or_producer(queue, producer_task)
Expand Down Expand Up @@ -453,6 +458,13 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> Records
primary_key = None

async with RedshiftClient.from_inputs(inputs).connect() as redshift_client:
# filter out fields that are not in the destination table
try:
columns = await redshift_client.aget_table_columns(inputs.schema, inputs.table_name)
table_fields = [field for field in table_fields if field[0] in columns]
except psycopg.errors.UndefinedTable:
pass

async with (
redshift_client.managed_table(
inputs.schema, inputs.table_name, table_fields, delete=False, primary_key=primary_key
Expand Down
4 changes: 3 additions & 1 deletion posthog/temporal/tests/batch_exports/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ BigQuery batch exports can be tested against a real BigQuery instance, but doing
> Since BigQuery batch export tests require additional setup, we skip them by default and will not be ran by automated CI pipelines. Please ensure these tests pass when making changes that affect BigQuery batch exports.
To enable testing for BigQuery batch exports, we require:

1. A BigQuery project and dataset
2. A BigQuery ServiceAccount with access to said project and dataset. See the [BigQuery batch export documentation](https://posthog.com/docs/cdp/batch-exports/bigquery#setting-up-bigquery-access) on detailed steps to setup a ServiceAccount.

Expand All @@ -29,11 +30,12 @@ Redshift batch exports can be tested against a real Redshift (or Redshift Server
> Since Redshift batch export tests require additional setup, we skip them by default and will not be ran by automated CI pipelines. Please ensure these tests pass when making changes that affect Redshift batch exports.
To enable testing for Redshift batch exports, we require:

1. A Redshift (or Redshift Serverless) instance.
2. Network access to this instance (via a VPN connection or jumphost, making a Redshift instance publicly available has serious security implications).
3. User credentials (user requires `CREATEDB` permissions for testing but **not** superuser access).

For PostHog employees, check the password manager as a set of development credentials should already be available. With these credentials, and after connecting to the appropriate VPN, we can run the tests from the root of the `posthog` repo with:
For PostHog employees, check the password manager as a set of development credentials should already be available. You will also need to use the `dev` exit node in Tailscale and be added to the `group:engineering` group in the tailnet policy. With these credentials, and Tailscale setup, we can run the tests from the root of the `posthog` repo with:

```bash
DEBUG=1 REDSHIFT_HOST=workgroup.111222333.region.redshift-serverless.amazonaws.com REDSHIFT_USER=test_user REDSHIFT_PASSWORD=test_password pytest posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py
Expand Down
34 changes: 34 additions & 0 deletions posthog/temporal/tests/batch_exports/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,37 @@ async def generate_test_data(
persons_to_export_created.append(person_to_export)

return (events_to_export_created, persons_to_export_created)


@pytest_asyncio.fixture
async def generate_test_persons_data(ateam, clickhouse_client, data_interval_start, data_interval_end):
"""Generate test persons data in ClickHouse."""
persons, _ = await generate_test_persons_in_clickhouse(
client=clickhouse_client,
team_id=ateam.pk,
start_time=data_interval_start,
end_time=data_interval_end,
count=10,
count_other_team=1,
properties={"utm_medium": "referral", "$initial_os": "Linux"},
)

persons_to_export_created = []
for person in persons:
person_distinct_id, _ = await generate_test_person_distinct_id2_in_clickhouse(
client=clickhouse_client,
team_id=ateam.pk,
person_id=uuid.UUID(person["id"]),
distinct_id=f"distinct-id-{uuid.UUID(person['id'])}",
timestamp=dt.datetime.fromisoformat(person["_timestamp"]),
)
person_to_export = {
"team_id": person["team_id"],
"person_id": person["id"],
"distinct_id": person_distinct_id["distinct_id"],
"version": person_distinct_id["version"],
"_timestamp": dt.datetime.fromisoformat(person["_timestamp"]),
}
persons_to_export_created.append(person_to_export)

return persons_to_export_created
Loading

0 comments on commit 4c3cb69

Please sign in to comment.