Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-10703][Manager] Manager Add Oceanbase Support #10701

Merged
merged 8 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class DataNodeType {
public static final String SQLSERVER = "SQLSERVER";
public static final String MONGODB = "MONGODB";
public static final String DORIS = "DORIS";
public static final String OCEANBASE = "OCEANBASE";

/**
* Tencent cloud log service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ public class SinkType extends StreamType {
@SupportSortType(sortType = SortType.SORT_FLINK)
public static final String TUBEMQ = "TUBEMQ";

@SupportSortType(sortType = SortType.SORT_FLINK)
public static final String OCEANBASE = "OCEANBASE";

/**
* Tencent cloud log service
* Details: <a href="https://www.tencentcloud.com/products/cls">CLS</a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class SourceType extends StreamType {
public static final String MONGODB = "MONGODB";
public static final String REDIS = "REDIS";
public static final String MQTT = "MQTT";
public static final String OCEANBASE = "OCEANBASE";

public static final Map<String, TaskTypeEnum> SOURCE_TASK_MAP = new HashMap<String, TaskTypeEnum>() {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.common.fieldtype.strategy;

import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.fieldtype.FieldTypeMappingReader;

import org.springframework.stereotype.Service;

/**
* The oceanbase field type mapping strategy
*/
@Service
public class OceanBaseFieldTypeStrategy extends DefaultFieldTypeStrategy {

public OceanBaseFieldTypeStrategy() {
this.reader = new FieldTypeMappingReader(DataNodeType.OCEANBASE);
}

@Override
public Boolean accept(String type) {
return DataNodeType.OCEANBASE.equals(type);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

source.type.to.target.type.converter:

- source.type: TINYINT
target.type: TINYINT

- source.type: SMALLINT
target.type: SMALLINT

- source.type: TINYINT UNSIGNED
target.type: SMALLINT

- source.type: TINYINT UNSIGNED ZEROFILL
target.type: SMALLINT

- source.type: INT
target.type: INT

- source.type: INTEGER
target.type: INT

- source.type: YEAR
target.type: INT

- source.type: SHORT
target.type: SHORT

- source.type: MEDIUMINT
target.type: INT

- source.type: SMALLINT UNSIGNED
target.type: INT

- source.type: SMALLINT UNSIGNED ZEROFILL
target.type: INT

- source.type: BIGINT
target.type: LONG

- source.type: INT UNSIGNED
target.type: LONG

- source.type: MEDIUMINT UNSIGNED
target.type: LONG

- source.type: MEDIUMINT UNSIGNED ZEROFILL
target.type: LONG

- source.type: INT UNSIGNED ZEROFILL
target.type: LONG

- source.type: BIGINT UNSIGNED
target.type: DECIMAL

- source.type: BIGINT UNSIGNED ZEROFILL
target.type: DECIMAL

- source.type: SERIAL
target.type: DECIMAL

- source.type: FLOAT
target.type: FLOAT

- source.type: FLOAT UNSIGNED
target.type: FLOAT

- source.type: FLOAT UNSIGNED ZEROFILL
target.type: FLOAT

- source.type: DOUBLE
target.type: DOUBLE

- source.type: DOUBLE UNSIGNED
target.type: DOUBLE

- source.type: DOUBLE UNSIGNED ZEROFILL
target.type: DOUBLE

- source.type: DOUBLE PRECISION
target.type: DOUBLE

- source.type: DOUBLE PRECISION UNSIGNED
target.type: DOUBLE

- source.type: ZEROFILL
target.type: DOUBLE

- source.type: REAL
target.type: DOUBLE

- source.type: REAL UNSIGNED
target.type: DOUBLE

- source.type: REAL UNSIGNED ZEROFILL
target.type: DOUBLE

- source.type: NUMERIC
target.type: DECIMAL

- source.type: NUMERIC UNSIGNED
target.type: DECIMAL

- source.type: NUMERIC UNSIGNED ZEROFILL
target.type: DECIMAL

- source.type: DECIMAL
target.type: DECIMAL

- source.type: DECIMAL UNSIGNED
target.type: DECIMAL

- source.type: DECIMAL UNSIGNED ZEROFILL
target.type: DECIMAL

- source.type: FIXED
target.type: DECIMAL

- source.type: FIXED UNSIGNED
target.type: DECIMAL

- source.type: FIXED UNSIGNED ZEROFILL
target.type: DECIMAL

- source.type: BOOLEAN
target.type: BOOLEAN

- source.type: DATE
target.type: DATE

- source.type: TIME
target.type: TIME

- source.type: DATETIME
target.type: TIMESTAMP

- source.type: TIMESTAMP
target.type: TIMESTAMP

- source.type: CHAR
target.type: STRING

- source.type: JSON
target.type: STRING

- source.type: BIT
target.type: STRING

- source.type: VARCHAR
target.type: STRING

- source.type: TEXT
target.type: STRING

- source.type: BLOB
target.type: STRING

- source.type: TINYBLOB
target.type: STRING

- source.type: TINYTEXT
target.type: STRING

- source.type: MEDIUMBLOB
target.type: STRING

- source.type: MEDIUMTEXT
target.type: STRING

- source.type: LONGBLOB
target.type: STRING

- source.type: LONGTEXT
target.type: STRING

- source.type: VARBINARY
target.type: STRING

- source.type: GEOMETRY
target.type: STRING

- source.type: POINT
target.type: STRING

- source.type: LINESTRING
target.type: STRING

- source.type: POLYGON
target.type: STRING

- source.type: MULTIPOINT
target.type: STRING

- source.type: MULTILINESTRING
target.type: STRING

- source.type: MULTIPOLYGON
target.type: STRING

- source.type: GEOMETRYCOLLECTION
target.type: STRING

- source.type: ENUM
target.type: STRING

- source.type: STRING
target.type: STRING

- source.type: BINARY
target.type: BINARY

- source.type: BYTE
target.type: BYTE
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.node.oceanbase;

import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.validation.constraints.NotNull;

/**
* OceanBase data node info
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@ApiModel("OceanBase data node info")
public class OceanBaseDataNodeDTO {

private static final Logger LOGGER = LoggerFactory.getLogger(OceanBaseDataNodeDTO.class);
private static final String OCEANBASE_JDBC_PREFIX = "jdbc:oceanbase://";

@ApiModelProperty("URL of backup DB server")
private String backupUrl;

/**
* Get the dto instance from the request
*/
public static OceanBaseDataNodeDTO getFromRequest(OceanBaseDataNodeRequest request, String extParams) {
OceanBaseDataNodeDTO dto = StringUtils.isNotBlank(extParams)
? OceanBaseDataNodeDTO.getFromJson(extParams)
: new OceanBaseDataNodeDTO();
return CommonBeanUtils.copyProperties(request, dto, true);
}

/**
* Get the dto instance from the JSON string.
*/
public static OceanBaseDataNodeDTO getFromJson(@NotNull String extParams) {
try {
return JsonUtils.parseObject(extParams, OceanBaseDataNodeDTO.class);
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
String.format("Failed to parse extParams for OceanBase node: %s", e.getMessage()));
}
}

/**
* Convert ip:post to jdbcurl.
*/
public static String convertToJdbcurl(String url) {
String jdbcUrl = url;
if (StringUtils.isNotBlank(jdbcUrl) && !jdbcUrl.startsWith(OCEANBASE_JDBC_PREFIX)) {
jdbcUrl = OCEANBASE_JDBC_PREFIX + jdbcUrl;
}
return jdbcUrl;
}
}
Loading
Loading