Skip to content

Commit

Permalink
some refines (pingcap#7)
Browse files Browse the repository at this point in the history
* save work

Signed-off-by: xufei <[email protected]>

* Update dbms/src/Interpreters/Join.cpp

Co-authored-by: Meng Xin <[email protected]>

---------

Signed-off-by: xufei <[email protected]>
Co-authored-by: Meng Xin <[email protected]>
  • Loading branch information
windtalker and mengxin9014 authored Mar 6, 2023
1 parent 04744f6 commit 0911265
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 199 deletions.
5 changes: 4 additions & 1 deletion dbms/src/Common/HashTable/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -1497,15 +1497,18 @@ class ConcurrentHashTable : private boost::noncopyable
return segments[segment_index]->getHashTable();
}

void resetSegmentTable(size_t segment_index)
size_t resetSegmentTable(size_t segment_index)
{
size_t ret = 0;
std::unique_ptr<SegmentType> segment_ptr = nullptr;
{
/// release the lock before destruct related segment
std::unique_lock lock(segments[segment_index]->getMutex());
ret = segments[segment_index]->getBufferSizeInBytes();
segment_ptr.swap(segments[segment_index]);
segments[segment_index].reset();
}
return ret;
}

std::mutex & getSegmentMutex(size_t segment_index)
Expand Down
9 changes: 5 additions & 4 deletions dbms/src/DataStreams/NonJoinedBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,14 @@ Block NonJoinedBlockInputStream::readImpl()
if (std::all_of(
std::begin(parent.partitions),
std::end(parent.partitions),
[](const Join::JoinPartition & partition) { return partition.build_partition.blocks.empty(); }))
[](const std::unique_ptr<Join::JoinPartition> & partition) { return partition->build_partition.blocks.empty(); }))
{
return Block();
}
}


/// todo read data based on JoinPartition
if (add_not_mapped_rows)
{
setNextCurrentNotMappedRow();
Expand Down Expand Up @@ -258,7 +259,7 @@ size_t NonJoinedBlockInputStream::fillColumns(const Map & map,
{
current_segment = index;

while (parent.partitions[current_segment].spill)
while (parent.partitions[current_segment]->spill)
{
current_segment += step;
if (current_segment >= map.getSegmentSize())
Expand All @@ -282,10 +283,10 @@ size_t NonJoinedBlockInputStream::fillColumns(const Map & map,

for (; *it != end || current_segment + step < map.getSegmentSize();)
{
if (*it == end || (parent.max_bytes_before_external_join && parent.partitions[current_segment].spill))
if (*it == end || (parent.max_bytes_before_external_join && parent.partitions[current_segment]->spill))
{
current_segment += step;
while (parent.partitions[current_segment].spill)
while (parent.partitions[current_segment]->spill)
{
current_segment += step;
if (current_segment >= map.getSegmentSize())
Expand Down
Loading

0 comments on commit 0911265

Please sign in to comment.