Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bloom ngramms speed up #12982

Merged
merged 11 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions ydb/core/formats/arrow/hash/calcer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

namespace NKikimr::NArrow::NHash {

void TXX64::AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TStreamStringHashCalcer& hashCalcer) {
namespace {
template <class TStreamCalcer>
void AppendFieldImpl(const std::shared_ptr<arrow::Scalar>& scalar, TStreamCalcer& hashCalcer) {
AFL_VERIFY(scalar);
NArrow::SwitchType(scalar->type->id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
Expand All @@ -28,7 +30,8 @@ void TXX64::AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TSt
});
}

void TXX64::AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NArrow::NHash::NXX64::TStreamStringHashCalcer& hashCalcer) {
template <class TStreamCalcer>
void AppendFieldImpl(const std::shared_ptr<arrow::Array>& array, const int row, TStreamCalcer& hashCalcer) {
NArrow::SwitchType(array->type_id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
using T = typename TWrap::T;
Expand All @@ -49,6 +52,24 @@ void TXX64::AppendField(const std::shared_ptr<arrow::Array>& array, const int ro
});
}

} // namespace

void TXX64::AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TStreamStringHashCalcer& hashCalcer) {
AppendFieldImpl(scalar, hashCalcer);
}

void TXX64::AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TStreamStringHashCalcer_H3& hashCalcer) {
AppendFieldImpl(scalar, hashCalcer);
}

void TXX64::AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NArrow::NHash::NXX64::TStreamStringHashCalcer& hashCalcer) {
AppendFieldImpl(array, row, hashCalcer);
}

void TXX64::AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NArrow::NHash::NXX64::TStreamStringHashCalcer_H3& hashCalcer) {
AppendFieldImpl(array, row, hashCalcer);
}

std::optional<std::vector<ui64>> TXX64::Execute(const std::shared_ptr<arrow::RecordBatch>& batch) const {
std::vector<std::shared_ptr<arrow::Array>> columns = GetColumns(batch);
if (columns.empty()) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/formats/arrow/hash/calcer.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class TXX64 {

static void AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NXX64::TStreamStringHashCalcer& hashCalcer);
static void AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TStreamStringHashCalcer& hashCalcer);
static void AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NXX64::TStreamStringHashCalcer_H3& hashCalcer);
static void AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TStreamStringHashCalcer_H3& hashCalcer);
static ui64 CalcHash(const std::shared_ptr<arrow::Scalar>& scalar);
std::optional<std::vector<ui64>> Execute(const std::shared_ptr<arrow::RecordBatch>& batch) const;

Expand Down
1 change: 0 additions & 1 deletion ydb/core/formats/arrow/permutations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <ydb/library/actors/core/log.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h>
#include <contrib/libs/xxhash/xxhash.h>

namespace NKikimr::NArrow {

Expand Down
12 changes: 6 additions & 6 deletions ydb/core/kqp/ut/olap/indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
}
{
ResetZeroLevel(csController);
ui32 requestsCount = 100;
ui32 requestsCount = 300;
for (ui32 i = 0; i < requestsCount; ++i) {
const ui32 idx = RandomNumber<ui32>(uids.size());
const auto query = [](const TString& res, const TString& uid, const ui32 level) {
Expand All @@ -494,12 +494,12 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
};
ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]");
}
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart > 1)("approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart)("approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
"skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
}
{
ResetZeroLevel(csController);
ui32 requestsCount = 100;
ui32 requestsCount = 300;
for (ui32 i = 0; i < requestsCount; ++i) {
const ui32 idx = RandomNumber<ui32>(uids.size());
const auto query = [](const TString& res, const TString& uid, const ui32 level) {
Expand All @@ -511,13 +511,13 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
};
ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]");
}
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart > 1)(
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart)(
"approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
"skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
}
{
ResetZeroLevel(csController);
ui32 requestsCount = 100;
ui32 requestsCount = 300;
for (ui32 i = 0; i < requestsCount; ++i) {
const ui32 idx = RandomNumber<ui32>(uids.size());
const auto query = [](const TString& res, const TString& uid, const ui32 level) {
Expand All @@ -529,7 +529,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
};
ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]");
}
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart > 1)(
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart)(
"approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
"skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ std::vector<TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared
for (auto&& i : packs) {
TGeneralSerializedSlice slicePrimary(std::move(i));
auto dataWithSecondary = resultFiltered->GetIndexInfo()
.AppendIndexes(slicePrimary.GetPortionChunksToHash(), SaverContext.GetStoragesManager())
.AppendIndexes(slicePrimary.GetPortionChunksToHash(), SaverContext.GetStoragesManager(), slicePrimary.GetRecordsCount())
.DetachResult();
TGeneralSerializedSlice slice(dataWithSecondary.GetExternalData(), schemaDetails, Context.Counters.SplitterCounters);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ std::optional<TWritePortionInfoWithBlobsResult> TReadPortionInfoWithBlobs::SyncP
TIndexInfo::TSecondaryData secondaryData;
secondaryData.MutableExternalData() = entityChunksNew;
for (auto&& i : to->GetIndexInfo().GetIndexes()) {
to->GetIndexInfo().AppendIndex(entityChunksNew, i.first, storages, secondaryData).Validate();
to->GetIndexInfo().AppendIndex(entityChunksNew, i.first, storages, source.PortionInfo.GetPortionInfo().GetRecordsCount(), secondaryData).Validate();
}

const NSplitter::TEntityGroups groups = source.PortionInfo.GetPortionInfo().GetEntityGroupsByStorageId(targetTier, *storages, to->GetIndexInfo());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ class TFetchingStepSignals: public NColumnShard::TCommonCountersOwner {
public:
TFetchingStepSignals(NColumnShard::TCommonCountersOwner&& owner)
: TBase(std::move(owner))
, DurationCounter(TBase::GetDeriviative("duration_ms"))
, BytesCounter(TBase::GetDeriviative("bytes_ms")) {
, DurationCounter(TBase::GetDeriviative("Duration/Us"))
, BytesCounter(TBase::GetDeriviative("Bytes/Count")) {
}

void AddDuration(const TDuration d) const {
DurationCounter->Add(d.MilliSeconds());
DurationCounter->Add(d.MicroSeconds());
}

void AddBytes(const ui32 v) const {
Expand All @@ -56,7 +56,7 @@ class TFetchingStepsSignalsCollection: public NColumnShard::TCommonCountersOwner

public:
TFetchingStepsSignalsCollection()
: TBase("scan_steps") {
: TBase("ScanSteps") {
}

static TFetchingStepSignals GetSignals(const TString& name) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/scheme/index_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,11 @@ std::shared_ptr<arrow::Scalar> TIndexInfo::GetColumnExternalDefaultValueVerified
}

NKikimr::TConclusionStatus TIndexInfo::AppendIndex(const THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& originalData,
const ui32 indexId, const std::shared_ptr<IStoragesManager>& operators, TSecondaryData& result) const {
const ui32 indexId, const std::shared_ptr<IStoragesManager>& operators, const ui32 recordsCount, TSecondaryData& result) const {
auto it = Indexes.find(indexId);
AFL_VERIFY(it != Indexes.end());
auto& index = it->second;
std::shared_ptr<IPortionDataChunk> chunk = index->BuildIndex(originalData, *this);
std::shared_ptr<IPortionDataChunk> chunk = index->BuildIndex(originalData, recordsCount, *this);
auto opStorage = operators->GetOperatorVerified(index->GetStorageId());
if ((i64)chunk->GetPackedSize() > opStorage->GetBlobSplitSettings().GetMaxBlobSize()) {
return TConclusionStatus::Fail("blob size for secondary data (" + ::ToString(indexId) + ") bigger than limit (" +
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/engines/scheme/index_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,11 @@ struct TIndexInfo: public IIndexInfo {
};

[[nodiscard]] TConclusion<TSecondaryData> AppendIndexes(const THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& primaryData,
const std::shared_ptr<IStoragesManager>& operators) const {
const std::shared_ptr<IStoragesManager>& operators, const ui32 recordsCount) const {
TSecondaryData result;
result.MutableExternalData() = primaryData;
for (auto&& i : Indexes) {
auto conclusion = AppendIndex(primaryData, i.first, operators, result);
auto conclusion = AppendIndex(primaryData, i.first, operators, recordsCount, result);
if (conclusion.IsFail()) {
return conclusion;
}
Expand All @@ -329,7 +329,7 @@ struct TIndexInfo: public IIndexInfo {
std::shared_ptr<NIndexes::NCountMinSketch::TIndexMeta> GetIndexMetaCountMinSketch(const std::set<ui32>& columnIds) const;

[[nodiscard]] TConclusionStatus AppendIndex(const THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& originalData,
const ui32 indexId, const std::shared_ptr<IStoragesManager>& operators, TSecondaryData& result) const;
const ui32 indexId, const std::shared_ptr<IStoragesManager>& operators, const ui32 recordsCount, TSecondaryData& result) const;

/// Returns an id of the column located by name. The name should exists in the schema.
ui32 GetColumnIdVerified(const std::string& name) const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class IIndexMeta {
YDB_READONLY(ui32, IndexId, 0);
YDB_READONLY(TString, StorageId, IStoragesManager::DefaultStorageId);
protected:
virtual std::shared_ptr<IPortionDataChunk> DoBuildIndex(const THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const = 0;
virtual std::shared_ptr<IPortionDataChunk> DoBuildIndex(const THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data,
const ui32 recordsCount, const TIndexInfo& indexInfo) const = 0;
virtual void DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const = 0;
virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) = 0;
virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const = 0;
Expand Down Expand Up @@ -67,8 +68,8 @@ class IIndexMeta {

virtual ~IIndexMeta() = default;

std::shared_ptr<IPortionDataChunk> BuildIndex(const THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const {
return DoBuildIndex(data, indexInfo);
std::shared_ptr<IPortionDataChunk> BuildIndex(const THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const ui32 recordsCount, const TIndexInfo& indexInfo) const {
return DoBuildIndex(data, recordsCount, indexInfo);
}

void FillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,39 @@ class TFixStringBitsStorage {
: Data(data)
{}

static ui32 GrowBitsCountToByte(const ui32 bitsCount) {
const ui32 bytesCount = bitsCount / 8;
return (bytesCount + ((bitsCount % 8) ? 1 : 0)) * 8;
}

TFixStringBitsStorage(const std::vector<bool>& bitsVector)
: TFixStringBitsStorage(bitsVector.size()) {
ui32 byteIdx = 0;
ui8 byteCurrent = 0;
ui8 shiftCurrent = 0;
for (ui32 i = 0; i < bitsVector.size(); ++i) {
if (i && i % 8 == 0) {
Data[byteIdx] = (char)byteCurrent;
byteCurrent = 0;
shiftCurrent = 1;
++byteIdx;
}
if (bitsVector[i]) {
byteCurrent += shiftCurrent;
}
shiftCurrent = (shiftCurrent << 1);
}
if (byteCurrent) {
Data[byteIdx] = (char)byteCurrent;
}
}

ui32 GetSizeBits() const {
return Data.size() * 8;
}

TFixStringBitsStorage(const ui32 sizeBits)
: Data(sizeBits / 8 + ((sizeBits % 8) ? 1 : 0), '\0') {
: Data(GrowBitsCountToByte(sizeBits) / 8, '\0') {
}

void Set(const bool val, const ui32 idx) {
Expand Down
34 changes: 14 additions & 20 deletions ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,21 @@

namespace NKikimr::NOlap::NIndexes {

TString TBloomIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader) const {
std::set<ui64> hashes;
{
NArrow::NHash::NXX64::TStreamStringHashCalcer hashCalcer(0);
TString TBloomIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 recordsCount) const {
const ui32 bitsCount = TFixStringBitsStorage::GrowBitsCountToByte(HashesCount * recordsCount / std::log(2));
std::vector<bool> filterBits(bitsCount, false);
for (ui32 i = 0; i < HashesCount; ++i) {
NArrow::NHash::NXX64::TStreamStringHashCalcer_H3 hashCalcer(i);
for (reader.Start(); reader.IsCorrect(); reader.ReadNext()) {
hashCalcer.Start();
for (auto&& i : reader) {
NArrow::NHash::TXX64::AppendField(i.GetCurrentChunk(), i.GetCurrentRecordIndex(), hashCalcer);
}
hashes.emplace(hashCalcer.Finish());
filterBits[hashCalcer.Finish() % bitsCount] = true;
}
}

const ui32 bitsCount = HashesCount * hashes.size() / std::log(2);
TFixStringBitsStorage bits(bitsCount);
const auto pred = [&bits](const ui64 hash) {
bits.Set(true, hash % bits.GetSizeBits());
};
BuildHashesSet(hashes, pred);
return bits.GetData();
return TFixStringBitsStorage(filterBits).GetData();
}

void TBloomIndexMeta::DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const {
Expand All @@ -51,15 +46,14 @@ void TBloomIndexMeta::DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataF
continue;
}
std::set<ui64> hashes;
const auto pred = [&hashes](const ui64 hash) {
hashes.emplace(hash);
};
NArrow::NHash::NXX64::TStreamStringHashCalcer calcer(0);
calcer.Start();
for (auto&& i : foundColumns) {
NArrow::NHash::TXX64::AppendField(i.second, calcer);
for (ui32 i = 0; i < HashesCount; ++i) {
NArrow::NHash::NXX64::TStreamStringHashCalcer_H3 calcer(i);
calcer.Start();
for (auto&& i : foundColumns) {
NArrow::NHash::TXX64::AppendField(i.second, calcer);
}
hashes.emplace(calcer.Finish());
}
BuildHashesSet(calcer.Finish(), pred);
branch->MutableIndexes().emplace_back(std::make_shared<TBloomFilterChecker>(GetIndexId(), std::move(hashes)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class TBloomIndexMeta: public TIndexByColumns {
}
virtual void DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const override;

virtual TString DoBuildIndexImpl(TChunkedBatchReader& reader) const override;
virtual TString DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 recordsCount) const override;

virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) override {
AFL_VERIFY(TBase::DoDeserializeFromProto(proto));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#include "const.h"

#include <util/string/builder.h>

namespace NKikimr::NOlap::NIndexes::NBloomNGramm {

TString TConstants::GetHashesCountIntervalString() {
return TStringBuilder() << "[" << MinHashesCount << ", " << MaxHashesCount << "]";
}

TString TConstants::GetFilterSizeBytesIntervalString() {
return TStringBuilder() << "[" << MinFilterSizeBytes << ", " << MaxFilterSizeBytes << "]";
}

TString TConstants::GetNGrammSizeIntervalString() {
return TStringBuilder() << "[" << MinNGrammSize << ", " << MaxNGrammSize << "]";
}

} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once
#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.h>
namespace NKikimr::NOlap::NIndexes::NBloomNGramm {

class TConstants {
public:
static constexpr ui32 MinNGrammSize = 3;
static constexpr ui32 MaxNGrammSize = 8;
static constexpr ui32 MinHashesCount = 1;
static constexpr ui32 MaxHashesCount = 8;
static constexpr ui32 MinFilterSizeBytes = 128;
static constexpr ui32 MaxFilterSizeBytes = 1 << 20;

static bool CheckNGrammSize(const ui32 value) {
return MinNGrammSize <= value && value <= MaxNGrammSize;
}

static bool CheckHashesCount(const ui32 value) {
return MinHashesCount <= value && value <= MaxHashesCount;
}

static bool CheckFilterSizeBytes(const ui32 value) {
return MinFilterSizeBytes <= value && value <= MaxFilterSizeBytes;
}

static TString GetHashesCountIntervalString();
static TString GetFilterSizeBytesIntervalString();
static TString GetNGrammSizeIntervalString();
};

} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm
Loading
Loading