Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
update
Browse files Browse the repository at this point in the history
yangbinbin committed Nov 13, 2022
1 parent cebb2a7 commit 4b57737
Showing 8 changed files with 394 additions and 1 deletion.
20 changes: 19 additions & 1 deletion seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
@@ -29,6 +29,14 @@

<artifactId>connector-jdbc</artifactId>

<repositories>
<repository>
<id>central-repos1</id>
<name>Central Repository 2</name>
<url>https://repo1.maven.org/maven2/</url>
</repository>
</repositories>

<properties>
<mysql.version>8.0.16</mysql.version>
<postgresql.version>42.4.1</postgresql.version>
@@ -37,6 +45,7 @@
<phoenix.version>5.2.5-HBase-2.x</phoenix.version>
<oracle.version>12.2.0.1</oracle.version>
<db2.version>db2jcc4</db2.version>
<teradata.version>17.20.00.12</teradata.version>
</properties>

<dependencyManagement>
@@ -83,7 +92,12 @@
<version>${db2.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.teradata.jdbc</groupId>
<artifactId>terajdbc4</artifactId>
<version>${teradata.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>

@@ -119,6 +133,10 @@
<groupId>com.ibm.db2.jcc</groupId>
<artifactId>db2jcc</artifactId>
</dependency>
<dependency>
<groupId>com.teradata.jdbc</groupId>
<artifactId>terajdbc4</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

public class TeradataDialect implements JdbcDialect {

@Override
public String dialectName() {
return "Teradata";
}

@Override
public JdbcRowConverter getRowConverter() {
return new TeradataJdbcRowConverter();
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new TeradataTypeMapper();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;

import com.google.auto.service.AutoService;

@AutoService(JdbcDialectFactory.class)
public class TeradataDialectFactory implements JdbcDialectFactory {

@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:teradata:");
}

@Override
public JdbcDialect create() {
return new TeradataDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;

public class TeradataJdbcRowConverter extends AbstractJdbcRowConverter {

@Override
public String converterName() {
return "Teradata";
}

@Override
public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException {
return super.toInternal(rs, metaData, typeInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata;

import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import java.sql.ResultSetMetaData;
import java.sql.SQLException;

public class TeradataTypeMapper implements JdbcDialectTypeMapper {

// ============================data types=====================

// -------------------------number----------------------------
private static final String TERADATA_BYTEINT = "BYTEINT";
private static final String TERADATA_SMALLINT = "SMALLINT";
private static final String TERADATA_INTEGER = "INTEGER";
private static final String TERADATA_BIGINT = "BIGINT";
private static final String TERADATA_FLOAT = "FLOAT";
private static final String TERADATA_DECIMAL = "DECIMAL";

// -------------------------string----------------------------
private static final String TERADATA_CHAR = "CHAR";
private static final String TERADATA_VARCHAR = "VARCHAR";
private static final String TERADATA_CLOB = "CLOB";


// ---------------------------binary---------------------------
private static final String TERADATA_BYTE = "BYTE";
private static final String TERADATA_VARBYTE = "VARBYTE";

// ------------------------------time-------------------------
private static final String TERADATA_DATE = "DATE";
private static final String TERADATA_TIME = "TIME";
private static final String TERADATA_TIMESTAMP = "TIMESTAMP";

// ------------------------------blob-------------------------
private static final String TERADATA_BLOB = "BLOB";

@Override
public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
String teradataType = metadata.getColumnTypeName(colIndex).toUpperCase();
switch (teradataType) {
case TERADATA_BYTEINT:
return BasicType.BYTE_TYPE;
case TERADATA_SMALLINT:
return BasicType.SHORT_TYPE;
case TERADATA_INTEGER:
return BasicType.INT_TYPE;
case TERADATA_BIGINT:
return BasicType.LONG_TYPE;
case TERADATA_FLOAT:
return BasicType.FLOAT_TYPE;
case TERADATA_DECIMAL:
return new DecimalType(metadata.getPrecision(colIndex), metadata.getScale(colIndex));
case TERADATA_CHAR:
case TERADATA_VARCHAR:
case TERADATA_CLOB:
return PrimitiveByteArrayType.INSTANCE;
case TERADATA_BYTE:
case TERADATA_VARBYTE:
case TERADATA_BLOB:
return ArrayType.BYTE_ARRAY_TYPE;
case TERADATA_DATE:
return LocalTimeType.LOCAL_DATE_TYPE;
case TERADATA_TIME:
return LocalTimeType.LOCAL_TIME_TYPE;
case TERADATA_TIMESTAMP:
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
default:
final String jdbcColumnName = metadata.getColumnName(colIndex);
throw new UnsupportedOperationException(
String.format(
"Doesn't support TERADATA type '%s' on column '%s' yet.",
teradataType, jdbcColumnName));
}
}
}
Original file line number Diff line number Diff line change
@@ -86,6 +86,11 @@
<artifactId>db2jcc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.teradata.jdbc</groupId>
<artifactId>terajdbc4</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc;

import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.TestContainer;

import com.teradata.jdbc.TeraDataSource;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;

import java.sql.Connection;
import java.sql.Statement;

@Disabled("Disabled because it needs user's personal teradata account to run this test")
public class JdbcTeradataIT extends TestSuiteBase implements TestResource {
private static final String HOST = "192.168.109.128";
private static final String PORT = "1025";
private static final String USERNAME = "dbc";
private static final String PASSWORD = "dbc";
private static final String DATABASE = "test";
private static final String SINK_TABLE = "sink_table";
private Connection connection;

@TestTemplate
public void testTeradata(TestContainer container) throws Exception {
Container.ExecResult execResult = container.executeJob("/jdbc_teradata_source_and_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
clearSinkTable();
}

private void clearSinkTable() {
try (Statement statement = connection.createStatement()) {
statement.execute(String.format("delete from %s", SINK_TABLE));
} catch (Exception e) {
throw new RuntimeException("Test teradata server failed!", e);
}
}

@BeforeAll
@Override
public void startUp() throws Exception {
TeraDataSource teraDataSource = new TeraDataSource();
teraDataSource.setDSName(HOST);
teraDataSource.setDbsPort(PORT);
teraDataSource.setUser(USERNAME);
teraDataSource.setPassword(PASSWORD);
teraDataSource.setDATABASE(DATABASE);
this.connection = teraDataSource.getConnection();
}

@AfterAll
@Override
public void tearDown() throws Exception {
if (connection != null) {
this.connection.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
execution.parallelism = 1
job.mode = "BATCH"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Jdbc {
driver = com.teradata.jdbc.TeraDriver
url = "jdbc:teradata://192.168.109.128:1025/DATABASE=test,TYPE=FASTEXPORT"
user = "dbc"
password = "dbc"
query = """
select id,
c_byteint,
c_smallint,
c_integer,
c_bigint,
c_float,
c_decimal,
c_char,
c_varchar,
c_byte,
c_varbyte,
c_date,
c_timestamp
from source_table;
"""
}
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
}

sink {
Jdbc {
driver = com.teradata.jdbc.TeraDriver
url = "jdbc:teradata://192.168.109.128:1025/DATABASE=test,TYPE=FASTLOAD"
user = "dbc"
password = "dbc"
query = """
insert into sink_table(id,
c_byteint,
c_smallint,
c_integer,
c_bigint,
c_float,
c_decimal,
c_char,
c_varchar,
c_byte,
c_varbyte,
c_date,
c_timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
}

0 comments on commit 4b57737

Please sign in to comment.