diff --git a/.github/workflows/gradle.yml b/.github/workflows/gradle.yml index ae360c3036b39..823a0e465bc3a 100644 --- a/.github/workflows/gradle.yml +++ b/.github/workflows/gradle.yml @@ -12,6 +12,9 @@ jobs: - name: Checkout Airbyte uses: actions/checkout@v2 + - name: Check images exist + run: ./tools/bin/check_images_exist.sh + - name: Cache java deps uses: actions/cache@v2 with: @@ -65,9 +68,6 @@ jobs: - name: Ensure no file change run: git status --porcelain && test -z "$(git status --porcelain)" - - name: Check images exist - run: ./tools/bin/check_images_exist.sh - - name: Check documentation if: success() && github.ref == 'refs/heads/master' run: ./tools/site/link_checker.sh check_docs diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json index 9c7400b7788f7..01bc7657234e5 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133", "name": "BigQuery", "dockerRepository": "airbyte/destination-bigquery", - "dockerImageTag": "0.1.2", + "dockerImageTag": "0.1.3", "documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-bigquery-destination" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/25c5221d-dce2-4163-ade9-739ef790f503.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/25c5221d-dce2-4163-ade9-739ef790f503.json index 3fbc441c9298a..a95641e89f2cd 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/25c5221d-dce2-4163-ade9-739ef790f503.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/25c5221d-dce2-4163-ade9-739ef790f503.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "25c5221d-dce2-4163-ade9-739ef790f503", "name": "Postgres", "dockerRepository": "airbyte/destination-postgres", - "dockerImageTag": "0.1.1", + "dockerImageTag": "0.1.2", "documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-postgres-destination" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json index befa5784f250c..458a41af2ea2b 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba", "name": "Snowflake", "dockerRepository": "airbyte/destination-snowflake", - "dockerImageTag": "0.1.2", + "dockerImageTag": "0.1.3", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8be1cf83-fde1-477f-a4ad-318d23c9f3c6.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8be1cf83-fde1-477f-a4ad-318d23c9f3c6.json index 961a4d27d7ca9..ebdfd91a03141 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8be1cf83-fde1-477f-a4ad-318d23c9f3c6.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8be1cf83-fde1-477f-a4ad-318d23c9f3c6.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "8be1cf83-fde1-477f-a4ad-318d23c9f3c6", "name": "Local CSV", "dockerRepository": "airbyte/destination-csv", - "dockerImageTag": "0.1.1", + "dockerImageTag": "0.1.2", "documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-csv-destination" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/2470e835-feaf-4db6-96f3-70fd645acc77.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/2470e835-feaf-4db6-96f3-70fd645acc77.json index 4ae97b5dd7fb3..eb6df1cf2385d 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/2470e835-feaf-4db6-96f3-70fd645acc77.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/2470e835-feaf-4db6-96f3-70fd645acc77.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "2470e835-feaf-4db6-96f3-70fd645acc77", "name": "Salesforce", "dockerRepository": "airbyte/source-salesforce-singer", - "dockerImageTag": "0.1.1", + "dockerImageTag": "0.1.2", "documentationUrl": "https://hub.docker.com/r/airbyte/source-salesforce-singer" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/39f092a6-8c87-4f6f-a8d9-5cef45b7dbe1.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/39f092a6-8c87-4f6f-a8d9-5cef45b7dbe1.json index 1e857ed482f99..762db975e4fae 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/39f092a6-8c87-4f6f-a8d9-5cef45b7dbe1.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/39f092a6-8c87-4f6f-a8d9-5cef45b7dbe1.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "39f092a6-8c87-4f6f-a8d9-5cef45b7dbe1", "name": "Google Analytics", "dockerRepository": "airbyte/source-googleanalytics-singer", - "dockerImageTag": "0.1.1", + "dockerImageTag": "0.1.2", "documentationUrl": "https://hub.docker.com/r/airbyte/source-googleanalytics-singer" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json index 34526428e5059..71ebab3da5665 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad", "name": "MySQL", "dockerRepository": "airbyte/source-mysql", - "dockerImageTag": "0.1.0", + "dockerImageTag": "0.1.1", "documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/57eb1576-8f52-463d-beb6-2e107cdf571d.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/57eb1576-8f52-463d-beb6-2e107cdf571d.json index d628a19a86b30..be5a10121e213 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/57eb1576-8f52-463d-beb6-2e107cdf571d.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/57eb1576-8f52-463d-beb6-2e107cdf571d.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "57eb1576-8f52-463d-beb6-2e107cdf571d", "name": "Hubspot", "dockerRepository": "airbyte/source-hubspot-singer", - "dockerImageTag": "0.1.0", + "dockerImageTag": "0.1.1", "documentationUrl": "https://https://docs.airbyte.io/integrations/sources/hubspot" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/71607ba1-c0ac-4799-8049-7f4b90dd50f7.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/71607ba1-c0ac-4799-8049-7f4b90dd50f7.json index 78860ec8b32aa..acc84cb257fcb 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/71607ba1-c0ac-4799-8049-7f4b90dd50f7.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/71607ba1-c0ac-4799-8049-7f4b90dd50f7.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "71607ba1-c0ac-4799-8049-7f4b90dd50f7", "name": "Google Sheets", "dockerRepository": "airbyte/source-google-sheets", - "dockerImageTag": "0.1.1", + "dockerImageTag": "0.1.2", "documentationUrl": "https://hub.docker.com/repository/docker/airbyte/source-google-sheets" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/74d47f79-8d01-44ac-9755-f5eb0d7caacb.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/74d47f79-8d01-44ac-9755-f5eb0d7caacb.json index 02fbdc7469f77..ca8e108c8eeee 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/74d47f79-8d01-44ac-9755-f5eb0d7caacb.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/74d47f79-8d01-44ac-9755-f5eb0d7caacb.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "74d47f79-8d01-44ac-9755-f5eb0d7caacb", "name": "Facebook Marketing APIs", "dockerRepository": "airbyte/source-facebook-marketing-api-singer", - "dockerImageTag": "0.1.1", + "dockerImageTag": "0.1.2", "documentationUrl": "https://hub.docker.com/r/airbyte/source-facebook-marketing-api-singer" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/778daa7c-feaf-4db6-96f3-70fd645acc77.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/778daa7c-feaf-4db6-96f3-70fd645acc77.json index 89f8ed162e3f3..4b7531b8501eb 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/778daa7c-feaf-4db6-96f3-70fd645acc77.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/778daa7c-feaf-4db6-96f3-70fd645acc77.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "778daa7c-feaf-4db6-96f3-70fd645acc77", "name": "File", "dockerRepository": "airbyte/source-file", - "dockerImageTag": "0.1.2", + "dockerImageTag": "0.1.3", "documentationUrl": "https://hub.docker.com/r/airbyte/source-file" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/9e0556f4-69df-4522-a3fb-03264d36b348.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/9e0556f4-69df-4522-a3fb-03264d36b348.json index f70e54ae701af..01c71569cb716 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/9e0556f4-69df-4522-a3fb-03264d36b348.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/9e0556f4-69df-4522-a3fb-03264d36b348.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "9e0556f4-69df-4522-a3fb-03264d36b348", "name": "Marketo", "dockerRepository": "airbyte/source-marketo-singer", - "dockerImageTag": "0.1.0", + "dockerImageTag": "0.1.1", "documentationUrl": "https://hub.docker.com/r/airbyte/source-marketo-singer" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/9fed261d-d107-47fd-8c8b-323023db6e20.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/9fed261d-d107-47fd-8c8b-323023db6e20.json index 0c72e2d7d98e6..5241b818958d7 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/9fed261d-d107-47fd-8c8b-323023db6e20.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/9fed261d-d107-47fd-8c8b-323023db6e20.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "9fed261d-d107-47fd-8c8b-323023db6e20", "name": "exchangeratesapi.io", "dockerRepository": "airbyte/source-exchangeratesapi-singer", - "dockerImageTag": "0.1.4", + "dockerImageTag": "0.1.6", "documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-exchangeratesapi_io-source" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b1892b11-788d-44bd-b9ec-3a436f7b54ce.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b1892b11-788d-44bd-b9ec-3a436f7b54ce.json index 9c2eddbe6fcfc..9f794f5f1f890 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b1892b11-788d-44bd-b9ec-3a436f7b54ce.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b1892b11-788d-44bd-b9ec-3a436f7b54ce.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "b1892b11-788d-44bd-b9ec-3a436f7b54ce", "name": "Shopify", "dockerRepository": "airbyte/source-shopify-singer", - "dockerImageTag": "0.1.1", + "dockerImageTag": "0.1.2", "documentationUrl": "https://hub.docker.com/r/airbyte/source-shopify-singer" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b5ea17b1-f170-46dc-bc31-cc744ca984c1.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b5ea17b1-f170-46dc-bc31-cc744ca984c1.json index 2c028c047c04e..a8c16783cf8a7 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b5ea17b1-f170-46dc-bc31-cc744ca984c1.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b5ea17b1-f170-46dc-bc31-cc744ca984c1.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "b5ea17b1-f170-46dc-bc31-cc744ca984c1", "name": "Microsoft SQL Server (MSSQL)", "dockerRepository": "airbyte/source-mssql", - "dockerImageTag": "0.1.0", + "dockerImageTag": "0.1.1", "documentationUrl": "https://hub.docker.com/r/airbyte/source-mssql" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json index a0bb31cdeb327..c55fab6d894b9 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750", "name": "Postgres", "dockerRepository": "airbyte/source-postgres", - "dockerImageTag": "0.1.0", + "dockerImageTag": "0.1.2", "documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e094cb9a-26de-4645-8761-65c0c425d1de.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e094cb9a-26de-4645-8761-65c0c425d1de.json index d6d7933f35e05..2edef9d90c800 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e094cb9a-26de-4645-8761-65c0c425d1de.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e094cb9a-26de-4645-8761-65c0c425d1de.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "e094cb9a-26de-4645-8761-65c0c425d1de", "name": "Stripe", "dockerRepository": "airbyte/source-stripe-singer", - "dockerImageTag": "0.1.4", + "dockerImageTag": "0.1.5", "documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-stripe-source" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/ef69ef6e-aa7f-4af1-a01d-ef775033524e.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/ef69ef6e-aa7f-4af1-a01d-ef775033524e.json index ae96ff75e6152..7948d33579311 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/ef69ef6e-aa7f-4af1-a01d-ef775033524e.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/ef69ef6e-aa7f-4af1-a01d-ef775033524e.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "ef69ef6e-aa7f-4af1-a01d-ef775033524e", "name": "Github", "dockerRepository": "airbyte/source-github-singer", - "dockerImageTag": "0.1.0", + "dockerImageTag": "0.1.1", "documentationUrl": "https://hub.docker.com/r/airbyte/source-github-singer" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/fdc8b827-3257-4b33-83cc-106d234c34d4.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/fdc8b827-3257-4b33-83cc-106d234c34d4.json index 72459d901cb60..76f9c7dc9a593 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/fdc8b827-3257-4b33-83cc-106d234c34d4.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/fdc8b827-3257-4b33-83cc-106d234c34d4.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "fdc8b827-3257-4b33-83cc-106d234c34d4", "name": "Google Adwords", "dockerRepository": "airbyte/source-google-adwords-singer", - "dockerImageTag": "0.1.0", + "dockerImageTag": "0.1.1", "documentationUrl": "https://hub.docker.com/r/airbyte/source-google-adwords" } diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteProtocolConverters.java b/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteProtocolConverters.java index 887bfb410c5b3..a5aa75ba01ae8 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteProtocolConverters.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteProtocolConverters.java @@ -28,7 +28,8 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.protocol.models.AirbyteCatalog; -import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -40,15 +41,15 @@ // todo (cgardens) - hack, remove after we've gotten rid of Schema object. public class AirbyteProtocolConverters { - public static AirbyteCatalog toCatalog(Schema schema) { - List airbyteStreams = schema.getStreams().stream() - .map(s -> new AirbyteStream() + public static ConfiguredAirbyteCatalog toConfiguredCatalog(Schema schema) { + List airbyteStreams = schema.getStreams().stream() + .map(s -> new ConfiguredAirbyteStream() .withName(s.getName()) .withJsonSchema(toJson(s.getFields()))) // perform selection based on the output of toJson, which keeps properties if selected=true .filter(s -> !s.getJsonSchema().get("properties").isEmpty()) .collect(Collectors.toList()); - return new AirbyteCatalog().withStreams(airbyteStreams); + return new ConfiguredAirbyteCatalog().withStreams(airbyteStreams); } // todo (cgardens) - this will only work with table / column schemas. it's hack to get us through diff --git a/airbyte-config/models/src/main/resources/types/StandardSyncInput.yaml b/airbyte-config/models/src/main/resources/types/StandardSyncInput.yaml index 6ff41a386181c..c78e00ab9ef8a 100644 --- a/airbyte-config/models/src/main/resources/types/StandardSyncInput.yaml +++ b/airbyte-config/models/src/main/resources/types/StandardSyncInput.yaml @@ -19,7 +19,7 @@ properties: "$ref": SyncMode.yaml catalog: type: object - existingJavaType: io.airbyte.protocol.models.AirbyteCatalog + existingJavaType: io.airbyte.protocol.models.ConfiguredAirbyteCatalog connectionId: type: string format: uuid diff --git a/airbyte-config/models/src/main/resources/types/StandardTapConfig.yaml b/airbyte-config/models/src/main/resources/types/StandardTapConfig.yaml index f9c335ff0c1ef..200e73ab60035 100644 --- a/airbyte-config/models/src/main/resources/types/StandardTapConfig.yaml +++ b/airbyte-config/models/src/main/resources/types/StandardTapConfig.yaml @@ -17,7 +17,7 @@ properties: existingJavaType: com.fasterxml.jackson.databind.JsonNode catalog: type: object - existingJavaType: io.airbyte.protocol.models.AirbyteCatalog + existingJavaType: io.airbyte.protocol.models.ConfiguredAirbyteCatalog syncMode: "$ref": SyncMode.yaml connectionId: diff --git a/airbyte-config/models/src/main/resources/types/StandardTargetConfig.yaml b/airbyte-config/models/src/main/resources/types/StandardTargetConfig.yaml index 46b18197d4462..9ae563d823909 100644 --- a/airbyte-config/models/src/main/resources/types/StandardTargetConfig.yaml +++ b/airbyte-config/models/src/main/resources/types/StandardTargetConfig.yaml @@ -19,7 +19,7 @@ properties: "$ref": SyncMode.yaml catalog: type: object - existingJavaType: io.airbyte.protocol.models.AirbyteCatalog + existingJavaType: io.airbyte.protocol.models.ConfiguredAirbyteCatalog connectionId: type: string format: uuid diff --git a/airbyte-config/models/src/test/java/io/airbyte/config/AirbyteProtocolConvertersTest.java b/airbyte-config/models/src/test/java/io/airbyte/config/AirbyteProtocolConvertersTest.java index 65e75afe1b22d..1d8dc98d54aeb 100644 --- a/airbyte-config/models/src/test/java/io/airbyte/config/AirbyteProtocolConvertersTest.java +++ b/airbyte-config/models/src/test/java/io/airbyte/config/AirbyteProtocolConvertersTest.java @@ -32,6 +32,8 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; import org.junit.jupiter.api.Test; @@ -48,6 +50,12 @@ class AirbyteProtocolConvertersTest { .withJsonSchema(CatalogHelpers.fieldsToJsonSchema( Field.of(COLUMN_NAME, JsonSchemaPrimitive.STRING), Field.of(COLUMN_AGE, JsonSchemaPrimitive.NUMBER))))); + private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = new ConfiguredAirbyteCatalog() + .withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() + .withName(STREAM) + .withJsonSchema(CatalogHelpers.fieldsToJsonSchema( + Field.of(COLUMN_NAME, JsonSchemaPrimitive.STRING), + Field.of(COLUMN_AGE, JsonSchemaPrimitive.NUMBER))))); private static final Schema SCHEMA = new Schema() .withStreams(Lists.newArrayList(new Stream() @@ -87,13 +95,13 @@ class AirbyteProtocolConvertersTest { .withSelected(false))))); @Test - void testToCatalog() { - assertEquals(CATALOG, AirbyteProtocolConverters.toCatalog(SCHEMA)); + void testToConfiguredCatalog() { + assertEquals(CONFIGURED_CATALOG, AirbyteProtocolConverters.toConfiguredCatalog(SCHEMA)); } @Test - void testToCatalogWithUnselected() { - assertEquals(CATALOG, AirbyteProtocolConverters.toCatalog(SCHEMA_WITH_UNSELECTED)); + void testToConfiguredCatalogWithUnselected() { + assertEquals(CONFIGURED_CATALOG, AirbyteProtocolConverters.toConfiguredCatalog(SCHEMA_WITH_UNSELECTED)); } @Test diff --git a/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/__init__.py b/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/__init__.py index 261cfd4c76a46..8c0d71b9ae069 100644 --- a/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/__init__.py +++ b/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/__init__.py @@ -30,6 +30,8 @@ AirbyteRecordMessage, AirbyteStateMessage, AirbyteStream, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, ConnectorSpecification, Status, Type, @@ -43,6 +45,8 @@ "AirbyteRecordMessage", "AirbyteStateMessage", "AirbyteStream", + "ConfiguredAirbyteCatalog", + "ConfiguredAirbyteStream", "ConnectorSpecification", "Status", "Type", diff --git a/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/__init__.py b/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/__init__.py index aba51162ae974..6d80a2e1ba2ef 100644 --- a/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/__init__.py +++ b/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/__init__.py @@ -23,4 +23,4 @@ """ # generated by generate-protocol-files -from .airbyte_message import * +from .airbyte_protocol import * diff --git a/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/airbyte_message.py b/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/airbyte_protocol.py similarity index 74% rename from airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/airbyte_message.py rename to airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/airbyte_protocol.py index b6da958ee0f42..859b998664df9 100644 --- a/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/airbyte_message.py +++ b/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/airbyte_protocol.py @@ -23,7 +23,7 @@ """ # generated by datamodel-codegen: -# filename: airbyte_message.yaml +# filename: airbyte_protocol.yaml from __future__ import annotations @@ -79,9 +79,9 @@ class AirbyteConnectionStatus(BaseModel): message: Optional[str] = None -class AirbyteStream(BaseModel): - name: str = Field(..., description="Stream's name.") - json_schema: Dict[str, Any] = Field(..., description="Stream schema using Json Schema specs.") +class SyncMode(Enum): + full_refresh = "full_refresh" + incremental = "incremental" class ConnectorSpecification(BaseModel): @@ -93,10 +93,34 @@ class ConnectorSpecification(BaseModel): ) +class AirbyteStream(BaseModel): + name: str = Field(..., description="Stream's name.") + json_schema: Dict[str, Any] = Field(..., description="Stream schema using Json Schema specs.") + supported_sync_modes: Optional[List[SyncMode]] = None + default_cursor_field: Optional[List[str]] = Field( + None, + description="Path to the field that will be used to determine if a record is new or modified since the last sync. If not provided by the source, the end user will have to specify the comparable themselves.", + ) + + +class ConfiguredAirbyteStream(BaseModel): + name: str = Field(..., description="Stream's name.") + json_schema: Dict[str, Any] = Field(..., description="Stream schema using Json Schema specs.") + sync_mode: Optional[SyncMode] = "full_refresh" + cursor_field: Optional[List[str]] = Field( + None, + description="Path to the field that will be used to determine if a record is new or modified since the last sync. This field is REQUIRED if `sync_mode` is `incremental`. Otherwise it is ignored.", + ) + + class AirbyteCatalog(BaseModel): streams: List[AirbyteStream] +class ConfiguredAirbyteCatalog(BaseModel): + streams: List[ConfiguredAirbyteStream] + + class AirbyteMessage(BaseModel): type: Type = Field(..., description="Message type") log: Optional[AirbyteLogMessage] = Field( @@ -114,3 +138,8 @@ class AirbyteMessage(BaseModel): None, description="schema message: the state. Must be the last message produced. The platform uses this information", ) + + +class AirbyteProtocol(BaseModel): + airbyte_message: Optional[AirbyteMessage] = None + configured_airbyte_catalog: Optional[ConfiguredAirbyteCatalog] = None diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Destination.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Destination.java index adc275aea3984..cfd1f932e8a89 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Destination.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Destination.java @@ -25,8 +25,8 @@ package io.airbyte.integrations.base; import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; public interface Destination extends Integration { @@ -41,6 +41,6 @@ public interface Destination extends Integration { * will always be called once regardless of success or failure. * @throws Exception - any exception. */ - DestinationConsumer write(JsonNode config, AirbyteCatalog catalog) throws Exception; + DestinationConsumer write(JsonNode config, ConfiguredAirbyteCatalog catalog) throws Exception; } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index 9debb9e391616..d2f29b64f62bc 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -30,9 +30,9 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.config.State; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.nio.file.Path; import java.util.Optional; import java.util.Scanner; @@ -99,7 +99,7 @@ public void run(String[] args) throws Exception { // envelope) while the other commands return what goes inside it. case READ -> { final JsonNode config = parseConfig(parsed.getConfigPath()); - final AirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), AirbyteCatalog.class); + final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); // todo (cgardens) - should we should only send the contents of the state field to the integration, // not the whole struct. this runner obfuscates everything but the contents. final Optional stateOptional = parsed.getStatePath().map(path -> parseConfig(path, State.class)); @@ -110,7 +110,7 @@ public void run(String[] args) throws Exception { // destination only case WRITE -> { final JsonNode config = parseConfig(parsed.getConfigPath()); - final AirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), AirbyteCatalog.class); + final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); final DestinationConsumer consumer = destination.write(config, catalog); consumeWriteStream(consumer); } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Source.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Source.java index 3a09ca34b9c13..cd3d275938585 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Source.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Source.java @@ -27,9 +27,9 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.util.stream.Stream; -// todo (cgardens) - share common parts of this interface with source. public interface Source extends Integration { /** @@ -53,6 +53,6 @@ public interface Source extends Integration { * {@link Stream#close()} will always be called once regardless of success or failure. * @throws Exception - any exception. */ - Stream read(JsonNode config, AirbyteCatalog catalog, JsonNode state) throws Exception; + Stream read(JsonNode config, ConfiguredAirbyteCatalog catalog, JsonNode state) throws Exception; } diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java index 6c17cb5254359..28abd7923e6b6 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java @@ -45,6 +45,8 @@ import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConnectorSpecification; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -63,6 +65,7 @@ class IntegrationRunnerTest { private static final String CONFIG_FILE_NAME = "config.json"; private static final String CATALOG_FILE_NAME = "catalog.json"; + private static final String CONFIGURED_CATALOG_FILE_NAME = "configured_catalog.json"; private static final String STATE_FILE_NAME = "state.json"; private static final String[] ARGS = new String[] {"args"}; @@ -74,6 +77,7 @@ class IntegrationRunnerTest { private static final Path TEST_ROOT = Path.of("/tmp/airbyte_tests"); private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(Lists.newArrayList(new AirbyteStream().withName(STREAM_NAME))); + private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG); private static final State STATE = new State().withState(Jsons.jsonNode(ImmutableMap.of("checkpoint", "05/08/1945"))); private IntegrationCliParser cliParser; @@ -82,6 +86,7 @@ class IntegrationRunnerTest { private Source source; private Path configPath; private Path catalogPath; + private Path configuredCatalogPath; private Path statePath; @SuppressWarnings("unchecked") @@ -95,6 +100,7 @@ void setup() throws IOException { configPath = IOs.writeFile(configDir, CONFIG_FILE_NAME, CONFIG_STRING); catalogPath = IOs.writeFile(configDir, CATALOG_FILE_NAME, Jsons.serialize(CATALOG)); + configuredCatalogPath = IOs.writeFile(configDir, CONFIGURED_CATALOG_FILE_NAME, Jsons.serialize(CONFIGURED_CATALOG)); statePath = IOs.writeFile(configDir, STATE_FILE_NAME, Jsons.serialize(STATE)); } @@ -170,7 +176,7 @@ void testDiscover() throws Exception { @Test void testRead() throws Exception { - final IntegrationConfig intConfig = IntegrationConfig.read(configPath, catalogPath, statePath); + final IntegrationConfig intConfig = IntegrationConfig.read(configPath, configuredCatalogPath, statePath); final AirbyteMessage message1 = new AirbyteMessage() .withType(Type.RECORD) .withRecord(new AirbyteRecordMessage().withData(Jsons.jsonNode(ImmutableMap.of("names", "byron")))); @@ -179,11 +185,11 @@ void testRead() throws Exception { .withData(Jsons.jsonNode(ImmutableMap.of("names", "reginald")))); when(cliParser.parse(ARGS)).thenReturn(intConfig); - when(source.read(CONFIG, CATALOG, STATE.getState())).thenReturn(Stream.of(message1, message2)); + when(source.read(CONFIG, CONFIGURED_CATALOG, STATE.getState())).thenReturn(Stream.of(message1, message2)); new IntegrationRunner(cliParser, stdoutConsumer, null, source).run(ARGS); - verify(source).read(CONFIG, CATALOG, STATE.getState()); + verify(source).read(CONFIG, CONFIGURED_CATALOG, STATE.getState()); verify(stdoutConsumer).accept(Jsons.serialize(message1)); verify(stdoutConsumer).accept(Jsons.serialize(message2)); } @@ -191,15 +197,15 @@ void testRead() throws Exception { @SuppressWarnings("unchecked") @Test void testWrite() throws Exception { - final IntegrationConfig intConfig = IntegrationConfig.write(configPath, catalogPath); + final IntegrationConfig intConfig = IntegrationConfig.write(configPath, configuredCatalogPath); final DestinationConsumer destinationConsumerMock = mock(DestinationConsumer.class); when(cliParser.parse(ARGS)).thenReturn(intConfig); - when(destination.write(CONFIG, CATALOG)).thenReturn(destinationConsumerMock); + when(destination.write(CONFIG, CONFIGURED_CATALOG)).thenReturn(destinationConsumerMock); final IntegrationRunner runner = spy(new IntegrationRunner(cliParser, stdoutConsumer, destination, null)); runner.run(ARGS); - verify(destination).write(CONFIG, CATALOG); + verify(destination).write(CONFIG, CONFIGURED_CATALOG); } @SuppressWarnings("unchecked") diff --git a/airbyte-integrations/bases/base-singer/base_singer/singer_helpers.py b/airbyte-integrations/bases/base-singer/base_singer/singer_helpers.py index a735261fa589d..316a7d3839d00 100644 --- a/airbyte-integrations/bases/base-singer/base_singer/singer_helpers.py +++ b/airbyte-integrations/bases/base-singer/base_singer/singer_helpers.py @@ -30,7 +30,14 @@ from datetime import datetime from typing import DefaultDict, Dict, Generator -from airbyte_protocol import AirbyteCatalog, AirbyteMessage, AirbyteRecordMessage, AirbyteStateMessage, AirbyteStream +from airbyte_protocol import ( + AirbyteCatalog, + AirbyteMessage, + AirbyteRecordMessage, + AirbyteStateMessage, + AirbyteStream, + ConfiguredAirbyteCatalog, +) def to_json(string): @@ -70,9 +77,7 @@ def singer_catalog_to_airbyte_catalog(singer_catalog: Dict[str, any]) -> Airbyte return AirbyteCatalog(streams=airbyte_streams) @staticmethod - def get_catalogs( - logger, shell_command, singer_transform=(lambda catalog: catalog), airbyte_transform=(lambda catalog: catalog) - ) -> Catalogs: + def get_catalogs(logger, shell_command) -> Catalogs: completed_process = subprocess.run( shell_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True ) @@ -80,8 +85,8 @@ def get_catalogs( for line in completed_process.stderr.splitlines(): logger.log_by_prefix(line, "ERROR") - singer_catalog = singer_transform(json.loads(completed_process.stdout)) - airbyte_catalog = airbyte_transform(SingerHelper.singer_catalog_to_airbyte_catalog(singer_catalog)) + singer_catalog = json.loads(completed_process.stdout) + airbyte_catalog = SingerHelper.singer_catalog_to_airbyte_catalog(singer_catalog) return Catalogs(singer_catalog=singer_catalog, airbyte_catalog=airbyte_catalog) @@ -124,7 +129,7 @@ def read(logger, shell_command, is_message=(lambda x: True), transform=(lambda x logger.log_by_prefix(line, "ERROR") @staticmethod - def create_singer_catalog_with_selection(masked_airbyte_catalog, discovered_singer_catalog) -> str: + def create_singer_catalog_with_selection(masked_airbyte_catalog: ConfiguredAirbyteCatalog, discovered_singer_catalog: object) -> str: combined_catalog_path = os.path.join("singer_rendered_catalog.json") masked_singer_streams = [] @@ -142,6 +147,7 @@ def create_singer_catalog_with_selection(masked_airbyte_catalog, discovered_sing for metadata in metadatas: new_metadata = metadata new_metadata["metadata"]["selected"] = True + # todo (cgardens) - need to make changes here for singer sources to support incremental. if not is_field_metadata(new_metadata): new_metadata["metadata"]["forced-replication-method"] = "FULL_TABLE" new_metadata["metadata"]["replication-method"] = "FULL_TABLE" diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java index 023da47e57978..5fa952761c184 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java @@ -47,6 +47,8 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.workers.DefaultCheckConnectionWorker; import io.airbyte.workers.DefaultGetSpecWorker; import io.airbyte.workers.OutputAndStatus; @@ -241,9 +243,10 @@ public Stream provideArguments(ExtensionContext context) { @ArgumentsSource(DataArgumentsProvider.class) public void testSync(String messagesFilename, String catalogFilename) throws Exception { final AirbyteCatalog catalog = Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class); + final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog); final List messages = MoreResources.readResource(messagesFilename).lines() .map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList()); - runSync(getConfig(), messages, catalog); + runSync(getConfig(), messages, configuredCatalog); assertSameMessages(messages, retrieveRecordsForCatalog(catalog)); } @@ -260,9 +263,10 @@ public void testSyncWithNormalization(String messagesFilename, String catalogFil } final AirbyteCatalog catalog = Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class); + final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog); final List messages = MoreResources.readResource(messagesFilename).lines() .map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList()); - runSync(getConfigWithBasicNormalization(), messages, catalog); + runSync(getConfigWithBasicNormalization(), messages, configuredCatalog); assertSameMessages(messages, retrieveRecordsForCatalog(catalog)); assertSameMessages(messages, retrieveNormalizedRecordsForCatalog(catalog), true); @@ -275,9 +279,10 @@ public void testSyncWithNormalization(String messagesFilename, String catalogFil public void testSecondSync() throws Exception { final AirbyteCatalog catalog = Jsons.deserialize(MoreResources.readResource("exchange_rate_catalog.json"), AirbyteCatalog.class); + final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog); final List firstSyncMessages = MoreResources.readResource("exchange_rate_messages.txt").lines() .map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList()); - runSync(getConfig(), firstSyncMessages, catalog); + runSync(getConfig(), firstSyncMessages, configuredCatalog); final List secondSyncMessages = Lists.newArrayList(new AirbyteMessage() .withRecord(new AirbyteRecordMessage() @@ -287,7 +292,7 @@ public void testSecondSync() throws Exception { .put("HKD", 10) .put("NZD", 700) .build())))); - runSync(getConfig(), secondSyncMessages, catalog); + runSync(getConfig(), secondSyncMessages, configuredCatalog); assertSameMessages(secondSyncMessages, retrieveRecordsForCatalog(catalog)); } @@ -301,7 +306,7 @@ private OutputAndStatus runCheck(JsonNode config) .run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot); } - private void runSync(JsonNode config, List messages, AirbyteCatalog catalog) throws Exception { + private void runSync(JsonNode config, List messages, ConfiguredAirbyteCatalog catalog) throws Exception { final StandardTargetConfig targetConfig = new StandardTargetConfig() .withConnectionId(UUID.randomUUID()) diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestSource.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestSource.java index f2c3a55702e73..b5110b378d1f5 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestSource.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestSource.java @@ -43,6 +43,8 @@ import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.workers.DefaultCheckConnectionWorker; import io.airbyte.workers.DefaultDiscoverCatalogWorker; @@ -207,7 +209,7 @@ public void testDiscover() throws Exception { */ @Test public void testRead() throws Exception { - final List allMessages = runRead(getCatalog()); + final List allMessages = runRead(CatalogHelpers.toDefaultConfiguredCatalog(getCatalog())); final List recordMessages = allMessages.stream().filter(m -> m.getType() == Type.RECORD).collect(Collectors.toList()); // the worker validates the message formats, so we just validate the message content // We don't need to validate message format as long as we use the worker, which we will not want to @@ -228,10 +230,12 @@ public void testRead() throws Exception { */ @Test public void testSecondRead() throws Exception { - final List recordMessagesFirstRun = - runRead(getCatalog()).stream().filter(m -> m.getType() == Type.RECORD).collect(Collectors.toList()); - final List recordMessagesSecondRun = - runRead(getCatalog()).stream().filter(m -> m.getType() == Type.RECORD).collect(Collectors.toList()); + final List recordMessagesFirstRun = runRead(CatalogHelpers.toDefaultConfiguredCatalog(getCatalog())) + .stream().filter(m -> m.getType() == Type.RECORD) + .collect(Collectors.toList()); + final List recordMessagesSecondRun = runRead(CatalogHelpers.toDefaultConfiguredCatalog(getCatalog())) + .stream().filter(m -> m.getType() == Type.RECORD) + .collect(Collectors.toList()); // the worker validates the messages, so we just validate the message, so we do not need to validate // again (as long as we use the worker, which we will not want to do long term). assertFalse(recordMessagesFirstRun.isEmpty()); @@ -255,7 +259,7 @@ private OutputAndStatus runDiscover() throws Exce } // todo (cgardens) - assume no state since we are all full refresh right now. - private List runRead(AirbyteCatalog catalog) throws Exception { + private List runRead(ConfiguredAirbyteCatalog catalog) throws Exception { final StandardTapConfig tapConfig = new StandardTapConfig() .withConnectionId(UUID.randomUUID()) .withSourceConnectionConfiguration(getConfig()) diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index 3418315dc7899..35ae3f218f8ea 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 4174207d28f87..6a7d038a9ac43 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -56,11 +56,11 @@ import io.airbyte.integrations.base.FailureTrackingConsumer; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.NamingHelper; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.ConnectorSpecification; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -186,13 +186,13 @@ private static Job waitForQuery(Job queryJob) { * @return consumer that writes singer messages to the database. */ @Override - public DestinationConsumer write(JsonNode config, AirbyteCatalog catalog) { + public DestinationConsumer write(JsonNode config, ConfiguredAirbyteCatalog catalog) { final BigQuery bigquery = getBigQuery(config); Map writeConfigs = new HashMap<>(); final String datasetId = config.get(CONFIG_DATASET_ID).asText(); // create tmp tables if not exist - for (final AirbyteStream stream : catalog.getStreams()) { + for (final ConfiguredAirbyteStream stream : catalog.getStreams()) { final String tableName = NamingHelper.getRawTableName(stream.getName()); final String tmpTableName = stream.getName() + "_" + Instant.now().toEpochMilli(); @@ -251,9 +251,9 @@ public static class RecordConsumer extends FailureTrackingConsumer writeConfigs; - private final AirbyteCatalog catalog; + private final ConfiguredAirbyteCatalog catalog; - public RecordConsumer(BigQuery bigquery, Map writeConfigs, AirbyteCatalog catalog) { + public RecordConsumer(BigQuery bigquery, Map writeConfigs, ConfiguredAirbyteCatalog catalog) { this.bigquery = bigquery; this.writeConfigs = writeConfigs; this.catalog = catalog; diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryIntegrationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryIntegrationTest.java index 6577a44e3a39e..83233d695e799 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryIntegrationTest.java @@ -68,8 +68,6 @@ public class BigQueryIntegrationTest extends TestDestination { private static final String CONFIG_PROJECT_ID = "project_id"; private static final String CONFIG_CREDS = "credentials_json"; - private static final String COLUMN_DATA = "data"; - private BigQuery bigquery; private Dataset dataset; private boolean tornDown; diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java index 55c39237f0049..72797a9a9e66b 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java @@ -45,14 +45,14 @@ import io.airbyte.commons.resources.MoreResources; import io.airbyte.integrations.base.DestinationConsumer; import io.airbyte.integrations.base.NamingHelper; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStateMessage; -import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; @@ -100,11 +100,11 @@ class BigQueryDestinationTest { private static final AirbyteMessage MESSAGE_STATE = new AirbyteMessage().withType(AirbyteMessage.Type.STATE) .withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build()))); - private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createAirbyteStream(USERS_STREAM_NAME, io.airbyte.protocol.models.Field.of("name", JsonSchemaPrimitive.STRING), + private static final ConfiguredAirbyteCatalog CATALOG = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( + CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, io.airbyte.protocol.models.Field.of("name", JsonSchemaPrimitive.STRING), io.airbyte.protocol.models.Field .of("id", JsonSchemaPrimitive.STRING)), - CatalogHelpers.createAirbyteStream(TASKS_STREAM_NAME, Field.of("goal", JsonSchemaPrimitive.STRING)))); + CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, Field.of("goal", JsonSchemaPrimitive.STRING)))); private JsonNode config; @@ -228,7 +228,7 @@ void testWriteSuccess() throws Exception { assertEquals(expectedTasksJson.size(), tasksActual.size()); assertTrue(expectedTasksJson.containsAll(tasksActual) && tasksActual.containsAll(expectedTasksJson)); - assertTmpTablesNotPresent(CATALOG.getStreams().stream().map(AirbyteStream::getName).collect(Collectors.toList())); + assertTmpTablesNotPresent(CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getName).collect(Collectors.toList())); } @SuppressWarnings("ResultOfMethodCallIgnored") @@ -244,8 +244,8 @@ void testWriteFailure() throws Exception { consumer.accept(MESSAGE_USERS2); consumer.close(); - final List tableNames = CATALOG.getStreams().stream().map(AirbyteStream::getName).collect(toList()); - assertTmpTablesNotPresent(CATALOG.getStreams().stream().map(AirbyteStream::getName).collect(Collectors.toList())); + final List tableNames = CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getName).collect(toList()); + assertTmpTablesNotPresent(CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getName).collect(Collectors.toList())); // assert that no tables were created. assertTrue(fetchNamesOfTablesInDb().stream().noneMatch(tableName -> tableNames.stream().anyMatch(tableName::startsWith))); } diff --git a/airbyte-integrations/connectors/destination-csv/Dockerfile b/airbyte-integrations/connectors/destination-csv/Dockerfile index 0d41f5764d2a3..d2e6b7174455d 100644 --- a/airbyte-integrations/connectors/destination-csv/Dockerfile +++ b/airbyte-integrations/connectors/destination-csv/Dockerfile @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.version=0.1.2 LABEL io.airbyte.name=airbyte/destination-csv diff --git a/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java b/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java index 62b1562dea18c..c6e477e46729d 100644 --- a/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java +++ b/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java @@ -32,11 +32,11 @@ import io.airbyte.integrations.base.DestinationConsumer; import io.airbyte.integrations.base.FailureTrackingConsumer; import io.airbyte.integrations.base.IntegrationRunner; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.ConnectorSpecification; import java.io.FileWriter; import java.io.IOException; @@ -85,14 +85,14 @@ public AirbyteConnectionStatus check(JsonNode config) { * @throws IOException - exception throw in manipulating the filesytem. */ @Override - public DestinationConsumer write(JsonNode config, AirbyteCatalog catalog) throws IOException { + public DestinationConsumer write(JsonNode config, ConfiguredAirbyteCatalog catalog) throws IOException { final Path destinationDir = getDestinationPath(config); FileUtils.forceMkdir(destinationDir.toFile()); final long now = Instant.now().toEpochMilli(); final Map writeConfigs = new HashMap<>(); - for (final AirbyteStream stream : catalog.getStreams()) { + for (final ConfiguredAirbyteStream stream : catalog.getStreams()) { final Path tmpPath = destinationDir.resolve(stream.getName() + "_" + now + ".csv"); final Path finalPath = destinationDir.resolve(stream.getName() + ".csv"); final FileWriter fileWriter = new FileWriter(tmpPath.toFile()); @@ -124,9 +124,9 @@ private Path getDestinationPath(JsonNode config) { private static class CsvConsumer extends FailureTrackingConsumer { private final Map writeConfigs; - private final AirbyteCatalog catalog; + private final ConfiguredAirbyteCatalog catalog; - public CsvConsumer(Map writeConfigs, AirbyteCatalog catalog) { + public CsvConsumer(Map writeConfigs, ConfiguredAirbyteCatalog catalog) { this.catalog = catalog; LOGGER.info("initializing consumer."); diff --git a/airbyte-integrations/connectors/destination-csv/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java b/airbyte-integrations/connectors/destination-csv/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java index 09acdc5b21e03..0af51258bef5c 100644 --- a/airbyte-integrations/connectors/destination-csv/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java +++ b/airbyte-integrations/connectors/destination-csv/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java @@ -35,13 +35,13 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; import io.airbyte.integrations.base.DestinationConsumer; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStateMessage; import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; @@ -89,10 +89,10 @@ class CsvDestinationTest { private static final AirbyteMessage MESSAGE_STATE = new AirbyteMessage().withType(AirbyteMessage.Type.STATE) .withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build()))); - private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createAirbyteStream(USERS_STREAM_NAME, Field.of("name", JsonSchemaPrimitive.STRING), + private static final ConfiguredAirbyteCatalog CATALOG = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( + CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, Field.of("name", JsonSchemaPrimitive.STRING), Field.of("id", JsonSchemaPrimitive.STRING)), - CatalogHelpers.createAirbyteStream(TASKS_STREAM_NAME, Field.of("goal", JsonSchemaPrimitive.STRING)))); + CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, Field.of("goal", JsonSchemaPrimitive.STRING)))); private Path destinationPath; private JsonNode config; diff --git a/airbyte-integrations/connectors/destination-postgres/Dockerfile b/airbyte-integrations/connectors/destination-postgres/Dockerfile index e0b4d374a498e..be2881038959f 100644 --- a/airbyte-integrations/connectors/destination-postgres/Dockerfile +++ b/airbyte-integrations/connectors/destination-postgres/Dockerfile @@ -9,5 +9,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.version=0.1.2 LABEL io.airbyte.name=airbyte/destination-postgres diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java index 12ec5b6e85eae..1df5f989883b4 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java @@ -41,12 +41,12 @@ import io.airbyte.integrations.base.FailureTrackingConsumer; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.NamingHelper; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.queue.BigQueue; import java.io.IOException; @@ -126,13 +126,13 @@ public AirbyteConnectionStatus check(JsonNode config) { * @throws Exception - anything could happen! */ @Override - public DestinationConsumer write(JsonNode config, AirbyteCatalog catalog) throws Exception { + public DestinationConsumer write(JsonNode config, ConfiguredAirbyteCatalog catalog) throws Exception { // connect to db. final Database database = getDatabase(config); Map writeBuffers = new HashMap<>(); // create tmp tables if not exist - for (final AirbyteStream stream : catalog.getStreams()) { + for (final ConfiguredAirbyteStream stream : catalog.getStreams()) { final String tableName = NamingHelper.getRawTableName(stream.getName()); final String tmpTableName = stream.getName() + "_" + Instant.now().toEpochMilli(); database.query(ctx -> ctx.execute(String.format( @@ -164,9 +164,9 @@ public static class RecordConsumer extends FailureTrackingConsumer writeConfigs; - private final AirbyteCatalog catalog; + private final ConfiguredAirbyteCatalog catalog; - public RecordConsumer(Database database, Map writeConfigs, AirbyteCatalog catalog) { + public RecordConsumer(Database database, Map writeConfigs, ConfiguredAirbyteCatalog catalog) { this.database = database; this.writeConfigs = writeConfigs; this.catalog = catalog; diff --git a/airbyte-integrations/connectors/destination-postgres/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java b/airbyte-integrations/connectors/destination-postgres/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java index 31331562b1af1..e4b8a0950f9ec 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java @@ -42,14 +42,14 @@ import io.airbyte.db.Databases; import io.airbyte.integrations.base.DestinationConsumer; import io.airbyte.integrations.base.NamingHelper; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStateMessage; -import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; @@ -94,10 +94,10 @@ class PostgresDestinationTest { private static final AirbyteMessage MESSAGE_STATE = new AirbyteMessage().withType(AirbyteMessage.Type.STATE) .withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build()))); - private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createAirbyteStream(USERS_STREAM_NAME, Field.of("name", JsonSchemaPrimitive.STRING), + private static final ConfiguredAirbyteCatalog CATALOG = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( + CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, Field.of("name", JsonSchemaPrimitive.STRING), Field.of("id", JsonSchemaPrimitive.STRING)), - CatalogHelpers.createAirbyteStream(TASKS_STREAM_NAME, Field.of("goal", JsonSchemaPrimitive.STRING)))); + CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, Field.of("goal", JsonSchemaPrimitive.STRING)))); private PostgreSQLContainer container; private JsonNode config; @@ -173,7 +173,7 @@ void testWriteSuccess() throws Exception { final Set expectedTasksJson = Sets.newHashSet(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData()); assertEquals(expectedTasksJson, tasksActual); - assertTmpTablesNotPresent(CATALOG.getStreams().stream().map(AirbyteStream::getName).collect(Collectors.toList())); + assertTmpTablesNotPresent(CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getName).collect(Collectors.toList())); } @SuppressWarnings("ResultOfMethodCallIgnored") @@ -190,7 +190,7 @@ void testWriteFailure() throws Exception { consumer.close(); final List tableNames = CATALOG.getStreams().stream().map(s -> NamingHelper.getRawTableName(s.getName())).collect(toList()); - assertTmpTablesNotPresent(CATALOG.getStreams().stream().map(AirbyteStream::getName).collect(Collectors.toList())); + assertTmpTablesNotPresent(CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getName).collect(Collectors.toList())); // assert that no tables were created. assertTrue(fetchNamesOfTablesInDb().stream().noneMatch(tableName -> tableNames.stream().anyMatch(tableName::startsWith))); } diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index ebdde26ad22d5..4bd1e948c37d1 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java index 3d9cb38467d59..0e553a5ea46b7 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java @@ -31,11 +31,11 @@ import io.airbyte.integrations.base.DestinationConsumer; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.NamingHelper; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.queue.BigQueue; import java.io.IOException; @@ -99,14 +99,14 @@ public AirbyteConnectionStatus check(JsonNode config) { * @throws Exception - anything could happen! */ @Override - public DestinationConsumer write(JsonNode config, AirbyteCatalog catalog) throws Exception { + public DestinationConsumer write(JsonNode config, ConfiguredAirbyteCatalog catalog) throws Exception { // connect to snowflake final Supplier connectionFactory = SnowflakeDatabase.getConnectionFactory(config); Map writeBuffers = new HashMap<>(); // create temporary tables if they do not exist // we don't use temporary/transient since we want to control the lifecycle - for (final AirbyteStream stream : catalog.getStreams()) { + for (final ConfiguredAirbyteStream stream : catalog.getStreams()) { final String tableName = NamingHelper.getRawTableName(stream.getName()); final String tmpTableName = stream.getName() + "_" + Instant.now().toEpochMilli(); diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeRecordConsumer.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeRecordConsumer.java index 494c24be0cd9d..8b944a705450e 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeRecordConsumer.java @@ -30,9 +30,9 @@ import io.airbyte.commons.lang.CloseableQueue; import io.airbyte.integrations.base.DestinationConsumer; import io.airbyte.integrations.base.FailureTrackingConsumer; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -61,9 +61,11 @@ public class SnowflakeRecordConsumer extends FailureTrackingConsumer connectionFactory; private final Map writeContexts; - private final AirbyteCatalog catalog; + private final ConfiguredAirbyteCatalog catalog; - public SnowflakeRecordConsumer(Supplier connectionFactory, Map writeContexts, AirbyteCatalog catalog) { + public SnowflakeRecordConsumer(Supplier connectionFactory, + Map writeContexts, + ConfiguredAirbyteCatalog catalog) { this.connectionFactory = connectionFactory; this.writeContexts = writeContexts; this.catalog = catalog; diff --git a/airbyte-integrations/connectors/source-exchangeratesapi-singer/Dockerfile b/airbyte-integrations/connectors/source-exchangeratesapi-singer/Dockerfile index 713b6c7c69917..9c08671ec9d38 100644 --- a/airbyte-integrations/connectors/source-exchangeratesapi-singer/Dockerfile +++ b/airbyte-integrations/connectors/source-exchangeratesapi-singer/Dockerfile @@ -8,7 +8,7 @@ ENV CODE_PATH="source_exchangeratesapi_singer" ENV AIRBYTE_IMPL_MODULE="source_exchangeratesapi_singer" ENV AIRBYTE_IMPL_PATH="SourceExchangeRatesApiSinger" -LABEL io.airbyte.version=0.1.4 +LABEL io.airbyte.version=0.1.6 LABEL io.airbyte.name=airbyte/source-exchangeratesapi-singer WORKDIR /airbyte/integration_code diff --git a/airbyte-integrations/connectors/source-facebook-marketing-api-singer/Dockerfile b/airbyte-integrations/connectors/source-facebook-marketing-api-singer/Dockerfile index 184e202783be5..25d921f82dc83 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing-api-singer/Dockerfile +++ b/airbyte-integrations/connectors/source-facebook-marketing-api-singer/Dockerfile @@ -13,5 +13,5 @@ COPY $CODE_PATH ./$CODE_PATH COPY setup.py ./ RUN pip install ".[main]" -LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.version=0.1.2 LABEL io.airbyte.name=airbyte/source-facebook-marketing-api-singer diff --git a/airbyte-integrations/connectors/source-file/Dockerfile b/airbyte-integrations/connectors/source-file/Dockerfile index 2c23b14e47867..53aa3b4a1eb92 100644 --- a/airbyte-integrations/connectors/source-file/Dockerfile +++ b/airbyte-integrations/connectors/source-file/Dockerfile @@ -11,5 +11,5 @@ COPY $CODE_PATH ./$CODE_PATH COPY setup.py ./ RUN pip install ".[main]" -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/source-file diff --git a/airbyte-integrations/connectors/source-github-singer/Dockerfile b/airbyte-integrations/connectors/source-github-singer/Dockerfile index f39c9b0fc1d2e..fa2376667c6a4 100644 --- a/airbyte-integrations/connectors/source-github-singer/Dockerfile +++ b/airbyte-integrations/connectors/source-github-singer/Dockerfile @@ -8,7 +8,7 @@ ENV CODE_PATH="source_github_singer" ENV AIRBYTE_IMPL_MODULE="source_github_singer" ENV AIRBYTE_IMPL_PATH="SourceGithubSinger" -LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.version=0.1.1 LABEL io.airbyte.name=airbyte/source-github-singer WORKDIR /airbyte/integration_code diff --git a/airbyte-integrations/connectors/source-google-adwords-singer/Dockerfile b/airbyte-integrations/connectors/source-google-adwords-singer/Dockerfile index 8da441a295805..cf25dce380848 100644 --- a/airbyte-integrations/connectors/source-google-adwords-singer/Dockerfile +++ b/airbyte-integrations/connectors/source-google-adwords-singer/Dockerfile @@ -11,5 +11,5 @@ COPY $CODE_PATH ./$CODE_PATH COPY setup.py ./ RUN pip install ".[main]" -LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.version=0.1.1 LABEL io.airbyte.name=airbyte/source-google-adwords-singer diff --git a/airbyte-integrations/connectors/source-google-sheets/Dockerfile b/airbyte-integrations/connectors/source-google-sheets/Dockerfile index 4cf52eb21d2cc..49fcd8583b6a2 100644 --- a/airbyte-integrations/connectors/source-google-sheets/Dockerfile +++ b/airbyte-integrations/connectors/source-google-sheets/Dockerfile @@ -11,5 +11,5 @@ COPY $CODE_PATH ./$CODE_PATH COPY setup.py ./ RUN pip install . -LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.version=0.1.2 LABEL io.airbyte.name=airbyte/source-google-sheets diff --git a/airbyte-integrations/connectors/source-googleanalytics-singer/Dockerfile b/airbyte-integrations/connectors/source-googleanalytics-singer/Dockerfile index ebe18ae31be30..0fa55dd62be16 100644 --- a/airbyte-integrations/connectors/source-googleanalytics-singer/Dockerfile +++ b/airbyte-integrations/connectors/source-googleanalytics-singer/Dockerfile @@ -8,7 +8,7 @@ ENV CODE_PATH="source_googleanalytics_singer" ENV AIRBYTE_IMPL_MODULE="source_googleanalytics_singer" ENV AIRBYTE_IMPL_PATH="GoogleAnalyticsSingerSource" -LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.version=0.1.2 LABEL io.airbyte.name=airbyte/source-googleanalytics-singer WORKDIR /airbyte/integration_code diff --git a/airbyte-integrations/connectors/source-hubspot-singer/Dockerfile b/airbyte-integrations/connectors/source-hubspot-singer/Dockerfile index 90c20f3c2ea73..38ab3e446572e 100644 --- a/airbyte-integrations/connectors/source-hubspot-singer/Dockerfile +++ b/airbyte-integrations/connectors/source-hubspot-singer/Dockerfile @@ -8,7 +8,7 @@ ENV CODE_PATH="source_hubspot_singer" ENV AIRBYTE_IMPL_MODULE="source_hubspot_singer" ENV AIRBYTE_IMPL_PATH="SourceHubspotSinger" -LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.version=0.1.1 LABEL io.airbyte.name=airbyte/source-hubspot-singer WORKDIR /airbyte/integration_code diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index 318051653bb27..5490ebf1380a6 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -39,8 +39,9 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; @@ -143,7 +144,7 @@ private List> discoverInternal(final Database database) throws Exceptio } @Override - public Stream read(JsonNode config, AirbyteCatalog catalog, JsonNode state) throws Exception { + public Stream read(JsonNode config, ConfiguredAirbyteCatalog catalog, JsonNode state) throws Exception { final Instant now = Instant.now(); final Database database = createDatabase(config); @@ -153,7 +154,7 @@ public Stream read(JsonNode config, AirbyteCatalog catalog, Json Stream resultStream = Stream.empty(); - for (final AirbyteStream airbyteStream : catalog.getStreams()) { + for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) { if (!tableNameToTable.containsKey(airbyteStream.getName())) { continue; } diff --git a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcSourceTest.java b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcSourceTest.java index 8ecf0117e64ad..0c9d0b092e54c 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcSourceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcSourceTest.java @@ -44,8 +44,9 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; @@ -65,6 +66,7 @@ class JdbcSourceTest { STREAM_NAME, Field.of("id", JsonSchemaPrimitive.NUMBER), Field.of("name", JsonSchemaPrimitive.STRING)); + private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG); private static final Set MESSAGES = Sets.newHashSet( new AirbyteMessage().withType(Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(STREAM_NAME).withData(Jsons.jsonNode(ImmutableMap.of("id", 1, "name", "picard")))), @@ -143,7 +145,7 @@ void testDiscover() throws Exception { @Test void testReadSuccess() throws Exception { - final Set actualMessages = new JdbcSource().read(config, CATALOG, null).collect(Collectors.toSet()); + final Set actualMessages = new JdbcSource().read(config, CONFIGURED_CATALOG, null).collect(Collectors.toSet()); actualMessages.forEach(r -> { if (r.getRecord() != null) { @@ -156,7 +158,7 @@ void testReadSuccess() throws Exception { @Test void testReadOneColumn() throws Exception { - final AirbyteCatalog catalog = CatalogHelpers.createAirbyteCatalog(STREAM_NAME, Field.of("id", JsonSchemaPrimitive.NUMBER)); + final ConfiguredAirbyteCatalog catalog = CatalogHelpers.createConfiguredAirbyteCatalog(STREAM_NAME, Field.of("id", JsonSchemaPrimitive.NUMBER)); final Set actualMessages = new JdbcSource().read(config, catalog, null).collect(Collectors.toSet()); @@ -183,9 +185,9 @@ void testReadMultipleTables() throws Exception { return null; }); - final AirbyteCatalog catalog = new AirbyteCatalog().withStreams(Lists.newArrayList( - CATALOG.getStreams().get(0), - CatalogHelpers.createAirbyteStream( + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( + CONFIGURED_CATALOG.getStreams().get(0), + CatalogHelpers.createConfiguredAirbyteStream( streamName2, Field.of("id", JsonSchemaPrimitive.NUMBER), Field.of("name", JsonSchemaPrimitive.STRING)))); @@ -210,8 +212,8 @@ void testReadMultipleTables() throws Exception { @SuppressWarnings("ResultOfMethodCallIgnored") @Test void testReadFailure() throws Exception { - final AirbyteStream spiedAbStream = spy(CATALOG.getStreams().get(0)); - final AirbyteCatalog catalog = new AirbyteCatalog().withStreams(Lists.newArrayList(spiedAbStream)); + final ConfiguredAirbyteStream spiedAbStream = spy(CONFIGURED_CATALOG.getStreams().get(0)); + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(spiedAbStream)); doCallRealMethod().doCallRealMethod().doThrow(new RuntimeException()).when(spiedAbStream).getName(); final Stream stream = new JdbcSource().read(config, catalog, null); diff --git a/airbyte-integrations/connectors/source-marketo-singer/Dockerfile b/airbyte-integrations/connectors/source-marketo-singer/Dockerfile index 18a77437a1d36..b031e67c47d04 100644 --- a/airbyte-integrations/connectors/source-marketo-singer/Dockerfile +++ b/airbyte-integrations/connectors/source-marketo-singer/Dockerfile @@ -14,5 +14,5 @@ COPY $CODE_PATH ./$CODE_PATH COPY setup.py ./ RUN pip install ".[main]" -LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.version=0.1.1 LABEL io.airbyte.name=airbyte/source-marketo-singer diff --git a/airbyte-integrations/connectors/source-mssql/Dockerfile b/airbyte-integrations/connectors/source-mssql/Dockerfile index a7d29abc0c777..9b01ef0962f36 100644 --- a/airbyte-integrations/connectors/source-mssql/Dockerfile +++ b/airbyte-integrations/connectors/source-mssql/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.version=0.1.1 LABEL io.airbyte.name=airbyte/source-mssql diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java index e969f131b2b48..27873884b4bd2 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java @@ -44,8 +44,9 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; @@ -67,6 +68,7 @@ class MssqlSourceTest { STREAM_NAME, Field.of("id", JsonSchemaPrimitive.NUMBER), Field.of("name", JsonSchemaPrimitive.STRING)); + private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG); private static final Set MESSAGES = Sets.newHashSet( new AirbyteMessage().withType(Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(STREAM_NAME).withData(Jsons.jsonNode(ImmutableMap.of("id", 1, "name", "picard")))), @@ -146,7 +148,7 @@ void testDiscover() throws Exception { @Test void testReadSuccess() throws Exception { - final Set actualMessages = new MssqlSource().read(config, CATALOG, null).collect(Collectors.toSet()); + final Set actualMessages = new MssqlSource().read(config, CONFIGURED_CATALOG, null).collect(Collectors.toSet()); actualMessages.forEach(r -> { if (r.getRecord() != null) { @@ -160,8 +162,8 @@ void testReadSuccess() throws Exception { @SuppressWarnings("ResultOfMethodCallIgnored") @Test void testReadFailure() { - final AirbyteStream spiedAbStream = spy(CATALOG.getStreams().get(0)); - final AirbyteCatalog catalog = new AirbyteCatalog().withStreams(Lists.newArrayList(spiedAbStream)); + final ConfiguredAirbyteStream spiedAbStream = spy(CONFIGURED_CATALOG.getStreams().get(0)); + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(spiedAbStream)); doThrow(new IllegalStateException()).when(spiedAbStream).getName(); final MssqlSource source = new MssqlSource(); diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index 7bee6e4968a85..f9b9280290a24 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.version=0.1.1 LABEL io.airbyte.name=airbyte/source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceTest.java index 2a1aa2de1cad2..49d4049909e9a 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceTest.java @@ -44,8 +44,9 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; @@ -69,6 +70,7 @@ class MySqlSourceTest { STREAM_NAME, Field.of("id", JsonSchemaPrimitive.NUMBER), Field.of("name", JsonSchemaPrimitive.STRING)); + private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG); private static final Set MESSAGES = Sets.newHashSet( new AirbyteMessage().withType(Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(STREAM_NAME).withData(Jsons.jsonNode(ImmutableMap.of("id", 1, "name", "picard")))), @@ -159,7 +161,7 @@ void testDiscover() throws Exception { @Test void testReadSuccess() throws Exception { - final Set actualMessages = new MySqlSource().read(config, CATALOG, null).collect(Collectors.toSet()); + final Set actualMessages = new MySqlSource().read(config, CONFIGURED_CATALOG, null).collect(Collectors.toSet()); actualMessages.forEach(r -> { if (r.getRecord() != null) { @@ -173,8 +175,8 @@ void testReadSuccess() throws Exception { @SuppressWarnings("ResultOfMethodCallIgnored") @Test void testReadFailure() throws Exception { - final AirbyteStream spiedAbStream = spy(CATALOG.getStreams().get(0)); - final AirbyteCatalog catalog = new AirbyteCatalog().withStreams(Lists.newArrayList(spiedAbStream)); + final ConfiguredAirbyteStream spiedAbStream = spy(CONFIGURED_CATALOG.getStreams().get(0)); + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(spiedAbStream)); doCallRealMethod().doCallRealMethod().doThrow(new RuntimeException()).when(spiedAbStream).getName(); final Stream stream = new MySqlSource().read(config, catalog, null); diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 503795c569d15..72247871fdccb 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.version=0.1.2 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java index d02450e51e3be..8404952d1b0ad 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java @@ -44,8 +44,9 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; @@ -69,6 +70,7 @@ class PostgresSourceTest { STREAM_NAME, Field.of("id", JsonSchemaPrimitive.NUMBER), Field.of("name", JsonSchemaPrimitive.STRING)); + private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG); private static final Set ASCII_MESSAGES = Sets.newHashSet( new AirbyteMessage().withType(Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(STREAM_NAME).withData(Jsons.jsonNode(ImmutableMap.of("id", 1, "name", "goku")))), @@ -178,7 +180,7 @@ void testDiscover() throws Exception { @Test void testReadSuccess() throws Exception { - final Set actualMessages = new PostgresSource().read(config, CATALOG, null).collect(Collectors.toSet()); + final Set actualMessages = new PostgresSource().read(config, CONFIGURED_CATALOG, null).collect(Collectors.toSet()); actualMessages.forEach(r -> { if (r.getRecord() != null) { @@ -204,7 +206,7 @@ public void testCanReadUtf8() throws Exception { }); } - Set actualMessages = new PostgresSource().read(config, CATALOG, null).collect(Collectors.toSet()); + Set actualMessages = new PostgresSource().read(config, CONFIGURED_CATALOG, null).collect(Collectors.toSet()); for (AirbyteMessage actualMessage : actualMessages) { if (actualMessage.getRecord() != null) { @@ -219,8 +221,8 @@ public void testCanReadUtf8() throws Exception { @SuppressWarnings("ResultOfMethodCallIgnored") @Test void testReadFailure() throws Exception { - final AirbyteStream spiedAbStream = spy(CATALOG.getStreams().get(0)); - final AirbyteCatalog catalog = new AirbyteCatalog().withStreams(Lists.newArrayList(spiedAbStream)); + final ConfiguredAirbyteStream spiedAbStream = spy(CONFIGURED_CATALOG.getStreams().get(0)); + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(spiedAbStream)); doCallRealMethod().doCallRealMethod().doThrow(new RuntimeException()).when(spiedAbStream).getName(); final Stream stream = new PostgresSource().read(config, catalog, null); diff --git a/airbyte-integrations/connectors/source-salesforce-singer/Dockerfile b/airbyte-integrations/connectors/source-salesforce-singer/Dockerfile index bdc3dea3742b9..b14c75868c1ea 100644 --- a/airbyte-integrations/connectors/source-salesforce-singer/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce-singer/Dockerfile @@ -8,7 +8,7 @@ ENV CODE_PATH="source_salesforce_singer" ENV AIRBYTE_IMPL_MODULE="source_salesforce_singer" ENV AIRBYTE_IMPL_PATH="SourceSalesforceSinger" -LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.version=0.1.2 LABEL io.airbyte.name=airbyte/source-salesforce-singer WORKDIR /airbyte/integration_code diff --git a/airbyte-integrations/connectors/source-shopify-singer/Dockerfile b/airbyte-integrations/connectors/source-shopify-singer/Dockerfile index 52b1a4dbe22d8..9361820c91ee6 100644 --- a/airbyte-integrations/connectors/source-shopify-singer/Dockerfile +++ b/airbyte-integrations/connectors/source-shopify-singer/Dockerfile @@ -12,5 +12,5 @@ COPY $CODE_PATH ./$CODE_PATH COPY setup.py ./ RUN pip install ".[main]" -LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.version=0.1.2 LABEL io.airbyte.name=airbyte/source-shopify-singer diff --git a/airbyte-integrations/connectors/source-stripe-singer/Dockerfile b/airbyte-integrations/connectors/source-stripe-singer/Dockerfile index 31bfd95e6befc..f89dc995f15ae 100644 --- a/airbyte-integrations/connectors/source-stripe-singer/Dockerfile +++ b/airbyte-integrations/connectors/source-stripe-singer/Dockerfile @@ -10,7 +10,7 @@ ENV CODE_PATH="source_stripe_singer" ENV AIRBYTE_IMPL_MODULE="source_stripe_singer" ENV AIRBYTE_IMPL_PATH="SourceStripeSinger" -LABEL io.airbyte.version=0.1.4 +LABEL io.airbyte.version=0.1.5 LABEL io.airbyte.name=airbyte/source-stripe-singer WORKDIR /airbyte/integration_code diff --git a/airbyte-json-validation/src/main/java/io/airbyte/validation/json/JsonSchemaValidator.java b/airbyte-json-validation/src/main/java/io/airbyte/validation/json/JsonSchemaValidator.java index 3d6376b3e77f2..8686297a80dde 100644 --- a/airbyte-json-validation/src/main/java/io/airbyte/validation/json/JsonSchemaValidator.java +++ b/airbyte-json-validation/src/main/java/io/airbyte/validation/json/JsonSchemaValidator.java @@ -34,6 +34,7 @@ import java.io.File; import java.io.IOException; import java.util.Set; +import me.andrz.jackson.JsonContext; import me.andrz.jackson.JsonReferenceException; import me.andrz.jackson.JsonReferenceProcessor; @@ -72,13 +73,43 @@ public void ensure(JsonNode schemaJson, JsonNode objectJson) throws JsonValidati objectJson.toPrettyString())); } + private static JsonReferenceProcessor getProcessor() { + // JsonReferenceProcessor follows $ref in json objects. Jackson does not natively support + // this. + final JsonReferenceProcessor jsonReferenceProcessor = new JsonReferenceProcessor(); + jsonReferenceProcessor.setMaxDepth(-1); // no max. + + return jsonReferenceProcessor; + } + + /** + * Get JsonNode for an object defined as the main object in a JsonSchema file. Able to create the + * JsonNode even if the the JsonSchema refers to objects in other files. + * + * @param schemaFile - the schema file + * @return schema object processed from across all dependency files. + */ public static JsonNode getSchema(final File schemaFile) { try { - // JsonReferenceProcessor follows $ref in json objects. Jackson does not natively support - // this. - final JsonReferenceProcessor jsonReferenceProcessor = new JsonReferenceProcessor(); - jsonReferenceProcessor.setMaxDepth(-1); // no max. - return jsonReferenceProcessor.process(schemaFile); + return getProcessor().process(schemaFile); + } catch (IOException | JsonReferenceException e) { + throw new RuntimeException(e); + } + } + + /** + * Get JsonNode for an object defined in the "definitions" section of a JsonSchema file. Able to + * create the JsonNode even if the the JsonSchema refers to objects in other files. + * + * @param schemaFile - the schema file + * @param definitionStructName - get the schema from a struct defined in the "definitions" section + * of a JsonSchema file (instead of the main object in that file). + * @return schema object processed from across all dependency files. + */ + public static JsonNode getSchema(final File schemaFile, String definitionStructName) { + try { + final JsonContext jsonContext = new JsonContext(schemaFile); + return getProcessor().process(jsonContext, jsonContext.getDocument().get("definitions").get(definitionStructName)); } catch (IOException | JsonReferenceException e) { throw new RuntimeException(e); } diff --git a/airbyte-json-validation/src/test/java/io/airbyte/validation/json/JsonSchemaValidatorTest.java b/airbyte-json-validation/src/test/java/io/airbyte/validation/json/JsonSchemaValidatorTest.java index af3b370b5291a..500a3c0884c54 100644 --- a/airbyte-json-validation/src/test/java/io/airbyte/validation/json/JsonSchemaValidatorTest.java +++ b/airbyte-json-validation/src/test/java/io/airbyte/validation/json/JsonSchemaValidatorTest.java @@ -26,11 +26,16 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import org.junit.jupiter.api.Test; class JsonSchemaValidatorTest { @@ -80,4 +85,39 @@ void testValidateFail() { assertThrows(JsonValidationException.class, () -> validator.ensure(VALID_SCHEMA, object2)); } + @Test + void test() throws IOException { + final String schema = "{\n" + + " \"$schema\": \"http://json-schema.org/draft-07/schema#\",\n" + + " \"title\": \"OuterObject\",\n" + + " \"type\": \"object\",\n" + + " \"properties\": {\n" + + " \"field1\": {\n" + + " \"type\": \"string\"\n" + + " }\n" + + " },\n" + + " \"definitions\": {\n" + + " \"InnerObject\": {\n" + + " \"type\": \"object\",\n" + + " \"properties\": {\n" + + " \"field2\": {\n" + + " \"type\": \"string\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}\n"; + + final File schemaFile = IOs.writeFile(Files.createTempDirectory("test"), "schema.json", schema).toFile(); + + // outer object + assertTrue(JsonSchemaValidator.getSchema(schemaFile).get("properties").has("field1")); + assertFalse(JsonSchemaValidator.getSchema(schemaFile).get("properties").has("field2")); + // inner object + assertTrue(JsonSchemaValidator.getSchema(schemaFile, "InnerObject").get("properties").has("field2")); + assertFalse(JsonSchemaValidator.getSchema(schemaFile, "InnerObject").get("properties").has("field1")); + // non-existent object + assertNull(JsonSchemaValidator.getSchema(schemaFile, "NonExistentObject")); + } + } diff --git a/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/AirbyteProtocolSchema.java b/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/AirbyteProtocolSchema.java index 7f71be4664b16..88fde920518c3 100644 --- a/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/AirbyteProtocolSchema.java +++ b/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/AirbyteProtocolSchema.java @@ -30,7 +30,7 @@ public enum AirbyteProtocolSchema { - MESSAGE("airbyte_message.yaml"); + PROTOCOL("airbyte_protocol.yaml"); static final Path KNOWN_SCHEMAS_ROOT = JsonSchemas.prepareSchemas("airbyte_protocol", AirbyteProtocolSchema.class); diff --git a/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java b/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java index 2389647276c41..658a94127fbb2 100644 --- a/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java +++ b/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java @@ -31,6 +31,7 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import io.airbyte.commons.json.Jsons; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -39,6 +40,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; public class CatalogHelpers { @@ -55,6 +57,41 @@ public static AirbyteStream createAirbyteStream(String streamName, List f return new AirbyteStream().withName(streamName).withJsonSchema(fieldsToJsonSchema(fields)); } + public static ConfiguredAirbyteCatalog createConfiguredAirbyteCatalog(String streamName, Field... fields) { + return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(createConfiguredAirbyteStream(streamName, fields))); + } + + public static ConfiguredAirbyteStream createConfiguredAirbyteStream(String streamName, Field... fields) { + return createConfiguredAirbyteStream(streamName, Arrays.asList(fields)); + } + + public static ConfiguredAirbyteStream createConfiguredAirbyteStream(String streamName, List fields) { + return new ConfiguredAirbyteStream().withName(streamName).withJsonSchema(fieldsToJsonSchema(fields)); + } + + /** + * Convert a Catalog into a ConfiguredCatalog. This applies minimum default to the Catalog to make + * it a valid ConfiguredCatalog. + * + * @param catalog - Catalog to be converted. + * @return - ConfiguredCatalog based of off the input catalog. + */ + public static ConfiguredAirbyteCatalog toDefaultConfiguredCatalog(AirbyteCatalog catalog) { + return new ConfiguredAirbyteCatalog() + .withStreams(catalog.getStreams() + .stream() + .map(CatalogHelpers::toDefaultConfiguredStream) + .collect(Collectors.toList())); + } + + public static ConfiguredAirbyteStream toDefaultConfiguredStream(AirbyteStream stream) { + return new ConfiguredAirbyteStream() + .withName(stream.getName()) + .withJsonSchema(stream.getJsonSchema()) + .withSyncMode(SyncMode.FULL_REFRESH) + .withCursorField(new ArrayList<>()); + } + public static JsonNode fieldsToJsonSchema(Field... fields) { return fieldsToJsonSchema(Arrays.asList(fields)); } @@ -78,7 +115,7 @@ public static JsonNode fieldsToJsonSchema(List fields) { * @return field names */ @SuppressWarnings("unchecked") - public static Set getTopLevelFieldNames(final AirbyteStream stream) { + public static Set getTopLevelFieldNames(final ConfiguredAirbyteStream stream) { // it is json, so the key has to be a string. final Map object = Jsons.object(stream.getJsonSchema().get("properties"), Map.class); return object.keySet(); @@ -118,8 +155,19 @@ public static boolean isValidIdentifier(String identifier) { * @return list of stream names in the catalog that are invalid */ public static List getInvalidStreamNames(AirbyteCatalog catalog) { - return catalog.getStreams().stream() - .map(AirbyteStream::getName) + return getInvalidStreamNames(catalog.getStreams().stream().map(AirbyteStream::getName)); + } + + /** + * @param catalog configured airbyte catalog + * @return list of stream names in the catalog that are invalid + */ + public static List getInvalidStreamNames(ConfiguredAirbyteCatalog catalog) { + return getInvalidStreamNames(catalog.getStreams().stream().map(ConfiguredAirbyteStream::getName)); + } + + private static List getInvalidStreamNames(Stream names) { + return names .filter(streamName -> !isValidIdentifier(streamName)) .collect(Collectors.toList()); } @@ -129,14 +177,39 @@ public static List getInvalidStreamNames(AirbyteCatalog catalog) { * @return multimap of stream names to all invalid field names in that stream */ public static Multimap getInvalidFieldNames(AirbyteCatalog catalog) { - Multimap streamNameToInvalidFieldNames = Multimaps.newSetMultimap(new HashMap<>(), HashSet::new); + return getInvalidFieldNames(getStreamNameToJsonSchema(catalog)); + } + + /** + * @param catalog configured airbyte catalog + * @return multimap of stream names to all invalid field names in that stream + */ + public static Multimap getInvalidFieldNames(ConfiguredAirbyteCatalog catalog) { + return getInvalidFieldNames(getStreamNameToJsonSchema(catalog)); + } + + private static Map getStreamNameToJsonSchema(AirbyteCatalog catalog) { + return catalog.getStreams() + .stream() + .collect(Collectors.toMap(AirbyteStream::getName, AirbyteStream::getJsonSchema)); + } + + private static Map getStreamNameToJsonSchema(ConfiguredAirbyteCatalog catalog) { + return catalog.getStreams() + .stream() + .collect(Collectors.toMap(ConfiguredAirbyteStream::getName, ConfiguredAirbyteStream::getJsonSchema)); + } + + private static Multimap getInvalidFieldNames(Map streamNameToJsonSchema) { + final Multimap streamNameToInvalidFieldNames = Multimaps.newSetMultimap(new HashMap<>(), HashSet::new); - for (AirbyteStream stream : catalog.getStreams()) { - Set invalidFieldNames = getAllFieldNames(stream.getJsonSchema()).stream() + for (final Map.Entry entry : streamNameToJsonSchema.entrySet()) { + final Set invalidFieldNames = getAllFieldNames(entry.getValue()) + .stream() .filter(streamName -> !isValidIdentifier(streamName)) .collect(Collectors.toSet()); - streamNameToInvalidFieldNames.putAll(stream.getName(), invalidFieldNames); + streamNameToInvalidFieldNames.putAll(entry.getKey(), invalidFieldNames); } return streamNameToInvalidFieldNames; diff --git a/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_message.yaml b/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_message.yaml deleted file mode 100644 index 747891c070ecb..0000000000000 --- a/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_message.yaml +++ /dev/null @@ -1,141 +0,0 @@ ---- -"$schema": http://json-schema.org/draft-07/schema# -"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_message.yaml -title: AirbyteMessage -description: Airbyte message -type: object -additionalProperties: false -required: - - type -properties: - type: - description: "Message type" - type: string - enum: - - RECORD - - STATE - - LOG - - SPEC - - CONNECTION_STATUS - - CATALOG - log: - description: "log message: any kind of logging you want the platform to know about." - "$ref": "#/definitions/AirbyteLogMessage" - spec: - "$ref": "#/definitions/ConnectorSpecification" - connectionStatus: - "$ref": "#/definitions/AirbyteConnectionStatus" - catalog: - description: "log message: any kind of logging you want the platform to know about." - "$ref": "#/definitions/AirbyteCatalog" - record: - description: "record message: the record" - "$ref": "#/definitions/AirbyteRecordMessage" - state: - description: "schema message: the state. Must be the last message produced. The platform uses this information" - "$ref": "#/definitions/AirbyteStateMessage" -definitions: - AirbyteRecordMessage: - type: object - additionalProperties: false - required: - - stream - - data - - emitted_at - properties: - stream: - description: "the name of the stream for this record" - type: string - data: - description: "the record data" - type: object - existingJavaType: com.fasterxml.jackson.databind.JsonNode - emitted_at: - description: "when the data was emitted from the source. epoch in millisecond." - type: integer - AirbyteStateMessage: - type: object - additionalProperties: false - required: - - data - properties: - data: - description: "the state data" - type: object - existingJavaType: com.fasterxml.jackson.databind.JsonNode - AirbyteLogMessage: - type: object - additionalProperties: false - required: - - level - - message - properties: - level: - description: "the type of logging" - type: string - enum: - - FATAL - - ERROR - - WARN - - INFO - - DEBUG - - TRACE - message: - description: "the log message" - type: string - AirbyteConnectionStatus: - description: Airbyte connection status - type: object - additionalProperties: false - required: - - status - properties: - status: - type: string - enum: - - SUCCEEDED - - FAILED - message: - type: string - AirbyteCatalog: - description: Airbyte stream schema catalog - type: object - additionalProperties: false - required: - - streams - properties: - streams: - type: array - items: - "$ref": "#/definitions/AirbyteStream" - AirbyteStream: - type: object - additionalProperties: false - required: - - name - - json_schema - properties: - name: - type: string - description: Stream's name. - json_schema: - description: Stream schema using Json Schema specs. - type: object - existingJavaType: com.fasterxml.jackson.databind.JsonNode - ConnectorSpecification: - description: Specification of a connector (source/destination) - type: object - required: - - connectionSpecification - additionalProperties: false - properties: - documentationUrl: - type: string - format: uri - changelogUrl: - type: string - format: uri - connectionSpecification: - description: ConnectorDefinition specific blob. Must be a valid JSON string. - type: object - existingJavaType: com.fasterxml.jackson.databind.JsonNode diff --git a/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml b/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml new file mode 100644 index 0000000000000..30c110ce283d1 --- /dev/null +++ b/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml @@ -0,0 +1,199 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml +title: AirbyteProtocol +type: object +description: AirbyteProtocol structs +properties: + airbyte_message: + "$ref": "#/definitions/AirbyteMessage" + configured_airbyte_catalog: + "$ref": "#/definitions/ConfiguredAirbyteCatalog" +definitions: + AirbyteMessage: + type: object + additionalProperties: false + required: + - type + properties: + type: + description: "Message type" + type: string + enum: + - RECORD + - STATE + - LOG + - SPEC + - CONNECTION_STATUS + - CATALOG + log: + description: "log message: any kind of logging you want the platform to know about." + "$ref": "#/definitions/AirbyteLogMessage" + spec: + "$ref": "#/definitions/ConnectorSpecification" + # todo (cgardens) - prefer snake case for field names. + connectionStatus: + "$ref": "#/definitions/AirbyteConnectionStatus" + catalog: + description: "log message: any kind of logging you want the platform to know about." + "$ref": "#/definitions/AirbyteCatalog" + record: + description: "record message: the record" + "$ref": "#/definitions/AirbyteRecordMessage" + state: + description: "schema message: the state. Must be the last message produced. The platform uses this information" + "$ref": "#/definitions/AirbyteStateMessage" + AirbyteRecordMessage: + type: object + additionalProperties: false + required: + - stream + - data + - emitted_at + properties: + stream: + description: "the name of the stream for this record" + type: string + data: + description: "the record data" + type: object + existingJavaType: com.fasterxml.jackson.databind.JsonNode + emitted_at: + description: "when the data was emitted from the source. epoch in millisecond." + type: integer + AirbyteStateMessage: + type: object + additionalProperties: false + required: + - data + properties: + data: + description: "the state data" + type: object + existingJavaType: com.fasterxml.jackson.databind.JsonNode + AirbyteLogMessage: + type: object + additionalProperties: false + required: + - level + - message + properties: + level: + description: "the type of logging" + type: string + enum: + - FATAL + - ERROR + - WARN + - INFO + - DEBUG + - TRACE + message: + description: "the log message" + type: string + AirbyteConnectionStatus: + description: Airbyte connection status + type: object + additionalProperties: false + required: + - status + properties: + status: + type: string + enum: + - SUCCEEDED + - FAILED + message: + type: string + AirbyteCatalog: + description: Airbyte stream schema catalog + type: object + additionalProperties: false + required: + - streams + properties: + streams: + type: array + items: + "$ref": "#/definitions/AirbyteStream" + AirbyteStream: + type: object + additionalProperties: false + required: + - name + - json_schema + properties: + name: + type: string + description: Stream's name. + json_schema: + description: Stream schema using Json Schema specs. + type: object + existingJavaType: com.fasterxml.jackson.databind.JsonNode + supported_sync_modes: + type: array + items: + "$ref": "#/definitions/SyncMode" + default_cursor_field: + description: Path to the field that will be used to determine if a record is new or modified since the last sync. If not provided by the source, the end user will have to specify the comparable themselves. + type: array + items: + type: string + ConfiguredAirbyteCatalog: + description: Airbyte stream schema catalog + type: object + additionalProperties: false + required: + - streams + properties: + streams: + type: array + items: + "$ref": "#/definitions/ConfiguredAirbyteStream" + ConfiguredAirbyteStream: + type: object + additionalProperties: false + required: + - name + - json_schema + properties: + name: + type: string + description: Stream's name. + json_schema: + description: Stream schema using Json Schema specs. + type: object + existingJavaType: com.fasterxml.jackson.databind.JsonNode + sync_mode: + "$ref": "#/definitions/SyncMode" + default: full_refresh + cursor_field: + description: Path to the field that will be used to determine if a record is new or modified since the last sync. This field is REQUIRED if `sync_mode` is `incremental`. Otherwise it is ignored. + type: array + items: + type: string + SyncMode: + type: string + enum: + - full_refresh + - incremental + ConnectorSpecification: + description: Specification of a connector (source/destination) + type: object + required: + - connectionSpecification + additionalProperties: false + properties: + # todo (cgardens) - prefer snake case for field names. + documentationUrl: + type: string + format: uri + # todo (cgardens) - prefer snake case for field names. + changelogUrl: + type: string + format: uri + # todo (cgardens) - prefer snake case for field names. + connectionSpecification: + description: ConnectorDefinition specific blob. Must be a valid JSON string. + type: object + existingJavaType: com.fasterxml.jackson.databind.JsonNode diff --git a/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/AirbyteProtocolSchemaTest.java b/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/AirbyteProtocolSchemaTest.java index 058a23ec9aed0..647fdca6970b8 100644 --- a/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/AirbyteProtocolSchemaTest.java +++ b/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/AirbyteProtocolSchemaTest.java @@ -35,7 +35,7 @@ class AirbyteProtocolSchemaTest { @Test void testFile() throws IOException { - final String schema = Files.readString(AirbyteProtocolSchema.MESSAGE.getFile().toPath(), StandardCharsets.UTF_8); + final String schema = Files.readString(AirbyteProtocolSchema.PROTOCOL.getFile().toPath(), StandardCharsets.UTF_8); assertTrue(schema.contains("title")); } diff --git a/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java b/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java index 1fb0cb6e926c4..58819efe5a4d3 100644 --- a/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java +++ b/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java @@ -56,7 +56,7 @@ void testFieldToJsonSchema() { @Test void testGetTopLevelFieldNames() { final String json = "{ \"type\": \"object\", \"properties\": { \"name\": { \"type\": \"string\" } } } "; - final Set actualFieldNames = CatalogHelpers.getTopLevelFieldNames(new AirbyteStream().withJsonSchema(Jsons.deserialize(json))); + final Set actualFieldNames = CatalogHelpers.getTopLevelFieldNames(new ConfiguredAirbyteStream().withJsonSchema(Jsons.deserialize(json))); assertEquals(Sets.newHashSet("name"), actualFieldNames); } diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/WorkerRunFactory.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/WorkerRunFactory.java index 0a37cc5ee32e7..bb1815fd884ee 100644 --- a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/WorkerRunFactory.java +++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/WorkerRunFactory.java @@ -164,7 +164,7 @@ private static StandardSyncInput getSyncInput(JobSyncConfig config) { .withSourceConnection(config.getSourceConnection()) .withDestinationConnection(config.getDestinationConnection()) .withConnectionId(config.getStandardSync().getConnectionId()) - .withCatalog(AirbyteProtocolConverters.toCatalog(config.getStandardSync().getSchema())) + .withCatalog(AirbyteProtocolConverters.toConfiguredCatalog(config.getStandardSync().getSchema())) .withSyncMode(config.getStandardSync().getSyncMode()) .withState(config.getState()); } diff --git a/airbyte-scheduler/src/test/java/io/airbyte/scheduler/WorkerRunFactoryTest.java b/airbyte-scheduler/src/test/java/io/airbyte/scheduler/WorkerRunFactoryTest.java index 2fe3df019336a..5b4ef0bc20c02 100644 --- a/airbyte-scheduler/src/test/java/io/airbyte/scheduler/WorkerRunFactoryTest.java +++ b/airbyte-scheduler/src/test/java/io/airbyte/scheduler/WorkerRunFactoryTest.java @@ -119,7 +119,7 @@ void testSync() { StandardSyncInput expectedInput = new StandardSyncInput() .withSourceConnection(job.getConfig().getSync().getSourceConnection()) .withDestinationConnection(job.getConfig().getSync().getDestinationConnection()) - .withCatalog(AirbyteProtocolConverters.toCatalog(job.getConfig().getSync().getStandardSync().getSchema())) + .withCatalog(AirbyteProtocolConverters.toConfiguredCatalog(job.getConfig().getSync().getStandardSync().getSchema())) .withConnectionId(job.getConfig().getSync().getStandardSync().getConnectionId()) .withSyncMode(job.getConfig().getSync().getStandardSync().getSyncMode()) .withState(job.getConfig().getSync().getState()); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultSyncWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultSyncWorker.java index 34afdc04889c5..6c95652db8e1d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultSyncWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultSyncWorker.java @@ -31,10 +31,10 @@ import io.airbyte.config.StandardTapConfig; import io.airbyte.config.StandardTargetConfig; import io.airbyte.config.State; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.workers.normalization.NormalizationRunner; import io.airbyte.workers.protocols.Destination; import io.airbyte.workers.protocols.MessageTracker; @@ -84,7 +84,6 @@ public OutputAndStatus run(StandardSyncInput syncInput, Path final StandardTargetConfig targetConfig = WorkerUtils.syncToTargetConfig(syncInput); try (destination; source) { - destination.start(targetConfig, jobRoot); source.start(tapConfig, jobRoot); @@ -139,12 +138,12 @@ public void cancel() { cancelled.set(true); } - private void removeInvalidStreams(AirbyteCatalog catalog) { + private void removeInvalidStreams(ConfiguredAirbyteCatalog catalog) { final Set invalidStreams = Sets.union( new HashSet<>(CatalogHelpers.getInvalidStreamNames(catalog)), CatalogHelpers.getInvalidFieldNames(catalog).keySet()); - final List streams = catalog.getStreams().stream() + final List streams = catalog.getStreams().stream() .filter(stream -> !invalidStreams.contains(stream.getName())) .collect(Collectors.toList()); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java index bac690e5a88c6..3da3fccca58c8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java @@ -29,7 +29,7 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; -import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.workers.WorkerConstants; import io.airbyte.workers.WorkerException; import io.airbyte.workers.WorkerUtils; @@ -62,7 +62,7 @@ public DefaultNormalizationRunner(final DestinationType destinationType, final P } @Override - public boolean normalize(Path jobRoot, JsonNode config, AirbyteCatalog catalog) throws Exception { + public boolean normalize(Path jobRoot, JsonNode config, ConfiguredAirbyteCatalog catalog) throws Exception { IOs.writeFile(jobRoot, WorkerConstants.TARGET_CONFIG_JSON_FILENAME, Jsons.serialize(config)); IOs.writeFile(jobRoot, WorkerConstants.CATALOG_JSON_FILENAME, Jsons.serialize(catalog)); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunner.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunner.java index af4f8030575e3..e64be6e1a9975 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunner.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunner.java @@ -25,7 +25,7 @@ package io.airbyte.workers.normalization; import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.nio.file.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,14 +54,14 @@ default void start() throws Exception { * @throws Exception - any exception thrown from normalization will be handled gracefully by the * caller. */ - boolean normalize(Path jobRoot, JsonNode config, AirbyteCatalog catalog) throws Exception; + boolean normalize(Path jobRoot, JsonNode config, ConfiguredAirbyteCatalog catalog) throws Exception; class NoOpNormalizationRunner implements NormalizationRunner { private static final Logger LOGGER = LoggerFactory.getLogger(NoOpNormalizationRunner.class); @Override - public boolean normalize(Path jobRoot, JsonNode config, AirbyteCatalog catalog) { + public boolean normalize(Path jobRoot, JsonNode config, ConfiguredAirbyteCatalog catalog) { LOGGER.info("Running no op logger"); return true; } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/AirbyteProtocolPredicate.java b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/AirbyteProtocolPredicate.java index ba0be1d626108..b62cd92fedaee 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/AirbyteProtocolPredicate.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/AirbyteProtocolPredicate.java @@ -36,7 +36,7 @@ public class AirbyteProtocolPredicate implements Predicate { public AirbyteProtocolPredicate() { jsonSchemaValidator = new JsonSchemaValidator(); - schema = JsonSchemaValidator.getSchema(AirbyteProtocolSchema.MESSAGE.getFile()); + schema = JsonSchemaValidator.getSchema(AirbyteProtocolSchema.PROTOCOL.getFile(), "AirbyteMessage"); } @Override diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultSyncWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultSyncWorkerTest.java index 4df4c63aea165..acafa860b9536 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultSyncWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultSyncWorkerTest.java @@ -35,9 +35,9 @@ import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardTapConfig; import io.airbyte.config.StandardTargetConfig; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.workers.normalization.NormalizationRunner; import io.airbyte.workers.protocols.airbyte.AirbyteDestination; import io.airbyte.workers.protocols.airbyte.AirbyteMessageTracker; @@ -86,12 +86,12 @@ void setup() throws Exception { invalidSyncInput.setState(validSyncInput.getState()); invalidSyncInput.setSyncMode(validSyncInput.getSyncMode()); - final AirbyteStream invalidStream = new AirbyteStream(); + final ConfiguredAirbyteStream invalidStream = new ConfiguredAirbyteStream(); invalidStream.setName(INVALID_STREAM_NAME); invalidStream.setJsonSchema(Jsons.deserialize("{}")); - List streams = new ArrayList<>(validSyncInput.getCatalog().getStreams()); + final List streams = new ArrayList<>(validSyncInput.getCatalog().getStreams()); streams.add(invalidStream); - AirbyteCatalog catalog = new AirbyteCatalog(); + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog(); catalog.setStreams(streams); invalidSyncInput.setCatalog(catalog); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/TestConfigHelpers.java b/airbyte-workers/src/test/java/io/airbyte/workers/TestConfigHelpers.java index 3301eace1131d..7da004fe88f3f 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/TestConfigHelpers.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/TestConfigHelpers.java @@ -113,7 +113,7 @@ public static ImmutablePair createSyncConfig() StandardSyncInput syncInput = new StandardSyncInput() .withDestinationConnection(destinationConnectionConfig) .withSyncMode(standardSync.getSyncMode()) - .withCatalog(AirbyteProtocolConverters.toCatalog(standardSync.getSchema())) + .withCatalog(AirbyteProtocolConverters.toConfiguredCatalog(standardSync.getSchema())) .withConnectionId(standardSync.getConnectionId()) .withSourceConnection(sourceConnectionConfig) .withState(state); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java index 304ceafde2244..86b20dcd838f6 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java @@ -31,7 +31,7 @@ import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.workers.WorkerConstants; import io.airbyte.workers.WorkerException; import io.airbyte.workers.normalization.DefaultNormalizationRunner.DestinationType; @@ -49,7 +49,7 @@ class DefaultNormalizationRunnerTest { private ProcessBuilderFactory pbf; private Process process; private JsonNode config; - private AirbyteCatalog catalog; + private ConfiguredAirbyteCatalog catalog; @BeforeEach void setup() throws IOException, WorkerException { @@ -59,7 +59,7 @@ void setup() throws IOException, WorkerException { process = mock(Process.class); config = mock(JsonNode.class); - catalog = mock(AirbyteCatalog.class); + catalog = mock(ConfiguredAirbyteCatalog.class); when(pbf.create(jobRoot, DefaultNormalizationRunner.NORMALIZATION_IMAGE_NAME, "run", "--integration-type", "bigquery", diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java index 6369c86c78b2e..855945c355856 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java @@ -40,10 +40,10 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.config.StandardTapConfig; import io.airbyte.config.State; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; import io.airbyte.workers.WorkerConstants; @@ -69,9 +69,9 @@ class DefaultAirbyteSourceTest { private static final String STREAM_NAME = "user_preferences"; private static final String FIELD_NAME = "favorite_color"; - private static final AirbyteCatalog CATALOG = new AirbyteCatalog() + private static final ConfiguredAirbyteCatalog CATALOG = new ConfiguredAirbyteCatalog() .withStreams(Collections.singletonList( - new AirbyteStream() + new ConfiguredAirbyteStream() .withName("hudi:latest") .withJsonSchema(CatalogHelpers.fieldsToJsonSchema(new Field(FIELD_NAME, Field.JsonSchemaPrimitive.STRING))))); @@ -80,7 +80,7 @@ class DefaultAirbyteSourceTest { .withSourceConnectionConfiguration(Jsons.jsonNode(Map.of( "apiKey", "123", "region", "us-east"))) - .withCatalog(CatalogHelpers.createAirbyteCatalog("hudi:latest", Field.of(FIELD_NAME, JsonSchemaPrimitive.STRING))); + .withCatalog(CatalogHelpers.createConfiguredAirbyteCatalog("hudi:latest", Field.of(FIELD_NAME, JsonSchemaPrimitive.STRING))); private static final List MESSAGES = Lists.newArrayList( AirbyteMessageUtils.createRecordMessage(STREAM_NAME, FIELD_NAME, "blue"),