-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(batch-exports): Add created_at
to Redshift persons batch export
#27403
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,11 +47,11 @@ | |
run_consumer, | ||
wait_for_schema_or_producer, | ||
) | ||
from posthog.temporal.batch_exports.temporary_file import BatchExportTemporaryFile, WriterFormat | ||
from posthog.temporal.batch_exports.utils import ( | ||
JsonType, | ||
set_status_to_running_task, | ||
from posthog.temporal.batch_exports.temporary_file import ( | ||
BatchExportTemporaryFile, | ||
WriterFormat, | ||
) | ||
from posthog.temporal.batch_exports.utils import JsonType, set_status_to_running_task | ||
from posthog.temporal.common.clickhouse import get_client | ||
from posthog.temporal.common.heartbeat import Heartbeater | ||
from posthog.temporal.common.logger import configure_temporal_worker_logger | ||
|
@@ -120,6 +120,10 @@ async def amerge_identical_tables( | |
for field in merge_key | ||
) | ||
|
||
# Redshift doesn't support adding a condition on the merge, so we have | ||
# to first delete any rows in stage that match those in final, where | ||
# stage also has a higher version. Otherwise we risk merging adding old | ||
# versions back. | ||
Comment on lines
+123
to
+126
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for clarifying 👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was slightly confused why we were doing this and then saw this comment in the PR where you introduced it, so thought it was worth adding |
||
delete_condition = and_separator.join( | ||
sql.SQL("{final_field} = {stage_field}").format( | ||
final_field=sql.Identifier("final", field[0]), | ||
|
@@ -404,6 +408,7 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> Records | |
include_events=inputs.include_events, | ||
extra_query_parameters=extra_query_parameters, | ||
max_record_batch_size_bytes=1024 * 1024 * 2, # 2MB | ||
use_latest_schema=True, | ||
) | ||
|
||
record_batch_schema = await wait_for_schema_or_producer(queue, producer_task) | ||
|
@@ -453,6 +458,13 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> Records | |
primary_key = None | ||
|
||
async with RedshiftClient.from_inputs(inputs).connect() as redshift_client: | ||
# filter out fields that are not in the destination table | ||
try: | ||
columns = await redshift_client.aget_table_columns(inputs.schema, inputs.table_name) | ||
table_fields = [field for field in table_fields if field[0] in columns] | ||
except psycopg.errors.UndefinedTable: | ||
pass | ||
|
||
async with ( | ||
redshift_client.managed_table( | ||
inputs.schema, inputs.table_name, table_fields, delete=False, primary_key=primary_key | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a very funny way to get schema information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was what my editor suggested 😄
I suppose it is efficient since it won't actually return any table data, just column metadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit obfuscated, maybe not so clear at a glace what you are doing, but I believe it does work. And it saves you from needing to query a separate table (something in
information_schema
) for which we may need extra permissions.