Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch-exports): Add created_at to Redshift persons batch export #27403

Merged
merged 1 commit into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions posthog/temporal/batch_exports/postgres_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,27 @@ async def adelete_table(self, schema: str | None, table_name: str, not_found_ok:

await cursor.execute(sql.SQL(base_query).format(table=table_identifier))

async def aget_table_columns(self, schema: str | None, table_name: str) -> list[str]:
"""Get the column names for a table in PostgreSQL.

Args:
schema: Name of the schema where the table is located.
table_name: Name of the table to get columns for.

Returns:
A list of column names in the table.
"""
if schema:
table_identifier = sql.Identifier(schema, table_name)
else:
table_identifier = sql.Identifier(table_name)

async with self.connection.transaction():
async with self.connection.cursor() as cursor:
await cursor.execute(sql.SQL("SELECT * FROM {} WHERE 1=0").format(table_identifier))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very funny way to get schema information.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was what my editor suggested 😄
I suppose it is efficient since it won't actually return any table data, just column metadata

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit obfuscated, maybe not so clear at a glace what you are doing, but I believe it does work. And it saves you from needing to query a separate table (something in information_schema) for which we may need extra permissions.

columns = [column.name for column in cursor.description or []]
return columns

@contextlib.asynccontextmanager
async def managed_table(
self,
Expand Down
20 changes: 16 additions & 4 deletions posthog/temporal/batch_exports/redshift_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@
run_consumer,
wait_for_schema_or_producer,
)
from posthog.temporal.batch_exports.temporary_file import BatchExportTemporaryFile, WriterFormat
from posthog.temporal.batch_exports.utils import (
JsonType,
set_status_to_running_task,
from posthog.temporal.batch_exports.temporary_file import (
BatchExportTemporaryFile,
WriterFormat,
)
from posthog.temporal.batch_exports.utils import JsonType, set_status_to_running_task
from posthog.temporal.common.clickhouse import get_client
from posthog.temporal.common.heartbeat import Heartbeater
from posthog.temporal.common.logger import configure_temporal_worker_logger
Expand Down Expand Up @@ -120,6 +120,10 @@ async def amerge_identical_tables(
for field in merge_key
)

# Redshift doesn't support adding a condition on the merge, so we have
# to first delete any rows in stage that match those in final, where
# stage also has a higher version. Otherwise we risk merging adding old
# versions back.
Comment on lines +123 to +126
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was slightly confused why we were doing this and then saw this comment in the PR where you introduced it, so thought it was worth adding

delete_condition = and_separator.join(
sql.SQL("{final_field} = {stage_field}").format(
final_field=sql.Identifier("final", field[0]),
Expand Down Expand Up @@ -404,6 +408,7 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> Records
include_events=inputs.include_events,
extra_query_parameters=extra_query_parameters,
max_record_batch_size_bytes=1024 * 1024 * 2, # 2MB
use_latest_schema=True,
)

record_batch_schema = await wait_for_schema_or_producer(queue, producer_task)
Expand Down Expand Up @@ -453,6 +458,13 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> Records
primary_key = None

async with RedshiftClient.from_inputs(inputs).connect() as redshift_client:
# filter out fields that are not in the destination table
try:
columns = await redshift_client.aget_table_columns(inputs.schema, inputs.table_name)
table_fields = [field for field in table_fields if field[0] in columns]
except psycopg.errors.UndefinedTable:
pass

async with (
redshift_client.managed_table(
inputs.schema, inputs.table_name, table_fields, delete=False, primary_key=primary_key
Expand Down
4 changes: 3 additions & 1 deletion posthog/temporal/tests/batch_exports/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ BigQuery batch exports can be tested against a real BigQuery instance, but doing
> Since BigQuery batch export tests require additional setup, we skip them by default and will not be ran by automated CI pipelines. Please ensure these tests pass when making changes that affect BigQuery batch exports.

To enable testing for BigQuery batch exports, we require:

1. A BigQuery project and dataset
2. A BigQuery ServiceAccount with access to said project and dataset. See the [BigQuery batch export documentation](https://posthog.com/docs/cdp/batch-exports/bigquery#setting-up-bigquery-access) on detailed steps to setup a ServiceAccount.

Expand All @@ -29,11 +30,12 @@ Redshift batch exports can be tested against a real Redshift (or Redshift Server
> Since Redshift batch export tests require additional setup, we skip them by default and will not be ran by automated CI pipelines. Please ensure these tests pass when making changes that affect Redshift batch exports.

To enable testing for Redshift batch exports, we require:

1. A Redshift (or Redshift Serverless) instance.
2. Network access to this instance (via a VPN connection or jumphost, making a Redshift instance publicly available has serious security implications).
3. User credentials (user requires `CREATEDB` permissions for testing but **not** superuser access).

For PostHog employees, check the password manager as a set of development credentials should already be available. With these credentials, and after connecting to the appropriate VPN, we can run the tests from the root of the `posthog` repo with:
For PostHog employees, check the password manager as a set of development credentials should already be available. You will also need to use the `dev` exit node in Tailscale and be added to the `group:engineering` group in the tailnet policy. With these credentials, and Tailscale setup, we can run the tests from the root of the `posthog` repo with:

```bash
DEBUG=1 REDSHIFT_HOST=workgroup.111222333.region.redshift-serverless.amazonaws.com REDSHIFT_USER=test_user REDSHIFT_PASSWORD=test_password pytest posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py
Expand Down
34 changes: 34 additions & 0 deletions posthog/temporal/tests/batch_exports/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,37 @@ async def generate_test_data(
persons_to_export_created.append(person_to_export)

return (events_to_export_created, persons_to_export_created)


@pytest_asyncio.fixture
async def generate_test_persons_data(ateam, clickhouse_client, data_interval_start, data_interval_end):
"""Generate test persons data in ClickHouse."""
persons, _ = await generate_test_persons_in_clickhouse(
client=clickhouse_client,
team_id=ateam.pk,
start_time=data_interval_start,
end_time=data_interval_end,
count=10,
count_other_team=1,
properties={"utm_medium": "referral", "$initial_os": "Linux"},
)

persons_to_export_created = []
for person in persons:
person_distinct_id, _ = await generate_test_person_distinct_id2_in_clickhouse(
client=clickhouse_client,
team_id=ateam.pk,
person_id=uuid.UUID(person["id"]),
distinct_id=f"distinct-id-{uuid.UUID(person['id'])}",
timestamp=dt.datetime.fromisoformat(person["_timestamp"]),
)
person_to_export = {
"team_id": person["team_id"],
"person_id": person["id"],
"distinct_id": person_distinct_id["distinct_id"],
"version": person_distinct_id["version"],
"_timestamp": dt.datetime.fromisoformat(person["_timestamp"]),
}
persons_to_export_created.append(person_to_export)

return persons_to_export_created
Loading
Loading