diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MongoTask.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MongoTask.java index f0440feb8d..a8c183c62f 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MongoTask.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MongoTask.java @@ -29,24 +29,8 @@ public class MongoTask { private String user; private String password; private String databaseIncludeList; - private String databaseExcludeList; private String collectionIncludeList; - private String collectionExcludeList; - private String fieldExcludeList; - private String connectTimeoutInMs; - private String queueSize; - private String cursorMaxAwaitTimeInMs; - private String socketTimeoutInMs; - private String selectionTimeoutInMs; - private String fieldRenames; - private String membersAutoDiscover; - private String connectMaxAttempts; - private String connectBackoffMaxDelayInMs; - private String connectBackoffInitialDelayInMs; - private String initialSyncMaxThreads; - private String sslInvalidHostnameAllowed; - private String sslEnabled; - private String pollIntervalInMs; + private String snapshotMode; private Snapshot snapshot; private Capture capture; private Offset offset; @@ -87,25 +71,8 @@ public static class MongoTaskConfig { private String username; private String password; - private String databaseIncludeList; - private String databaseExcludeList; - private String collectionIncludeList; - private String collectionExcludeList; - private String fieldExcludeList; - private String connectTimeoutInMs; - private String queueSize; - private String cursorMaxAwaitTimeInMs; - private String socketTimeoutInMs; - private String selectionTimeoutInMs; - private String fieldRenames; - private String membersAutoDiscover; - private String connectMaxAttempts; - private String connectBackoffMaxDelayInMs; - private String connectBackoffInitialDelayInMs; - private String initialSyncMaxThreads; - private String sslInvalidHostnameAllowed; - private String sslEnabled; - private String pollIntervalInMs; + private String database; + private String collection; private String snapshotMode; private String captureMode; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java index baf9e0c64b..0b6d5bcc2a 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java @@ -49,6 +49,7 @@ public class TaskProfileDto { public static final String DEFAULT_FILE_TASK = "org.apache.inlong.agent.plugin.task.file.LogFileTask"; public static final String DEFAULT_KAFKA_TASK = "org.apache.inlong.agent.plugin.task.KafkaTask"; public static final String DEFAULT_PULSAR_TASK = "org.apache.inlong.agent.plugin.task.PulsarTask"; + public static final String DEFAULT_MONGODB_TASK = "org.apache.inlong.agent.plugin.task.MongoDBTask"; public static final String DEFAULT_CHANNEL = "org.apache.inlong.agent.plugin.channel.MemoryChannel"; public static final String MANAGER_JOB = "MANAGER_JOB"; public static final String DEFAULT_DATA_PROXY_SINK = "org.apache.inlong.agent.plugin.sinks.ProxySink"; @@ -283,25 +284,9 @@ private static MongoTask getMongoTask(DataConfig dataConfigs) { mongoTask.setHosts(config.getHosts()); mongoTask.setUser(config.getUsername()); mongoTask.setPassword(config.getPassword()); - mongoTask.setDatabaseIncludeList(config.getDatabaseIncludeList()); - mongoTask.setDatabaseExcludeList(config.getDatabaseExcludeList()); - mongoTask.setCollectionIncludeList(config.getCollectionIncludeList()); - mongoTask.setCollectionExcludeList(config.getCollectionExcludeList()); - mongoTask.setFieldExcludeList(config.getFieldExcludeList()); - mongoTask.setConnectTimeoutInMs(config.getConnectTimeoutInMs()); - mongoTask.setQueueSize(config.getQueueSize()); - mongoTask.setCursorMaxAwaitTimeInMs(config.getCursorMaxAwaitTimeInMs()); - mongoTask.setSocketTimeoutInMs(config.getSocketTimeoutInMs()); - mongoTask.setSelectionTimeoutInMs(config.getSelectionTimeoutInMs()); - mongoTask.setFieldRenames(config.getFieldRenames()); - mongoTask.setMembersAutoDiscover(config.getMembersAutoDiscover()); - mongoTask.setConnectMaxAttempts(config.getConnectMaxAttempts()); - mongoTask.setConnectBackoffMaxDelayInMs(config.getConnectBackoffMaxDelayInMs()); - mongoTask.setConnectBackoffInitialDelayInMs(config.getConnectBackoffInitialDelayInMs()); - mongoTask.setInitialSyncMaxThreads(config.getInitialSyncMaxThreads()); - mongoTask.setSslInvalidHostnameAllowed(config.getSslInvalidHostnameAllowed()); - mongoTask.setSslEnabled(config.getSslEnabled()); - mongoTask.setPollIntervalInMs(config.getPollIntervalInMs()); + mongoTask.setDatabaseIncludeList(config.getDatabase()); + mongoTask.setCollectionIncludeList(config.getCollection()); + mongoTask.setSnapshotMode(config.getSnapshotMode()); MongoTask.Offset offset = new MongoTask.Offset(); offset.setFilename(config.getOffsetFilename()); @@ -511,6 +496,7 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) { profileDto.setTask(task); break; case MONGODB: + task.setTaskClass(DEFAULT_MONGODB_TASK); MongoTask mongoTask = getMongoTask(dataConfig); task.setMongoTask(mongoTask); task.setSource(MONGO_SOURCE); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/MongoDBInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/MongoDBInstance.java new file mode 100644 index 0000000000..d393d1e0b7 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/MongoDBInstance.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.instance; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.constant.TaskConstants; + +public class MongoDBInstance extends CommonInstance { + + @Override + public void setInodeInfo(InstanceProfile profile) { + profile.set(TaskConstants.INODE_INFO, ""); + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java index 555d87b412..74fc50e111 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java @@ -17,56 +17,159 @@ package org.apache.inlong.agent.plugin.sources; +import org.apache.inlong.agent.common.AgentThreadFactory; +import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; -import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.except.FileException; import org.apache.inlong.agent.plugin.file.Reader; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; -import org.apache.inlong.agent.plugin.sources.reader.MongoDBReader; +import io.debezium.connector.mongodb.MongoDbConnector; +import io.debezium.connector.mongodb.MongoDbConnectorConfig; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.DebeziumEngine.RecordCommitter; +import io.debezium.engine.format.Json; +import io.debezium.engine.spi.OffsetCommitPolicy; +import org.apache.kafka.connect.storage.FileOffsetBackingStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; -/** - * MongoDBSource : mongo source, split mongo source job into multi readers - */ public class MongoDBSource extends AbstractSource { private static final Logger LOGGER = LoggerFactory.getLogger(MongoDBSource.class); + private static final Integer DEBEZIUM_QUEUE_SIZE = 100; + private ExecutorService executor; + public InstanceProfile profile; + private BlockingQueue debeziumQueue; + private final Properties props = new Properties(); + private String database; + private String collection; + private String snapshotMode; + + private boolean isRestoreFromDB = false; + + public MongoDBSource() { + } + + @Override + public void init(InstanceProfile profile) { + try { + LOGGER.info("MongoDBSource init: {}", profile.toJsonStr()); + this.profile = profile; + super.init(profile); + debeziumQueue = new LinkedBlockingQueue<>(DEBEZIUM_QUEUE_SIZE); + database = profile.get(TaskConstants.TASK_MONGO_DATABASE_INCLUDE_LIST); + collection = profile.get(TaskConstants.TASK_MONGO_COLLECTION_INCLUDE_LIST); + snapshotMode = profile.get(TaskConstants.TASK_MONGO_SNAPSHOT_MODE, "initial"); + + props.setProperty("name", "MongoDB-" + instanceId); + props.setProperty("connector.class", MongoDbConnector.class.getName()); + props.setProperty("offset.storage", FileOffsetBackingStore.class.getName()); + String agentPath = AgentConfiguration.getAgentConf() + .get(AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME); + String offsetPath = agentPath + "/" + getThreadName() + "offset.dat"; + props.setProperty("offset.storage.file.filename", offsetPath); + + props.setProperty(String.valueOf(MongoDbConnectorConfig.LOGICAL_NAME), "agent-mongoDB-" + instanceId); + props.setProperty(String.valueOf(MongoDbConnectorConfig.HOSTS), + profile.get(TaskConstants.TASK_MONGO_HOSTS)); + props.setProperty(String.valueOf(MongoDbConnectorConfig.USER), profile.get(TaskConstants.TASK_MONGO_USER)); + props.setProperty(String.valueOf(MongoDbConnectorConfig.PASSWORD), + profile.get(TaskConstants.TASK_MONGO_PASSWORD)); + props.setProperty(String.valueOf(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST), database); + props.setProperty(String.valueOf(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST), collection); + props.setProperty(String.valueOf(MongoDbConnectorConfig.SNAPSHOT_MODE), snapshotMode); + + executor = Executors.newSingleThreadExecutor(); + executor.execute(startDebeziumEngine()); + + } catch (Exception ex) { + stopRunning(); + throw new FileException("error init stream for " + collection, ex); + } + } + + private Runnable startDebeziumEngine() { + return () -> { + AgentThreadFactory.nameThread(getThreadName() + "debezium"); + try (DebeziumEngine> debeziumEngine = DebeziumEngine.create(Json.class) + .using(props) + .using(OffsetCommitPolicy.always()) + .notifying(this::handleConsumerEvent) + .build()) { + + debeziumEngine.run(); + } catch (Throwable e) { + LOGGER.error("do run error in mongoDB debezium: ", e); + } + }; + } + + private void handleConsumerEvent(List> records, + RecordCommitter> committer) throws InterruptedException { + boolean offerSuc = false; + for (ChangeEvent record : records) { + SourceData sourceData = new SourceData(record.value().getBytes(StandardCharsets.UTF_8), 0L); + while (isRunnable() && !offerSuc) { + offerSuc = debeziumQueue.offer(sourceData, 1, TimeUnit.SECONDS); + } + committer.markProcessed(record); + } + committer.markBatchFinished(); + } @Override public List split(TaskProfile conf) { - MongoDBReader mongoDBReader = new MongoDBReader(); - List readerList = Collections.singletonList(mongoDBReader); - sourceMetric.sourceSuccessCount.incrementAndGet(); - return readerList; + return null; } @Override protected String getThreadName() { - return null; + return "mongo-source-" + taskId + "-" + instanceId; } @Override protected void printCurrentState() { - + LOGGER.info("mongo collection is {}", collection); } @Override protected boolean doPrepareToRead() { - return false; + return true; } @Override protected List readFromSource() { - return null; - } - - @Override - public Message read() { - return null; + List dataList = new ArrayList<>(); + try { + int size = 0; + while (size < BATCH_READ_LINE_TOTAL_LEN) { + SourceData sourceData = debeziumQueue.poll(1, TimeUnit.SECONDS); + if (sourceData != null) { + size += sourceData.getData().length; + dataList.add(sourceData); + } else { + break; + } + } + } catch (InterruptedException e) { + LOGGER.error("poll {} data from debezium queue interrupted.", instanceId); + } + return dataList; } @Override @@ -76,16 +179,12 @@ protected boolean isRunnable() { @Override protected void releaseSource() { - - } - - @Override - public boolean sourceFinish() { - return false; + LOGGER.info("release mongo source"); + executor.shutdownNow(); } @Override public boolean sourceExist() { - return false; + return true; } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java new file mode 100644 index 0000000000..6081cc388c --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.task; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.CycleUnitType; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.core.instance.ActionType; +import org.apache.inlong.agent.core.instance.InstanceAction; +import org.apache.inlong.agent.core.instance.InstanceManager; +import org.apache.inlong.agent.core.task.TaskManager; +import org.apache.inlong.agent.db.Db; +import org.apache.inlong.agent.metrics.audit.AuditUtils; +import org.apache.inlong.agent.plugin.file.Task; +import org.apache.inlong.agent.state.State; +import org.apache.inlong.agent.utils.AgentUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB; + +public class MongoDBTask extends Task { + + private static final Logger LOGGER = LoggerFactory.getLogger(MongoDBTask.class); + public static final String DEFAULT_MONGODB_INSTANCE = "org.apache.inlong.agent.plugin.instance.MongoDBInstance"; + public static final int CORE_THREAD_SLEEP_TIME = 5000; + public static final int CORE_THREAD_PRINT_TIME = 10000; + + private TaskProfile taskProfile; + private Db basicDb; + private TaskManager taskManager; + private InstanceManager instanceManager; + private long lastPrintTime = 0; + private boolean initOK = false; + private volatile boolean running = false; + private boolean isAdded = false; + private boolean isRestoreFromDB = false; + + private String database; + private String collection; + + @Override + public void init(Object srcManager, TaskProfile taskProfile, Db basicDb) throws IOException { + taskManager = (TaskManager) srcManager; + commonInit(taskProfile, basicDb); + initOK = true; + } + + private void commonInit(TaskProfile taskProfile, Db basicDb) { + LOGGER.info("mongoDB commonInit: {}", taskProfile.toJsonStr()); + this.taskProfile = taskProfile; + this.basicDb = basicDb; + this.database = taskProfile.get(TaskConstants.TASK_MONGO_DATABASE_INCLUDE_LIST); + this.collection = taskProfile.get(TaskConstants.TASK_MONGO_COLLECTION_INCLUDE_LIST); + this.isRestoreFromDB = taskProfile.getBoolean(RESTORE_FROM_DB, false); + instanceManager = new InstanceManager(taskProfile.getTaskId(), 1, + basicDb, taskManager.getTaskDb()); + try { + instanceManager.start(); + } catch (Exception e) { + LOGGER.error("start instance manager error: ", e); + } + } + + @Override + public void destroy() { + doChangeState(State.SUCCEEDED); + if (instanceManager != null) { + instanceManager.stop(); + } + } + + @Override + public TaskProfile getProfile() { + return taskProfile; + } + + @Override + public String getTaskId() { + if (taskProfile == null) { + return ""; + } + return taskProfile.getTaskId(); + } + + @Override + public boolean isProfileValid(TaskProfile profile) { + if (!profile.allRequiredKeyExist()) { + LOGGER.error("task profile needs all required key"); + return false; + } + + return true; + } + + @Override + public void addCallbacks() { + + } + + @Override + public void run() { + Thread.currentThread().setName("mongoDB-task-core-" + getTaskId()); + running = true; + try { + doRun(); + } catch (Throwable e) { + LOGGER.error("do run error: ", e); + } + running = false; + } + + private void doRun() { + while (!isFinished()) { + if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_TIME) { + LOGGER.info("mongoDB task running! taskId {}", getTaskId()); + lastPrintTime = AgentUtils.getCurrentTime(); + } + AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); + if (!initOK) { + continue; + } + + // Add instance profile to instance manager + addInstanceProfile(); + + String inlongGroupId = taskProfile.getInlongGroupId(); + String inlongStreamId = taskProfile.getInlongStreamId(); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT, inlongGroupId, inlongStreamId, + AgentUtils.getCurrentTime(), 1, 1); + } + } + + private void addInstanceProfile() { + if (isAdded) { + return; + } + String dataTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH")); + InstanceProfile instanceProfile = taskProfile.createInstanceProfile(DEFAULT_MONGODB_INSTANCE, collection, + CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime()); + LOGGER.info("taskProfile.createInstanceProfile: {}", instanceProfile.toJsonStr()); + InstanceAction action = new InstanceAction(ActionType.ADD, instanceProfile); + while (!isFinished() && !instanceManager.submitAction(action)) { + LOGGER.error("instance manager action queue is full: taskId {}", instanceManager.getTaskId()); + AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); + } + this.isAdded = true; + } +}