Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Snowflake: Handle date-time data types #17144

Merged
merged 7 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@
- name: Snowflake
sourceDefinitionId: e2d65910-8c8b-40a1-ae7d-ee2416b2bfa2
dockerRepository: airbyte/source-snowflake
dockerImageTag: 0.1.23
dockerImageTag: 0.1.24
documentationUrl: https://docs.airbyte.io/integrations/sources/snowflake
icon: snowflake.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10226,7 +10226,7 @@
- - "client_secret"
oauthFlowOutputParameters:
- - "refresh_token"
- dockerImage: "airbyte/source-snowflake:0.1.23"
- dockerImage: "airbyte/source-snowflake:0.1.24"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/snowflake"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-snowflake

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.23
LABEL io.airbyte.version=0.1.24
LABEL io.airbyte.name=airbyte/source-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,33 @@

package io.airbyte.integrations.source.snowflake;

import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_SCHEMA_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_TABLE_NAME;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.db.jdbc.DateTimeConverter;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.protocol.models.JsonSchemaType;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnowflakeSourceOperations extends JdbcSourceOperations {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeSourceOperations.class);

@Override
protected void putDouble(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) {
try {
Expand All @@ -25,6 +41,23 @@ protected void putDouble(final ObjectNode node, final String columnName, final R
}
}

@Override
public JDBCType getFieldType(final JsonNode field) {
try {
final String typeName = field.get(INTERNAL_COLUMN_TYPE_NAME).asText().toLowerCase();
return "TIMESTAMPLTZ".equalsIgnoreCase(typeName)
edgao marked this conversation as resolved.
Show resolved Hide resolved
? JDBCType.TIMESTAMP_WITH_TIMEZONE
: JDBCType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt());
} catch (final IllegalArgumentException ex) {
LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s. Casting to VARCHAR.",
field.get(INTERNAL_COLUMN_NAME),
field.get(INTERNAL_SCHEMA_NAME),
field.get(INTERNAL_TABLE_NAME),
field.get(INTERNAL_COLUMN_TYPE)));
return JDBCType.VARCHAR;
}
}

@Override
protected void putBigInt(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) {
try {
Expand Down Expand Up @@ -61,19 +94,25 @@ public JsonSchemaType getJsonType(final JDBCType jdbcType) {
}

/**
* The only difference between this method and the one in {@link JdbcSourceOperations} is that
* the TIMESTAMP_WITH_TIMEZONE columns are also converted using the putTimestamp method.
* This is necessary after the JDBC upgrade from 3.13.9 to 3.13.22. This change may need to be
* added to {@link JdbcSourceOperations#setJsonField} in the future.
* The only difference between this method and the one in {@link JdbcSourceOperations} is that the
* TIMESTAMP_WITH_TIMEZONE columns are also converted using the putTimestamp method. This is
* necessary after the JDBC upgrade from 3.13.9 to 3.13.22. This change may need to be added to
* {@link JdbcSourceOperations#setJsonField} in the future.
* <p/>
* See issue: https://github.com/airbytehq/airbyte/issues/16838.
*/
@Override
public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException {
final int columnTypeInt = resultSet.getMetaData().getColumnType(colIndex);
final String columnName = resultSet.getMetaData().getColumnName(colIndex);
final JDBCType columnType = safeGetJdbcType(columnTypeInt);
final String columnTypeName = resultSet.getMetaData().getColumnTypeName(colIndex).toLowerCase();

final JDBCType columnType = safeGetJdbcType(columnTypeInt);
// TIMESTAMPLTZ data type detected as JDBCType.TIMESTAMP which is not correct
if ("TIMESTAMPLTZ".equalsIgnoreCase(columnTypeName)) {
putTimestampWithTimezone(json, columnName, resultSet, colIndex);
edgao marked this conversation as resolved.
Show resolved Hide resolved
return;
}
// https://www.cis.upenn.edu/~bcpierce/courses/629/jdkdocs/guide/jdbc/getstart/mapping.doc.html
switch (columnType) {
case BIT, BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex);
Expand All @@ -86,11 +125,43 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob
case CHAR, VARCHAR, LONGVARCHAR -> putString(json, columnName, resultSet, colIndex);
case DATE -> putDate(json, columnName, resultSet, colIndex);
case TIME -> putTime(json, columnName, resultSet, colIndex);
case TIMESTAMP, TIMESTAMP_WITH_TIMEZONE -> putTimestamp(json, columnName, resultSet, colIndex);
case TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex);
case TIMESTAMP_WITH_TIMEZONE -> putTimestampWithTimezone(json, columnName, resultSet, colIndex);
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(json, columnName, resultSet, colIndex);
case ARRAY -> putArray(json, columnName, resultSet, colIndex);
default -> putDefault(json, columnName, resultSet, colIndex);
}
}

@Override
protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
final LocalDate date = LocalDate.parse(value);
preparedStatement.setDate(parameterIndex, Date.valueOf(date));
edgao marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
protected void putTimestampWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
final Timestamp timestamp = resultSet.getTimestamp(index);
node.put(columnName, DateTimeConverter.convertToTimestampWithTimezone(timestamp));
}

@Override
protected void putTimestamp(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
final Timestamp timestamp = resultSet.getTimestamp(index);
node.put(columnName, DateTimeConverter.convertToTimestamp(timestamp));
}

@Override
protected void putDate(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
final Date date = resultSet.getDate(index);
node.put(columnName, DateTimeConverter.convertToDate(date));
}

@Override
protected void putTime(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
// resultSet.getTime() will lose nanoseconds precision
edgao marked this conversation as resolved.
Show resolved Hide resolved
final LocalTime localTime = resultSet.getTimestamp(index).toLocalDateTime().toLocalTime();
node.put(columnName, DateTimeConverter.convertToTime(localTime));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,33 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.integrations.source.relationaldb.models.DbStreamState;
import io.airbyte.integrations.source.snowflake.SnowflakeSource;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import java.math.BigDecimal;
import java.nio.file.Path;
import java.sql.JDBCType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -159,4 +166,60 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) {
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
}

protected List<AirbyteMessage> getTestMessages() {
return List.of(
new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace())
.withData(Jsons.jsonNode(Map
.of(COL_ID, ID_VALUE_1,
COL_NAME, "picard",
COL_UPDATED_AT, "2004-10-19")))),
new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace())
.withData(Jsons.jsonNode(Map
.of(COL_ID, ID_VALUE_2,
COL_NAME, "crusher",
COL_UPDATED_AT,
"2005-10-19")))),
new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace())
.withData(Jsons.jsonNode(Map
.of(COL_ID, ID_VALUE_3,
COL_NAME, "vash",
COL_UPDATED_AT, "2006-10-19")))));
}

@Override
protected void incrementalDateCheck() throws Exception {
super.incrementalCursorCheck(COL_UPDATED_AT,
"2005-10-18",
"2006-10-19",
Lists.newArrayList(getTestMessages().get(1),
getTestMessages().get(2)));
}

@Override
protected List<AirbyteMessage> getExpectedAirbyteMessagesSecondSync(final String namespace) {
final List<AirbyteMessage> expectedMessages = new ArrayList<>();
expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace)
.withData(Jsons.jsonNode(Map
.of(COL_ID, ID_VALUE_4,
COL_NAME, "riker",
COL_UPDATED_AT, "2006-10-19")))));
expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace)
.withData(Jsons.jsonNode(Map
.of(COL_ID, ID_VALUE_5,
COL_NAME, "data",
COL_UPDATED_AT, "2006-10-19")))));
final DbStreamState state = new DbStreamState()
.withStreamName(streamName)
.withStreamNamespace(namespace)
.withCursorField(List.of(COL_ID))
.withCursor("5");
expectedMessages.addAll(createExpectedTestMessages(List.of(state)));
return expectedMessages;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -254,29 +254,29 @@ protected void initTests() {
.sourceType("DATE")
.airbyteType(JsonSchemaType.STRING_DATE)
.addInsertValues("null", "'0001-01-01'", "'9999-12-31'")
.addExpectedValues(null, "0001-01-01T00:00:00Z", "9999-12-31T00:00:00Z")
.addExpectedValues(null, "0001-01-01", "9999-12-31")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("DATETIME")
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues("null", "'0001-01-01 00:00:00'", "'9999-12-31 23:59:59'", "'9999-12-31 23:59:59.123456'")
.addExpectedValues(null, "0001-01-01T00:00:00.000000Z", "9999-12-31T23:59:59.000000Z", "9999-12-31T23:59:59.123456Z")
.addExpectedValues(null, "0001-01-01T00:00:00.000000", "9999-12-31T23:59:59.000000", "9999-12-31T23:59:59.123456")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("TIME")
.airbyteType(JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE)
.addInsertValues("null", "'00:00:00'", "'1:59 PM'", "'23:59:59'")
.addExpectedValues(null, "1970-01-01T00:00:00Z", "1970-01-01T13:59:00Z",
"1970-01-01T23:59:59Z")
.addInsertValues("null", "'00:00:00'", "'1:59 PM'", "'23:59:59.123456'")
.addExpectedValues(null, "00:00:00.000000", "13:59:00.000000",
"23:59:59.123456")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("TIMESTAMP")
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues("null", "'2018-03-22 12:00:00.123'", "'2018-03-22 12:00:00.123456'")
.addExpectedValues(null, "2018-03-22T12:00:00.123000Z", "2018-03-22T12:00:00.123456Z")
.addExpectedValues(null, "2018-03-22T12:00:00.123000", "2018-03-22T12:00:00.123456")
.build());
addDataTypeTestData(
TestDataHolder.builder()
Expand All @@ -290,7 +290,7 @@ protected void initTests() {
.sourceType("TIMESTAMP_NTZ")
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues("null", "'2018-03-22 12:00:00.123 +05:00'", "'2018-03-22 12:00:00.123456 +05:00'")
.addExpectedValues(null, "2018-03-22T12:00:00.123000Z", "2018-03-22T12:00:00.123456Z")
.addExpectedValues(null, "2018-03-22T12:00:00.123000", "2018-03-22T12:00:00.123456")
.build());
addDataTypeTestData(
TestDataHolder.builder()
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ To read more please check official [Snowflake documentation](https://docs.snowfl

| Version | Date | Pull Request | Subject |
|:----------| :--- | :--- | :--- |
| 0.1.24 | 2022-09-26 | [17144](https://github.com/airbytehq/airbyte/pull/17144) | Fixed bug with incorrect date-time datatypes handling |
| 0.1.23 | 2022-09-26 | [17116](https://github.com/airbytehq/airbyte/pull/17116) | added connection string identifier |
| 0.1.22 | 2022-09-21 | [16766](https://github.com/airbytehq/airbyte/pull/16766) | Update JDBC Driver version to 3.13.22 |
| 0.1.21 | 2022-09-14 | [15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage |
Expand Down