Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve LogWithPrefix of mpp #4209

Merged
merged 16 commits into from
Mar 21, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ParallelAggregatingBlockInputStream::ParallelAggregatingBlockInputStream(
, keys_size(params.keys_size)
, aggregates_size(params.aggregates_size)
, handler(*this)
, processor(inputs, additional_input_at_end, max_threads, handler)
, processor(inputs, additional_input_at_end, max_threads, handler, log)
{
children = inputs;
if (additional_input_at_end)
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/DataStreams/ParallelInputsProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <Common/ThreadManager.h>
#include <Common/setThreadName.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Flash/Mpp/getMPPTaskLog.h>
#include <common/logger_useful.h>

#include <atomic>
Expand Down Expand Up @@ -81,11 +82,12 @@ class ParallelInputsProcessor
* - where you must first make JOIN in parallel, while noting which keys are not found,
* and only after the completion of this work, create blocks of keys that are not found.
*/
ParallelInputsProcessor(const BlockInputStreams & inputs_, const BlockInputStreamPtr & additional_input_at_end_, size_t max_threads_, Handler & handler_)
ParallelInputsProcessor(const BlockInputStreams & inputs_, const BlockInputStreamPtr & additional_input_at_end_, size_t max_threads_, Handler & handler_, const LogWithPrefixPtr & log_)
: inputs(inputs_)
, additional_input_at_end(additional_input_at_end_)
, max_threads(std::min(inputs_.size(), max_threads_))
, handler(handler_)
, log(getMPPTaskLog(log_, "ParallelInputsProcessor"))
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
{
for (size_t i = 0; i < inputs_.size(); ++i)
unprepared_inputs.emplace(inputs_[i], i);
Expand All @@ -99,7 +101,7 @@ class ParallelInputsProcessor
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
}

Expand Down Expand Up @@ -352,7 +354,7 @@ class ParallelInputsProcessor
/// Wait for the completion of all threads.
std::atomic<bool> joined_threads{false};

Poco::Logger * log = &Poco::Logger::get("ParallelInputsProcessor");
const LogWithPrefixPtr log;
};


Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/SharedQueryBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
}

Expand Down
10 changes: 5 additions & 5 deletions dbms/src/DataStreams/UnionBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream
const LogWithPrefixPtr & log_,
ExceptionCallback exception_callback_ = ExceptionCallback())
: output_queue(std::min(inputs.size(), max_threads))
, log(getMPPTaskLog(log_, NAME))
, handler(*this)
, processor(inputs, additional_input_at_end, max_threads, handler)
, processor(inputs, additional_input_at_end, max_threads, handler, log)
, exception_callback(exception_callback_)
, log(getMPPTaskLog(log_, NAME))
{
children = inputs;
if (additional_input_at_end)
Expand Down Expand Up @@ -117,7 +117,7 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
}

Expand Down Expand Up @@ -312,6 +312,8 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream
Self & parent;
};

LogWithPrefixPtr log;

Handler handler;
ParallelInputsProcessor<Handler, mode> processor;

Expand All @@ -321,8 +323,6 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream

bool started = false;
bool all_read = false;

LogWithPrefixPtr log;
};

} // namespace DB
4 changes: 3 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -932,9 +932,11 @@ void DAGQueryBlockInterpreter::handleExchangeReceiver(DAGPipeline & pipeline)
throw Exception("Can not find exchange receiver for " + query_block.source_name, ErrorCodes::LOGICAL_ERROR);
// todo choose a more reasonable stream number
auto & exchange_receiver_io_input_streams = dagContext().getInBoundIOInputStreamsMap()[query_block.source_name];
// In order to distinguish different exchange receivers.
auto executor_id_prefix_log = getMPPTaskLog(taskLogger(), query_block.source_name);
for (size_t i = 0; i < max_streams; ++i)
{
BlockInputStreamPtr stream = std::make_shared<ExchangeReceiverInputStream>(it->second, taskLogger());
BlockInputStreamPtr stream = std::make_shared<ExchangeReceiverInputStream>(it->second, executor_id_prefix_log);
exchange_receiver_io_input_streams.push_back(stream);
stream = std::make_shared<SquashingBlockInputStream>(stream, 8192, 0, taskLogger());
pipeline.streams.push_back(stream);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ AggregatedDataVariants::~AggregatedDataVariants()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(aggregator->log, __PRETTY_FUNCTION__);
fuzhe1989 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down