Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track stream status in source #24971

Merged
merged 20 commits into from
Apr 26, 2023
Merged

Conversation

jdpgrailsdev
Copy link
Contributor

@jdpgrailsdev jdpgrailsdev commented Apr 6, 2023

What

  • Report transitions in status while reading a source

How

This PR introduces an approach to tracking status transition for streams within a source. This PR attempts to limit the changes instead of doing a much larger refactor to replace the use of Iterator as the interface used to represent a stream. Because of this current implementation, it means that the logic to track the transition between streams and within a stream cannot be added to the IntegrationRunner.java, which would be the logical place to track the status. Instead, this PR puts the logic in CompositeIterator.java, which is really a facade over the set of iterators that represent each unique stream.

Recommended reading order

  1. CompositeIterator.java
  2. DefaultAutoCloseableIterator.java
  3. LazyAutoCoseableIterator.java
  4. AutoCloseableIterators.java
  5. AirbyteStreamAware.java
  6. CompositeIteratorTests.java

Testing

This change has been tested locally by using a locally built version of the source-postgres connector to verify that the status messages are produced AND that the platform can safely ignore them:

2023-04-14 14:36:39 source > INFO i.a.c.u.CompositeIterator(lambda$emitStartStreamStatus$1):155 STARTING -> public_cars
2023-04-14 14:36:39 WARN i.a.w.i.b.AirbyteMessageTracker(handleEmittedTrace):168 - Invalid message type for trace message: io.airbyte.protocol.models.AirbyteTraceMessage@3fc513e[type=STREAM_STATUS,emittedAt=1.681482999673E12,error=<null>,estimate=<null>,streamStatus=io.airbyte.protocol.models.AirbyteStreamStatusTraceMessage@733ef39e[streamDescriptor=io.airbyte.protocol.models.StreamDescriptor@30bef8f8[name=cars,namespace=public,additionalProperties={}],status=STARTED,success=<null>,additionalProperties={}],additionalProperties={}]
2023-04-14 14:36:39 source > INFO i.a.c.u.CompositeIterator(lambda$emitRunningStreamStatus$0):148 RUNNING -> public_cars
2023-04-14 14:36:39 WARN i.a.w.i.b.AirbyteMessageTracker(handleEmittedTrace):168 - Invalid message type for trace message: io.airbyte.protocol.models.AirbyteTraceMessage@7b041539[type=STREAM_STATUS,emittedAt=1.681482999685E12,error=<null>,estimate=<null>,streamStatus=io.airbyte.protocol.models.AirbyteStreamStatusTraceMessage@63460a4d[streamDescriptor=io.airbyte.protocol.models.StreamDescriptor@3c2a50a0[name=cars,namespace=public,additionalProperties={}],status=RUNNING,success=<null>,additionalProperties={}],additionalProperties={}]
2023-04-14 14:36:39 WARN c.n.s.JsonMetaSchema(newValidator):278 - Unknown keyword airbyte_type - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-04-14 14:36:39 source > INFO i.a.c.u.CompositeIterator(lambda$emitStopStreamStatus$2):162 STOPPING(true) -> public_cars
2023-04-14 14:36:39 WARN i.a.w.i.b.AirbyteMessageTracker(handleEmittedTrace):168 - Invalid message type for trace message: io.airbyte.protocol.models.AirbyteTraceMessage@512954f4[type=STREAM_STATUS,emittedAt=1.681482999692E12,error=<null>,estimate=<null>,streamStatus=io.airbyte.protocol.models.AirbyteStreamStatusTraceMessage@2da5fce4[streamDescriptor=io.airbyte.protocol.models.StreamDescriptor@149e7674[name=cars,namespace=public,additionalProperties={}],status=STOPPED,success=true,additionalProperties={}],additionalProperties={}]
2023-04-14 14:36:39 source > INFO i.a.c.u.CompositeIterator(lambda$emitStartStreamStatus$1):155 STARTING -> public_users
2023-04-14 14:36:39 WARN i.a.w.i.b.AirbyteMessageTracker(handleEmittedTrace):168 - Invalid message type for trace message: io.airbyte.protocol.models.AirbyteTraceMessage@5bae1b2e[type=STREAM_STATUS,emittedAt=1.681482999692E12,error=<null>,estimate=<null>,streamStatus=io.airbyte.protocol.models.AirbyteStreamStatusTraceMessage@403b6a5b[streamDescriptor=io.airbyte.protocol.models.StreamDescriptor@42fbfc64[name=users,namespace=public,additionalProperties={}],status=STARTED,success=<null>,additionalProperties={}],additionalProperties={}]
2023-04-14 14:36:39 source > INFO i.a.i.s.r.RelationalDbQueryUtils(lambda$queryTable$0):71 Queueing query: SELECT "id","account_id","first_name","last_name","email","gender","ip_address","avatar","language","deactivated","created_at","updated_at" FROM "public"."users"
2023-04-14 14:36:39 source > INFO i.a.d.j.s.AdaptiveStreamingQueryConfig(initialize):31 Set initial fetch size: 10 rows
2023-04-14 14:36:39 source > INFO i.a.c.u.CompositeIterator(lambda$emitStopStreamStatus$2):162 STOPPING(true) -> public_users
2023-04-14 14:36:39 WARN i.a.w.i.b.AirbyteMessageTracker(handleEmittedTrace):168 - Invalid message type for trace message: io.airbyte.protocol.models.AirbyteTraceMessage@25b11f99[type=STREAM_STATUS,emittedAt=1.681482999697E12,error=<null>,estimate=<null>,streamStatus=io.airbyte.protocol.models.AirbyteStreamStatusTraceMessage@eed176a[streamDescriptor=io.airbyte.protocol.models.StreamDescriptor@151fdd5e[name=users,namespace=public,additionalProperties={}],status=STOPPED,success=true,additionalProperties={}],additionalProperties={}]
2023-04-14 14:36:39 source > INFO i.a.c.u.CompositeIterator(lambda$emitStartStreamStatus$1):155 STARTING -> public_us_states
2023-04-14 14:36:39 WARN i.a.w.i.b.AirbyteMessageTracker(handleEmittedTrace):168 - Invalid message type for trace message: io.airbyte.protocol.models.AirbyteTraceMessage@147073d[type=STREAM_STATUS,emittedAt=1.681482999697E12,error=<null>,estimate=<null>,streamStatus=io.airbyte.protocol.models.AirbyteStreamStatusTraceMessage@16b6a45a[streamDescriptor=io.airbyte.protocol.models.StreamDescriptor@1c449b59[name=us_states,namespace=public,additionalProperties={}],status=STARTED,success=<null>,additionalProperties={}],additionalProperties={}]
2023-04-14 14:36:39 source > INFO i.a.i.s.r.RelationalDbQueryUtils(lambda$queryTable$0):71 Queueing query: SELECT "id","name","abbreviation" FROM "public"."us_states"
2023-04-14 14:36:39 source > INFO i.a.d.j.s.AdaptiveStreamingQueryConfig(initialize):31 Set initial fetch size: 10 rows
2023-04-14 14:36:39 source > INFO i.a.c.u.CompositeIterator(lambda$emitRunningStreamStatus$0):148 RUNNING -> public_us_states
2023-04-14 14:36:39 WARN i.a.w.i.b.AirbyteMessageTracker(handleEmittedTrace):168 - Invalid message type for trace message: io.airbyte.protocol.models.AirbyteTraceMessage@90288fe[type=STREAM_STATUS,emittedAt=1.681482999704E12,error=<null>,estimate=<null>,streamStatus=io.airbyte.protocol.models.AirbyteStreamStatusTraceMessage@3045080c[streamDescriptor=io.airbyte.protocol.models.StreamDescriptor@13f2fe2a[name=us_states,namespace=public,additionalProperties={}],status=RUNNING,success=<null>,additionalProperties={}],additionalProperties={}]
2023-04-14 14:36:39 source > INFO i.a.c.u.CompositeIterator(lambda$emitStopStreamStatus$2):162 STOPPING(true) -> public_us_states
2023-04-14 14:36:39 WARN i.a.w.i.b.AirbyteMessageTracker(handleEmittedTrace):168 - Invalid message type for trace message: io.airbyte.protocol.models.AirbyteTraceMessage@2b54215e[type=STREAM_STATUS,emittedAt=1.681482999705E12,error=<null>,estimate=<null>,streamStatus=io.airbyte.protocol.models.AirbyteStreamStatusTraceMessage@368c09b2[streamDescriptor=io.airbyte.protocol.models.StreamDescriptor@2e116207[name=us_states,namespace=public,additionalProperties={}],status=STOPPED,success=true,additionalProperties={}],additionalProperties={}]

