Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [Oracle-CDC] cdb+pdb 模式, Unable to create a source for identifier 'Oracle-CDC', Can not find catalog table with factoryId [Oracle] #8463

Open
1 task done
chenyz1984 opened this issue Jan 6, 2025 · 10 comments
Labels

Comments

@chenyz1984
Copy link

chenyz1984 commented Jan 6, 2025

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

image

  1. 源端(10.0.6.53)为 Oracle 19c(启用 CDB),包含业务库 PDB1PDB1 中包含业务用户 province_lnjy与查询用户 lndata,查询用户可查询 province_lnjy 中的所有表。
  2. 目标端(10.0.6.53)为 MySQL 8.0 社区版
  3. Apache SeaTunnel 2.3.810.0.6.131) 以单机模式部署。通过 Oracle CDC Source 抽取 Oracle PDB1province_lnjy 的表数据,并通过 MySQL Sink 写入目标端 MySQL 库(lnjy_frontdb)中。
  4. 为方便调试排错,Sink 部分暂时为向 MySQL 端写入数据,而是直接将源端数据输出到 console 控制台。

SeaTunnel Version

2.3.8

SeaTunnel Config

1. 配置1

env {
  # You can set engine configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  Oracle-CDC {
    driver = "oracle.jdbc.driver.OracleDriver"
    base-url = "jdbc:oracle:thin:@//10.0.6.53:1521/pdb1" # 直接连接PDB
    username = "c##logminer"
    password = "logminer"
    database-names = ["PDB1"]           # 监控的 db 列表
    schema-names = ["PROVINCE_LNJY"]    # 监控的 schema 列表
    table-names = ["PDB1.PROVINCE_LNJY.MY_OBJECTS"] # 监控的 table 列表,需含库名前缀
    use_select_count = true
    result_table_name = "customers"
    source.reader.close.timeout = 120000
    connection.pool.size = 1
    schema-changes.enabled = true       # 启用 schema evolution 功能
  }
}
transform {
}
sink {
    Console { # 为便于排错,直接将上游数据输出到控制台
        source_table_name = "customers"
      }
}

此配置的DSN直接连接PDB,全量阶段可读取到表数据,增量阶段报错 `ORA-65040: operation not allowed from within a pluggable database`。经分析,是无法在 PDB 中执行 `sys.dbms_logmnr` 包,该包只能在CDB中执行。

2.配置2
```yml
env {
  # You can set engine configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  Oracle-CDC {
    driver = "oracle.jdbc.driver.OracleDriver"
    base-url = "jdbc:oracle:thin:@//10.0.6.53:1521/ora19c"
    username = "c##logminer"
    password = "logminer"
    database-names = ["PDB1"]           # 监控的 db 列表 
    schema-names = ["PROVINCE_LNJY"]    # 监控的 schema 列表
    table-names = ["PDB1.PROVINCE_LNJY.MY_OBJECTS"] # 监控的 table 列表,需含库名前缀
    use_select_count = true
    result_table_name = "customers"
    source.reader.close.timeout = 120000
    connection.pool.size = 1
    schema-changes.enabled = true       # 启用 schema evolution 功能 
  }
}
transform {
}
sink {
    Console {  # 为便于排错,直接将上游数据输出到控制台
        source_table_name = "customers"
      }
}

此配置的DSN连接到CDB根容器,报错内容为ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a source for identifier 'Oracle-CDC'org.apache.seatunnel.common.utils.SeaTunnelException: Can not find catalog table with factoryId [Oracle]



### Running Command

```shell
${SEATUNNEL_HOME}/bin/seatunnel.sh -m local --config ${SEATUNNEL_HOME}/jobs_config/stream_ora2fake.conf

Error Exception

1. 配置1(连接PDB)对应的报错


2025-01-06 16:45:25,653 ERROR [i.d.c.o.l.LogMinerHelper      ] [debezium-reader-0] - Mining session stopped due to the java.sql.SQLException: ORA-65040: operation not allowed from within a pluggable database
ORA-06512: at "SYS.DBMS_LOGMNR", line 82
ORA-06512: at line 1

