-
Notifications
You must be signed in to change notification settings - Fork 411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix TiFlash hang issue after #9072 #9424
Conversation
Signed-off-by: xufei <[email protected]>
Signed-off-by: xufei <[email protected]>
Signed-off-by: xufei <[email protected]>
Signed-off-by: xufei <[email protected]>
Signed-off-by: xufei <[email protected]>
Signed-off-by: xufei <[email protected]>
/cc @SeaRise |
@windtalker: GitHub didn't allow me to request PR reviews from the following users: SeaRise. Note that only pingcap members and repo collaborators can review this PR, and authors cannot review their own PRs. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
That's amazing! |
have no chance to be notified |
How about modifying the waitForWritable method of ExchangeWriter so that it returns WaitResult::Ready even if the tunnel is not ready, as long as the cache has not reached the limit? Then, when sink->write detects that the tunnel is not ready, it returns WaitResult::WAIT_FOR_NOTIFY. This way, when the sink is notified, it will definitely write to the mpmcqueue, preventing any hang-ups? |
BY "even if the tunnel is not ready" you mean the tunnel is not connected or the tunnel is full? |
I mean tunnel is full. |
Considering this case:
So query will hang, right? |
Signed-off-by: xufei <[email protected]>
Signed-off-by: xufei <[email protected]>
Current pr makes sure that operator will send data to all tunnels, even if there is no actually data to send. |
Signed-off-by: xufei <[email protected]>
Signed-off-by: xufei <[email protected]>
Signed-off-by: xufei <[email protected]>
Another possible fix from @SeaRise
|
block.clear(); | ||
tracked_packet->addChunk(codec_stream->getString()); | ||
codec_stream->clear(); | ||
if (block) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add this condition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is added for debug, there is no need to add this. I will remove it.
auto should_notify = false; | ||
{ | ||
std::lock_guard lock(mu); | ||
should_notify = status != MPMCQueueStatus::CANCELLED && !isFullWithoutLock(); | ||
} | ||
if (should_notify) | ||
pipe_writer_cv.notifyOne(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about just pipe_writer_cv.notifyOne()
?
@@ -137,7 +138,7 @@ try | |||
batch_send_min_limit, | |||
*dag_context_ptr); | |||
for (const auto & block : blocks) | |||
dag_writer->write(block); | |||
dag_writer->doWrite(block); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dag_writer->doWrite(block); | |
dag_writer->write(block); |
seems useless change
@@ -29,7 +29,15 @@ class DAGResponseWriter | |||
DAGResponseWriter(Int64 records_per_chunk_, DAGContext & dag_context_); | |||
/// prepared with sample block | |||
virtual void prepare(const Block &){}; | |||
virtual void write(const Block & block) = 0; | |||
// return true if write is actually write the data | |||
virtual bool doWrite(const Block & block) = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about moving to protected
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
} | ||
|
||
// return true if flush is actually flush data | ||
virtual bool doFlush() = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
@@ -352,7 +353,7 @@ class TestTiRemoteBlockInputStream : public testing::Test | |||
|
|||
// 2. encode all blocks | |||
for (const auto & block : source_blocks) | |||
dag_writer->write(block); | |||
dag_writer->doWrite(block); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems useless change
@@ -378,7 +379,7 @@ class TestTiRemoteBlockInputStream : public testing::Test | |||
|
|||
// 2. encode all blocks | |||
for (const auto & block : source_blocks) | |||
dag_writer->write(block); | |||
dag_writer->doWrite(block); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
block.clear(); | ||
tracked_packet->addChunk(codec_stream->getString()); | ||
codec_stream->clear(); | ||
if (block) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (block) | |
if likely (block && block.rows() > 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to add these check because write()
makes sure the block has data before it push the block into blocks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
others lgtm
Signed-off-by: xufei <[email protected]>
After some offline discussion, we decide to use this pr as a quick fix for the hang issue, and will refine the whole tunnel write process using the above idea. |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: gengliqi, SeaRise The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
What problem does this PR solve?
Issue Number: close #9413
Problem Summary:
What is changed and how it works?
The root casue of the hang issue introduced by #9072 is
ExchangeSenderSinkOp
eventually write data toLooseBoundedMPMCQueue
, althoughLooseBoundedMPMCQueue
is named withLooseBounded
, it actually has a upper boundMPPTunnel
(which holds aLooseBoundedMPMCQueue
), all theExchangeSenderSinkOp
will try to write data to the sameLooseBoundedMPMCQueue
concurrentlyLooseBoundedMPMCQueue
is full),ExchangeSenderSinkOp
will register itself to the pipeline notify furture(thetunnelSender
)LooseBoundedMPMCQueue
read a message fromLooseBoundedMPMCQueue
ExchangeSenderSinkOp
is notified in stage 4, it is not 100% sure that theExchangeSenderSinkOp
will write data toLooseBoundedMPMCQueue
becauseExchangeSenderSinkOp
only callswriter->write(block);
to write the data, and insidewriter->write(block)
the data can be cached inwriter
instead of writting toLooseBoundedMPMCQueue
block
is empty, it will callwriter->flush()
to flush the cached data, and if there is no cached data, it will not write toLooseBoundedMPMCQueue
M
ExchangeSenderSinkOp
, and theLooseBoundedMPMCQueue
has a limited size ofN
, whereM > N
, and all theM
ExchangeSenderSinkOp
tries to write to a fullLooseBoundedMPMCQueue
at the same time. Then all theM
ExchangeSenderSinkOp
are registered to the pipeline notify future. As described in stage 4, the pipeline notify future only notify one task at a time when the consumer ofLooseBoundedMPMCQueue
read a message fromLooseBoundedMPMCQueue
, then at mostN
ExchangeSenderSinkOp
will be notified. If all the notifiedN
ExchangeSenderSinkOp
do not write data toLooseBoundedMPMCQueue
, then theLooseBoundedMPMCQueue
become empty queue, and there is stillM - N
ExchangeSenderSinkOp
waiting on the pipeline notify future. SinceLooseBoundedMPMCQueue
is empty, all theM - N
ExchangeSenderSinkOp
have no chance to be notified. Then the whole query hangs.There is 2 possible fix
LooseBoundedMPMCQueue
, triger a notification if current queue is emptyExchangeSenderSinkOp
is notified, theExchangeSenderSinkOp
should either write data toLooseBoundedMPMCQueue
, or try to notify anotherExchangeSenderSinkOp
The first fix should be earier, but consider that
ExchangeReceiver
will also using the notify way after #9073, the first fix may not work in the furture, so this pr use the second fix.Check List
Tests
Side effects
Documentation
Release note