-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug] [Oracle-CDC] cdb+pdb 模式, Unable to create a source for identifier 'Oracle-CDC', Can not find catalog table with factoryId [Oracle] #8463
Comments
Can’t anyone solve this problem? |
issue 列表,没人关注么? |
你的问题描述的很详细。遇到了类似的问题,但我的场景是:Oracle 12C 未开启CDB模式。 |
因为我身边无使用CDB+PDB模式的数据库实例,无法验证你的问题。 @Override
protected String getListDatabaseSql() {
return "SELECT name FROM v$database";
} 该查询只会得到 CDB名称。 同时连接数据库的 base-url 把'/' 改为':' base-url = "jdbc:oracle:thin:@//10.0.6.53:1521:ora19c" 原因是源码中在拼接连接串时,会自动拼接/: protected String getUrlFromDatabaseName(String databaseName) {
String url = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
return url + databaseName + suffix;
} 错误可能和这里有关。 配置项:
|
我的问题解决了,明天到公司给您贴我最终用的配置。以及列一下在各种尝试过程中,踩到的其他坑。经过实践,国产开源产品远没有宣传中所描述的那么强大啊。文档里各种坑。 |
1. CDC 同步配置文件1.1. CDC 单表(Oracle -> MySQL)env {
# stream_ora2mysql.conf
parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
# CDC 同步时,建议为每个表独立定义一个 Oracle-CDC 连接,以保证稳定性
Oracle-CDC {
driver = "oracle.jdbc.driver.OracleDriver"
base-url = "jdbc:oracle:thin:@//10.0.6.53:1521/pdb1" # 需要为 PDB 的连接串
username = "c##logminer" # 公共用户 Common User
password = "logminer"
database-names = ["PDB1"] # 监控的 PDB(大写)
schema-names = ["PROVINCE_LNJY"] # 监控的 schema 列表(大写)
table-names = [ # 监控的 table 列表,格式:<PDB>.<SCHEMA>.<TNAME>,多个表用逗号分隔
"PDB1.PROVINCE_LNJY.MY_OBJECTS"
]
table-names-config = [ # 可以为特定表,显式指定主键列
{
table = "PDB1.PROVINCE_LNJY.MY_OBJECTS"
primaryKeys = ["OBJECT_ID"]
}
]
result_table_name = "ORA_MY_OBJECTS"
use_select_count = false # 全量阶段,是否用 select count(*) 获取数据量;当 count(*) 比 analyze table 速度快时,可设置为 true
skip_analyze = false # 全量阶段,是否跳过 analyze table。如果表数据变更不频繁,可以设为true。
source.reader.close.timeout = 120000
connection.pool.size = 1
schema-changes.enabled = true # 启用 schema evolution 功能
exactly_once = true # 默认为 false。是否启用数据的精确一次性处理。启用 extractly one 语义,可确保数据不会重复
debezium {
database.pdb.name = "PDB1" # 需指定 PDB 名称
}
}
}
transform {
Sql { # 为目标表增加时间戳字段,以记录数据行的变更时间
source_table_name = "ORA_MY_OBJECTS"
result_table_name = "MYSQL_MY_OBJECTS"
query = "SELECT *, NOW() AS CREATE_TIME, NOW() AS UPDATE_TIME FROM ORA_MY_OBJECTS;"
}
}
sink {
jdbc {
url = "jdbc:mysql://10.0.6.53:3306/lnjy_frontdb?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "sysz2024"
database = "lnjy_frontdb"
table = "${table_name}" # 目标端表名与源端一致
generate_sink_sql = true # 默认 false,是否自动生成目标端库表的 sql 语句
primary_keys = ["OBJECT_ID"] # generate_sink_sql 为 true 时,需指定主键
source_table_name = "MYSQL_MY_OBJECTS"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" # 任务启动之前,目的表的处理方式
data_save_mode = "APPEND_DATA" # 任务启动之前,若目标端已存在数据,则保留数据库结构与数据;
field_ide = "ORIGINAL" # 字段是否需要大小写转换:ORIGINAL 不转换,UPPERCASE 转大写,LOWERCASE 转小写
is_exactly_once = true # 默认 false。是否启用数据的精确一次性处理,设置为true时,需要设置 xa_data_source_class_name
xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
}
} 1.2. CDC 多表(Oracle -> MySQL)env {
# stream_ora2mysql_multi.conf
parallelism = 4
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
# CDC 数据同步,建议为每个表独立设置一个 Oracle-CDC,以确保稳定性
Oracle-CDC {
driver = "oracle.jdbc.driver.OracleDriver" # 固定为 oracle.jdbc.driver.OracleDriver
base-url = "jdbc:oracle:thin:@//10.0.6.53:1521/pdb1" # 需要为 PDB 连接串
username = "c##logminer" # 源库需要创建,公共用户 Common User
password = "logminer"
database-names = ["PDB1"] # 监控的 PDB(大写)
schema-names = ["PROVINCE_LNJY"] # 监控的 schema 列表(大写)
table-names = [ # 监控的 table 列表,格式:<PDB>.<SCHEMA>.<TNAME>,多个表用逗号分隔
"PDB1.PROVINCE_LNJY.T_MY_TABLES"
]
table-names-config = [
{
table = "PDB1.PROVINCE_LNJY.T_MY_TABLES"
primaryKeys = ["ID"] # 为表显式指定主键字段
}
]
use_select_count = false # 全量阶段,是否用 select count(*) 获取数据量;当 count(*) 比 analyze table 速度快时,可设置为 true
skip_analyze = false # 全量阶段,是否跳过 analyze table。如果表数据变更不频繁,可以设为true。
result_table_name = "ORA_MY_TABLES" # 将本阶段处理的数据,注册为数据集(或称临时表)
source.reader.close.timeout = 120000
connection.pool.size = 1
schema-changes.enabled = true # 默认为 true。是否启用 schema evolution 功能,即自动推断 DDL 脚本
exactly_once = true # 默认为 false。是否启用数据的精确一次性处理。启用 extractly one 语义,可确保数据不会重复
debezium {
database.pdb.name = "PDB1" # 需指定 PDB 名称
}
}
Oracle-CDC {
driver = "oracle.jdbc.driver.OracleDriver" # 固定为 oracle.jdbc.driver.OracleDriver
base-url = "jdbc:oracle:thin:@//10.0.6.53:1521/pdb1" # 需要为 PDB 连接串
username = "c##logminer" # 源库需要创建,公共用户 Common User
password = "logminer"
database-names = ["PDB1"] # 监控的 PDB(大写)
schema-names = ["PROVINCE_LNJY"] # 监控的 schema 列表(大写)
table-names = [ # 监控的 table 列表,格式:<PDB>.<SCHEMA>.<TNAME>,多个表用逗号分隔
"PDB1.PROVINCE_LNJY.T_ALL_TABLES"
]
table-names-config = [
{
table = "PDB1.PROVINCE_LNJY.T_ALL_TABLES"
primaryKeys = ["ID"] # 为表显式指定主键字段
}
]
use_select_count = false # 全量阶段,是否用 select count(*) 获取数据量;当 count(*) 比 analyze table 速度快时,可设置为 true
skip_analyze = false # 全量阶段,是否跳过 analyze table。如果表数据变更不频繁,可以设为true。
result_table_name = "ORA_ALL_TABLES" # 将本阶段处理的数据,注册为数据集(或称临时表)
source.reader.close.timeout = 120000
connection.pool.size = 1
schema-changes.enabled = true # 默认为 true。是否启用 schema evolution 功能,即自动推断 DDL 脚本
exactly_once = true # 默认为 false。是否启用数据的精确一次性处理。启用 extractly one 语义,可确保数据不会重复
debezium {
database.pdb.name = "PDB1" # 需指定 PDB 名称
}
}
}
transform {
Sql { # 为目标表增加时间戳字段,以记录数据行的变更时间
source_table_name = "ORA_MY_TABLES"
result_table_name = "TRANS_MY_TABLES"
query = "SELECT *, NOW() AS CREATE_TIME, NOW() AS UPDATE_TIME FROM ORA_MY_TABLES;"
}
Sql { # 为目标表增加时间戳字段,以记录数据行的变更时间
source_table_name = "ORA_ALL_TABLES"
result_table_name = "TRANS_ALL_TABLES"
query = "SELECT *, NOW() AS CREATE_TIME, NOW() AS UPDATE_TIME FROM ORA_ALL_TABLES;"
}
}
sink {
jdbc {
url = "jdbc:mysql://10.0.6.53:3306/lnjy_frontdb?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "sysz2024"
database = "lnjy_frontdb"
table = "${table_name}" # 目标端表名与源端一致
generate_sink_sql = true # 默认 false,是否自动生成目标端库表的 sql 语句
primary_keys = ["ID"] # generate_sink_sql 为 true 时,需指定主键
source_table_name = "TRANS_MY_TABLES"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" # 任务启动之前,目的表的处理方式
data_save_mode = "APPEND_DATA" # 任务启动之前,若目标端已存在数据,则保留数据库结构与数据;
field_ide = "ORIGINAL" # 字段是否需要大小写转换:ORIGINAL 不转换,UPPERCASE 转大写,LOWERCASE 转小写
is_exactly_once = true # 默认 false。是否启用数据的精确一次性处理,设置为true时,需要设置 xa_data_source_class_name
xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
}
jdbc {
url = "jdbc:mysql://10.0.6.53:3306/lnjy_frontdb?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "sysz2024"
database = "lnjy_frontdb"
table = "${table_name}" # 目标端表名与源端一致
generate_sink_sql = true # 默认 false,是否自动生成目标端库表的 sql 语句
primary_keys = ["ID"] # generate_sink_sql 为 true 时,需指定主键
source_table_name = "TRANS_ALL_TABLES"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" # 任务启动之前,目的表的处理方式
data_save_mode = "APPEND_DATA" # 任务启动之前,若目标端已存在数据,则保留数据库结构与数据;
field_ide = "ORIGINAL" # 字段是否需要大小写转换:ORIGINAL 不转换,UPPERCASE 转大写,LOWERCASE 转小写
is_exactly_once = true # 默认 false。是否启用数据的精确一次性处理,设置为true时,需要设置 xa_data_source_class_name
xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
}
} 2. 踩到的坑2.1. ORA-00942: table or view does not exist
批量同步阶段结束,在进入增量同步时,报错
在 CDB 的 SQL> CREATE TABLE LOG_MINING_FLUSH(LAST_SCN NUMBER(19,0));
SQL> INSERT INTO LOG_MINING_FLUSH VALUES (0);
SQL> commit;
猜测 2.2. ORA-65040: operation not allowed from within a pluggable database
为 debezium {
database.name = "CDB$ROOT" # 需指定 CDB 名称
database.pdb.name = "PDB1" # 需指定 PDB 名称
} 增量同步可正常工作。
同步进程根据 |
我遇到过类似的问题,目前我的解决方案是,如果ORACLE是pdb类型的数据库,需要在source端 增加 |
@W-dragan 太感谢了!另外,有个疑问:在 cdc 增量同步时,需要用到LOG_MINING_FLUSH这个表。我测试时,发现在 pdb 中已经创建了此表。但是,同步作业似乎要需要的是 cdb 中的LOG_MINING_FLUSH表。所以,报错找不到表。不清楚 pdb 中的这个表是怎么来的,既然创建了,为何又不使用。 |
遇到一样的问题,在 Oracle 端检查权限和配置花大半天没解决。原来要增加 "debezium" 字段... 不知两位是从哪里发现的呢? 感谢分享。 |
flink cdc 文档里有提到 |
Search before asking
What happened
10.0.6.53
)为Oracle 19c
(启用CDB
),包含业务库PDB1
。PDB1
中包含业务用户province_lnjy
与查询用户lndata
,查询用户可查询province_lnjy
中的所有表。10.0.6.53
)为MySQL 8.0
社区版Apache SeaTunnel 2.3.8
(10.0.6.131
) 以单机模式部署。通过Oracle CDC Source
抽取Oracle PDB1
中province_lnjy
的表数据,并通过MySQL Sink
写入目标端MySQL
库(lnjy_frontdb
)中。Sink
部分暂时为向MySQL
端写入数据,而是直接将源端数据输出到console
控制台。SeaTunnel Version
2.3.8
SeaTunnel Config
此配置的DSN连接到CDB根容器,报错内容为
ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a source for identifier 'Oracle-CDC'
与org.apache.seatunnel.common.utils.SeaTunnelException: Can not find catalog table with factoryId [Oracle]
Error Exception
The text was updated successfully, but these errors were encountered: