Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TiFlash hang issue after #9072 #9424

Merged
merged 12 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbms/src/Common/GRPCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class GRPCSendQueue
}

bool isWritable() const { return send_queue.isWritable(); }
void notifyNextPipelineWriter() { send_queue.notifyNextPipelineWriter(); }

void registerPipeReadTask(TaskPtr && task) { send_queue.registerPipeReadTask(std::move(task)); }
void registerPipeWriteTask(TaskPtr && task) { send_queue.registerPipeWriteTask(std::move(task)); }
Expand Down Expand Up @@ -299,6 +300,7 @@ class GRPCRecvQueue
}

bool isWritable() const { return recv_queue.isWritable(); }
void notifyNextPipelineWriter() { return recv_queue.notifyNextPipelineWriter(); }

void registerPipeReadTask(TaskPtr && task) { recv_queue.registerPipeReadTask(std::move(task)); }
void registerPipeWriteTask(TaskPtr && task) { recv_queue.registerPipeWriteTask(std::move(task)); }
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/Common/LooseBoundedMPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,17 @@ class LooseBoundedMPMCQueue
return !isFullWithoutLock();
}

void notifyNextPipelineWriter()
{
auto should_notify = false;
{
std::lock_guard lock(mu);
should_notify = status != MPMCQueueStatus::CANCELLED && !isFullWithoutLock();
}
if (should_notify)
pipe_writer_cv.notifyOne();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about just pipe_writer_cv.notifyOne()?

}

MPMCQueueStatus getStatus() const
{
std::lock_guard lock(mu);
Expand Down
22 changes: 20 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,15 @@ class DAGResponseWriter
DAGResponseWriter(Int64 records_per_chunk_, DAGContext & dag_context_);
/// prepared with sample block
virtual void prepare(const Block &){};
virtual void write(const Block & block) = 0;
// return true if write is actually write the data
virtual bool doWrite(const Block & block) = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about moving to protected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

void write(const Block & block)
{
if (!doWrite(block))
{
notifyNextPipelineWriter();
}
}

// For async writer, `waitForWritable` need to be called before calling `write`.
// ```
Expand All @@ -40,7 +48,17 @@ class DAGResponseWriter
virtual WaitResult waitForWritable() const { throw Exception("Unsupport"); }

/// flush cached blocks for batch writer
virtual void flush() = 0;
void flush()
{
if (!doFlush())
{
notifyNextPipelineWriter();
}
}

// return true if flush is actually flush data
virtual bool doFlush() = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

virtual void notifyNextPipelineWriter() = 0;
virtual ~DAGResponseWriter() = default;

protected:
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/StreamWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ struct CopStreamWriter
throw Exception("Failed to write resp");
}
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); }
static void notifyNextPipelineWriter() {}
};

struct BatchCopStreamWriter
Expand All @@ -83,6 +84,7 @@ struct BatchCopStreamWriter
throw Exception("Failed to write resp");
}
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); }
static void notifyNextPipelineWriter() {}
};

using CopStreamWriterPtr = std::shared_ptr<CopStreamWriter>;
Expand Down
21 changes: 17 additions & 4 deletions dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,14 @@ StreamingDAGResponseWriter<StreamWriterPtr>::StreamingDAGResponseWriter(
}

template <class StreamWriterPtr>
void StreamingDAGResponseWriter<StreamWriterPtr>::flush()
bool StreamingDAGResponseWriter<StreamWriterPtr>::doFlush()
{
if (rows_in_blocks > 0)
{
encodeThenWriteBlocks();
return true;
}
return false;
}

template <class StreamWriterPtr>
Expand All @@ -74,7 +78,13 @@ WaitResult StreamingDAGResponseWriter<StreamWriterPtr>::waitForWritable() const
}

template <class StreamWriterPtr>
void StreamingDAGResponseWriter<StreamWriterPtr>::write(const Block & block)
void StreamingDAGResponseWriter<StreamWriterPtr>::notifyNextPipelineWriter()
{
return writer->notifyNextPipelineWriter();
}

template <class StreamWriterPtr>
bool StreamingDAGResponseWriter<StreamWriterPtr>::doWrite(const Block & block)
{
RUNTIME_CHECK_MSG(
block.columns() == dag_context.result_field_types.size(),
Expand All @@ -87,14 +97,17 @@ void StreamingDAGResponseWriter<StreamWriterPtr>::write(const Block & block)
}

if (static_cast<Int64>(rows_in_blocks) > batch_send_min_limit)
{
encodeThenWriteBlocks();
return true;
}
return false;
}

template <class StreamWriterPtr>
void StreamingDAGResponseWriter<StreamWriterPtr>::encodeThenWriteBlocks()
{
if (unlikely(blocks.empty()))
return;
assert(!blocks.empty());

TrackedSelectResp response;
response.setEncodeType(dag_context.encode_type);
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ class StreamingDAGResponseWriter : public DAGResponseWriter
Int64 records_per_chunk_,
Int64 batch_send_min_limit_,
DAGContext & dag_context_);
void write(const Block & block) override;
bool doWrite(const Block & block) override;
WaitResult waitForWritable() const override;
void flush() override;
bool doFlush() override;
void notifyNextPipelineWriter() override;

private:
void encodeThenWriteBlocks();
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void UnaryDAGResponseWriter::appendWarningsToDAGResponse()
dag_response->set_warning_count(dag_context.getWarningCount());
}

void UnaryDAGResponseWriter::flush()
bool UnaryDAGResponseWriter::doFlush()
{
if (current_records_num > 0)
{
Expand All @@ -86,9 +86,10 @@ void UnaryDAGResponseWriter::flush()
throw TiFlashException(
"DAG response is too big, please check config about region size or region merge scheduler",
Errors::Coprocessor::Internal);
return true;
}

void UnaryDAGResponseWriter::write(const Block & block)
bool UnaryDAGResponseWriter::doWrite(const Block & block)
{
if (block.columns() != dag_context.result_field_types.size())
throw TiFlashException("Output column size mismatch with field type size", Errors::Coprocessor::Internal);
Expand Down Expand Up @@ -116,5 +117,6 @@ void UnaryDAGResponseWriter::write(const Block & block)
row_index = upper;
}
}
return true;
}
} // namespace DB
5 changes: 3 additions & 2 deletions dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ class UnaryDAGResponseWriter : public DAGResponseWriter
public:
UnaryDAGResponseWriter(tipb::SelectResponse * response_, Int64 records_per_chunk_, DAGContext & dag_context_);

void write(const Block & block) override;
void flush() override;
bool doWrite(const Block & block) override;
bool doFlush() override;
void notifyNextPipelineWriter() override{};
void encodeChunkToDAGResponse();
void appendWarningsToDAGResponse();

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/tests/gtest_streaming_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ struct MockStreamWriter

void write(tipb::SelectResponse & response) { checker(response); }
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); }
static void notifyNextPipelineWriter() {}

private:
MockStreamWriterChecker checker;
Expand Down Expand Up @@ -137,7 +138,7 @@ try
batch_send_min_limit,
*dag_context_ptr);
for (const auto & block : blocks)
dag_writer->write(block);
dag_writer->doWrite(block);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dag_writer->doWrite(block);
dag_writer->write(block);

seems useless change

dag_writer->flush();

// 4. Start to check write_report.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ struct MockWriter
}
static uint16_t getPartitionNum() { return 1; }
static WaitResult waitForWritable() { throw Exception("Unsupport async write"); }
static void notifyNextPipelineWriter() {}

std::vector<tipb::FieldType> result_field_types;

Expand Down Expand Up @@ -352,7 +353,7 @@ class TestTiRemoteBlockInputStream : public testing::Test

// 2. encode all blocks
for (const auto & block : source_blocks)
dag_writer->write(block);
dag_writer->doWrite(block);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems useless change

dag_writer->flush();

// 3. send execution summary
Expand All @@ -378,7 +379,7 @@ class TestTiRemoteBlockInputStream : public testing::Test

// 2. encode all blocks
for (const auto & block : source_blocks)
dag_writer->write(block);
dag_writer->doWrite(block);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

dag_writer->flush();

