-
Notifications
You must be signed in to change notification settings - Fork 411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix TiFlash hang issue after #9072 #9424
Changes from 11 commits
f9a3206
55c6711
095eb29
c723aa1
c6e8279
6e7c6ee
c20aeb5
9746690
4d1fc6b
98e797f
25cbb7b
3b7541a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,7 +29,15 @@ class DAGResponseWriter | |
DAGResponseWriter(Int64 records_per_chunk_, DAGContext & dag_context_); | ||
/// prepared with sample block | ||
virtual void prepare(const Block &){}; | ||
virtual void write(const Block & block) = 0; | ||
// return true if write is actually write the data | ||
virtual bool doWrite(const Block & block) = 0; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about moving to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK |
||
void write(const Block & block) | ||
{ | ||
if (!doWrite(block)) | ||
{ | ||
notifyNextPipelineWriter(); | ||
} | ||
} | ||
|
||
// For async writer, `waitForWritable` need to be called before calling `write`. | ||
// ``` | ||
|
@@ -40,7 +48,17 @@ class DAGResponseWriter | |
virtual WaitResult waitForWritable() const { throw Exception("Unsupport"); } | ||
|
||
/// flush cached blocks for batch writer | ||
virtual void flush() = 0; | ||
void flush() | ||
{ | ||
if (!doFlush()) | ||
{ | ||
notifyNextPipelineWriter(); | ||
} | ||
} | ||
|
||
// return true if flush is actually flush data | ||
virtual bool doFlush() = 0; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK |
||
virtual void notifyNextPipelineWriter() = 0; | ||
virtual ~DAGResponseWriter() = default; | ||
|
||
protected: | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -94,6 +94,7 @@ struct MockStreamWriter | |||||
|
||||||
void write(tipb::SelectResponse & response) { checker(response); } | ||||||
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } | ||||||
static void notifyNextPipelineWriter() {} | ||||||
|
||||||
private: | ||||||
MockStreamWriterChecker checker; | ||||||
|
@@ -137,7 +138,7 @@ try | |||||
batch_send_min_limit, | ||||||
*dag_context_ptr); | ||||||
for (const auto & block : blocks) | ||||||
dag_writer->write(block); | ||||||
dag_writer->doWrite(block); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
seems useless change |
||||||
dag_writer->flush(); | ||||||
|
||||||
// 4. Start to check write_report. | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -148,6 +148,7 @@ struct MockWriter | |
} | ||
static uint16_t getPartitionNum() { return 1; } | ||
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); } | ||
static void notifyNextPipelineWriter() {} | ||
|
||
std::vector<tipb::FieldType> result_field_types; | ||
|
||
|
@@ -352,7 +353,7 @@ class TestTiRemoteBlockInputStream : public testing::Test | |
|
||
// 2. encode all blocks | ||
for (const auto & block : source_blocks) | ||
dag_writer->write(block); | ||
dag_writer->doWrite(block); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems useless change |
||
dag_writer->flush(); | ||
|
||
// 3. send execution summary | ||
|
@@ -378,7 +379,7 @@ class TestTiRemoteBlockInputStream : public testing::Test | |
|
||
// 2. encode all blocks | ||
for (const auto & block : source_blocks) | ||
dag_writer->write(block); | ||
dag_writer->doWrite(block); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
dag_writer->flush(); | ||
|
||
// 3. send execution summary | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about just
pipe_writer_cv.notifyOne()
?