Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding incremental to the data model #998

Merged
merged 29 commits into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ jobs:
- name: Checkout Airbyte
uses: actions/checkout@v2

- name: Check images exist
run: ./tools/bin/check_images_exist.sh

- name: Cache java deps
uses: actions/cache@v2
with:
Expand Down Expand Up @@ -65,9 +68,6 @@ jobs:
- name: Ensure no file change
run: git status --porcelain && test -z "$(git status --porcelain)"

- name: Check images exist
run: ./tools/bin/check_images_exist.sh

- name: Check documentation
if: success() && github.ref == 'refs/heads/master'
run: ./tools/site/link_checker.sh check_docs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
"name": "BigQuery",
"dockerRepository": "airbyte/destination-bigquery",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-bigquery-destination"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "25c5221d-dce2-4163-ade9-739ef790f503",
"name": "Postgres",
"dockerRepository": "airbyte/destination-postgres",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-postgres-destination"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
"name": "Snowflake",
"dockerRepository": "airbyte/destination-snowflake",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "8be1cf83-fde1-477f-a4ad-318d23c9f3c6",
"name": "Local CSV",
"dockerRepository": "airbyte/destination-csv",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-csv-destination"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "2470e835-feaf-4db6-96f3-70fd645acc77",
"name": "Salesforce",
"dockerRepository": "airbyte/source-salesforce-singer",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-salesforce-singer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "39f092a6-8c87-4f6f-a8d9-5cef45b7dbe1",
"name": "Google Analytics",
"dockerRepository": "airbyte/source-googleanalytics-singer",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-googleanalytics-singer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
"name": "MySQL",
"dockerRepository": "airbyte/source-mysql",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "57eb1576-8f52-463d-beb6-2e107cdf571d",
"name": "Hubspot",
"dockerRepository": "airbyte/source-hubspot-singer",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://https://docs.airbyte.io/integrations/sources/hubspot"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "71607ba1-c0ac-4799-8049-7f4b90dd50f7",
"name": "Google Sheets",
"dockerRepository": "airbyte/source-google-sheets",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://hub.docker.com/repository/docker/airbyte/source-google-sheets"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "74d47f79-8d01-44ac-9755-f5eb0d7caacb",
"name": "Facebook Marketing APIs",
"dockerRepository": "airbyte/source-facebook-marketing-api-singer",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-facebook-marketing-api-singer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "778daa7c-feaf-4db6-96f3-70fd645acc77",
"name": "File",
"dockerRepository": "airbyte/source-file",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-file"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "9e0556f4-69df-4522-a3fb-03264d36b348",
"name": "Marketo",
"dockerRepository": "airbyte/source-marketo-singer",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-marketo-singer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "9fed261d-d107-47fd-8c8b-323023db6e20",
"name": "exchangeratesapi.io",
"dockerRepository": "airbyte/source-exchangeratesapi-singer",
"dockerImageTag": "0.1.4",
"dockerImageTag": "0.1.6",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-exchangeratesapi_io-source"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "b1892b11-788d-44bd-b9ec-3a436f7b54ce",
"name": "Shopify",
"dockerRepository": "airbyte/source-shopify-singer",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-shopify-singer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "b5ea17b1-f170-46dc-bc31-cc744ca984c1",
"name": "Microsoft SQL Server (MSSQL)",
"dockerRepository": "airbyte/source-mssql",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-mssql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"name": "Postgres",
"dockerRepository": "airbyte/source-postgres",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "e094cb9a-26de-4645-8761-65c0c425d1de",
"name": "Stripe",
"dockerRepository": "airbyte/source-stripe-singer",
"dockerImageTag": "0.1.4",
"dockerImageTag": "0.1.5",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-stripe-source"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "ef69ef6e-aa7f-4af1-a01d-ef775033524e",
"name": "Github",
"dockerRepository": "airbyte/source-github-singer",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-github-singer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "fdc8b827-3257-4b33-83cc-106d234c34d4",
"name": "Google Adwords",
"dockerRepository": "airbyte/source-google-adwords-singer",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-google-adwords"
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand All @@ -40,15 +41,15 @@
// todo (cgardens) - hack, remove after we've gotten rid of Schema object.
public class AirbyteProtocolConverters {

public static AirbyteCatalog toCatalog(Schema schema) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only ever need to convert from schema into a configured catalog (never the R/O one).

List<AirbyteStream> airbyteStreams = schema.getStreams().stream()
.map(s -> new AirbyteStream()
public static ConfiguredAirbyteCatalog toConfiguredCatalog(Schema schema) {
List<ConfiguredAirbyteStream> airbyteStreams = schema.getStreams().stream()
.map(s -> new ConfiguredAirbyteStream()
.withName(s.getName())
.withJsonSchema(toJson(s.getFields())))
// perform selection based on the output of toJson, which keeps properties if selected=true
.filter(s -> !s.getJsonSchema().get("properties").isEmpty())
.collect(Collectors.toList());
return new AirbyteCatalog().withStreams(airbyteStreams);
return new ConfiguredAirbyteCatalog().withStreams(airbyteStreams);
}

// todo (cgardens) - this will only work with table / column schemas. it's hack to get us through
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ properties:
"$ref": SyncMode.yaml
catalog:
type: object
existingJavaType: io.airbyte.protocol.models.AirbyteCatalog
existingJavaType: io.airbyte.protocol.models.ConfiguredAirbyteCatalog
connectionId:
type: string
format: uuid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ properties:
existingJavaType: com.fasterxml.jackson.databind.JsonNode
catalog:
type: object
existingJavaType: io.airbyte.protocol.models.AirbyteCatalog
existingJavaType: io.airbyte.protocol.models.ConfiguredAirbyteCatalog
syncMode:
"$ref": SyncMode.yaml
connectionId:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ properties:
"$ref": SyncMode.yaml
catalog:
type: object
existingJavaType: io.airbyte.protocol.models.AirbyteCatalog
existingJavaType: io.airbyte.protocol.models.ConfiguredAirbyteCatalog
connectionId:
type: string
format: uuid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import org.junit.jupiter.api.Test;
Expand All @@ -48,6 +50,12 @@ class AirbyteProtocolConvertersTest {
.withJsonSchema(CatalogHelpers.fieldsToJsonSchema(
Field.of(COLUMN_NAME, JsonSchemaPrimitive.STRING),
Field.of(COLUMN_AGE, JsonSchemaPrimitive.NUMBER)))));
private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = new ConfiguredAirbyteCatalog()
.withStreams(Lists.newArrayList(new ConfiguredAirbyteStream()
.withName(STREAM)
.withJsonSchema(CatalogHelpers.fieldsToJsonSchema(
Field.of(COLUMN_NAME, JsonSchemaPrimitive.STRING),
Field.of(COLUMN_AGE, JsonSchemaPrimitive.NUMBER)))));

private static final Schema SCHEMA = new Schema()
.withStreams(Lists.newArrayList(new Stream()
Expand Down Expand Up @@ -87,13 +95,13 @@ class AirbyteProtocolConvertersTest {
.withSelected(false)))));

