Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐞 Postgres source: fix bug in intermediate state emission #15496

Merged
merged 17 commits into from
Aug 10, 2022

Conversation

tuliren
Copy link
Contributor

@tuliren tuliren commented Aug 10, 2022

What

🚨 User Impact 🚨

  • This PR fixes a bug introduced in version 0.4.41.

@github-actions github-actions bot added the area/connectors Connector related issues label Aug 10, 2022
@tuliren tuliren changed the title Liren/fix postgres state emission 🐞 Postgres source: fix bug in intermediate state emission Aug 10, 2022
@tuliren
Copy link
Contributor Author

tuliren commented Aug 10, 2022

@ryankfu, this is the intermediate state emission bug fix that is relevant to your ticket. Now whenever you want the iterator to emit a state, just set emitIntermediateState to true, and the next call will emit the proper intermediate state when it is available.

@tuliren tuliren requested review from ryankfu, a team and subodh1810 August 10, 2022 01:10
@tuliren
Copy link
Contributor Author

tuliren commented Aug 10, 2022

/test connector=connectors/source-postgres

🕑 connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/2829349920
✅ connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/2829349920
No Python unittests run

Build Passed

Test summary info:

All Passed

// the latest intermediateStateMessage will be emitted.
private int totalRecordCount = 0;
private boolean emitIntermediateState = false;
private AirbyteMessage intermediateStateMessage = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we're setting the intermediateStateMessage = null here over the previous code where it was originally intermediateStateMessage? Is it mainly to be explicit with the default value?

Copy link
Contributor Author

@tuliren tuliren Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I just want to make it explicit because these values are set and reset repetitively.

assertEquals(STATE_MESSAGE_3, iterator1.next());
assertEquals(RECORD_MESSAGE_5, iterator1.next());
// state 3 is not emitted because there is no more record and only
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused by this comment, based on my understanding every time the iterator executes it should emit a new intermittent STATE_MESSAGE that is not the initial or the final cursor so in this example I would have expected the STATE_MESSAGEs for 2 - 4. How come the STATE_MESSAGEs are from 1 - 3 and the code does not emit 4?

Copy link
Contributor Author

@tuliren tuliren Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This stream has no initial cursor (null). It is supposed to emit state for every record. So state 1 should be emitted. Here is what happens.

  • Emit record 1.
  • It should emit state 1 after record 1. But at this point, it is unclear whether there will be more records with the same cursor. There is no state ready for emission. So no state is emitted.
  • Emit record 2. Now it is clear that the cursor has moved to a new value. So state 1 is ready to be emitted.
  • Emit state 1.
  • Emit record 3. Same logic that state 2 is ready.
  • Emit state 2.
  • Emit record 4. State 3 is ready.
  • Emit state 3.
  • Emit record 5. State 4 is ready.
  • Emit state 5. There is no more record. So only the final state, state 5, will be emitted. This is why state 4 is NOT emitted, even though it is ready.

The state message will always be behind the record, because we cannot know ahead of time whether all records with the same cursor value have been emitted.

Copy link
Contributor

@ryankfu ryankfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments to clarify the naming of tests to be more clear with emphasis on testStateEmission4 being difficult to follow without context into the github issue

Also would like to clarify why certain code changes were made when the original code could be reused

@tuliren tuliren requested a review from ryankfu August 10, 2022 15:40
@tuliren
Copy link
Contributor Author

tuliren commented Aug 10, 2022

/test connector=connectors/source-postgres

🕑 connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/2834111585
✅ connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/2834111585
No Python unittests run

Build Passed

Test summary info:

All Passed

Copy link
Contributor

@ryankfu ryankfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, thanks for clarifying the reasons for breaking out some of the logic adding a more clear understanding for the testStateEmissionForRecordsSharingSameCursorValue

@tuliren
Copy link
Contributor Author

tuliren commented Aug 10, 2022

/publish connector=connectors/source-postgres

🕑 Publishing the following connectors:
connectors/source-postgres
https://github.com/airbytehq/airbyte/actions/runs/2834838041


Connector Did it publish? Were definitions generated?
connectors/source-postgres

if you have connectors that successfully published but failed definition generation, follow step 4 here ▶️

