From 590e46ebe3471a8f72acc199cb8cad120b49fe42 Mon Sep 17 00:00:00 2001 From: zoy0 <105140381+zoy0@users.noreply.github.com> Date: Fri, 2 Aug 2024 09:48:25 +0800 Subject: [PATCH] [INLONG-10289][Agent] Update the SQLServer Source (#10735) --- .../inlong/agent/constant/TaskConstants.java | 11 + .../inlong/agent/pojo/SqlServerTask.java | 5 + .../inlong/agent/pojo/TaskProfileDto.java | 5 + .../plugin/instance/SQLServerInstance.java | 31 +++ .../agent/plugin/sources/SQLServerSource.java | 172 +++++++++++++--- .../agent/plugin/task/SQLServerTask.java | 111 +++++++++++ .../plugin/utils/SQLServerTimeConverter.java | 124 ++++++++++++ .../plugin/sources/TestSQLServerSource.java | 188 +++++++++++++++--- 8 files changed, 585 insertions(+), 62 deletions(-) create mode 100644 inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/SQLServerInstance.java create mode 100644 inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/SQLServerTask.java create mode 100644 inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/SQLServerTimeConverter.java diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java index 4cd6ac56ed4..1f142f839e2 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java @@ -153,6 +153,17 @@ public class TaskConstants extends CommonConstants { public static final String TASK_MQTT_AUTOMATIC_RECONNECT = "task.mqttTask.automaticReconnect"; public static final String TASK_MQTT_VERSION = "task.mqttTask.mqttVersion"; + // SQLServer task + public static final String TASK_SQLSERVER_HOSTNAME = "task.sqlserverTask.hostname"; + public static final String TASK_SQLSERVER_PORT = "task.sqlserverTask.port"; + public static final String TASK_SQLSERVER_USER = "task.sqlserverTask.user"; + public static final String TASK_SQLSERVER_PASSWORD = "task.sqlserverTask.password"; + public static final String TASK_SQLSERVER_DB_NAME = "task.sqlserverTask.dbname"; + public static final String TASK_SQLSERVER_SNAPSHOT_MODE = "task.sqlserverTask.snapshot.mode"; + public static final String TASK_SQLSERVER_SERVER_NAME = "task.sqlserverTask.serverName"; + public static final String TASK_SQLSERVER_SCHEMA_NAME = "task.sqlserverTask.schemaName"; + public static final String TASK_SQLSERVER_TABLE_NAME = "task.sqlserverTask.tableName"; + public static final String TASK_STATE = "task.state"; public static final String INSTANCE_STATE = "instance.state"; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerTask.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerTask.java index 56e4a9b9200..9240e8366a6 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerTask.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerTask.java @@ -28,6 +28,9 @@ public class SqlServerTask { private String port; private String serverName; private String dbname; + private String schemaName; + private String tableName; + private String serverTimezone; private SqlServerTask.Snapshot snapshot; private SqlServerTask.Offset offset; @@ -63,6 +66,8 @@ public static class SqlserverTaskConfig { private String port; private String database; private String schemaName; + private String tableName; + private String serverTimezone; private String snapshotMode; private String intervalMs; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java index cc6cfe82446..039acea32d6 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java @@ -54,6 +54,7 @@ public class TaskProfileDto { public static final String DEFAULT_MONGODB_TASK = "org.apache.inlong.agent.plugin.task.MongoDBTask"; public static final String DEFAULT_POSTGRESQL_TASK = "org.apache.inlong.agent.plugin.task.PostgreSQLTask"; public static final String DEFAULT_MQTT_TASK = "org.apache.inlong.agent.plugin.task.MqttTask"; + public static final String DEFAULT_SQLSERVER_TASK = "org.apache.inlong.agent.plugin.task.SQLServerTask"; public static final String DEFAULT_CHANNEL = "org.apache.inlong.agent.plugin.channel.MemoryChannel"; public static final String MANAGER_JOB = "MANAGER_JOB"; public static final String DEFAULT_DATA_PROXY_SINK = "org.apache.inlong.agent.plugin.sinks.ProxySink"; @@ -343,6 +344,9 @@ private static SqlServerTask getSqlServerTask(DataConfig dataConfigs) { sqlServerTask.setPort(config.getPort()); sqlServerTask.setServerName(config.getSchemaName()); sqlServerTask.setDbname(config.getDatabase()); + sqlServerTask.setSchemaName(config.getSchemaName()); + sqlServerTask.setTableName(config.getSchemaName() + "." + config.getTableName()); + sqlServerTask.setServerTimezone(config.getServerTimezone()); SqlServerTask.Offset offset = new SqlServerTask.Offset(); offset.setFilename(config.getOffsetFilename()); @@ -495,6 +499,7 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) { profileDto.setTask(task); break; case SQLSERVER: + task.setTaskClass(DEFAULT_SQLSERVER_TASK); SqlServerTask sqlserverTask = getSqlServerTask(dataConfig); task.setSqlserverTask(sqlserverTask); task.setSource(SQLSERVER_SOURCE); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/SQLServerInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/SQLServerInstance.java new file mode 100644 index 00000000000..735f416ff94 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/SQLServerInstance.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.instance; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.constant.TaskConstants; + +import java.io.IOException; + +public class SQLServerInstance extends CommonInstance { + + @Override + public void setInodeInfo(InstanceProfile profile) throws IOException { + profile.set(TaskConstants.INODE_INFO, ""); + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java index e067833c6b9..01e61a99bd9 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java @@ -17,66 +17,184 @@ package org.apache.inlong.agent.plugin.sources; +import org.apache.inlong.agent.common.AgentThreadFactory; +import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; -import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.constant.SqlServerConstants; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.except.FileException; import org.apache.inlong.agent.plugin.file.Reader; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; -import org.apache.inlong.agent.plugin.sources.reader.SQLServerReader; +import io.debezium.connector.sqlserver.SqlServerConnector; +import io.debezium.connector.sqlserver.SqlServerConnectorConfig; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.format.Json; +import io.debezium.engine.spi.OffsetCommitPolicy; +import io.debezium.relational.history.FileDatabaseHistory; +import org.apache.kafka.connect.storage.FileOffsetBackingStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; /** * SQLServer source */ public class SQLServerSource extends AbstractSource { - private static final Logger logger = LoggerFactory.getLogger(SQLServerSource.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SQLServerSource.class); + private static final Integer DEBEZIUM_QUEUE_SIZE = 100; + private ExecutorService executor; + public InstanceProfile profile; + private BlockingQueue debeziumQueue; + private final Properties props = new Properties(); + private String dbName; + private String schemaName; + private String tableName; public SQLServerSource() { } - @Override - public List split(TaskProfile conf) { - SQLServerReader sqlServerReader = new SQLServerReader(); - List readerList = new ArrayList<>(); - readerList.add(sqlServerReader); - sourceMetric.sourceSuccessCount.incrementAndGet(); - return readerList; + protected void initSource(InstanceProfile profile) { + try { + LOGGER.info("SQLServerSource init: {}", profile.toJsonStr()); + debeziumQueue = new LinkedBlockingQueue<>(DEBEZIUM_QUEUE_SIZE); + + dbName = profile.get(TaskConstants.TASK_SQLSERVER_DB_NAME); + schemaName = profile.get(TaskConstants.TASK_SQLSERVER_SCHEMA_NAME); + tableName = profile.get(TaskConstants.TASK_SQLSERVER_TABLE_NAME); + + props.setProperty("name", "SQLServer-" + instanceId); + props.setProperty("connector.class", SqlServerConnector.class.getName()); + props.setProperty("offset.storage", FileOffsetBackingStore.class.getName()); + String agentPath = AgentConfiguration.getAgentConf() + .get(AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME); + String offsetPath = agentPath + "/" + getThreadName() + "/offset.dat"; + props.setProperty("offset.storage.file.filename", offsetPath); + props.setProperty("offset.flush.interval.ms", "10000"); + props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName()); + props.setProperty("database.history.file.filename", agentPath + "/" + getThreadName() + "/history.dat"); + // ignore "schema" and extract data from "payload" + props.setProperty("key.converter.schemas.enable", "false"); + props.setProperty("value.converter.schemas.enable", "false"); + // ignore ddl + props.setProperty("include.schema.changes", "false"); + // convert time to formatted string + props.setProperty("converters", "datetime"); + props.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.SQLServerTimeConverter"); + props.setProperty("datetime.format.date", "yyyy-MM-dd"); + props.setProperty("datetime.format.time", "HH:mm:ss"); + props.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss"); + props.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss"); + + props.setProperty(String.valueOf(SqlServerConnectorConfig.HOSTNAME), + profile.get(TaskConstants.TASK_SQLSERVER_HOSTNAME)); + props.setProperty(String.valueOf(SqlServerConnectorConfig.PORT), + profile.get(TaskConstants.TASK_SQLSERVER_PORT)); + props.setProperty(String.valueOf(SqlServerConnectorConfig.USER), + profile.get(TaskConstants.TASK_SQLSERVER_USER)); + props.setProperty(String.valueOf(SqlServerConnectorConfig.PASSWORD), + profile.get(TaskConstants.TASK_SQLSERVER_PASSWORD)); + props.setProperty(String.valueOf(SqlServerConnectorConfig.DATABASE_NAME), + profile.get(TaskConstants.TASK_SQLSERVER_DB_NAME)); + props.setProperty(String.valueOf(SqlServerConnectorConfig.SNAPSHOT_MODE), + profile.get(TaskConstants.TASK_SQLSERVER_SNAPSHOT_MODE, SqlServerConstants.INITIAL)); + props.setProperty(String.valueOf(SqlServerConnectorConfig.SERVER_NAME), + profile.get(TaskConstants.TASK_SQLSERVER_SERVER_NAME)); + props.setProperty(String.valueOf(SqlServerConnectorConfig.SCHEMA_INCLUDE_LIST), + profile.get(TaskConstants.TASK_SQLSERVER_SCHEMA_NAME)); + props.setProperty(String.valueOf(SqlServerConnectorConfig.TABLE_INCLUDE_LIST), + profile.get(TaskConstants.TASK_SQLSERVER_TABLE_NAME)); + + executor = Executors.newSingleThreadExecutor(); + executor.execute(startDebeziumEngine()); + } catch (Exception ex) { + stopRunning(); + throw new FileException("error init stream for " + instanceId, ex); + } + } + + private Runnable startDebeziumEngine() { + return () -> { + AgentThreadFactory.nameThread(getThreadName() + "debezium"); + try (DebeziumEngine> debeziumEngine = DebeziumEngine.create(Json.class) + .using(props) + .using(OffsetCommitPolicy.always()) + .notifying(this::handleConsumerEvent) + .build()) { + + debeziumEngine.run(); + } catch (Throwable e) { + LOGGER.error("do run error in SQLServer debezium: ", e); + } + }; + } + + private void handleConsumerEvent(List> records, + DebeziumEngine.RecordCommitter> committer) throws InterruptedException { + for (ChangeEvent record : records) { + boolean offerSuc = false; + SourceData sourceData = new SourceData(record.value().getBytes(StandardCharsets.UTF_8), "0"); + while (isRunnable() && !offerSuc) { + offerSuc = debeziumQueue.offer(sourceData, 1, TimeUnit.SECONDS); + } + committer.markProcessed(record); + } + committer.markBatchFinished(); } @Override - protected String getThreadName() { + public List split(TaskProfile conf) { return null; } @Override - protected void initSource(InstanceProfile profile) { - + protected String getThreadName() { + return "SQLServer-source-" + taskId + "-" + instanceId; } @Override protected void printCurrentState() { - + LOGGER.info("sqlserver databases is {} and schema is {} and table is {}", dbName, schemaName, tableName); } @Override protected boolean doPrepareToRead() { - return false; + return true; } @Override protected List readFromSource() { - return null; - } - - @Override - public Message read() { - return null; + List dataList = new ArrayList<>(); + try { + int size = 0; + while (size < BATCH_READ_LINE_TOTAL_LEN) { + SourceData sourceData = debeziumQueue.poll(1, TimeUnit.SECONDS); + if (sourceData != null) { + LOGGER.info("readFromSource: {}", sourceData.getData()); + size += sourceData.getData().length; + dataList.add(sourceData); + } else { + break; + } + } + + } catch (InterruptedException e) { + LOGGER.error("poll {} data from debezium queue interrupted.", instanceId); + } + return dataList; } @Override @@ -86,16 +204,12 @@ protected boolean isRunnable() { @Override protected void releaseSource() { - - } - - @Override - public boolean sourceFinish() { - return false; + LOGGER.info("release sqlserver source"); + executor.shutdownNow(); } @Override public boolean sourceExist() { - return false; + return true; } -} +} \ No newline at end of file diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/SQLServerTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/SQLServerTask.java new file mode 100644 index 00000000000..dd1446b301e --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/SQLServerTask.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.task; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.CycleUnitType; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.utils.AgentUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; + +public class SQLServerTask extends AbstractTask { + + private static final Logger LOGGER = LoggerFactory.getLogger(SQLServerTask.class); + public static final String DEFAULT_SQLSERVER_INSTANCE = "org.apache.inlong.agent.plugin.instance.SQLServerInstance"; + private boolean isAdded = false; + + private String dbName; + private String schemaName; + private String tableName; + private String instanceId; + + private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHH"); + + @Override + protected int getInstanceLimit() { + return DEFAULT_INSTANCE_LIMIT; + } + + @Override + protected void initTask() { + LOGGER.info("SQLServer commonInit: {}", taskProfile.toJsonStr()); + dbName = taskProfile.get(TaskConstants.TASK_SQLSERVER_DB_NAME); + tableName = taskProfile.get(TaskConstants.TASK_SQLSERVER_TABLE_NAME); + schemaName = taskProfile.get(TaskConstants.TASK_SQLSERVER_SCHEMA_NAME); + instanceId = dbName + "-" + tableName; + } + + @Override + public boolean isProfileValid(TaskProfile profile) { + if (!profile.allRequiredKeyExist()) { + LOGGER.error("task profile needs all required key"); + return false; + } + if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_HOSTNAME)) { + LOGGER.error("task profile needs hostname"); + return false; + } + if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_PORT)) { + LOGGER.error("task profile needs port"); + return false; + } + if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_USER)) { + LOGGER.error("task profile needs username"); + return false; + } + if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_PASSWORD)) { + LOGGER.error("task profile needs password"); + return false; + } + if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_DB_NAME)) { + LOGGER.error("task profile needs DB name"); + return false; + } + if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_SCHEMA_NAME)) { + LOGGER.error("task profile needs schema name"); + return false; + } + if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_TABLE_NAME)) { + LOGGER.error("task profile needs table name"); + return false; + } + return true; + } + + @Override + protected List getNewInstanceList() { + List list = new ArrayList<>(); + if (isAdded) { + return list; + } + String dataTime = LocalDateTime.now().format(dateTimeFormatter); + InstanceProfile instanceProfile = taskProfile.createInstanceProfile(DEFAULT_SQLSERVER_INSTANCE, instanceId, + CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime()); + list.add(instanceProfile); + this.isAdded = true; + return list; + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/SQLServerTimeConverter.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/SQLServerTimeConverter.java new file mode 100644 index 00000000000..008dbda4eaa --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/SQLServerTimeConverter.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.utils; + +import io.debezium.spi.converter.CustomConverter; +import io.debezium.spi.converter.RelationalColumn; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.DateTimeException; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Properties; +import java.util.function.Consumer; + +public class SQLServerTimeConverter implements CustomConverter { + + private static final Logger LOGGER = LoggerFactory.getLogger(SQLServerTimeConverter.class); + + private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE; + private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME; + private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME; + + private ZoneOffset defalutZoneOffset = ZoneOffset.systemDefault().getRules().getOffset(Instant.now()); + + @Override + public void configure(Properties props) { + readProps(props, "format.date", p -> dateFormatter = DateTimeFormatter.ofPattern(p)); + readProps(props, "format.time", p -> timeFormatter = DateTimeFormatter.ofPattern(p)); + readProps(props, "format.datetime", p -> datetimeFormatter = DateTimeFormatter.ofPattern(p)); + readProps(props, "format.timestamp.zone", z -> defalutZoneOffset = ZoneOffset.of(z)); + } + + private void readProps(Properties properties, String settingKey, Consumer callback) { + String settingValue = (String) properties.get(settingKey); + if (settingValue == null || settingValue.length() == 0) { + return; + } + try { + callback.accept(settingValue.trim()); + } catch (IllegalArgumentException | DateTimeException e) { + LOGGER.error("The {} setting is illegal:{}", settingKey, settingValue); + throw e; + } + } + + @Override + public void converterFor(RelationalColumn column, ConverterRegistration registration) { + String sqlType = column.typeName().toUpperCase(); + SchemaBuilder schemaBuilder = null; + Converter converter = null; + if ("DATE".equals(sqlType)) { + schemaBuilder = SchemaBuilder.string().optional().name("org.apache.inlong.agent.date.string"); + converter = this::convertDate; + } + if ("TIME".equals(sqlType)) { + schemaBuilder = SchemaBuilder.string().optional().name("org.apache.inlong.agent.time.string"); + converter = this::convertTime; + } + if ("DATETIME".equals(sqlType) || + "DATETIME2".equals(sqlType) || + "SMALLDATETIME".equals(sqlType)) { + schemaBuilder = SchemaBuilder.string().optional().name("org.apache.inlong.agent.datetime.string"); + converter = this::convertDateTime; + } + if ("DATETIMEOFFSET".equals(sqlType)) { + schemaBuilder = SchemaBuilder.string().optional().name("org.apache.inlong.agent.datetimeoffset.string"); + converter = this::convertDateTimeOffset; + } + if (schemaBuilder != null) { + registration.register(schemaBuilder, converter); + LOGGER.info("register converter for sqlType {} to schema {}", sqlType, schemaBuilder.name()); + } + } + + private String convertDate(Object input) { + if (input instanceof java.sql.Date) { + return dateFormatter.format(((java.sql.Date) input).toLocalDate()); + } + return input == null ? null : input.toString(); + } + + private String convertTime(Object input) { + if (input instanceof java.sql.Time) { + return timeFormatter.format(((java.sql.Time) input).toLocalTime()); + } else if (input instanceof java.sql.Timestamp) { + return timeFormatter.format(((java.sql.Timestamp) input).toLocalDateTime().toLocalTime()); + } + return input == null ? null : input.toString(); + } + + private String convertDateTime(Object input) { + if (input instanceof java.sql.Timestamp) { + return datetimeFormatter.format(((java.sql.Timestamp) input).toLocalDateTime()); + } + return input == null ? null : input.toString(); + } + + private String convertDateTimeOffset(Object input) { + if (input instanceof microsoft.sql.DateTimeOffset) { + microsoft.sql.DateTimeOffset dateTimeOffset = (microsoft.sql.DateTimeOffset) input; + return datetimeFormatter.format( + dateTimeOffset.getOffsetDateTime().withOffsetSameInstant(defalutZoneOffset).toLocalDateTime()); + } + return input == null ? null : input.toString(); + } +} diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java index 778c2710c44..0dc8b71be48 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java @@ -17,74 +17,196 @@ package org.apache.inlong.agent.plugin.sources; +import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; -import org.apache.inlong.agent.metrics.AgentMetricItem; -import org.apache.inlong.agent.metrics.AgentMetricItemSet; -import org.apache.inlong.common.metric.MetricItem; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.constant.CommonConstants; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.core.task.OffsetManager; +import org.apache.inlong.agent.core.task.TaskManager; +import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; +import org.apache.inlong.agent.store.Store; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.common.enums.TaskStateEnum; import org.apache.inlong.common.metric.MetricRegister; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; -import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import java.util.concurrent.atomic.AtomicLong; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.powermock.api.mockito.PowerMockito.mockStatic; import static org.powermock.api.mockito.PowerMockito.when; -import static org.powermock.api.mockito.PowerMockito.whenNew; -import static org.powermock.api.support.membermodification.MemberMatcher.field; /** * Test cases for {@link SQLServerSource}. */ @RunWith(PowerMockRunner.class) -@PrepareForTest({SQLServerSource.class, MetricRegister.class}) +@PrepareForTest({DebeziumEngine.class, Executors.class, SQLServerSource.class, MetricRegister.class}) @PowerMockIgnore({"javax.management.*"}) public class TestSQLServerSource { + private SQLServerSource source; + + private static AgentBaseTestsHelper helper; + // task basic store + private static Store taskBasicStore; + // instance basic store + private static Store instanceBasicStore; + // offset basic store + private static Store offsetBasicStore; + + InstanceProfile instanceProfile; + @Mock - TaskProfile jobProfile; + private DebeziumEngine.Builder builder; @Mock - private AgentMetricItemSet agentMetricItemSet; + private ExecutorService executorService; @Mock - private AgentMetricItem agentMetricItem; + DebeziumEngine.RecordCommitter> committer; - private AtomicLong sourceSuccessCount; + @Mock + private DebeziumEngine> engine; + + private BlockingQueue queue; - private AtomicLong sourceFailCount; + private final String instanceId = "s4bc475560b4444dbd4e9812ab1fd64d"; @Before public void setup() throws Exception { - sourceSuccessCount = new AtomicLong(0); - sourceFailCount = new AtomicLong(0); - - // mock metrics - whenNew(AgentMetricItemSet.class).withArguments(anyString()).thenReturn(agentMetricItemSet); - when(agentMetricItemSet.findMetricItem(any())).thenReturn(agentMetricItem); - field(AgentMetricItem.class, "sourceSuccessCount").set(agentMetricItem, sourceSuccessCount); - field(AgentMetricItem.class, "sourceFailCount").set(agentMetricItem, sourceFailCount); - PowerMockito.mockStatic(MetricRegister.class); - PowerMockito.doNothing().when( - MetricRegister.class, "register", any(MetricItem.class)); + + helper = new AgentBaseTestsHelper(TestSQLServerSource.class.getName()).setupAgentHome(); + taskBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK); + instanceBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE); + offsetBasicStore = + TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_OFFSET); + OffsetManager.init(taskBasicStore, instanceBasicStore, offsetBasicStore); + // mock DebeziumEngine + mockStatic(DebeziumEngine.class); + when(DebeziumEngine.create(io.debezium.engine.format.Json.class)).thenReturn(builder); + when(builder.using(any(Properties.class))).thenReturn(builder); + when(builder.notifying(any(DebeziumEngine.ChangeConsumer.class))).thenReturn(builder); + when(builder.using(any(DebeziumEngine.CompletionCallback.class))).thenReturn(builder); + when(builder.build()).thenReturn(engine); + + doNothing().when(committer).markProcessed(any(ChangeEvent.class)); + doNothing().when(committer).markBatchFinished(); + + // mock executorService + mockStatic(Executors.class); + when(Executors.newSingleThreadExecutor()).thenReturn(executorService); + + getSource(); + // init source debeziumQueue + Field field = SQLServerSource.class.getDeclaredField("debeziumQueue"); + field.setAccessible(true); + queue = (BlockingQueue) field.get(source); + } + + private SQLServerSource getSource() { + final String username = "SA"; + final String password = "123456"; + final String hostname = "127.0.0.1"; + final String port = "1434"; + final String groupId = "group01"; + final String streamId = "stream01"; + final String dbName = "inlong"; + final String schemaName = "dbo"; + final String tableName = "test_source"; + final String serverName = "server-01"; + + TaskProfile taskProfile = helper.getTaskProfile(1, "", false, 0L, 0L, TaskStateEnum.RUNNING, "D", + "GMT+8:00"); + instanceProfile = taskProfile.createInstanceProfile("", + "", taskProfile.getCycleUnit(), "20240725", AgentUtils.getCurrentTime()); + instanceProfile.set(CommonConstants.PROXY_INLONG_GROUP_ID, groupId); + instanceProfile.set(CommonConstants.PROXY_INLONG_STREAM_ID, streamId); + instanceProfile.set(TaskConstants.TASK_SQLSERVER_USER, username); + instanceProfile.set(TaskConstants.TASK_SQLSERVER_PASSWORD, password); + instanceProfile.set(TaskConstants.TASK_SQLSERVER_HOSTNAME, hostname); + instanceProfile.set(TaskConstants.TASK_SQLSERVER_PORT, port); + instanceProfile.set(TaskConstants.TASK_SQLSERVER_DB_NAME, dbName); + instanceProfile.set(TaskConstants.TASK_SQLSERVER_SCHEMA_NAME, schemaName); + instanceProfile.set(TaskConstants.TASK_SQLSERVER_TABLE_NAME, tableName); + instanceProfile.set(TaskConstants.TASK_SQLSERVER_SERVER_NAME, serverName); + instanceProfile.set(TaskConstants.TASK_AUDIT_VERSION, "0"); + instanceProfile.setInstanceId(instanceId); + + (source = new SQLServerSource()).init(instanceProfile); + return source; } - /** - * Test cases for . - */ @Test - public void testSplit() { + public void testSQLServerSource() throws Exception { + testHandleConsumerEvent(); + TestReadDataFromSource(); + TestReadEmptyFromSource(); + } + + // test DebeziumEngine get one recode from SQLServer + private void testHandleConsumerEvent() throws Exception { + List> records = new ArrayList<>(); + records.add(new ChangeEvent() { + + @Override + public String key() { + return "KEY"; + } + + @Override + public String value() { + return "VALUE"; + } + + @Override + public String destination() { + return null; + } + }); + Method handleConsumerEvent = SQLServerSource.class.getDeclaredMethod("handleConsumerEvent", List.class, + DebeziumEngine.RecordCommitter.class); + handleConsumerEvent.setAccessible(true); + handleConsumerEvent.invoke(source, records, committer); + assertEquals(1, queue.size()); + } + + // test read one source data from queue + private void TestReadDataFromSource() throws Exception { + Method handleConsumerEvent = SQLServerSource.class.getDeclaredMethod("readFromSource"); + handleConsumerEvent.setAccessible(true); + + List result = (List) handleConsumerEvent.invoke(source); + assertFalse(result.isEmpty()); + assertTrue(queue.isEmpty()); + } + + // test read + private void TestReadEmptyFromSource() throws Exception { + Method handleConsumerEvent = SQLServerSource.class.getDeclaredMethod("readFromSource"); + handleConsumerEvent.setAccessible(true); - // build mock - final SQLServerSource source = new SQLServerSource(); - // assert - // assertEquals(1, source.split(jobProfile).size()); + queue.clear(); + List result = (List) handleConsumerEvent.invoke(source); + assertTrue(result.isEmpty()); + assertTrue(queue.isEmpty()); } }