@Test
void testToCatalog() {
assertEquals(CATALOG, AirbyteProtocolConverters.toCatalog(SCHEMA));
void testToConfiguredCatalog() {
assertEquals(CONFIGURED_CATALOG, AirbyteProtocolConverters.toConfiguredCatalog(SCHEMA));
}

@Test
void testToCatalogWithUnselected() {
assertEquals(CATALOG, AirbyteProtocolConverters.toCatalog(SCHEMA_WITH_UNSELECTED));
void testToConfiguredCatalogWithUnselected() {
assertEquals(CONFIGURED_CATALOG, AirbyteProtocolConverters.toConfiguredCatalog(SCHEMA_WITH_UNSELECTED));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
AirbyteRecordMessage,
AirbyteStateMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
ConnectorSpecification,
Status,
Type,
Expand All @@ -43,6 +45,8 @@
"AirbyteRecordMessage",
"AirbyteStateMessage",
"AirbyteStream",
"ConfiguredAirbyteCatalog",
"ConfiguredAirbyteStream",
"ConnectorSpecification",
"Status",
"Type",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@
"""

# generated by generate-protocol-files
from .airbyte_message import *
from .airbyte_protocol import *
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"""

# generated by datamodel-codegen:
# filename: airbyte_message.yaml
# filename: airbyte_protocol.yaml

from __future__ import annotations

Expand Down Expand Up @@ -79,9 +79,9 @@ class AirbyteConnectionStatus(BaseModel):
message: Optional[str] = None


class AirbyteStream(BaseModel):
name: str = Field(..., description="Stream's name.")
json_schema: Dict[str, Any] = Field(..., description="Stream schema using Json Schema specs.")
class SyncMode(Enum):
full_refresh = "full_refresh"
incremental = "incremental"


class ConnectorSpecification(BaseModel):
Expand All @@ -93,10 +93,34 @@ class ConnectorSpecification(BaseModel):
)


class AirbyteStream(BaseModel):
name: str = Field(..., description="Stream's name.")
json_schema: Dict[str, Any] = Field(..., description="Stream schema using Json Schema specs.")
supported_sync_modes: Optional[List[SyncMode]] = None
default_cursor_field: Optional[List[str]] = Field(
None,
description="Path to the field that will be used to determine if a record is new or modified since the last sync. If not provided by the source, the end user will have to specify the comparable themselves.",
)


class ConfiguredAirbyteStream(BaseModel):
name: str = Field(..., description="Stream's name.")
json_schema: Dict[str, Any] = Field(..., description="Stream schema using Json Schema specs.")
sync_mode: Optional[SyncMode] = "full_refresh"
cursor_field: Optional[List[str]] = Field(
None,
description="Path to the field that will be used to determine if a record is new or modified since the last sync. This field is REQUIRED if `sync_mode` is `incremental`. Otherwise it is ignored.",
)


class AirbyteCatalog(BaseModel):
streams: List[AirbyteStream]


class ConfiguredAirbyteCatalog(BaseModel):
streams: List[ConfiguredAirbyteStream]


class AirbyteMessage(BaseModel):
type: Type = Field(..., description="Message type")
log: Optional[AirbyteLogMessage] = Field(
Expand All @@ -114,3 +138,8 @@ class AirbyteMessage(BaseModel):
None,
description="schema message: the state. Must be the last message produced. The platform uses this information",
)


class AirbyteProtocol(BaseModel):
airbyte_message: Optional[AirbyteMessage] = None
configured_airbyte_catalog: Optional[ConfiguredAirbyteCatalog] = None
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
package io.airbyte.integrations.base;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;

public interface Destination extends Integration {

Expand All @@ -41,6 +41,6 @@ public interface Destination extends Integration {
* will always be called once regardless of success or failure.
* @throws Exception - any exception.
*/
DestinationConsumer<AirbyteMessage> write(JsonNode config, AirbyteCatalog catalog) throws Exception;
DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirbyteCatalog catalog) throws Exception;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks so much better!


}
Loading