@tuliren
Copy link
Contributor Author

tuliren commented Aug 10, 2022

/publish connector=connectors/source-postgres-strict-encrypt

🕑 Publishing the following connectors:
connectors/source-postgres-strict-encrypt
https://github.com/airbytehq/airbyte/actions/runs/2834838661


Connector Did it publish? Were definitions generated?
connectors/source-postgres-strict-encrypt

if you have connectors that successfully published but failed definition generation, follow step 4 here ▶️

@tuliren tuliren merged commit 2f17e99 into master Aug 10, 2022
@tuliren tuliren deleted the liren/fix-postgres-state-emission branch August 10, 2022 20:02
pmossman added a commit that referenced this pull request Aug 10, 2022
commit 10fb1dc137175d09826cdfcf419698e3000cd418
Author: pmossman <[email protected]>
Date:   Wed Aug 10 13:37:00 2022 -0700

    format and pmd

commit 7c223ec2e0abfec864c11395d7310d679706a49c
Author: pmossman <[email protected]>
Date:   Wed Aug 10 13:08:09 2022 -0700

    update peristStateActivity test

commit 763e9e2c5ca5f998ab49ffd0dcafb7ae81201b2b
Author: pmossman <[email protected]>
Date:   Fri Aug 5 15:24:10 2022 -0700

    format

commit c176e63c3841a1f08c7a43359d293b12297e03e4
Author: pmossman <[email protected]>
Date:   Fri Aug 5 15:18:03 2022 -0700

    move converters to module that worker can access, convert statePersistence calls to API calls, convert statePersistence helper to local private method

commit 1b583487b4ea7dd058944cdbce4de6197f967523
Author: pmossman <[email protected]>
Date:   Fri Aug 5 10:37:00 2022 -0700

    add createOrUpdateState API endpoint

commit d87eed6215ce451a3e126d433991967317839876
Author: pmossman <[email protected]>
Date:   Fri Aug 5 13:42:16 2022 -0700

    add AirbyteApiClient to WorkerApp for data plane workers to use

