-
Notifications
You must be signed in to change notification settings - Fork 413
2.3_Rdbms Writer
Rdbms-Writer插件主要用于将其他类型数据源的增量数据进行处理和转换,最后写入关系型数据源,主要包括MYSQL、SQLSERVER、POSTGRESQL几种。目前的应用场景主要有两种:一种是线上各个系统间的基础参数表同步,另外一种是线上数据同步到OLAP系统(主要是BI)。
Rdbms-Writer插件自定义的RdbEventRecordHandler继承自com.ucar.datalink.worker.api.handle.AbstractHandler,使用了系统提供的默认流程(详见深入领域),并根据需要自行扩展了一些功能,执行最终的目标端写入操作。具体的数据写入流程如下图所示:
-
【构建sql】
> 首先,Rdbms-Writer将Records按批量拆分,根据关系型数据库方言的sql构造模板,为每个Record构造相应的sql。默认批量大小为100,各批并发进行。
> 对于EventType为INSERT的Record,若目标端数据库方言支持MergeSql(如mysql、sqlserver、oracle),则为Record构造对应的MergeSql,处理主键冲突的情况(改为update),若不支持MergeSql则构造InsertSql;对于EventType为UPDATE和DELETE的Record,分别为Record构造对应的UpdateSql、DeleteSql。 -
【Records分组】
Rdbms-Writer的同步模式分为全局有序、局部有序、充分并发三种。
> 全局有序:当同步模式为DbGlobalOrdered时,不进行分组,直接按照全局顺序加载RdbEventRecords,完全按照源端日志的执行顺序进行重放。
> 局部有序:当同步模式为TablePartlyOrdered时,为每个RdbEventRecord按表进行分组,保证单表内的局部有序。当Rdbms-Writer未启用压缩合并时,因为没有根据相同sql进行batch组合,所以自动禁用批量写入,按一张表一个分组来构建batchRecords。
> 充分并发:当Rdbms-Writer启用了压缩合并功能的时候,会在表级别基础上根据INSERT/UPDATE/DELETE进行更细粒度的分组。在合并完之后,能保证同一张表的同一条数据只有一条binlog事件,此时可以完全打乱顺序,保证最终一致即可。若同时启用了批量写入,则将原DELETE、INSERT/UPDATE分组的Records按照sql相同进行batch组合,为各组分别构建若干个大小为batchSize的batchRecords;若未启用批量写入,则将DELETE、INSERT/UPDATE分组按原样构建batchRecords。
注:压缩合并情况下,优先执行DELETE语句,针对uniqe更新,一般会进行DELETE+INSERT的处理模式,避免并发更新。 -
【Records加载】
> 数据写入的最后一个环节,执行数据持久化操作,各分组的batchRecords并行加载。若Rdbms-Writer启用了批量写入,则根据batchSize进行批量更新,否则进行单条更新。
注:若批量模式出错,则转为single模式,以保证尽可能把不报错的数据写入到目标库,并且改为single模式,才能知道具体出错的record是哪一条。
> 根据目标端DbDialect进行相应的数据处理,并通过构建预编译对象PreparedStatement执行最终的数据写入,以提高效率。
注:insert为更新所有字段,update只更新带有isUpdate=true的字段,delete根据主键删除。
-
【HA机制】
> 目标端依靠自己的HA机制实现高可用,如:向mysql同步时指定的是vip,mysql通过pxc或mha实现HA。 -
【有序消费】
> Rdbms-Writer支持Binlog局部有序消费和全局有序消费。
1)当同步模式为TablePartlyOrdered时,按表分组保证局部有序,即单表的执行顺序和Binlog一致;
2)当同步模式为DbGlobalOrdered时,完全按照Binlog的顺序进行replay。 -
【并发消费】
> Rdbms-Writer支持并发消费,以提升消费速度。
1)当按表分组时,是全局并发,即先并发,然后每个并发线程再有序运行。若未开启批量更新,并发数=分组数,若开启了批量更新,并发数=分组的批次总数(分组的Records.size()/batchSize);
2)当未按表分组时,是局部并发,即先保证有序,运行到每条数据的时候再考虑并发。 -
【批量更新】
> Rdbms-Writer向目标端进行写入的时候,可以启用压缩合并与批量更新,并设置每次写入的batchSize,以增大吞吐率,提高系统性能。
系统默认情况下是单条写入,当需要同步的数据量比较大时,可以开启批量更新模式。 -
【主键冲突处理】
> 当插入出现主键冲突时,Rdbms-Writer有两种处理方式:
1)若目标端数据库方言支持MergeSql(如mysql、sqlserver、oracle),则构造MergeSql处理,当insert出现主键冲突时自动改为update。
2)若目标端数据库方言不支持MergeSql,当insert出现主键冲突时,会忽略该Record。 -
【ddl拦截器】
> Rdbms-Writer在AbstractHandler内置的基础拦截器前面增加自定义拦截器DdlEventInterceptor,对ddl类型的Record进行sql解析。
1)当配置全库同步时,自动同步sql类型为CreateTable的Record;
2)当目标表的数据量不超过500万时,自动同步sql类型为AlterTable的Record。
-
【自动加字段】
> 当发现目标表缺字段时,可以设置Rdbms-Writer自动构造sql补全所缺字段。
目前源端只支持mysql,目标端支持mysql、sqlserver、postgresql,且目标表记录总数不超过500万。 -
【过滤机制】
> 过滤字段:每张表可以设置列映射模式,通过黑白名单过滤特定的字段,白名单时可以配置字段别名。
> 过滤Records:可以通过内置拦截器,设置主键黑名单,跳过指定主键的Records;也可以自定义拦截器,对某些Records进行过滤或特殊处理,来满足特定场景下的同步需求。
目前Rdbms-Writer用到的拦截器举例:
OrderEntRecordInterceptor,用于拦截非企业订单,只同步企业订单。
UcarincGpsInterceptor、ZucheGpsInterceptor,用于不同库的不同表数据同步到同一个目标库的同一个表中时,对一些Records的特殊处理逻辑。
ObdRecordInterceptor,用于拦截一些表的字段满足特定条件的Records。
在继承Writer插件通用参数基类(PluginWriterParameter,详见深入Task)的基础上,RdbmsWriterParameter还针对Rdbms类型数据库的特点扩展了自己的参数类,用户可以根据需求在页面更改其参数配置。
RdbmsWriterParameter扩展参数 | 参数描述 | 默认值 | 备注 |
---|---|---|---|
SyncMode |
同步模式 |
TablePartlyOrdered |
Rdbms-Writer同步模式分为两种: DbGlobalOrdered,//库级别,全局有序 TablePartlyOrdered;//表级别,局部有序 |
一个Mapping代表同步方向的同时也定义了同步行为,即同步过程中需要经过哪些特殊处理。Rdbms-Writer插件的相关Mapping配置参数如下:
Mapping参数 | 参数描述 | 默认值 | 备注 |
---|---|---|---|
targetMediaSourceId |
目标端数据源的id |
无 | 目标数据源,这里指要同步到的RDBMS类型数据库 |
targetMediaName |
目标端表的名字 |
无 |
支持目标端有表别名,用于源端和目标端表名不一致的情况 |
targetMediaNamespace |
目标端数据源的namespace |
无 |
目标端数据源schema |
ColumnMappingMode |
列映射模式 |
NONE |
支持列名黑白名单与列别名: NONE,//所有列均同步到目标端 INCLUDE,//只同步白名单中的列,可以设置列别名 EXCLUDE;//黑名单中的列不同步 |
writePriority |
同步优先级 |
5 |
数值越小优先级越高 |
interceptorId |
拦截器id |
无 |
拦截器可以对Records进行特殊处理,满足少数特定功能的需求 |
skipIds |
主键黑名单 |
无 |
可以通过指定主键id来过滤源端的某些异常Records |
valid |
是否有效 |
是 |
同步映射有效时,才进行同步 |
关联技术 | 稳定版本 | 待测版本 |
---|---|---|
mysql |
5.1.35 |
其他 |
sqlserver |
4.0 |
其他 |
postgresql |
42.1.4 |
其他 |
Rdbms-Writer的内置拦截器DdlEventInterceptor对ddl同步的一些限制:
对于sql类型为CreateTable的Record,在全库同步的情况下自动同步,若不是全库同步,则直接忽略。
对于sql类型为AlterTable的Record,当目标表的数据量不超过500万时自动同步,若超过了500万,则提示人工介入。
对于sql类型为DropTable或者其他类型的Record,直接忽略,暂时只支持CreateTable和AlterTable类型的ddl自动同步。
对于异构数据源之间的ddl类型的Record,直接忽略,暂时只支持同构数据源之间的ddl自动同步,例如mysql同步到mysql。
对于配置了白名单和列别名的情况,暂时不做更细粒度的判断,直接忽略ddl语句。
对于配置了拦截器的情况,暂时不支持ddl自动同步。
sqlserver merge sql 参考文档:https://www.mssqltips.com/sqlservertip/3074/use-caution-with-sql-servers-merge-statement/