@github-actions
Copy link
Contributor

github-actions bot commented Apr 6, 2023

Affected Connector Report

NOTE ⚠️ Changes in this PR affect the following connectors. Make sure to do the following as needed:

  • Run integration tests
  • Bump connector or module version
  • Add changelog
  • Publish the new version

✅ Sources (33)

Connector Version Changelog Publish
source-alloydb 2.0.23
source-alloydb-strict-encrypt 2.0.23 🔵
(ignored)
🔵
(ignored)
source-azure-blob-storage 0.1.0
source-bigquery 0.2.3
source-clickhouse 0.1.17
source-clickhouse-strict-encrypt 0.1.17 🔵
(ignored)
🔵
(ignored)
source-cockroachdb 0.1.22
source-cockroachdb-strict-encrypt 0.1.22 🔵
(ignored)
🔵
(ignored)
source-db2 0.1.19
source-db2-strict-encrypt 0.1.19 🔵
(ignored)
🔵
(ignored)
source-dynamodb 0.1.2
source-e2e-test 2.1.4
source-e2e-test-cloud 2.1.4 🔵
(ignored)
🔵
(ignored)
source-elasticsearch 0.1.1
source-jdbc 0.3.5 🔵
(ignored)
🔵
(ignored)
source-kafka 0.2.3
source-mongodb-strict-encrypt 0.1.19 🔵
(ignored)
🔵
(ignored)
source-mongodb-v2 0.1.19
source-mssql 1.0.14
source-mssql-strict-encrypt 1.0.14 🔵
(ignored)
🔵
(ignored)
source-mysql 2.0.18
source-mysql-strict-encrypt 2.0.18 🔵
(ignored)
🔵
(ignored)
source-oracle 0.3.24
source-oracle-strict-encrypt 0.3.24 🔵
(ignored)
🔵
(ignored)
source-postgres 2.0.27
source-postgres-strict-encrypt 2.0.27 🔵
(ignored)
🔵
(ignored)
source-redshift 0.3.16
source-relational-db 0.3.1 🔵
(ignored)
🔵
(ignored)
source-scaffold-java-jdbc 0.1.0 🔵
(ignored)
🔵
(ignored)
source-sftp 0.1.2
source-snowflake 0.1.34
source-teradata 0.1.0
source-tidb 0.2.4
  • See "Actionable Items" below for how to resolve warnings and errors.

❌ Destinations (49)

Connector Version Changelog Publish
destination-azure-blob-storage 0.2.0
destination-bigquery 1.3.1
destination-bigquery-denormalized 1.3.1
destination-cassandra 0.1.4
destination-clickhouse 0.2.3
destination-clickhouse-strict-encrypt 0.2.3 🔵
(ignored)
🔵
(ignored)
destination-csv 1.0.0
destination-databricks 1.0.2
destination-dev-null 0.2.7 🔵
(ignored)
🔵
(ignored)
destination-doris 0.1.0
destination-dynamodb 0.1.7
destination-e2e-test 0.2.4
destination-elasticsearch 0.1.6
destination-elasticsearch-strict-encrypt 0.1.6 🔵
(ignored)
🔵
(ignored)
destination-exasol 0.1.1
destination-gcs 0.2.16
destination-iceberg 0.1.0
destination-kafka 0.1.10
destination-keen 0.2.4
destination-kinesis 0.1.5
destination-local-json 0.2.11
destination-mariadb-columnstore 0.1.7
destination-mongodb 0.1.9
destination-mongodb-strict-encrypt 0.1.9 🔵
(ignored)
🔵
(ignored)
destination-mqtt 0.1.3
destination-mssql 0.1.23
destination-mssql-strict-encrypt 0.1.23 🔵
(ignored)
🔵
(ignored)
destination-mysql 0.1.20
destination-mysql-strict-encrypt 0.1.21
(mismatch: 0.1.20)
🔵
(ignored)
🔵
(ignored)
destination-oracle 0.1.19
destination-oracle-strict-encrypt 0.1.19 🔵
(ignored)
🔵
(ignored)
destination-postgres 0.3.27
destination-postgres-strict-encrypt 0.3.27 🔵
(ignored)
🔵
(ignored)
destination-pubsub 0.2.0
destination-pulsar 0.1.3
destination-r2 0.1.0
destination-redis 0.1.4
destination-redpanda 0.1.0
destination-redshift 0.4.5
destination-rockset 0.1.4
destination-s3 0.3.23
destination-s3-glue 0.1.6
destination-scylla 0.1.3
destination-selectdb 0.1.0
destination-snowflake 0.4.61
destination-starburst-galaxy 0.0.1
destination-teradata 0.1.1
destination-tidb 0.1.1
destination-yugabytedb 0.1.1
  • See "Actionable Items" below for how to resolve warnings and errors.

