Skip to content

Commit

Permalink
accelerate dumpTree by cache tree_id inside BlockInputStream and repl…
Browse files Browse the repository at this point in the history
…ace stringstream by FmtBuffer (#4506)

close #4494
  • Loading branch information
SeaRise authored Mar 31, 2022
1 parent cf792d1 commit 4afdc08
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 25 deletions.
43 changes: 24 additions & 19 deletions dbms/src/DataStreams/IBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,26 @@ extern const int TOO_DEEP_PIPELINE;

String IBlockInputStream::getTreeID() const
{
std::stringstream s;
s << getName();

if (!children.empty())
std::lock_guard lock(tree_id_mutex);
if (tree_id.empty())
{
s << "(";
for (BlockInputStreams::const_iterator it = children.begin(); it != children.end(); ++it)
FmtBuffer buffer;
buffer.append(getName());

if (!children.empty())
{
if (it != children.begin())
s << ", ";
s << (*it)->getTreeID();
buffer.append("(");
buffer.joinStr(
children.cbegin(),
children.cend(),
[](const auto & r, FmtBuffer & fb) { fb.append(r->getTreeID()); },
", ");
buffer.append(")");
}
s << ")";
tree_id = buffer.toString();
}

return s.str();
return tree_id;
}


Expand All @@ -60,7 +64,7 @@ size_t IBlockInputStream::checkDepthImpl(size_t max_depth, size_t level) const
return 0;

if (level > max_depth)
throw Exception("Query pipeline is too deep. Maximum: " + toString(max_depth), ErrorCodes::TOO_DEEP_PIPELINE);
throw Exception(fmt::format("Query pipeline is too deep. Maximum: {}", max_depth), ErrorCodes::TOO_DEEP_PIPELINE);

size_t res = 0;
for (const auto & child : children)
Expand All @@ -74,13 +78,14 @@ size_t IBlockInputStream::checkDepthImpl(size_t max_depth, size_t level) const
}


void IBlockInputStream::dumpTree(std::ostream & ostr, size_t indent, size_t multiplier)
void IBlockInputStream::dumpTree(FmtBuffer & buffer, size_t indent, size_t multiplier)
{
ostr << String(indent, ' ') << getName();
if (multiplier > 1)
ostr << " × " << multiplier;
//ostr << ": " << getHeader().dumpStructure();
ostr << std::endl;
// todo append getHeader().dumpStructure()
buffer.fmtAppend(
"{}{}{}\n",
String(indent, ' '),
getName(),
multiplier > 1 ? fmt::format(" x {}", multiplier) : "");
++indent;

/// If the subtree is repeated several times, then we output it once with the multiplier.
Expand All @@ -96,7 +101,7 @@ void IBlockInputStream::dumpTree(std::ostream & ostr, size_t indent, size_t mult
size_t & subtree_multiplier = multipliers[id];
if (subtree_multiplier != 0) /// Already printed subtrees are marked with zero in the array of multipliers.
{
child->dumpTree(ostr, indent, subtree_multiplier);
child->dumpTree(buffer, indent, subtree_multiplier);
subtree_multiplier = 0;
}
}
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/DataStreams/IBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/FmtUtils.h>
#include <Core/Block.h>
#include <Core/SortDescription.h>
#include <Storages/TableLockHolder.h>
Expand Down Expand Up @@ -122,7 +123,7 @@ class IBlockInputStream : private boost::noncopyable

/** Must be called before read, readPrefix.
*/
void dumpTree(std::ostream & ostr, size_t indent = 0, size_t multiplier = 1);
void dumpTree(FmtBuffer & buffer, size_t indent = 0, size_t multiplier = 1);

/** Check the depth of the pipeline.
* If max_depth is specified and the `depth` is greater - throw an exception.
Expand Down Expand Up @@ -184,8 +185,11 @@ class IBlockInputStream : private boost::noncopyable
TableLockHolders table_locks;

size_t checkDepthImpl(size_t max_depth, size_t level) const;
mutable std::mutex tree_id_mutex;
mutable String tree_id;

/// Get text with names of this source and the entire subtree.
/// Get text with names of this source and the entire subtree, this function should only be called after the
/// InputStream tree is constructed.
String getTreeID() const;
};

Expand Down
11 changes: 7 additions & 4 deletions dbms/src/Interpreters/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,13 @@ std::tuple<ASTPtr, BlockIO> executeQueryImpl(

if (!internal && res.in)
{
std::stringstream log_str;
log_str << "Query pipeline:\n";
res.in->dumpTree(log_str);
LOG_DEBUG(execute_query_logger, log_str.str());
auto pipeline_log_str = [&res]() {
FmtBuffer log_buffer;
log_buffer.append("Query pipeline:\n");
res.in->dumpTree(log_buffer);
return log_buffer.toString();
};
LOG_DEBUG(execute_query_logger, pipeline_log_str());
}
}
}
Expand Down

0 comments on commit 4afdc08

Please sign in to comment.