Skip to content

Commit

Permalink
Merge branch 'master' into refine_asttoexecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
ywqzzy authored Sep 15, 2022
2 parents bb96af5 + f4e976b commit 8c7c7a8
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 90 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,6 @@
[submodule "contrib/arm-optimized-routines"]
path = contrib/arm-optimized-routines
url = https://github.com/ARM-software/optimized-routines
[submodule "contrib/magic_enum"]
path = contrib/magic_enum
url = https://github.com/Neargye/magic_enum.git
2 changes: 2 additions & 0 deletions contrib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,5 @@ endif()
if (ARCH_AARCH64 AND ARCH_LINUX)
add_subdirectory(arm-optimized-routines-cmake)
endif ()

add_subdirectory(magic_enum)
1 change: 1 addition & 0 deletions contrib/magic_enum
Submodule magic_enum added at 43a927
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ target_link_libraries (clickhouse_common_io
prometheus-cpp::push
prometheus-cpp::pull
cpptoml
magic_enum
libsymbolization
)
target_include_directories (clickhouse_common_io BEFORE PRIVATE ${kvClient_SOURCE_DIR}/include)
Expand Down
59 changes: 59 additions & 0 deletions dbms/src/Common/tests/gtest_magic_enum.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <common/crc64.h>
#include <gtest/gtest.h>

#include <magic_enum.hpp>

namespace DB::tests
{
TEST(MagicEnumTest, EnumConversion)
{
using crc64::Mode;
// mode_entries -> {{Mode::Table, "Table"}, {Mode::Auto, "Auto"}, {Mode::SIMD_128, "SIMD_128"}...}
// mode_entries[0].first -> Mode::Table
// mode_entries[0].second -> "Table"
constexpr auto mode_entries = magic_enum::enum_entries<Mode>();
ASSERT_EQ(mode_entries.size(), magic_enum::enum_names<Mode>().size());
ASSERT_EQ(mode_entries.size(), magic_enum::enum_values<Mode>().size());
ASSERT_EQ(mode_entries.size(), magic_enum::enum_count<Mode>());

for (const auto & entry : mode_entries)
{
// enum value to string
ASSERT_EQ(magic_enum::enum_name(entry.first), entry.second);
// string to enum value
auto mode = magic_enum::enum_cast<Mode>(entry.second);
ASSERT_TRUE(mode.has_value());
ASSERT_EQ(entry.first, mode);
}

// enum value to integer
int mode_integer = 2;
auto mode_from_int = magic_enum::enum_cast<Mode>(mode_integer);
ASSERT_TRUE(mode_from_int.has_value());
ASSERT_EQ(mode_from_int.value(), Mode::SIMD_128);

// indexed access to enum value
std::size_t index = 1;
ASSERT_EQ(magic_enum::enum_value<Mode>(index), Mode::Auto);

// edge cases
ASSERT_FALSE(magic_enum::enum_cast<Mode>("table").has_value());
ASSERT_FALSE(magic_enum::enum_cast<Mode>("auto").has_value());
ASSERT_FALSE(magic_enum::enum_cast<Mode>(-1).has_value());
ASSERT_FALSE(magic_enum::enum_cast<Mode>(99999).has_value());
}
} // namespace DB::tests
9 changes: 5 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

#include <atomic>
#include <ext/scope_guard.h>
#include <magic_enum.hpp>
#include <memory>

namespace ProfileEvents
Expand Down Expand Up @@ -123,15 +124,15 @@ std::pair<bool, bool> DeltaMergeStore::MergeDeltaTaskPool::tryAddTask(const Back
light_tasks.push(task);
break;
default:
throw Exception(fmt::format("Unsupported task type: {}", toString(task.type)));
throw Exception(fmt::format("Unsupported task type: {}", magic_enum::enum_name(task.type)));
}

LOG_FMT_DEBUG(
log_,
"Segment task add to background task pool, segment={} task={} by_whom={}",
task.segment->simpleInfo(),
toString(task.type),
toString(whom));
magic_enum::enum_name(task.type),
magic_enum::enum_name(whom));
return std::make_pair(true, is_heavy);
}

Expand All @@ -145,7 +146,7 @@ DeltaMergeStore::BackgroundTask DeltaMergeStore::MergeDeltaTaskPool::nextTask(bo
auto task = tasks.front();
tasks.pop();

LOG_FMT_DEBUG(log_, "Segment task pop from background task pool, segment={} task={}", task.segment->simpleInfo(), toString(task.type));
LOG_FMT_DEBUG(log_, "Segment task pop from background task pool, segment={} task={}", task.segment->simpleInfo(), magic_enum::enum_name(task.type));

return task;
}
Expand Down
69 changes: 2 additions & 67 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,71 +188,6 @@ class DeltaMergeStore : private boost::noncopyable
Manual,
};

static std::string toString(ThreadType type)
{
switch (type)
{
case Init:
return "Init";
case Write:
return "Write";
case Read:
return "Read";
case BG_Split:
return "BG_Split";
case BG_Merge:
return "BG_Merge";
case BG_MergeDelta:
return "BG_MergeDelta";
case BG_Compact:
return "BG_Compact";
case BG_Flush:
return "BG_Flush";
case BG_GC:
return "BG_GC";
default:
return "Unknown";
}
}

static std::string toString(TaskType type)
{
switch (type)
{
case Split:
return "Split";
case Merge:
return "Merge";
case MergeDelta:
return "MergeDelta";
case Compact:
return "Compact";
case Flush:
return "Flush";
case PlaceIndex:
return "PlaceIndex";
default:
return "Unknown";
}
}

static std::string toString(MergeDeltaReason type)
{
switch (type)
{
case BackgroundThreadPool:
return "BackgroundThreadPool";
case BackgroundGCThread:
return "BackgroundGCThread";
case ForegroundWrite:
return "ForegroundWrite";
case Manual:
return "Manual";
default:
return "Unknown";
}
}

struct BackgroundTask
{
TaskType type;
Expand Down Expand Up @@ -512,11 +447,11 @@ class DeltaMergeStore : private boost::noncopyable
bool handleBackgroundTask(bool heavy);

// isSegmentValid should be protected by lock on `read_write_mutex`
inline bool isSegmentValid(std::shared_lock<std::shared_mutex> &, const SegmentPtr & segment)
inline bool isSegmentValid(const std::shared_lock<std::shared_mutex> &, const SegmentPtr & segment)
{
return doIsSegmentValid(segment);
}
inline bool isSegmentValid(std::unique_lock<std::shared_mutex> &, const SegmentPtr & segment)
inline bool isSegmentValid(const std::unique_lock<std::shared_mutex> &, const SegmentPtr & segment)
{
return doIsSegmentValid(segment);
}
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/Transaction/TMTContext.h>

#include <magic_enum.hpp>

namespace CurrentMetrics
{
extern const Metric DT_SnapshotOfDeltaMerge;
Expand Down Expand Up @@ -174,7 +176,7 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy)
{
/// Note that `task.dm_context->db_context` will be free after query is finish. We should not use that in background task.
task.dm_context->min_version = latest_gc_safe_point.load(std::memory_order_relaxed);
LOG_FMT_DEBUG(log, "Task {} GC safe point: {}", toString(task.type), task.dm_context->min_version);
LOG_FMT_DEBUG(log, "Task {} GC safe point: {}", magic_enum::enum_name(task.type), task.dm_context->min_version);
}

SegmentPtr left, right;
Expand Down Expand Up @@ -216,15 +218,15 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy)
task.segment->placeDeltaIndex(*task.dm_context);
break;
default:
throw Exception(fmt::format("Unsupported task type: {}", toString(task.type)));
throw Exception(fmt::format("Unsupported task type: {}", magic_enum::enum_name(task.type)));
}
}
catch (const Exception & e)
{
LOG_FMT_ERROR(
log,
"Execute task on segment failed, task={} segment={}{} err={}",
DeltaMergeStore::toString(task.type),
magic_enum::enum_name(task.type),
task.segment->simpleInfo(),
((bool)task.next_segment ? (fmt::format(" next_segment={}", task.next_segment->simpleInfo())) : ""),
e.message());
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/Segment.h>

#include <magic_enum.hpp>

namespace CurrentMetrics
{
extern const Metric DT_DeltaMerge;
Expand Down Expand Up @@ -116,7 +118,7 @@ SegmentPair DeltaMergeStore::segmentSplit(DMContext & dm_context, const SegmentP

auto segment_lock = segment->mustGetUpdateLock();

std::tie(new_left, new_right) = segment->applySplit(dm_context, segment_snap, wbs, split_info);
std::tie(new_left, new_right) = segment->applySplit(segment_lock, dm_context, segment_snap, wbs, split_info);

wbs.writeMeta();

Expand Down Expand Up @@ -278,7 +280,7 @@ SegmentPtr DeltaMergeStore::segmentMerge(DMContext & dm_context, const std::vect
for (const auto & seg : ordered_segments)
locks.emplace_back(seg->mustGetUpdateLock());

merged = Segment::applyMerge(dm_context, ordered_segments, ordered_snapshots, wbs, merged_stable);
merged = Segment::applyMerge(locks, dm_context, ordered_segments, ordered_snapshots, wbs, merged_stable);

wbs.writeMeta();

Expand Down Expand Up @@ -321,7 +323,7 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(
const MergeDeltaReason reason,
SegmentSnapshotPtr segment_snap)
{
LOG_FMT_INFO(log, "MergeDelta - Begin, reason={} safe_point={} segment={}", toString(reason), dm_context.min_version, segment->info());
LOG_FMT_INFO(log, "MergeDelta - Begin, reason={} safe_point={} segment={}", magic_enum::enum_name(reason), dm_context.min_version, segment->info());

ColumnDefinesPtr schema_snap;

Expand Down Expand Up @@ -412,7 +414,7 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(

auto segment_lock = segment->mustGetUpdateLock();

new_segment = segment->applyMergeDelta(dm_context, segment_snap, wbs, new_stable);
new_segment = segment->applyMergeDelta(segment_lock, dm_context, segment_snap, wbs, new_stable);

wbs.writeMeta();

Expand Down
15 changes: 9 additions & 6 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ SegmentPtr Segment::mergeDelta(DMContext & dm_context, const ColumnDefinesPtr &
SYNC_FOR("before_Segment::applyMergeDelta"); // pause without holding the lock on the segment

auto lock = mustGetUpdateLock();
auto new_segment = applyMergeDelta(dm_context, segment_snap, wbs, new_stable);
auto new_segment = applyMergeDelta(lock, dm_context, segment_snap, wbs, new_stable);

wbs.writeAll();
return new_segment;
Expand Down Expand Up @@ -674,7 +674,8 @@ StableValueSpacePtr Segment::prepareMergeDelta(DMContext & dm_context,
return new_stable;
}

SegmentPtr Segment::applyMergeDelta(DMContext & context,
SegmentPtr Segment::applyMergeDelta(const Segment::Lock &, //
DMContext & context,
const SegmentSnapshotPtr & segment_snap,
WriteBatches & wbs,
const StableValueSpacePtr & new_stable) const
Expand Down Expand Up @@ -731,7 +732,7 @@ SegmentPair Segment::split(DMContext & dm_context, const ColumnDefinesPtr & sche
SYNC_FOR("before_Segment::applySplit"); // pause without holding the lock on the segment

auto lock = mustGetUpdateLock();
auto segment_pair = applySplit(dm_context, segment_snap, wbs, split_info);
auto segment_pair = applySplit(lock, dm_context, segment_snap, wbs, split_info);

wbs.writeAll();

Expand Down Expand Up @@ -1141,7 +1142,8 @@ std::optional<Segment::SplitInfo> Segment::prepareSplitPhysical(DMContext & dm_c
return {SplitInfo{false, split_point, my_new_stable, other_stable}};
}

SegmentPair Segment::applySplit(DMContext & dm_context, //
SegmentPair Segment::applySplit(const Segment::Lock &, //
DMContext & dm_context,
const SegmentSnapshotPtr & segment_snap,
WriteBatches & wbs,
SplitInfo & split_info) const
Expand Down Expand Up @@ -1241,7 +1243,7 @@ SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schem
for (const auto & seg : ordered_segments)
locks.emplace_back(seg->mustGetUpdateLock());

auto merged = applyMerge(dm_context, ordered_segments, ordered_snapshots, wbs, merged_stable);
auto merged = applyMerge(locks, dm_context, ordered_segments, ordered_snapshots, wbs, merged_stable);

wbs.writeAll();
return merged;
Expand Down Expand Up @@ -1326,7 +1328,8 @@ StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, //
return merged_stable;
}

SegmentPtr Segment::applyMerge(DMContext & dm_context, //
SegmentPtr Segment::applyMerge(const std::vector<Segment::Lock> &, //
DMContext & dm_context,
const std::vector<SegmentPtr> & ordered_segments,
const std::vector<SegmentSnapshotPtr> & ordered_snapshots,
WriteBatches & wbs,
Expand Down
15 changes: 14 additions & 1 deletion dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class Segment : private boost::noncopyable
{
public:
using DeltaTree = DefaultDeltaTree;
using Lock = DeltaValueSpace::Lock;

struct ReadInfo
{
Expand Down Expand Up @@ -207,7 +208,11 @@ class Segment : private boost::noncopyable
const SegmentSnapshotPtr & segment_snap,
WriteBatches & wbs) const;

/**
* Should be protected behind the Segment update lock.
*/
[[nodiscard]] SegmentPair applySplit(
const Lock &,
DMContext & dm_context,
const SegmentSnapshotPtr & segment_snap,
WriteBatches & wbs,
Expand All @@ -229,7 +234,11 @@ class Segment : private boost::noncopyable
const std::vector<SegmentSnapshotPtr> & ordered_snapshots,
WriteBatches & wbs);

/**
* Should be protected behind the update lock for all related segments.
*/
[[nodiscard]] static SegmentPtr applyMerge(
const std::vector<Lock> &,
DMContext & dm_context,
const std::vector<SegmentPtr> & ordered_segments,
const std::vector<SegmentSnapshotPtr> & ordered_snapshots,
Expand All @@ -247,7 +256,12 @@ class Segment : private boost::noncopyable
const ColumnDefinesPtr & schema_snap,
const SegmentSnapshotPtr & segment_snap,
WriteBatches & wbs) const;

/**
* Should be protected behind the Segment update lock.
*/
[[nodiscard]] SegmentPtr applyMergeDelta(
const Lock &,
DMContext & dm_context,
const SegmentSnapshotPtr & segment_snap,
WriteBatches & wbs,
Expand Down Expand Up @@ -283,7 +297,6 @@ class Segment : private boost::noncopyable
static String simpleInfo(const std::vector<SegmentPtr> & segments);
static String info(const std::vector<SegmentPtr> & segments);

using Lock = DeltaValueSpace::Lock;
bool getUpdateLock(Lock & lock) const { return delta->getLock(lock); }

Lock mustGetUpdateLock() const
Expand Down
Loading

0 comments on commit 8c7c7a8

Please sign in to comment.