// 3. send execution summary
Expand Down
25 changes: 20 additions & 5 deletions dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ BroadcastOrPassThroughWriter<ExchangeWriterPtr>::BroadcastOrPassThroughWriter(
switch (data_codec_version)
{
case MPPDataPacketV0:
if (batch_send_min_limit <= 0)
batch_send_min_limit = 1;
break;
case MPPDataPacketV1:
default:
Expand All @@ -64,10 +66,14 @@ BroadcastOrPassThroughWriter<ExchangeWriterPtr>::BroadcastOrPassThroughWriter(
}

template <class ExchangeWriterPtr>
void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::flush()
bool BroadcastOrPassThroughWriter<ExchangeWriterPtr>::doFlush()
{
if (rows_in_blocks > 0)
{
writeBlocks();
return true;
}
return false;
}

template <class ExchangeWriterPtr>
Expand All @@ -77,7 +83,13 @@ WaitResult BroadcastOrPassThroughWriter<ExchangeWriterPtr>::waitForWritable() co
}

template <class ExchangeWriterPtr>
void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::write(const Block & block)
void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::notifyNextPipelineWriter()
{
writer->notifyNextPipelineWriter();
}

template <class ExchangeWriterPtr>
bool BroadcastOrPassThroughWriter<ExchangeWriterPtr>::doWrite(const Block & block)
{
RUNTIME_CHECK(!block.info.selective);
RUNTIME_CHECK_MSG(
Expand All @@ -90,15 +102,18 @@ void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::write(const Block & block)
blocks.push_back(block);
}

if (static_cast<Int64>(rows_in_blocks) > batch_send_min_limit)
if (static_cast<Int64>(rows_in_blocks) >= batch_send_min_limit)
{
writeBlocks();
return true;
}
return false;
}

template <class ExchangeWriterPtr>
void BroadcastOrPassThroughWriter<ExchangeWriterPtr>::writeBlocks()
{
if unlikely (blocks.empty())
return;
assert(!blocks.empty());

// check schema
if (!expected_types.empty())
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Flash/Mpp/BroadcastOrPassThroughWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ class BroadcastOrPassThroughWriter : public DAGResponseWriter
MPPDataPacketVersion data_codec_version_,
tipb::CompressionMode compression_mode_,
tipb::ExchangeType exchange_type_);
void write(const Block & block) override;
bool doWrite(const Block & block) override;
WaitResult waitForWritable() const override;
void flush() override;
bool doFlush() override;
void notifyNextPipelineWriter() override;

private:
void writeBlocks();
Expand Down
21 changes: 17 additions & 4 deletions dbms/src/Flash/Mpp/FineGrainedShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,20 @@ void FineGrainedShuffleWriter<ExchangeWriterPtr>::prepare(const Block & sample_b
}

template <class ExchangeWriterPtr>
void FineGrainedShuffleWriter<ExchangeWriterPtr>::flush()
bool FineGrainedShuffleWriter<ExchangeWriterPtr>::doFlush()
{
if (rows_in_blocks > 0)
{
batchWriteFineGrainedShuffle();
return true;
}
return false;
}

template <class ExchangeWriterPtr>
void FineGrainedShuffleWriter<ExchangeWriterPtr>::notifyNextPipelineWriter()
{
writer->notifyNextPipelineWriter();
}

template <class ExchangeWriterPtr>
Expand All @@ -103,7 +113,7 @@ WaitResult FineGrainedShuffleWriter<ExchangeWriterPtr>::waitForWritable() const
}

template <class ExchangeWriterPtr>
void FineGrainedShuffleWriter<ExchangeWriterPtr>::write(const Block & block)
bool FineGrainedShuffleWriter<ExchangeWriterPtr>::doWrite(const Block & block)
{
RUNTIME_CHECK_MSG(prepared, "FineGrainedShuffleWriter should be prepared before writing.");
RUNTIME_CHECK_MSG(
Expand All @@ -124,7 +134,11 @@ void FineGrainedShuffleWriter<ExchangeWriterPtr>::write(const Block & block)

if (blocks.size() == fine_grained_shuffle_stream_count
|| static_cast<UInt64>(rows_in_blocks) >= batch_send_row_limit)
{
batchWriteFineGrainedShuffle();
return true;
}
return false;
}

template <class ExchangeWriterPtr>
Expand All @@ -148,8 +162,7 @@ template <class ExchangeWriterPtr>
template <MPPDataPacketVersion version>
void FineGrainedShuffleWriter<ExchangeWriterPtr>::batchWriteFineGrainedShuffleImpl()
{
if (blocks.empty())
return;
assert(!blocks.empty());

{
assert(rows_in_blocks > 0);
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Flash/Mpp/FineGrainedShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ class FineGrainedShuffleWriter : public DAGResponseWriter
MPPDataPacketVersion data_codec_version_,
tipb::CompressionMode compression_mode_);
void prepare(const Block & sample_block) override;
void write(const Block & block) override;
bool doWrite(const Block & block) override;
WaitResult waitForWritable() const override;
void flush() override;
bool doFlush() override;
void notifyNextPipelineWriter() override;

private:
void batchWriteFineGrainedShuffle();
Expand Down
Loading