2025-01-06 16:45:25,653 ERROR [i.d.p.ErrorHandler            ] [debezium-reader-0] - Producer failure
java.sql.SQLException: ORA-65040: operation not allowed from within a pluggable database
ORA-06512: at "SYS.DBMS_LOGMNR", line 82
ORA-06512: at line 1

	at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:494) ~[ojdbc8.jar:18.3.0.0.0]
	at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:446) ~[ojdbc8.jar:18.3.0.0.0]
	at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1052) ~[ojdbc8.jar:18.3.0.0.0]
	at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:537) ~[ojdbc8.jar:18.3.0.0.0]
	at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:255) ~[ojdbc8.jar:18.3.0.0.0]
	at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:610) ~[ojdbc8.jar:18.3.0.0.0]
	at oracle.jdbc.driver.T4CCallableStatement.doOall8(T4CCallableStatement.java:249) ~[ojdbc8.jar:18.3.0.0.0]
	at oracle.jdbc.driver.T4CCallableStatement.doOall8(T4CCallableStatement.java:82) ~[ojdbc8.jar:18.3.0.0.0]
	at oracle.jdbc.driver.T4CCallableStatement.executeForRows(T4CCallableStatement.java:924) ~[ojdbc8.jar:18.3.0.0.0]
	at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1136) ~[ojdbc8.jar:18.3.0.0.0]
	at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3640) ~[ojdbc8.jar:18.3.0.0.0]
	at oracle.jdbc.driver.T4CCallableStatement.executeInternal(T4CCallableStatement.java:1318) ~[ojdbc8.jar:18.3.0.0.0]
	at oracle.jdbc.driver.OraclePreparedStatement.execute(OraclePreparedStatement.java:3752) ~[ojdbc8.jar:18.3.0.0.0]
	at oracle.jdbc.driver.OracleCallableStatement.execute(OracleCallableStatement.java:4242) ~[ojdbc8.jar:18.3.0.0.0]
	at oracle.jdbc.driver.OraclePreparedStatementWrapper.execute(OraclePreparedStatementWrapper.java:1079) ~[ojdbc8.jar:18.3.0.0.0]
	at io.debezium.connector.oracle.logminer.LogMinerHelper.executeCallableStatement(LogMinerHelper.java:218) ~[?:?]
	at io.debezium.connector.oracle.logminer.LogMinerHelper.setLogFilesForMining(LogMinerHelper.java:87) ~[?:?]
	at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.initializeRedoLogsForMining(LogMinerStreamingChangeEventSource.java:412) ~[?:?]
	at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:184) ~[?:?]
	at org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.reader.fetch.logminer.OracleRedoLogFetchTask$RedoLogSplitReadTask.execute(OracleRedoLogFetchTask.java:147) ~[?:?]
	at org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.reader.fetch.logminer.OracleRedoLogFetchTask.execute(OracleRedoLogFetchTask.java:73) ~[?:?]
	at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:106) ~[?:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_422-422]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_422-422]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_422-422]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_422-422]
	at java.lang.Thread.run(Thread.java:750) [?:1.8.0_422-422]
Caused by: oracle.jdbc.OracleDatabaseException: ORA-65040: operation not allowed from within a pluggable database
ORA-06512: at "SYS.DBMS_LOGMNR", line 82
ORA-06512: at line 1


2. 配置2(连接CDB)对应的报错

```log
Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:213)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a source for identifier 'Oracle-CDC'.
	at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:101)
	at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSource(MultipleTableJobConfigParser.java:375)
	at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:209)
	at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:114)
	at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:182)
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:160)
	... 2 more
Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: Can not find catalog table with factoryId [Oracle]
	at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.lambda$getCatalogTables$0(CatalogTableUtil.java:131)
	at java.util.Optional.map(Optional.java:215)
	at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.getCatalogTables(CatalogTableUtil.java:118)
	at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.getCatalogTables(CatalogTableUtil.java:98)
	at org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.OracleIncrementalSourceFactory.lambda$createSource$1(OracleIncrementalSourceFactory.java:108)
	at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:113)
	at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:74)



### Zeta or Flink or Spark Version

Zeta

### Java or Scala Version

openjdk version "1.8.0_422-422"

### Screenshots

_No response_

### Are you willing to submit PR?

- [ ] Yes I am willing to submit a PR!

### Code of Conduct

- [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
@chenyz1984 chenyz1984 added the bug label Jan 6, 2025
@chenyz1984 chenyz1984 changed the title [Bug] [Oracle-CDC] Unable to create a source for identifier 'Oracle-CDC', Can not find catalog table with factoryId [Oracle] [Bug] [Oracle-CDC] cdb+pdb 模式, Unable to create a source for identifier 'Oracle-CDC', Can not find catalog table with factoryId [Oracle] Jan 6, 2025
@chenyz1984
Copy link
Author

Can’t anyone solve this problem?

@chenyz1984
Copy link
Author

issue 列表,没人关注么?

@NookVoive
Copy link

你的问题描述的很详细。遇到了类似的问题,但我的场景是:Oracle 12C 未开启CDB模式。
我也在排查中。。。

@NookVoive
Copy link

NookVoive commented Jan 8, 2025

因为我身边无使用CDB+PDB模式的数据库实例,无法验证你的问题。
根据我的理解你可以尝试一个修改点:
把表名前缀的database名改为:CDB的名称:ORA19C (需为大写)
原因是:在源码中获取database列表的查询语句为:

    @Override
    protected String getListDatabaseSql() {
        return "SELECT name FROM v$database";
    }

该查询只会得到 CDB名称。

同时连接数据库的 base-url 把'/' 改为':'

base-url = "jdbc:oracle:thin:@//10.0.6.53:1521:ora19c"

原因是源码中在拼接连接串时,会自动拼接/:

    protected String getUrlFromDatabaseName(String databaseName) {
        String url = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
        return url + databaseName + suffix;
    }

错误可能和这里有关。

配置项:

source {
  Oracle-CDC {
    driver = "oracle.jdbc.driver.OracleDriver"
    base-url = "jdbc:oracle:thin:@//10.0.6.53:1521:ora19c"  # 此处不区分大小写
    username = "c##logminer"
    password = "logminer"
    database-names = ["PDB1"]           # 监控的 db 列表 
    schema-names = ["PROVINCE_LNJY"]    # 监控的 schema 列表
    table-names = ["ORA19C.PROVINCE_LNJY.MY_OBJECTS"] # 监控的 table 列表,需含库名前缀,使用CDB名称
    use_select_count = true
    result_table_name = "customers"
    source.reader.close.timeout = 120000
    connection.pool.size = 1
    schema-changes.enabled = true       # 启用 schema evolution 功能 
  }
}

@chenyz1984
Copy link
Author

因为我身边无使用CDB+PDB模式的数据库实例,无法验证你的问题。 根据我的理解你可以尝试一个修改点: 把表名前缀的database名改为:CDB的名称:ORA19C (需为大写) 原因是:在源码中获取database列表的查询语句为:

    @Override
    protected String getListDatabaseSql() {
        return "SELECT name FROM v$database";
    }

该查询只会得到 CDB名称。

同时连接数据库的 base-url 把'/' 改为':'

base-url = "jdbc:oracle:thin:@//10.0.6.53:1521:ora19c"

原因是源码中在拼接连接串时,会自动拼接/:

    protected String getUrlFromDatabaseName(String databaseName) {
        String url = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
        return url + databaseName + suffix;
    }

错误可能和这里有关。

配置项:

source {
  Oracle-CDC {
    driver = "oracle.jdbc.driver.OracleDriver"
    base-url = "jdbc:oracle:thin:@//10.0.6.53:1521:ora19c"  # 此处不区分大小写
    username = "c##logminer"
    password = "logminer"
    database-names = ["PDB1"]           # 监控的 db 列表 
    schema-names = ["PROVINCE_LNJY"]    # 监控的 schema 列表
    table-names = ["ORA19C.PROVINCE_LNJY.MY_OBJECTS"] # 监控的 table 列表,需含库名前缀,使用CDB名称
    use_select_count = true
    result_table_name = "customers"
    source.reader.close.timeout = 120000
    connection.pool.size = 1
    schema-changes.enabled = true       # 启用 schema evolution 功能 
  }
}

我的问题解决了,明天到公司给您贴我最终用的配置。以及列一下在各种尝试过程中,踩到的其他坑。经过实践,国产开源产品远没有宣传中所描述的那么强大啊。文档里各种坑。

@chenyz1984
Copy link
Author

chenyz1984 commented Jan 9, 2025

1. CDC 同步配置文件

1.1. CDC 单表(Oracle -> MySQL)

env {
  # stream_ora2mysql.conf
  parallelism = 2
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  # CDC 同步时,建议为每个表独立定义一个 Oracle-CDC 连接,以保证稳定性
  Oracle-CDC {
    driver = "oracle.jdbc.driver.OracleDriver"
    base-url = "jdbc:oracle:thin:@//10.0.6.53:1521/pdb1"  # 需要为 PDB 的连接串
    username = "c##logminer"            # 公共用户 Common User
    password = "logminer"
    database-names = ["PDB1"]           # 监控的 PDB(大写)
    schema-names = ["PROVINCE_LNJY"]    # 监控的 schema 列表(大写)
    table-names = [                     # 监控的 table 列表,格式:<PDB>.<SCHEMA>.<TNAME>,多个表用逗号分隔
        "PDB1.PROVINCE_LNJY.MY_OBJECTS"
    ]
    table-names-config = [              # 可以为特定表,显式指定主键列
      {
        table = "PDB1.PROVINCE_LNJY.MY_OBJECTS"
        primaryKeys = ["OBJECT_ID"]
      }
    ]
    result_table_name = "ORA_MY_OBJECTS"
    use_select_count = false             # 全量阶段,是否用 select count(*) 获取数据量;当 count(*) 比 analyze table 速度快时,可设置为 true
    skip_analyze = false                 # 全量阶段,是否跳过 analyze table。如果表数据变更不频繁,可以设为true。
    source.reader.close.timeout = 120000
    connection.pool.size = 1
    schema-changes.enabled = true        # 启用 schema evolution 功能
    exactly_once = true                  # 默认为 false。是否启用数据的精确一次性处理。启用 extractly one 语义,可确保数据不会重复
    debezium {
                database.pdb.name = "PDB1" # 需指定 PDB 名称
        }
    }
}

transform {
    Sql {  # 为目标表增加时间戳字段,以记录数据行的变更时间
        source_table_name = "ORA_MY_OBJECTS"   
        result_table_name = "MYSQL_MY_OBJECTS"
        query = "SELECT  *, NOW() AS CREATE_TIME, NOW() AS UPDATE_TIME  FROM ORA_MY_OBJECTS;"
    }
}

sink {
    jdbc {
        url = "jdbc:mysql://10.0.6.53:3306/lnjy_frontdb?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "root"
        password = "sysz2024"
        database = "lnjy_frontdb"
        table = "${table_name}"      # 目标端表名与源端一致
        generate_sink_sql = true     # 默认 false,是否自动生成目标端库表的 sql 语句
        primary_keys = ["OBJECT_ID"] # generate_sink_sql 为 true 时,需指定主键
        
        source_table_name = "MYSQL_MY_OBJECTS"
        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" # 任务启动之前,目的表的处理方式
        data_save_mode = "APPEND_DATA"  # 任务启动之前,若目标端已存在数据,则保留数据库结构与数据;
        field_ide = "ORIGINAL"       # 字段是否需要大小写转换:ORIGINAL 不转换,UPPERCASE 转大写,LOWERCASE 转小写

        is_exactly_once = true       # 默认 false。是否启用数据的精确一次性处理,设置为true时,需要设置 xa_data_source_class_name
        xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
    }
}

1.2. CDC 多表(Oracle -> MySQL)

env {
  # stream_ora2mysql_multi.conf
  parallelism = 4
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
    # CDC 数据同步,建议为每个表独立设置一个 Oracle-CDC,以确保稳定性
    Oracle-CDC {
        driver = "oracle.jdbc.driver.OracleDriver" # 固定为 oracle.jdbc.driver.OracleDriver
        base-url = "jdbc:oracle:thin:@//10.0.6.53:1521/pdb1"  # 需要为 PDB 连接串
        username = "c##logminer"                # 源库需要创建,公共用户 Common User
        password = "logminer"
        database-names = ["PDB1"]               # 监控的 PDB(大写)
        schema-names = ["PROVINCE_LNJY"]        # 监控的 schema 列表(大写)
        table-names = [                         # 监控的 table 列表,格式:<PDB>.<SCHEMA>.<TNAME>,多个表用逗号分隔
            "PDB1.PROVINCE_LNJY.T_MY_TABLES"
        ]
        table-names-config = [              
            {
                table = "PDB1.PROVINCE_LNJY.T_MY_TABLES"
                primaryKeys = ["ID"]     # 为表显式指定主键字段
            }
        ]
        use_select_count = false                # 全量阶段,是否用 select count(*) 获取数据量;当 count(*) 比 analyze table 速度快时,可设置为 true
        skip_analyze = false                    # 全量阶段,是否跳过 analyze table。如果表数据变更不频繁,可以设为true。
        result_table_name = "ORA_MY_TABLES"    # 将本阶段处理的数据,注册为数据集(或称临时表)
        source.reader.close.timeout = 120000    
        connection.pool.size = 1
        schema-changes.enabled = true           # 默认为 true。是否启用 schema evolution 功能,即自动推断 DDL 脚本 
        exactly_once = true                    # 默认为 false。是否启用数据的精确一次性处理。启用 extractly one 语义,可确保数据不会重复
        debezium { 
		    database.pdb.name = "PDB1" # 需指定 PDB 名称
    	    }
    }
    
    Oracle-CDC {
        driver = "oracle.jdbc.driver.OracleDriver" # 固定为 oracle.jdbc.driver.OracleDriver
        base-url = "jdbc:oracle:thin:@//10.0.6.53:1521/pdb1"  # 需要为 PDB 连接串
        username = "c##logminer"                # 源库需要创建,公共用户 Common User
        password = "logminer"
        database-names = ["PDB1"]               # 监控的 PDB(大写)
        schema-names = ["PROVINCE_LNJY"]        # 监控的 schema 列表(大写)
        table-names = [                         # 监控的 table 列表,格式:<PDB>.<SCHEMA>.<TNAME>,多个表用逗号分隔
            "PDB1.PROVINCE_LNJY.T_ALL_TABLES"
        ]
        table-names-config = [              
            {
                table = "PDB1.PROVINCE_LNJY.T_ALL_TABLES"
                primaryKeys = ["ID"]     # 为表显式指定主键字段
            }
        ]
        use_select_count = false                # 全量阶段,是否用 select count(*) 获取数据量;当 count(*) 比 analyze table 速度快时,可设置为 true
        skip_analyze = false                    # 全量阶段,是否跳过 analyze table。如果表数据变更不频繁,可以设为true。
        result_table_name = "ORA_ALL_TABLES"    # 将本阶段处理的数据,注册为数据集(或称临时表)
        source.reader.close.timeout = 120000    
        connection.pool.size = 1
        schema-changes.enabled = true           # 默认为 true。是否启用 schema evolution 功能,即自动推断 DDL 脚本 
        exactly_once = true                    # 默认为 false。是否启用数据的精确一次性处理。启用 extractly one 语义,可确保数据不会重复
        debezium { 
		    database.pdb.name = "PDB1" # 需指定 PDB 名称
    	    }
    }
}

transform {
    Sql {  # 为目标表增加时间戳字段,以记录数据行的变更时间
        source_table_name = "ORA_MY_TABLES"   
        result_table_name = "TRANS_MY_TABLES"
        query = "SELECT  *, NOW() AS CREATE_TIME, NOW() AS UPDATE_TIME  FROM ORA_MY_TABLES;"
    }
    Sql {  # 为目标表增加时间戳字段,以记录数据行的变更时间
        source_table_name = "ORA_ALL_TABLES"   
        result_table_name = "TRANS_ALL_TABLES"
        query = "SELECT  *, NOW() AS CREATE_TIME, NOW() AS UPDATE_TIME  FROM ORA_ALL_TABLES;"
    }
}

sink {
    jdbc {
        url = "jdbc:mysql://10.0.6.53:3306/lnjy_frontdb?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "root"
        password = "sysz2024"
        database = "lnjy_frontdb"
        table = "${table_name}"      # 目标端表名与源端一致
        generate_sink_sql = true     # 默认 false,是否自动生成目标端库表的 sql 语句
        primary_keys = ["ID"] # generate_sink_sql 为 true 时,需指定主键
        
        source_table_name = "TRANS_MY_TABLES"
        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" # 任务启动之前,目的表的处理方式
        data_save_mode = "APPEND_DATA"  # 任务启动之前,若目标端已存在数据,则保留数据库结构与数据;
        field_ide = "ORIGINAL"       # 字段是否需要大小写转换:ORIGINAL 不转换,UPPERCASE 转大写,LOWERCASE 转小写

        is_exactly_once = true       # 默认 false。是否启用数据的精确一次性处理,设置为true时,需要设置 xa_data_source_class_name
        xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
    }
    
    jdbc {
        url = "jdbc:mysql://10.0.6.53:3306/lnjy_frontdb?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "root"
        password = "sysz2024"
        database = "lnjy_frontdb"
        table = "${table_name}"      # 目标端表名与源端一致
        generate_sink_sql = true     # 默认 false,是否自动生成目标端库表的 sql 语句
        primary_keys = ["ID"] # generate_sink_sql 为 true 时,需指定主键
        
        source_table_name = "TRANS_ALL_TABLES"
        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" # 任务启动之前,目的表的处理方式
        data_save_mode = "APPEND_DATA"  # 任务启动之前,若目标端已存在数据,则保留数据库结构与数据;
        field_ide = "ORIGINAL"       # 字段是否需要大小写转换:ORIGINAL 不转换,UPPERCASE 转大写,LOWERCASE 转小写

        is_exactly_once = true       # 默认 false。是否启用数据的精确一次性处理,设置为true时,需要设置 xa_data_source_class_name
        xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
    }
}

2. 踩到的坑

2.1. ORA-00942: table or view does not exist

  1. 问题现象

批量同步阶段结束,在进入增量同步时,报错 ORA-00942: table or view does not exist,日志中还包含 UPDATE LOG_MINING_FLUSH 表的 SQL 语句。经过排查,发现 c##logminer 在 PDB 中包含 LOG_MINING_FLUSH 表;但在 CDB 中,该表不存在。

  1. 解决方案

在 CDB 的 c##logminer 用户下,手动建表 LOG_MINING_FLUSH

SQL> CREATE TABLE LOG_MINING_FLUSH(LAST_SCN NUMBER(19,0));
SQL> INSERT INTO LOG_MINING_FLUSH VALUES (0);
SQL> commit;
  1. 疑问

猜测 LOG_MINING_FLUSH 表是用于记录 Redo 相关偏移量的。但为何在 PDB 中自动建该表,却未在 CDB 中自动建表?与 CDC 作业配置的 base-url 是否有关系?

2.2. ORA-65040: operation not allowed from within a pluggable database

  1. 问题描述

base-url 设置为 pdb 名称时,在批量同步阶段可以正常读取数据。但是,进入增量同步阶段时,报错 ORA-65040: operation not allowed from within a pluggable database

  1. 问题解决

source 增加如下配置:

debezium {

                database.name = "CDB$ROOT" # 需指定 CDB 名称
                database.pdb.name = "PDB1" # 需指定 PDB 名称
        }

增量同步可正常工作。

  1. 疑问

同步进程根据 debezium 的配置,执行 CDBPDB 的切换操作。然后,执行 SYS.DBMS_LOGMNR 中的存储过程?

@W-dragan
Copy link

W-dragan commented Jan 9, 2025

我遇到过类似的问题,目前我的解决方案是,如果ORACLE是pdb类型的数据库,需要在source端 增加
"debezium": {
"database.pdb.name" : "${你的pdb数据库名称}",
"debezium.log.mining.strategy":"online_catalog",
"database.oracle.jdbc.timezoneAsRegion": "false",
"debezium.log.mining.continuous.mine":"true",
"debezium.database.tablename.case.insensitive":"false",
},如果是CDB,需要在source端增加
"debezium": {
"database.dbname" : "${你的cdb数据库名称}",
"debezium.log.mining.strategy":"online_catalog",
"database.oracle.jdbc.timezoneAsRegion": "false",
"debezium.log.mining.continuous.mine":"true",
"debezium.database.tablename.case.insensitive":"false",
},但是我的场景是两个不同的oarcle实例,一个是cdb,一个是pdb,没有在一个oracle实例里面同时存在这两种库的情况
@chenyz1984 ,核心是第一个参数的区别,其它参数影响应该不大,希望对你有帮助

@chenyz1984
Copy link
Author

@W-dragan 太感谢了!另外,有个疑问:在 cdc 增量同步时,需要用到LOG_MINING_FLUSH这个表。我测试时,发现在 pdb 中已经创建了此表。但是,同步作业似乎要需要的是 cdb 中的LOG_MINING_FLUSH表。所以,报错找不到表。不清楚 pdb 中的这个表是怎么来的,既然创建了,为何又不使用。

@pkhsu
Copy link

pkhsu commented Jan 11, 2025

遇到一样的问题,在 Oracle 端检查权限和配置花大半天没解决。原来要增加 "debezium" 字段... 不知两位是从哪里发现的呢? 感谢分享。

@chenyz1984
Copy link
Author

遇到一样的问题,在 Oracle 端检查权限和配置花大半天没解决。原来要增加 "debezium" 字段... 不知两位是从哪里发现的呢? 感谢分享。

flink cdc 文档里有提到

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants