Skip to content

Commit

Permalink
refer to flink pr(apache/flink-cdc#3065 and apache/flink-cdc#2220 )fi…
Browse files Browse the repository at this point in the history
…x method
  • Loading branch information
jw-itq committed Jan 19, 2025
1 parent f23c6da commit d4f04b2
Showing 1 changed file with 52 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
/*
* Copyright Debezium Authors.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.debezium.connector.mysql;

import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.ErrorMessageUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
Expand Down Expand Up @@ -82,7 +95,17 @@

import static io.debezium.util.Strings.isNullOrEmpty;

/** @author Jiri Pechanec */
/**
* Copied from Debezium project(1.9.8.Final) to fix
* https://github.com/ververica/flink-cdc-connectors/issues/1944.
*
* <p>Line 1427-1433 : Adjust GTID merging logic to support recovering from job which previously
* specifying starting offset on start.
*
* <p>Line 1485 : Add more error details for some exceptions.
*
* @author Jiri Pechanec
*/
public class MySqlStreamingChangeEventSource
implements StreamingChangeEventSource<MySqlPartition, MySqlOffsetContext> {

Expand Down Expand Up @@ -118,6 +141,7 @@ public class MySqlStreamingChangeEventSource
@SingleThreadAccess("binlog client thread")
private Instant eventTimestamp;

/** Describe binlog position. */
public static class BinlogPosition {
final String filename;
final long position;
Expand Down Expand Up @@ -174,7 +198,7 @@ public boolean equals(Object obj) {
}

@FunctionalInterface
private static interface BinlogChangeEmitter<T> {
private interface BinlogChangeEmitter<T> {
void emit(TableId tableId, T data) throws InterruptedException;
}

Expand Down Expand Up @@ -258,7 +282,8 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {

// DBZ-5126 Clean cache on rotate event to prevent it from growing
// indefinitely.
if (event.getHeader().getEventType() == EventType.ROTATE) {
if (event.getHeader().getEventType() == EventType.ROTATE
&& event.getHeader().getTimestamp() != 0) {
tableMapEventByTableId.clear();
}
return event;
Expand Down Expand Up @@ -536,10 +561,10 @@ protected void handleRotateLogsEvent(MySqlOffsetContext offsetContext, Event eve
* processed the binlog to this point.
*
* <p>Note that this captures the current GTID and complete GTID set, regardless of whether the
* connector is {@link MySqlTaskContext#gtidSourceFilter() filtering} the GTID set upon
* connection. We do this because we actually want to capture all GTID set values found in the
* binlog, whether or not we process them. However, only when we connect do we actually want to
* pass to MySQL only those GTID ranges that are applicable per the configuration.
* connector is the GTID set upon connection. We do this because we actually want to capture all
* GTID set values found in the binlog, whether or not we process them. However, only when we
* connect do we actually want to pass to MySQL only those GTID ranges that are applicable per
* the configuration.
*
* @param event the GTID event to be processed; may not be null
*/
Expand Down Expand Up @@ -1404,11 +1429,23 @@ public GtidSet filterGtidSet(
LOGGER.info(
"Relevant GTID set available on server: {}", relevantAvailableServerGtidSet);

// Since the GTID recorded in the checkpoint represents the CDC-executed records, in
// certain scenarios
// (such as when the startup mode is earliest/timestamp/binlogfile), the recorded GTID
// may not start from
// the beginning. For example, A:300-500. However, during job recovery, we usually only
// need to focus on
// the last consumed point instead of consuming A:1-299. Therefore, some adjustments
// need to be made to the
// recorded offset in the checkpoint, and the available GTID for other MySQL instances
// should be completed.
mergedGtidSet =
relevantAvailableServerGtidSet
.retainAll(uuid -> knownGtidSet.forServerWithId(uuid) != null)
.with(purgedServerGtid)
.with(filteredGtidSet);
GtidUtils.fixRestoredGtidSet(
GtidUtils.mergeGtidSetInto(
relevantAvailableServerGtidSet.retainAll(
uuid -> knownGtidSet.forServerWithId(uuid) != null),
purgedServerGtid),
filteredGtidSet);
} else {
mergedGtidSet = availableServerGtidSet.with(filteredGtidSet);
}
Expand Down Expand Up @@ -1462,9 +1499,11 @@ protected DebeziumException wrap(Throwable error) {
+ e.getSQLState()
+ ".";
}
msg = ErrorMessageUtils.optimizeErrorMessage(msg);
return new DebeziumException(msg, error);
}

/** LifecycleListener for Reader Thread. */
protected final class ReaderThreadLifecycleListener implements LifecycleListener {
private final MySqlOffsetContext offsetContext;

Expand Down

0 comments on commit d4f04b2

Please sign in to comment.