commit a65524a
Author: Teal Larson <[email protected]>
Date:   Wed Aug 10 16:03:59 2022 -0400

    🪟 🔧 Add testing and storybook component for CatalogDiffModal (#15426)

    * wip diff modal test setup

    * starting storybook add

    * storybook working now

    * cleanup

    * aria labels

    * test syncmode string

commit 2f17e99
Author: Liren Tu <[email protected]>
Date:   Wed Aug 10 13:02:01 2022 -0700

    🐞 Postgres source: fix bug in intermediate state emission (#15496)

    * Rename record counter

    * Rename method

    * Emit intermediate state after all cursor records

    * Emit intermediate state only when it is ready

    * Merge two checks

    * Add a testing message

    * Fix unit tests

    * Add one more testing record and add comments

    * Add test case for multiple records with the same cursor value

    * Revert irrelevant change

    * Add explanation in javadoc

    * Format code

    * Rename testing methods

    * Fix comment

    * Bump version

    * auto-bump connector version [ci skip]

    Co-authored-by: Octavia Squidington III <[email protected]>

commit f540499
Author: Alexandre Girard <[email protected]>
Date:   Wed Aug 10 11:37:07 2022 -0700

    [low-code connectors]: Assert there are no custom top-level fields (#15489)

    * move components to definitions field

    * Also update the references

    * validate the top level fields and add version

    * raise exception on unknown fields

    * newline

    * unit tests

    * set version to 0.1.0

    * newline

commit f52bfb6
Author: Xiaohan Song <[email protected]>
Date:   Wed Aug 10 11:16:17 2022 -0700

    change query frequency to 1hour (#15499)

commit f143c8f
Author: midavadim <[email protected]>
Date:   Wed Aug 10 21:13:51 2022 +0300

    :tada: Source File - add support for custom encoding (#15293)

    * added support for custom encoding

    * fixed unit test for utf16

    * updated docs

    * bumped connector version

    * auto-bump connector version [ci skip]

    Co-authored-by: Octavia Squidington III <[email protected]>

commit bbf3584
Author: Alexandre Girard <[email protected]>
Date:   Wed Aug 10 10:58:22 2022 -0700

    Remove unused field from JsonSchema (#15425)

    * few fixes from working with sendgrid

    * reset to master

    * only update the docstring

    * reset

commit a280113
Author: VitaliiMaltsev <[email protected]>
Date:   Wed Aug 10 20:44:51 2022 +0300

    Destination S3: add LZO compression support (#15394)

    * Fixed bucket naming for S3

    * Destination S3: add LZO compression support for parquet files

    * Destination S3: add LZO compression support for parquet files

    * implemented logic for aarch64

    * removed redundant logging

    * updated changelog

    * moved intstall of native-lzo lib to Dockerfile

    * removed redundant logging

    * add unit test for aarch64

    * bump version

    * auto-bump connector version [ci skip]

    Co-authored-by: Octavia Squidington III <[email protected]>

commit 29c3426
Author: sivankumar86 <[email protected]>
Date:   Thu Aug 11 02:34:47 2022 +1000

    Source MSSQL: special character support in dbname for CDC method (#15430)

    * information schema included

    * special character handle

    * Revert "information schema included"

    This reverts commit f0aee6a.

    * version change

    * doc update

    * auto-bump connector version [ci skip]

    Co-authored-by: Octavia Squidington III <[email protected]>

commit 959d862
Author: Baz <[email protected]>
Date:   Wed Aug 10 19:20:22 2022 +0300

    🐛 Source SalesForce: changed `DEFAULT_WAIT_TIMEOUT_SECONDS` to 24-hour limit (#15444)

commit 0092712
Author: Subodh Kant Chaturvedi <[email protected]>
Date:   Wed Aug 10 21:48:57 2022 +0530

    fix postgres data handling from WAL logs in CDC mode (#15481)

    * fix postgres data handling from WAL logs in CDC mode

    * format

    * use formatter for dates also (#15485)

    * format

    * change test structure

    * change log to debug

    Co-authored-by: Edward Gao <[email protected]>

commit fdb5eb9
Author: Evan Tahler <[email protected]>
Date:   Wed Aug 10 09:03:02 2022 -0700

    Simplify the `MigrationAcceptanceTest` (#15497)

    * disable `testAutomaticMigration`

    * empty commit to retry tests

    * Simplify the MigrationAcceptanceTest

    * lint

    * Fix PMD. Reorder some calls to make clear what is happening.

    Co-authored-by: Davin Chia <[email protected]>

commit fd70913
Author: Augustin <[email protected]>
Date:   Wed Aug 10 17:42:07 2022 +0200

    SAT: compatibility tests for catalogs (#15486)

commit 1ad5152
Author: Evan Tahler <[email protected]>
Date:   Wed Aug 10 08:21:52 2022 -0700

    Disable automaticMigrationAcceptanceTest (#15492)

    * disable `testAutomaticMigration`

    * empty commit to retry tests

commit 1228451
Author: Edmundo Ruiz Ghanem <[email protected]>
Date:   Wed Aug 10 11:11:45 2022 -0400

    Fix styles impacting global ul, li in FieldSection component (#15484)

commit 6aa08e0
Author: Jonathan Pearlin <[email protected]>
Date:   Wed Aug 10 10:55:46 2022 -0400

    Add micronaut dependencies and bundles (#15459)

    * Add micronaut dependencies and bundles

    * Update Micronaut core

commit 7662956
Author: Edmundo Ruiz Ghanem <[email protected]>
Date:   Wed Aug 10 10:51:43 2022 -0400

    🪟 🧹 Cleanup documentation panel components (#15455)

    * Add docs/ to frontend workspace

    * Migrate Markdown components to scss and cleanup when not found is rendered

    * Add white-space: break-spaces rule to markdown code blocks

commit 1258ab4
Author: Topher Lubaway <[email protected]>
Date:   Wed Aug 10 09:05:14 2022 -0500

    Revert "Adds PAT check to shared pr check (#15453)" (#15511)

    This reverts commit 06a18d4.

commit 853b88a
Author: Kyryl Skobylko <[email protected]>
Date:   Wed Aug 10 16:48:20 2022 +0300

    fix: fix gcs-log creds secret name, add externaldb configuration for temporal, fix webapp ingress (#15510)

commit c782303
Author: Yatsuk Bogdan <[email protected]>
Date:   Wed Aug 10 15:57:26 2022 +0300

    :window: :art: Increases GroupTitle followed divs width from 180px to 250px (#13956)

    * Increases GroupControls followed divs width from 180px to 250px

    * Increases min-width for GroupTitle

    * Change layout to flexbox

    Co-authored-by: Tim Roes <[email protected]>

commit e28bc3a
Author: Serhii Chvaliuk <[email protected]>
Date:   Wed Aug 10 13:55:29 2022 +0300

    🎉Source Harvest: Added `parent_id` for all streams which have parent stream (#15221)

    Signed-off-by: Sergey Chvalyuk <[email protected]>

commit aaa3aae
Author: Tuhai Maksym <[email protected]>
Date:   Wed Aug 10 12:43:55 2022 +0300

    15310: Destination Scylla: Handle per-stream state (#15399)

    * 15310: Destination Scylla: Handle per-stream state

    * 15399: test fix

    * 15318: test fix

    * 15318: updating version

    * auto-bump connector version [ci skip]

    Co-authored-by: Octavia Squidington III <[email protected]>

commit c724630
Author: Yurii Bidiuk <[email protected]>
Date:   Wed Aug 10 10:17:23 2022 +0300

    Add test case for new fields appearing in data (#15372)

    * add test case for new field(s) appearing in data

    * rework test to verify that sync at least not failed if new fields are present

commit 6e1a76f
Author: Serhii Chvaliuk <[email protected]>
Date:   Wed Aug 10 09:24:40 2022 +0300

    🐛 Source Amazon Ads: define primary_key for all report streams (#15469)

    Signed-off-by: Sergey Chvalyuk <[email protected]>

commit c1a0cbc
Author: Octavia Squidington III <[email protected]>
Date:   Wed Aug 10 04:20:12 2022 +0200

    Bump Airbyte version from 0.39.42-alpha to 0.40.0-alpha (#15493)

    Co-authored-by: benmoriceau <[email protected]>

commit f6766ee
Author: Benoit Moriceau <[email protected]>
Date:   Wed Aug 10 07:50:41 2022 +0800

    Revert "Revert "Release per stream to the OSS project (#15008)" (#15177)" (#15401)

    This reverts commit 362fc4e.

commit eab0013
Author: Edward Gao <[email protected]>
Date:   Tue Aug 9 16:13:09 2022 -0700

    🐛 Source snowflake: int columns should be discovered as ints (#15314)

    * snowflake discovers ints as ints

    * version bump+changelog

    * bump version+changelog

    * auto-bump connector version [ci skip]

    Co-authored-by: Octavia Squidington III <[email protected]>

commit f506c60
Author: Anne <[email protected]>
Date:   Tue Aug 9 16:07:35 2022 -0700

    Track number of streams in syncs (#15478)

    * Add number_of_streams to job sync tracking

commit 6c5d1ff
Author: Augustin <[email protected]>
Date:   Wed Aug 10 00:33:58 2022 +0200

    SAT: measure unit test coverage (#15443)

commit e9afa9b
Author: Anne <[email protected]>
Date:   Tue Aug 9 15:30:48 2022 -0700

    Error Prone PMD rules (#15010)

    * Implement ErrorProne PMD rules:
    AssignmentInOperand
    AvoidAccessibilityAlteration
    AvoidBranchingStatementAsLastInLoop
    AvoidCatchingNPE
    AvoidCatchingThrowable
    AvoidDuplicateLiterals rule

commit c536e51
Author: Tim Roes <[email protected]>
Date:   Wed Aug 10 00:11:12 2022 +0200

    Fix copy link to logs functionality (#15368)

    * Fix copy link to logs functionality

    * Update airbyte-webapp/src/components/JobItem/JobItem.tsx

    Co-authored-by: Edmundo Ruiz Ghanem <[email protected]>

    * Fix scrolling

    * Remove smooth scrolling

    * Improve effect for better return statements

    * Better scroll

    Co-authored-by: Edmundo Ruiz Ghanem <[email protected]>

commit 62303a8
Author: Augustin <[email protected]>
Date:   Tue Aug 9 23:07:13 2022 +0200

    SAT: check that previous config schema validates against current connector spec (#15367)

commit 123705c
Author: Stephen Wentling <[email protected]>
Date:   Tue Aug 9 21:30:14 2022 +0100

    Source Jira: Added updates to include issue components and fixes to README files (#15135)

    * solve readme conflict

    * updated jira sources with open PR details

    * correct additionalProperties test discover

    Co-authored-by: marcosmarxm <[email protected]>

commit 9e691d8
Author: Alex <[email protected]>
Date:   Tue Aug 9 14:28:38 2022 -0500

    fix broken link (#15379)

commit 36ed6ce
Author: Denys Davydov <[email protected]>
Date:   Tue Aug 9 21:58:52 2022 +0300

    #15445 source typeform: integration tests (#15446)

commit 06a18d4
Author: Topher Lubaway <[email protected]>
Date:   Tue Aug 9 13:33:20 2022 -0500

    Adds PAT check to shared pr check (#15453)

    * Adds PAT check to shared pr check

    * Name change

    * Removes "safe_to_push" string

    * Adds OCTAVIA_PAT and uses the found PAT

    found PAT was not used in all locales, so this could have still failed
    on an expired OCTAVIA_PAT before this change
girarda pushed a commit that referenced this pull request Aug 11, 2022
* Rename record counter

* Rename method

* Emit intermediate state after all cursor records

* Emit intermediate state only when it is ready

* Merge two checks

* Add a testing message

* Fix unit tests

* Add one more testing record and add comments

* Add test case for multiple records with the same cursor value

* Revert irrelevant change

* Add explanation in javadoc

* Format code

* Rename testing methods

* Fix comment

* Bump version

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <[email protected]>
girarda added a commit that referenced this pull request Aug 12, 2022
* greenhouse minus pagination

* jobs

* first substream

* rename field

* applications_demographics_answers_stream

* interviews

* All streams are implemented

* fix check

* fix spec

* disable backward compatibility tests

* disable backward compatibility tests

* unit tests

* definitions

* only use config.json

* bump version

* expected records

* delete stream classes

* Handle extracting no records from root

* handle missing keys

* Remove unused field from JsonSchema (#15425)

* few fixes from working with sendgrid

* reset to master

* only update the docstring

* reset

* 🎉 Source File - add support for custom encoding (#15293)

* added support for custom encoding

* fixed unit test for utf16

* updated docs

* bumped connector version

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <[email protected]>

* change query frequency to 1hour (#15499)

* [low-code connectors]: Assert there are no custom top-level fields (#15489)

* move components to definitions field

* Also update the references

* validate the top level fields and add version

* raise exception on unknown fields

* newline

* unit tests

* set version to 0.1.0

* newline

* 🐞 Postgres source: fix bug in intermediate state emission (#15496)

* Rename record counter

* Rename method

* Emit intermediate state after all cursor records

* Emit intermediate state only when it is ready

* Merge two checks

* Add a testing message

* Fix unit tests

* Add one more testing record and add comments

* Add test case for multiple records with the same cursor value

* Revert irrelevant change

* Add explanation in javadoc

* Format code

* Rename testing methods

* Fix comment

* Bump version

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <[email protected]>

* 🪟 🔧 Add testing and storybook component for CatalogDiffModal (#15426)

* wip diff modal test setup

* starting storybook add

* storybook working now

* cleanup

* aria labels

* test syncmode string

* 🎉 New Source: Hubplanner (#15521)

* added hubplanner source connector

* feat: added more streams to read from

* cleaned up some unneeded integration tests

* fix hubplanner schema

* changes

* update dockerfile

* add seed and doc

* update spec

* run source spec seed file

Co-authored-by: Ricky Renner <[email protected]>

* Make it possible to specify normalization pod resources. (#15495)

Today we are running into OOM exceptions with normalization. Normalization itself also inherits the destination's resource requirements. After work to bring destination memory usage down, this is no longer ideal, since most destinations use less memory than normalization needs.

This PR makes it possible to specify the general resource the normalization pod is provided via env vars.

Notes:
- Add env vars. Default to the various job main container resources if these are not set.
- Instead of using the destination's memory, use the normalization specify env vars.

* [low-code connectors] Extract datetime parser and handle %s format directive (#15429)

* fix parse

* Revert "fix parse"

This reverts commit 3c76c5a.

* fix parse timestamp

* extract datetime parser

* remove print

* use parser

* top level docstring

* rename variable

* do not use timestamp()

* Revert "do not use timestamp()"

This reverts commit 016cb69.

* update comment

* bump cdk version

* Update template

* source-file-secure bump to 0.2.16 (#15528)

* update Dockerfile version

* update init to accept additional args

* unit test sendgrid messages stream (#15331)

* unit test sendgrid messages stream

* reset

* Update airbyte-integrations/connectors/source-sendgrid/unit_tests/unit_test.py

Co-authored-by: Augustin <[email protected]>

Co-authored-by: Augustin <[email protected]>

* record extractor interface

* dpath extractor

* docstring

* 🎉 Source File: cache binary stream to file (#15501)

Signed-off-by: Sergey Chvalyuk <[email protected]>

* Docs: update posthog.md (#15541)

* Source Stripe: implement slicing (#15292)

* #45 oncall - source Stripe: implement slicing

* #45 source stripe: upd changelog

* #45 source stripe: upd changelog

* #45 source stripe: make slice range configurable

* #45 source stripe: move generating a single slice into a mixin

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <[email protected]>

* fix: revert extraEnv delition in values.yaml for bootloader (#15548)

* fix: revert extraEnv delition in values.yaml for bootloader

* add newline

* SAT: backward compatibility - check that cursor fields were not changed (#15520)

* Replace twttr repo to the root build.gradle (#15544)

* Fixed bucket naming for S3

* replaced twttr repo to the root build.gradle

* replaced twttr repo to the root build.gradle

Co-authored-by: Oleksandr Sheheda <[email protected]>

* Generate separate server endpoints per domain (#15513)

* 🐛 Backward compatibility test: Don't fail on updating additionalProperties (#15532)

* Source Recurly: adds `state_checkpoint_interval` to streams (#13685)

* Add `state_checkpoint_interval` to Recurly stream

* Bumpg Recurly source version to `0.4.1`

* reset

* use dpath

* enable backward compatibility test

* infer types

* Revert "infer types"

This reverts commit b4de8d6.

* infer some of the types

* some drying

* more drying

* auto-bump connector version [ci skip]

Signed-off-by: Sergey Chvalyuk <[email protected]>
Co-authored-by: midavadim <[email protected]>
Co-authored-by: Octavia Squidington III <[email protected]>
Co-authored-by: Xiaohan Song <[email protected]>
Co-authored-by: Liren Tu <[email protected]>
Co-authored-by: Teal Larson <[email protected]>
Co-authored-by: Marcos Marx <[email protected]>
Co-authored-by: Ricky Renner <[email protected]>
Co-authored-by: Davin Chia <[email protected]>
Co-authored-by: Brian Lai <[email protected]>
Co-authored-by: Augustin <[email protected]>
Co-authored-by: Serhii Chvaliuk <[email protected]>
Co-authored-by: juliatournant <[email protected]>
Co-authored-by: Denys Davydov <[email protected]>
Co-authored-by: Kyryl Skobylko <[email protected]>
Co-authored-by: VitaliiMaltsev <[email protected]>
Co-authored-by: Oleksandr Sheheda <[email protected]>
Co-authored-by: Jonathan Pearlin <[email protected]>
Co-authored-by: Mohamed Magdy <[email protected]>
@haimelo
Copy link

haimelo commented Nov 27, 2022

HI! everyone. I have a need when my python destination throws exception I want postgres emit 1 state after 10000record. But the behavior of postgres is not like that. When my destination error, the job was interrupted and half of the previous data entered my system. When I re-sync, my data is duplicated.

So my question is how to make postgres emit state every 10000 records. I want to persist cursor_record_count when my destination error instead emit the last offset.
ảnh

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Postgres intermediate state may lead to data loss for incremental sync
4 participants