Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Caused by: com.github.shyiko.mysql.binlog.event.deserialization.MissingTableMapEventException and lost data #2217

Closed
2 tasks done
dylenWu opened this issue Jun 15, 2023 · 13 comments
Assignees
Labels
bug Something isn't working
Milestone

Comments

@dylenWu
Copy link

dylenWu commented Jun 15, 2023

Search before asking

  • I searched in the issues and found nothing similar.

Flink version

1.14.6

Flink CDC version

2.3

Database and its version

mysql

Minimal reproduce step

Caused by: com.github.shyiko.mysql.binlog.event.deserialization.MissingTableMapEventException: No TableMapEventData has been found for table id:15987. Usually that means that you have started reading binary log 'within the logical event group' (e.g. from WRITE_ROWS and not proceeding TABLE_MAP

What did you expect to see?

flink job can start from checkpoint

What did you see instead?

no restart and loss data

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@dylenWu dylenWu added the bug Something isn't working label Jun 15, 2023
@dylenWu
Copy link
Author

dylenWu commented Jun 15, 2023

please aasign this issus to me @leonardBang

@dylenWu
Copy link
Author

dylenWu commented Jun 15, 2023

I found the reason, mainly because our binglog client was restarted during the consumption process, but the location of the restart was on the binlog of Write_rows type. As a result, when the binlog was deserialized, the corresponding table of the binlog data could not be found. Information. Because when parsing the data of the binlog type of the Write_rows type, the data of the table_map type must be parsed first.
for example, When restart consuming from position 9510298, MissingTableMapEventException will appear, because to consume data of type write_row, data of type table_map must first be obtained, so that the schema of the table can be obtained and parsed normally
image

@dylenWu
Copy link
Author

dylenWu commented Jun 15, 2023

1.我们可以从下面这部分代码看出问题,当出现MissingTableMapEventException这种报错的时候,程序不会抛出异常,而是继续执行,导致过程中Write_rows类型的数据被丢弃,最终导致数据丢失
In BinaryLogClient class,我发现对于MissingTableMapEventException会被捕捉,但是不会像EOFException、SocketException异常被再次抛出,意味着,如果我们在上层的监听器实现onEventDeserializationFailure不做处理的话,这个异常将不会再被处理。
private void listenForEventPackets() throws IOException {
.......
} catch (Exception e) {
Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e;
if (cause instanceof EOFException || cause instanceof SocketException) {
throw e;
}
if (isConnected()) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onEventDeserializationFailure(this, e);
}
}
continue;
}
''''''''
} finally {
if (isConnected()) {
if (completeShutdown) {
disconnect(); // initiate complete shutdown sequence (which includes keep alive thread)
} else {
disconnectChannel();
}

2.由于binlogclient的代码是zendesk的代码,我们无法修改,所以只能通过lifecycleListener的方式,在上层来处理,我继续看代码,找到应用层监听器的实现,在MySqlStreamingChangeEventSource类里面
public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
if (eventDeserializationFailureHandlingMode
== EventProcessingFailureHandlingMode.FAIL) {
......
logStreamingSourceState();
errorHandler.setProducerThrowable(wrap(ex));
} else if (eventDeserializationFailureHandlingMode
....
} else {
.......
}
}

3.然后我们看ErrorHandler的实现,可以看到会把报错信息发送给queue
public void setProducerThrowable(Throwable producerThrowable) {
....
if (first) {
if (retriable) {
queue.producerException(
new RetriableException("An exception occurred in the change event producer. This connector will be restarted.", producerThrowable));
}
......
}
4.报错信息发送给queue之后,我发现queue的poll方法要把这个报错throw是有条件的,就是在一定时间内,binglog数据不为空的时候
public List poll() throws InterruptedException {
......
while (!timeout.expired() && queue.drainTo(records, maxBatchSize) == 0) {
throwProducerExceptionIfPresent();
........
}
5.这样造成的问题就是如果长时间有数据的话,这个报错就不会抛出,而且由于长时间没报错,这个时间段虽然有报错日志,但是程序是正常运行的,而且如果在报错前还进行了checkpoint,那这部分数据就丢失了。

@leonardBang
Copy link
Contributor

please aasign this issus to me @leonardBang

Assigned to you @dylenWu, please keep going, looking forward your analysis.

@dylenWu
Copy link
Author

dylenWu commented Jun 16, 2023

I think we should add this logic to this method to let the client stop receiving messages, so that the queue poll() method can throw an exception
image
please review it @leonardBang

@qidian99
Copy link
Contributor

@dylenWu The option to just ignore the exception is useful. I encountered a weird case where consuming from a later offset will throw the exception that the binlog file does not exist. I tried to manually specify a earlier offset, and the error showed up. In my case I'm relying on the upsert semanitcs of sink operators so having some data duplication is tolerable.

@dylenWu
Copy link
Author

dylenWu commented Jun 19, 2023

@dylenWu The option to just ignore the exception is useful. I encountered a weird case where consuming from a later offset will throw the exception that the binlog file does not exist. I tried to manually specify a earlier offset, and the error showed up. In my case I'm relying on the upsert semanitcs of sink operators so having some data duplication is tolerable.

We cannot ignore the exception, otherwise it will cause data loss

@zhenyimo
Copy link

@dylenWu
image
image
image
image
我发现当BinlogClient因为心跳超时重新连接的时候,之前保留下来的tableMapEventByTableId里的数据会因为这个ROTATE事件清除掉tableMapEventByTableId,导致下一个event的解析错误,是不是因为这个原因导致的问题?

@dylenWu
Copy link
Author

dylenWu commented Jun 20, 2023

在消费write_rows类型的binlog数据的时候,一定要先消费table_map类型的binlog获取表相关的信息,不然无法解析write_rows类型的binlog。但是当BinlogClient因为心跳超时重新连接的时候,它消费的位点可能在事务的中间,导致没有先消费table_map类型的数据,而是直接消费了write_rows类型的数据。详情请看我上面截的图。

@ruanhang1993 ruanhang1993 modified the milestones: V2.4.1, V2.5.0 Jun 28, 2023
@GuXianWei
Copy link

2.4.1也出现了这个问题 2.4.2有计划修复吗

@ldwnt
Copy link

ldwnt commented Dec 26, 2023

在消费write_rows类型的binlog数据的时候,一定要先消费table_map类型的binlog获取表相关的信息,不然无法解析write_rows类型的binlog。但是当BinlogClient因为心跳超时重新连接的时候,它消费的位点可能在事务的中间,导致没有先消费table_map类型的数据,而是直接消费了write_rows类型的数据。详情请看我上面截的图。

感谢讲解。我想把生产环境2.1版本的cdc升级到2.4,但是因为兼容性问题 #1795 (comment) 只能新起一个2.4版本cdc的job,不从旧job的checkpoint恢复、而是从旧job最后消费的binlog位点启动。但问题在于,这个位点通常都是write_rows类型,没有schema数据。那么【从指定的binlog位点启动】这个功能应该怎么使用呢,自己去binlog里面找非write_rows类型的最后一条记录?那么问题又来了,如果我的源库有100+张表,如何确保我选择的位点每张表都有schema呢

@shikai93
Copy link
Contributor

@dylenWu do you think the PR #3065 would help in this issue? This is so that the tableMapEventByTableId would not be cleared on rotate event created due to binlog client restart

@PatrickRen
Copy link
Contributor

Considering collaboration with developers around the world, please re-create your issue in English on Apache Jira under project Flink with component tag Flink CDC. Thank you!

PatrickRen pushed a commit that referenced this issue Apr 10, 2024
wuzhenhua01 pushed a commit to wuzhenhua01/flink-cdc-connectors that referenced this issue Aug 4, 2024
ChaomingZhangCN pushed a commit to ChaomingZhangCN/flink-cdc that referenced this issue Jan 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

9 participants