✅ Other Modules (0)

Actionable Items

(click to expand)

Category Status Actionable Item
Version
mismatch
The version of the connector is different from its normal variant. Please bump the version of the connector.

doc not found
The connector does not seem to have a documentation file. This can be normal (e.g. basic connector like source-jdbc is not published or documented). Please double-check to make sure that it is not a bug.
Changelog
doc not found
The connector does not seem to have a documentation file. This can be normal (e.g. basic connector like source-jdbc is not published or documented). Please double-check to make sure that it is not a bug.

changelog missing
There is no chnagelog for the current version of the connector. If you are the author of the current version, please add a changelog.
Publish
not in seed
The connector is not in the seed file (e.g. source_definitions.yaml), so its publication status cannot be checked. This can be normal (e.g. some connectors are cloud-specific, and only listed in the cloud seed file). Please double-check to make sure that it is not a bug.

diff seed version
The connector exists in the seed file, but the latest version is not listed there. This usually means that the latest version is not published. Please use the /publish command to publish the latest version.

@octavia-squidington-iii octavia-squidington-iii removed the CDK Connector Development Kit label Apr 26, 2023
@jdpgrailsdev jdpgrailsdev force-pushed the jonathan/stream-status branch from 5307e44 to d9cf162 Compare April 26, 2023 18:23
@jdpgrailsdev
Copy link
Contributor Author

jdpgrailsdev commented Apr 26, 2023

/publish connector=connectors/source-postgres

🕑 Publishing the following connectors:
connectors/source-postgres
https://github.com/airbytehq/airbyte/actions/runs/4812354834


Connector Version Did it publish? Were definitions generated?
connectors/source-postgres 2.0.27

if you have connectors that successfully published but failed definition generation, follow step 4 here ▶️

@jdpgrailsdev
Copy link
Contributor Author

jdpgrailsdev commented Apr 26, 2023

/publish connector=connectors/source-postgres-strict-encrypt

🕑 Publishing the following connectors:
connectors/source-postgres-strict-encrypt
https://github.com/airbytehq/airbyte/actions/runs/4812398446


Connector Version Did it publish? Were definitions generated?
connectors/source-postgres-strict-encrypt 2.0.27

if you have connectors that successfully published but failed definition generation, follow step 4 here ▶️

@jdpgrailsdev
Copy link
Contributor Author

/approve-and-merge reason="PR approved but failing due to issue on master"

@octavia-approvington
Copy link
Contributor

Our work here is done
done

@octavia-approvington octavia-approvington merged commit a38af08 into master Apr 26, 2023
@octavia-approvington octavia-approvington deleted the jonathan/stream-status branch April 26, 2023 20:14
marcosmarxm pushed a commit to natalia-miinto/airbyte that referenced this pull request Jun 8, 2023
* WIP Track stream status in source

* Revert formatting

* Revert formatting changes

* Remove unnecessary file

* Automated Change

* Automated Change

* Use new stream status trace message

* Rename class

* Remove unnecessary import

* Formatting

* Add tests

* Fix compile issues

* Automated Commit - Formatting Changes

* Remove TODO

* Fix compilation error

* Split STOPPED into INCOMPLETE and COMPLETE

* Remove unused import

* Changelog updates for source-postgres

* Remove unused import

* auto-bump connector version

---------

Co-authored-by: jdpgrailsdev <[email protected]>
Co-authored-by: Octavia Squidington III <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment