Skip to content

Commit

Permalink
[oracle][fix]fix testConsumingAllEvents bug, the incremental phase sk…
Browse files Browse the repository at this point in the history
…ips the data that has been read in the snapshot full phase
  • Loading branch information
molsionmo committed Jun 14, 2023
1 parent 859b153 commit a84e7b1
Show file tree
Hide file tree
Showing 7 changed files with 459 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
for (DataChangeEvent event : batch) {
if (shouldEmit(event.getRecord())) {
sourceRecords.add(event.getRecord());
} else {
LOG.info("{} data change event should not emit", event);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public CommonConnectorConfig getDbzConnectorConfig() {
}

public SchemaNameAdjuster getSchemaNameAdjuster() {
return null;
return schemaNameAdjuster;
}

public abstract RelationalDatabaseSchema getDatabaseSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
import com.ververica.cdc.connectors.base.utils.SourceRecordUtils;
import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfig;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import com.ververica.cdc.connectors.oracle.source.utils.OracleUtils;
Expand All @@ -46,19 +47,25 @@
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
import oracle.sql.ROWID;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.SQLException;
import java.sql.Types;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;

/** The context for fetch task that fetching data of snapshot split from Oracle data source. */
public class OracleSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
Expand Down Expand Up @@ -186,7 +193,49 @@ public OracleDatabaseSchema getDatabaseSchema() {

@Override
public RowType getSplitType(Table table) {
return OracleUtils.getSplitType(table);
OracleSourceConfig oracleSourceConfig = getSourceConfig();
// config chunk key column then return type of this column
if (Objects.nonNull(oracleSourceConfig.getChunkKeyColumn())) {
return OracleUtils.getSplitType(table);
}

// RowId is chunk key column by default
return OracleUtils.getSplitType(
Column.editor().jdbcType(Types.VARCHAR).name(ROWID.class.getSimpleName()).create());
}

@Override
public boolean isDataChangeRecord(SourceRecord record) {
return SourceRecordUtils.isDataChangeRecord(record);
}

@Override
public boolean isRecordBetween(SourceRecord record, Object[] splitStart, Object[] splitEnd) {
RowType splitKeyType =
getSplitType(getDatabaseSchema().tableFor(SourceRecordUtils.getTableId(record)));

// RowId is chunk key column by default, compare RowId
if (splitKeyType.getFieldNames().contains(ROWID.class.getSimpleName())) {
ConnectHeaders headers = (ConnectHeaders) record.headers();
ROWID rowId = null;
try {
rowId = new ROWID(headers.iterator().next().value().toString());
} catch (SQLException e) {
LOG.error("{} can not convert to RowId", record);
}
Object[] rowIds = new ROWID[] {rowId};
return SourceRecordUtils.splitKeyRangeContains(rowIds, splitStart, splitEnd);
} else {
// config chunk key column compare
Object[] key =
SourceRecordUtils.getSplitKey(splitKeyType, record, getSchemaNameAdjuster());
return SourceRecordUtils.splitKeyRangeContains(key, splitStart, splitEnd);
}
}

@Override
public TableId getTableId(SourceRecord record) {
return SourceRecordUtils.getTableId(record);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer;

import io.debezium.DebeziumException;
import io.debezium.connector.oracle.BaseChangeRecordEmitter;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.data.Envelope.Operation;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Table;
import io.debezium.relational.TableSchema;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.util.Clock;
import oracle.sql.ROWID;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;

/**
* Copied from https://github.com/debezium/debezium project
*
* <p>Emits change record based on a single {@link LogMinerDmlEntry} event.
*/
public class LogMinerChangeRecordEmitter extends BaseChangeRecordEmitter<Object> {

private static final Logger LOGGER = LoggerFactory.getLogger(LogMinerChangeRecordEmitter.class);

private final int operation;
private final Object[] oldValues;
private final Object[] newValues;
private final String rowId;

public LogMinerChangeRecordEmitter(
OffsetContext offset,
int operation,
Object[] oldValues,
Object[] newValues,
Table table,
Clock clock) {
super(offset, table, clock);
this.operation = operation;
this.oldValues = oldValues;
this.newValues = newValues;
this.rowId = null;
}

public LogMinerChangeRecordEmitter(
OffsetContext offset,
int operation,
Object[] oldValues,
Object[] newValues,
Table table,
Clock clock,
String rowId) {
super(offset, table, clock);
this.operation = operation;
this.oldValues = oldValues;
this.newValues = newValues;
this.rowId = rowId;
}

@Override
protected Operation getOperation() {
switch (operation) {
case RowMapper.INSERT:
return Operation.CREATE;
case RowMapper.UPDATE:
case RowMapper.SELECT_LOB_LOCATOR:
return Operation.UPDATE;
case RowMapper.DELETE:
return Operation.DELETE;
default:
throw new DebeziumException("Unsupported operation type: " + operation);
}
}

@Override
public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver)
throws InterruptedException {
TableSchema tableSchema = (TableSchema) schema;
Operation operation = getOperation();

switch (operation) {
case CREATE:
emitCreateRecord(receiver, tableSchema);
break;
case READ:
emitReadRecord(receiver, tableSchema);
break;
case UPDATE:
emitUpdateRecord(receiver, tableSchema);
break;
case DELETE:
emitDeleteRecord(receiver, tableSchema);
break;
case TRUNCATE:
emitTruncateRecord(receiver, tableSchema);
break;
default:
throw new IllegalArgumentException("Unsupported operation: " + operation);
}
}

@Override
protected void emitCreateRecord(Receiver receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] newColumnValues = getNewColumnValues();
Struct newKey = tableSchema.keyFromColumnData(newColumnValues);
Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct envelope =
tableSchema
.getEnvelopeSchema()
.create(
newValue,
getOffset().getSourceInfo(),
getClock().currentTimeAsInstant());
ConnectHeaders headers = new ConnectHeaders();
headers.add(ROWID.class.getSimpleName(), new SchemaAndValue(null, rowId));

if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {
// This case can be hit on UPDATE / DELETE when there's no primary key defined while
// using certain decoders
LOGGER.warn(
"no new values found for table '{}' from create message at '{}'; skipping record",
tableSchema,
getOffset().getSourceInfo());
return;
}
receiver.changeRecord(
tableSchema, Operation.CREATE, newKey, envelope, getOffset(), headers);
}

@Override
protected void emitReadRecord(Receiver receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] newColumnValues = getNewColumnValues();
Struct newKey = tableSchema.keyFromColumnData(newColumnValues);
Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct envelope =
tableSchema
.getEnvelopeSchema()
.read(
newValue,
getOffset().getSourceInfo(),
getClock().currentTimeAsInstant());
ConnectHeaders headers = new ConnectHeaders();
headers.add(ROWID.class.getSimpleName(), new SchemaAndValue(null, rowId));

receiver.changeRecord(tableSchema, Operation.READ, newKey, envelope, getOffset(), headers);
}

@Override
protected void emitUpdateRecord(Receiver receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] oldColumnValues = getOldColumnValues();
Object[] newColumnValues = getNewColumnValues();

Struct oldKey = tableSchema.keyFromColumnData(oldColumnValues);
Struct newKey = tableSchema.keyFromColumnData(newColumnValues);

Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);

if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {
LOGGER.warn(
"no new values found for table '{}' from update message at '{}'; skipping record",
tableSchema,
getOffset().getSourceInfo());
return;
}
// some configurations does not provide old values in case of updates
// in this case we handle all updates as regular ones
if (oldKey == null || Objects.equals(oldKey, newKey)) {
Struct envelope =
tableSchema
.getEnvelopeSchema()
.update(
oldValue,
newValue,
getOffset().getSourceInfo(),
getClock().currentTimeAsInstant());
ConnectHeaders headers = new ConnectHeaders();
headers.add(ROWID.class.getSimpleName(), new SchemaAndValue(null, rowId));
receiver.changeRecord(
tableSchema, Operation.UPDATE, newKey, envelope, getOffset(), headers);
}
// PK update -> emit as delete and re-insert with new key
else {
ConnectHeaders headers = new ConnectHeaders();
headers.add(PK_UPDATE_NEWKEY_FIELD, newKey, tableSchema.keySchema());

Struct envelope =
tableSchema
.getEnvelopeSchema()
.delete(
oldValue,
getOffset().getSourceInfo(),
getClock().currentTimeAsInstant());
receiver.changeRecord(
tableSchema, Operation.DELETE, oldKey, envelope, getOffset(), headers);

headers = new ConnectHeaders();
headers.add(PK_UPDATE_OLDKEY_FIELD, oldKey, tableSchema.keySchema());

headers.add(ROWID.class.getSimpleName(), new SchemaAndValue(null, rowId));

envelope =
tableSchema
.getEnvelopeSchema()
.create(
newValue,
getOffset().getSourceInfo(),
getClock().currentTimeAsInstant());
receiver.changeRecord(
tableSchema, Operation.CREATE, newKey, envelope, getOffset(), headers);
}
}

