Skip to content

Commit

Permalink
[INLONG-10289][Agent] Update the SQLServer Source (#10735)
Browse files Browse the repository at this point in the history
  • Loading branch information
zoy0 authored Aug 2, 2024
1 parent bd4c3db commit 590e46e
Show file tree
Hide file tree
Showing 8 changed files with 585 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,17 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_MQTT_AUTOMATIC_RECONNECT = "task.mqttTask.automaticReconnect";
public static final String TASK_MQTT_VERSION = "task.mqttTask.mqttVersion";

// SQLServer task
public static final String TASK_SQLSERVER_HOSTNAME = "task.sqlserverTask.hostname";
public static final String TASK_SQLSERVER_PORT = "task.sqlserverTask.port";
public static final String TASK_SQLSERVER_USER = "task.sqlserverTask.user";
public static final String TASK_SQLSERVER_PASSWORD = "task.sqlserverTask.password";
public static final String TASK_SQLSERVER_DB_NAME = "task.sqlserverTask.dbname";
public static final String TASK_SQLSERVER_SNAPSHOT_MODE = "task.sqlserverTask.snapshot.mode";
public static final String TASK_SQLSERVER_SERVER_NAME = "task.sqlserverTask.serverName";
public static final String TASK_SQLSERVER_SCHEMA_NAME = "task.sqlserverTask.schemaName";
public static final String TASK_SQLSERVER_TABLE_NAME = "task.sqlserverTask.tableName";

public static final String TASK_STATE = "task.state";

public static final String INSTANCE_STATE = "instance.state";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public class SqlServerTask {
private String port;
private String serverName;
private String dbname;
private String schemaName;
private String tableName;
private String serverTimezone;

private SqlServerTask.Snapshot snapshot;
private SqlServerTask.Offset offset;
Expand Down Expand Up @@ -63,6 +66,8 @@ public static class SqlserverTaskConfig {
private String port;
private String database;
private String schemaName;
private String tableName;
private String serverTimezone;

private String snapshotMode;
private String intervalMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class TaskProfileDto {
public static final String DEFAULT_MONGODB_TASK = "org.apache.inlong.agent.plugin.task.MongoDBTask";
public static final String DEFAULT_POSTGRESQL_TASK = "org.apache.inlong.agent.plugin.task.PostgreSQLTask";
public static final String DEFAULT_MQTT_TASK = "org.apache.inlong.agent.plugin.task.MqttTask";
public static final String DEFAULT_SQLSERVER_TASK = "org.apache.inlong.agent.plugin.task.SQLServerTask";
public static final String DEFAULT_CHANNEL = "org.apache.inlong.agent.plugin.channel.MemoryChannel";
public static final String MANAGER_JOB = "MANAGER_JOB";
public static final String DEFAULT_DATA_PROXY_SINK = "org.apache.inlong.agent.plugin.sinks.ProxySink";
Expand Down Expand Up @@ -343,6 +344,9 @@ private static SqlServerTask getSqlServerTask(DataConfig dataConfigs) {
sqlServerTask.setPort(config.getPort());
sqlServerTask.setServerName(config.getSchemaName());
sqlServerTask.setDbname(config.getDatabase());
sqlServerTask.setSchemaName(config.getSchemaName());
sqlServerTask.setTableName(config.getSchemaName() + "." + config.getTableName());
sqlServerTask.setServerTimezone(config.getServerTimezone());

SqlServerTask.Offset offset = new SqlServerTask.Offset();
offset.setFilename(config.getOffsetFilename());
Expand Down Expand Up @@ -495,6 +499,7 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
profileDto.setTask(task);
break;
case SQLSERVER:
task.setTaskClass(DEFAULT_SQLSERVER_TASK);
SqlServerTask sqlserverTask = getSqlServerTask(dataConfig);
task.setSqlserverTask(sqlserverTask);
task.setSource(SQLSERVER_SOURCE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.plugin.instance;

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.TaskConstants;

import java.io.IOException;

public class SQLServerInstance extends CommonInstance {

@Override
public void setInodeInfo(InstanceProfile profile) throws IOException {
profile.set(TaskConstants.INODE_INFO, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,66 +17,184 @@

package org.apache.inlong.agent.plugin.sources;

import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.SqlServerConstants;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
import org.apache.inlong.agent.plugin.sources.reader.SQLServerReader;

import io.debezium.connector.sqlserver.SqlServerConnector;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.relational.history.FileDatabaseHistory;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* SQLServer source
*/
public class SQLServerSource extends AbstractSource {

private static final Logger logger = LoggerFactory.getLogger(SQLServerSource.class);
private static final Logger LOGGER = LoggerFactory.getLogger(SQLServerSource.class);
private static final Integer DEBEZIUM_QUEUE_SIZE = 100;
private ExecutorService executor;
public InstanceProfile profile;
private BlockingQueue<SourceData> debeziumQueue;
private final Properties props = new Properties();
private String dbName;
private String schemaName;
private String tableName;

public SQLServerSource() {
}

@Override
public List<Reader> split(TaskProfile conf) {
SQLServerReader sqlServerReader = new SQLServerReader();
List<Reader> readerList = new ArrayList<>();
readerList.add(sqlServerReader);
sourceMetric.sourceSuccessCount.incrementAndGet();
return readerList;
protected void initSource(InstanceProfile profile) {
try {
LOGGER.info("SQLServerSource init: {}", profile.toJsonStr());
debeziumQueue = new LinkedBlockingQueue<>(DEBEZIUM_QUEUE_SIZE);

dbName = profile.get(TaskConstants.TASK_SQLSERVER_DB_NAME);
schemaName = profile.get(TaskConstants.TASK_SQLSERVER_SCHEMA_NAME);
tableName = profile.get(TaskConstants.TASK_SQLSERVER_TABLE_NAME);

props.setProperty("name", "SQLServer-" + instanceId);
props.setProperty("connector.class", SqlServerConnector.class.getName());
props.setProperty("offset.storage", FileOffsetBackingStore.class.getName());
String agentPath = AgentConfiguration.getAgentConf()
.get(AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME);
String offsetPath = agentPath + "/" + getThreadName() + "/offset.dat";
props.setProperty("offset.storage.file.filename", offsetPath);
props.setProperty("offset.flush.interval.ms", "10000");
props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName());
props.setProperty("database.history.file.filename", agentPath + "/" + getThreadName() + "/history.dat");
// ignore "schema" and extract data from "payload"
props.setProperty("key.converter.schemas.enable", "false");
props.setProperty("value.converter.schemas.enable", "false");
// ignore ddl
props.setProperty("include.schema.changes", "false");
// convert time to formatted string
props.setProperty("converters", "datetime");
props.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.SQLServerTimeConverter");
props.setProperty("datetime.format.date", "yyyy-MM-dd");
props.setProperty("datetime.format.time", "HH:mm:ss");
props.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss");
props.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss");

props.setProperty(String.valueOf(SqlServerConnectorConfig.HOSTNAME),
profile.get(TaskConstants.TASK_SQLSERVER_HOSTNAME));
props.setProperty(String.valueOf(SqlServerConnectorConfig.PORT),
profile.get(TaskConstants.TASK_SQLSERVER_PORT));
props.setProperty(String.valueOf(SqlServerConnectorConfig.USER),
profile.get(TaskConstants.TASK_SQLSERVER_USER));
props.setProperty(String.valueOf(SqlServerConnectorConfig.PASSWORD),
profile.get(TaskConstants.TASK_SQLSERVER_PASSWORD));
props.setProperty(String.valueOf(SqlServerConnectorConfig.DATABASE_NAME),
profile.get(TaskConstants.TASK_SQLSERVER_DB_NAME));
props.setProperty(String.valueOf(SqlServerConnectorConfig.SNAPSHOT_MODE),
profile.get(TaskConstants.TASK_SQLSERVER_SNAPSHOT_MODE, SqlServerConstants.INITIAL));
props.setProperty(String.valueOf(SqlServerConnectorConfig.SERVER_NAME),
profile.get(TaskConstants.TASK_SQLSERVER_SERVER_NAME));
props.setProperty(String.valueOf(SqlServerConnectorConfig.SCHEMA_INCLUDE_LIST),
profile.get(TaskConstants.TASK_SQLSERVER_SCHEMA_NAME));
props.setProperty(String.valueOf(SqlServerConnectorConfig.TABLE_INCLUDE_LIST),
profile.get(TaskConstants.TASK_SQLSERVER_TABLE_NAME));

executor = Executors.newSingleThreadExecutor();
executor.execute(startDebeziumEngine());
} catch (Exception ex) {
stopRunning();
throw new FileException("error init stream for " + instanceId, ex);
}
}

private Runnable startDebeziumEngine() {
return () -> {
AgentThreadFactory.nameThread(getThreadName() + "debezium");
try (DebeziumEngine<ChangeEvent<String, String>> debeziumEngine = DebeziumEngine.create(Json.class)
.using(props)
.using(OffsetCommitPolicy.always())
.notifying(this::handleConsumerEvent)
.build()) {

debeziumEngine.run();
} catch (Throwable e) {
LOGGER.error("do run error in SQLServer debezium: ", e);
}
};
}

private void handleConsumerEvent(List<ChangeEvent<String, String>> records,
DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer) throws InterruptedException {
for (ChangeEvent<String, String> record : records) {
boolean offerSuc = false;
SourceData sourceData = new SourceData(record.value().getBytes(StandardCharsets.UTF_8), "0");
while (isRunnable() && !offerSuc) {
offerSuc = debeziumQueue.offer(sourceData, 1, TimeUnit.SECONDS);
}
committer.markProcessed(record);
}
committer.markBatchFinished();
}

@Override
protected String getThreadName() {
public List<Reader> split(TaskProfile conf) {
return null;
}

@Override
protected void initSource(InstanceProfile profile) {

protected String getThreadName() {
return "SQLServer-source-" + taskId + "-" + instanceId;
}

@Override
protected void printCurrentState() {

LOGGER.info("sqlserver databases is {} and schema is {} and table is {}", dbName, schemaName, tableName);
}

@Override
protected boolean doPrepareToRead() {
return false;
return true;
}

@Override
protected List<SourceData> readFromSource() {
return null;
}

@Override
public Message read() {
return null;
List<SourceData> dataList = new ArrayList<>();
try {
int size = 0;
while (size < BATCH_READ_LINE_TOTAL_LEN) {
SourceData sourceData = debeziumQueue.poll(1, TimeUnit.SECONDS);
if (sourceData != null) {
LOGGER.info("readFromSource: {}", sourceData.getData());
size += sourceData.getData().length;
dataList.add(sourceData);
} else {
break;
}
}

} catch (InterruptedException e) {
LOGGER.error("poll {} data from debezium queue interrupted.", instanceId);
}
return dataList;
}

@Override
Expand All @@ -86,16 +204,12 @@ protected boolean isRunnable() {

@Override
protected void releaseSource() {

}

@Override
public boolean sourceFinish() {
return false;
LOGGER.info("release sqlserver source");
executor.shutdownNow();
}

@Override
public boolean sourceExist() {
return false;
return true;
}
}
}
Loading

0 comments on commit 590e46e

Please sign in to comment.