Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Catalog] Add JDBC Catalog auto create table #4917

Merged
merged 62 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
0719d5f
[feature] Add JDBC Catalog auto create table
XiaoJiang521 Jun 13, 2023
456d5df
[feature] Add license
XiaoJiang521 Jun 13, 2023
99a246e
[feature] Add license
XiaoJiang521 Jun 14, 2023
c2fb63b
[feature] Add license
XiaoJiang521 Jun 14, 2023
6e7558d
[feature] Add license
XiaoJiang521 Jun 15, 2023
c14c728
[feature] update e2e
XiaoJiang521 Jun 15, 2023
f9dc33d
[feature] Update e2e
XiaoJiang521 Jun 15, 2023
6d55f19
[feature] Update e2e disable spark and flink
XiaoJiang521 Jun 15, 2023
2fed962
[feature] Update catalog test
XiaoJiang521 Jun 15, 2023
03aaece
[feature] Add license
XiaoJiang521 Jun 15, 2023
57e6fb2
[feature] Add license
XiaoJiang521 Jun 15, 2023
d50186a
[feature] Add license
XiaoJiang521 Jun 15, 2023
020f4a3
fix
ic4y Jun 15, 2023
ead392f
[feature] Add license
XiaoJiang521 Jun 15, 2023
86231c0
[feature] update ete catalog create table
ic4y Jun 16, 2023
36cbcc2
[feature] update ete catalog create table
ic4y Jun 16, 2023
4bd9f25
[feature] spotless
XiaoJiang521 Jun 16, 2023
8e5b808
[feature] update port
XiaoJiang521 Jun 16, 2023
3ab0b36
[feature] spotless
XiaoJiang521 Jun 19, 2023
0882051
[feature] add part4
XiaoJiang521 Jun 19, 2023
690fe6c
[feature] null point
XiaoJiang521 Jun 20, 2023
3885e60
[feature] update cdc create table conf file
XiaoJiang521 Jun 20, 2023
b8e4633
[bugfix] Log level
XiaoJiang521 Jun 29, 2023
8d5844d
[bugfix] Catalog bug
XiaoJiang521 Jul 3, 2023
c0c7383
[bugfix] MySqlCatalog bug
XiaoJiang521 Jul 3, 2023
49fb9b4
[feature] Add JDBC Catalog auto create table
XiaoJiang521 Jun 13, 2023
40a57f1
[feature] Add license
XiaoJiang521 Jun 13, 2023
f0854fa
[feature] Add license
XiaoJiang521 Jun 14, 2023
4d0bd09
[feature] Add license
XiaoJiang521 Jun 14, 2023
eac82d8
[feature] Add license
XiaoJiang521 Jun 15, 2023
337d112
[feature] update e2e
XiaoJiang521 Jun 15, 2023
9481fa0
[feature] Update e2e
XiaoJiang521 Jun 15, 2023
5e8c9d1
[feature] Update e2e disable spark and flink
XiaoJiang521 Jun 15, 2023
c1dda9f
[feature] Update catalog test
XiaoJiang521 Jun 15, 2023
2d0e242
[feature] Add license
XiaoJiang521 Jun 15, 2023
47b97d1
[feature] Add license
XiaoJiang521 Jun 15, 2023
666c6cf
[feature] Add license
XiaoJiang521 Jun 15, 2023
4a7173c
fix
ic4y Jun 15, 2023
f6132cb
[feature] Add license
XiaoJiang521 Jun 15, 2023
61c3c76
[feature] update ete catalog create table
ic4y Jun 16, 2023
5534b31
[feature] update ete catalog create table
ic4y Jun 16, 2023
ba30ca1
[feature] spotless
XiaoJiang521 Jun 16, 2023
6a3b913
[feature] update port
XiaoJiang521 Jun 16, 2023
d08c15f
[feature] spotless
XiaoJiang521 Jun 19, 2023
69fdde6
[feature] add part4
XiaoJiang521 Jun 19, 2023
8b742eb
[feature] null point
XiaoJiang521 Jun 20, 2023
6900812
[feature] update cdc create table conf file
XiaoJiang521 Jun 20, 2023
33f7f89
[bugfix] Log level
XiaoJiang521 Jun 29, 2023
c35fc94
[bugfix] Catalog bug
XiaoJiang521 Jul 3, 2023
5001f69
[bugfix] MySqlCatalog bug
XiaoJiang521 Jul 3, 2023
82594b4
Merge remote-tracking branch 'xj-apache-seatunnel/dev_catalog_auto_cr…
XiaoJiang521 Jul 10, 2023
dd28afd
[feature][doris] merge
XiaoJiang521 Jul 11, 2023
62ba257
[bugfix][jdbc] mysql e2e
XiaoJiang521 Jul 18, 2023
a6a99c9
[bugfix][jdbc] update pg sink conf ,because flink cdc error
XiaoJiang521 Jul 18, 2023
edaaa07
[bugfix][jdbc] test flink
XiaoJiang521 Jul 19, 2023
a395b1f
[bugfix][jdbc] truncate table sink;
XiaoJiang521 Jul 19, 2023
b20457f
[bugfix][jdbc] truncate table sink;
XiaoJiang521 Jul 19, 2023
16f49f0
[bugfix][jdbc] mongodb e2e update
XiaoJiang521 Jul 19, 2023
29ec3f2
[bugfix][jdbc] mongodb options update,because e2e error
XiaoJiang521 Jul 19, 2023
956171a
[licensed] licensed
XiaoJiang521 Jul 20, 2023
a43a8c9
[licensed] spot
XiaoJiang521 Jul 20, 2023
e29c9ad
[bugfix][e2e] mongodb cdc e2e error
XiaoJiang521 Jul 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,30 @@ jobs:
env:
MAVEN_OPTS: -Xmx4096m

jdbc-connectors-it-part-4:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true'
runs-on: ${{ matrix.os }}
strategy:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 90
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v3
with:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: run jdbc connectors integration test (part-4)
if: needs.changes.outputs.api == 'true'
run: |
./mvnw -B -T 1C verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-4 -am -Pci
env:
MAVEN_OPTS: -Xmx4096m

kafka-connector-it:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public final class CatalogTable implements Serializable {

private final String comment;

private final String catalogName;

public static CatalogTable of(
TableIdentifier tableId,
TableSchema tableSchema,
Expand All @@ -47,17 +49,38 @@ public static CatalogTable of(
return new CatalogTable(tableId, tableSchema, options, partitionKeys, comment);
}

public static CatalogTable of(
TableIdentifier tableId,
TableSchema tableSchema,
Map<String, String> options,
List<String> partitionKeys,
String comment,
String catalogName) {
return new CatalogTable(tableId, tableSchema, options, partitionKeys, comment, catalogName);
}

private CatalogTable(
TableIdentifier tableId,
TableSchema tableSchema,
Map<String, String> options,
List<String> partitionKeys,
String comment) {
this(tableId, tableSchema, options, partitionKeys, comment, "");
}

private CatalogTable(
TableIdentifier tableId,
TableSchema tableSchema,
Map<String, String> options,
List<String> partitionKeys,
String comment,
String catalogName) {
this.tableId = tableId;
this.tableSchema = tableSchema;
this.options = options;
this.partitionKeys = partitionKeys;
this.comment = comment;
this.catalogName = catalogName;
}

public TableIdentifier getTableId() {
Expand All @@ -80,6 +103,10 @@ public String getComment() {
return comment;
}

public String getCatalogName() {
return catalogName;
}

@Override
public String toString() {
return "CatalogTable{"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change the license header

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error resolved and fixed

* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -23,6 +22,7 @@
import lombok.Data;

import java.io.Serializable;
import java.util.Map;

/**
* Represent the column of {@link TableSchema}.
Expand Down Expand Up @@ -54,19 +54,71 @@ public abstract class Column implements Serializable {

protected final String comment;

/** Field type in the database * */
protected final String sourceType;

/** Unsigned bit * */
protected final boolean isUnsigned;

/** Whether to use the 0 bit * */
protected final boolean isZeroFill;

/** Bit length * */
protected final Long bitLen;

/** integer may be cross the border * */
protected final Long longColumnLength;

/** your options * */
protected final Map<String, Object> options;

protected Column(
String name,
SeaTunnelDataType<?> dataType,
Integer columnLength,
boolean nullable,
Object defaultValue,
String comment) {
this(
name,
dataType,
columnLength,
nullable,
defaultValue,
comment,
null,
false,
false,
null,
0L,
null);
}

protected Column(
String name,
SeaTunnelDataType<?> dataType,
Integer columnLength,
boolean nullable,
Object defaultValue,
String comment,
String sourceType,
boolean isUnsigned,
boolean isZeroFill,
Long bitLen,
Long longColumnLength,
Map<String, Object> options) {
this.name = name;
this.dataType = dataType;
this.columnLength = columnLength;
this.nullable = nullable;
this.defaultValue = defaultValue;
this.comment = comment;
this.sourceType = sourceType;
this.isUnsigned = isUnsigned;
this.isZeroFill = isZeroFill;
this.bitLen = bitLen;
this.longColumnLength = longColumnLength;
this.options = options;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -23,6 +22,8 @@
import lombok.EqualsAndHashCode;
import lombok.ToString;

import java.util.Map;

/** Representation of a physical column. */
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
Expand All @@ -38,6 +39,34 @@ protected PhysicalColumn(
super(name, dataType, columnLength, nullable, defaultValue, comment);
}

protected PhysicalColumn(
String name,
SeaTunnelDataType<?> dataType,
Integer columnLength,
boolean nullable,
Object defaultValue,
String comment,
String sourceType,
boolean isUnsigned,
boolean isZeroFill,
Long bitLen,
Long longColumnLength,
Map<String, Object> options) {
super(
name,
dataType,
columnLength,
nullable,
defaultValue,
comment,
sourceType,
isUnsigned,
isZeroFill,
bitLen,
longColumnLength,
options);
}

public static PhysicalColumn of(
String name,
SeaTunnelDataType<?> dataType,
Expand All @@ -48,18 +77,70 @@ public static PhysicalColumn of(
return new PhysicalColumn(name, dataType, columnLength, nullable, defaultValue, comment);
}

public static PhysicalColumn of(
String name,
SeaTunnelDataType<?> dataType,
Integer columnLength,
boolean nullable,
Object defaultValue,
String comment,
String sourceType,
boolean isUnsigned,
boolean isZeroFill,
Long bitLen,
Map<String, Object> options,
Long longColumnLength) {
return new PhysicalColumn(
name,
dataType,
columnLength,
nullable,
defaultValue,
comment,
sourceType,
isUnsigned,
isZeroFill,
bitLen,
longColumnLength,
options);
}

@Override
public boolean isPhysical() {
return true;
}

@Override
public Column copy(SeaTunnelDataType<?> newType) {
return PhysicalColumn.of(name, newType, columnLength, nullable, defaultValue, comment);
return PhysicalColumn.of(
name,
newType,
columnLength,
nullable,
defaultValue,
comment,
sourceType,
isUnsigned,
isZeroFill,
bitLen,
options,
longColumnLength);
}

@Override
public Column copy() {
return PhysicalColumn.of(name, dataType, columnLength, nullable, defaultValue, comment);
return PhysicalColumn.of(
name,
dataType,
columnLength,
nullable,
defaultValue,
comment,
sourceType,
isUnsigned,
isZeroFill,
bitLen,
options,
longColumnLength);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.bson.json.JsonMode;
import org.bson.json.JsonWriterSettings;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -234,20 +232,39 @@ public class MongodbSourceOptions extends SourceOptions {
.withDescription(
"Decides if the table options contains Debezium client properties that start with prefix 'debezium'.");

// public static final Option<StartupMode> STARTUP_MODE =
// Options.key(SourceOptions.STARTUP_MODE_KEY)
// .singleChoice(
// StartupMode.class,
// Arrays.asList(
// StartupMode.INITIAL, StartupMode.EARLIEST,
// StartupMode.LATEST))
// .defaultValue(StartupMode.INITIAL)
// .withDescription(
// "Optional startup mode for CDC source, valid enumerations are "
// + "\"initial\", \"earliest\", \"latest\", \"timestamp\"\n
// or \"specific\"");

public static final Option<StartupMode> STARTUP_MODE =
Options.key(SourceOptions.STARTUP_MODE_KEY)
.singleChoice(
StartupMode.class,
Arrays.asList(
StartupMode.INITIAL, StartupMode.EARLIEST, StartupMode.LATEST))
.enumType(StartupMode.class)
.defaultValue(StartupMode.INITIAL)
.withDescription(
"Optional startup mode for CDC source, valid enumerations are "
+ "\"initial\", \"earliest\", \"latest\", \"timestamp\"\n or \"specific\"");

// public static final Option<StopMode> STOP_MODE =
// Options.key(SourceOptions.STOP_MODE_KEY)
// .singleChoice(StopMode.class, Collections.singletonList(StopMode.NEVER))
// .defaultValue(StopMode.NEVER)
// .withDescription(
// "Optional stop mode for CDC source, valid enumerations are "
// + "\"never\", \"latest\", \"timestamp\"\n or
// \"specific\"");

public static final Option<StopMode> STOP_MODE =
Options.key(SourceOptions.STOP_MODE_KEY)
.singleChoice(StopMode.class, Collections.singletonList(StopMode.NEVER))
.enumType(StopMode.class)
.defaultValue(StopMode.NEVER)
.withDescription(
"Optional stop mode for CDC source, valid enumerations are "
Expand Down
Loading