@Override
protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] oldColumnValues = getOldColumnValues();
Struct oldKey = tableSchema.keyFromColumnData(oldColumnValues);
Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);
ConnectHeaders headers = new ConnectHeaders();
headers.add(ROWID.class.getSimpleName(), new SchemaAndValue(null, rowId));

if (skipEmptyMessages() && (oldColumnValues == null || oldColumnValues.length == 0)) {
LOGGER.warn(
"no old values found for table '{}' from delete message at '{}'; skipping record",
tableSchema,
getOffset().getSourceInfo());
return;
}

Struct envelope =
tableSchema
.getEnvelopeSchema()
.delete(
oldValue,
getOffset().getSourceInfo(),
getClock().currentTimeAsInstant());
receiver.changeRecord(
tableSchema, Operation.DELETE, oldKey, envelope, getOffset(), headers);
}

@Override
protected Object[] getOldColumnValues() {
return oldValues;
}

@Override
protected Object[] getNewColumnValues() {
return newValues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ boolean commit(
event.getEntry().getOldValues(),
event.getEntry().getNewValues(),
schema.tableFor(event.getTableId()),
clock));
clock,
event.rowId));
}

lastCommittedScn = Scn.valueOf(scn.longValue());
Expand Down
Loading

0 comments on commit a84e7b1

Please sign in to comment.