Skip to content

Commit

Permalink
Merge d10ed77 into 2edc9b5
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Dec 25, 2024
2 parents 2edc9b5 + d10ed77 commit 06cd4cd
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 80 deletions.
12 changes: 6 additions & 6 deletions ydb/core/kqp/ut/olap/indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
}
{
ResetZeroLevel(csController);
ui32 requestsCount = 100;
ui32 requestsCount = 500;
for (ui32 i = 0; i < requestsCount; ++i) {
const ui32 idx = RandomNumber<ui32>(uids.size());
const auto query = [](const TString& res, const TString& uid, const ui32 level) {
Expand All @@ -494,12 +494,12 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
};
ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]");
}
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart > 1)("approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart)("approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
"skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
}
{
ResetZeroLevel(csController);
ui32 requestsCount = 100;
ui32 requestsCount = 500;
for (ui32 i = 0; i < requestsCount; ++i) {
const ui32 idx = RandomNumber<ui32>(uids.size());
const auto query = [](const TString& res, const TString& uid, const ui32 level) {
Expand All @@ -511,13 +511,13 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
};
ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]");
}
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart > 1)(
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart)(
"approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
"skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
}
{
ResetZeroLevel(csController);
ui32 requestsCount = 100;
ui32 requestsCount = 500;
for (ui32 i = 0; i < requestsCount; ++i) {
const ui32 idx = RandomNumber<ui32>(uids.size());
const auto query = [](const TString& res, const TString& uid, const ui32 level) {
Expand All @@ -529,7 +529,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
};
ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]");
}
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart > 1)(
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart)(
"approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
"skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ class TFetchingStepSignals: public NColumnShard::TCommonCountersOwner {
public:
TFetchingStepSignals(NColumnShard::TCommonCountersOwner&& owner)
: TBase(std::move(owner))
, DurationCounter(TBase::GetDeriviative("duration_ms"))
, BytesCounter(TBase::GetDeriviative("bytes_ms")) {
, DurationCounter(TBase::GetDeriviative("Duration/Us"))
, BytesCounter(TBase::GetDeriviative("Bytes/Count")) {
}

void AddDuration(const TDuration d) const {
DurationCounter->Add(d.MilliSeconds());
DurationCounter->Add(d.MicroSeconds());
}

void AddBytes(const ui32 v) const {
Expand All @@ -56,7 +56,7 @@ class TFetchingStepsSignalsCollection: public NColumnShard::TCommonCountersOwner

public:
TFetchingStepsSignalsCollection()
: TBase("scan_steps") {
: TBase("ScanSteps") {
}

static TFetchingStepSignals GetSignals(const TString& name) {
Expand Down
22 changes: 22 additions & 0 deletions ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,28 @@ class TFixStringBitsStorage {
: Data(data)
{}

TFixStringBitsStorage(const std::vector<bool>& bitsVector)
: TFixStringBitsStorage(bitsVector.size()) {
ui32 byteIdx = 0;
ui8 byteCurrent = 0;
ui8 shiftCurrent = 0;
for (ui32 i = 0; i < bitsVector.size(); ++i) {
if (i && i % 8 == 0) {
Data[byteIdx] = (char)byteCurrent;
byteCurrent = 0;
shiftCurrent = 1;
++byteIdx;
}
if (bitsVector[i]) {
byteCurrent += shiftCurrent;
}
shiftCurrent = (shiftCurrent << 1);
}
if (byteCurrent) {
Data[byteIdx] = (char)byteCurrent;
}
}

ui32 GetSizeBits() const {
return Data.size() * 8;
}
Expand Down
154 changes: 104 additions & 50 deletions ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,111 @@ namespace NKikimr::NOlap::NIndexes::NBloomNGramm {

class TNGrammBuilder {
private:
NArrow::NHash::NXX64::TStreamStringHashCalcer HashCalcer;
TBuffer Zeros;
const ui32 HashesCount;

static const ui64 HashesConstructorP = 9223372036854775783;
static const ui64 HashesConstructorA = 1;

template <int HashIdx>
class THashesBuilder {
public:
template <class TActor>
static void Build(const ui64 originalHash, const TActor& actor) {
actor((HashesConstructorA * originalHash + HashIdx) % HashesConstructorP);
}
};

template <>
class THashesBuilder<0> {
public:
template <class TActor>
static void Build(const ui64 /*originalHash*/, const TActor& /*actor*/) {
}
};

template <class TActor>
void BuildHashesSet(const ui64 originalHash, const TActor& actor) const {
if (HashesCount == 1) {
THashesBuilder<1>::Build(originalHash, actor);
} else if (HashesCount == 2) {
THashesBuilder<2>::Build(originalHash, actor);
} else if (HashesCount == 3) {
THashesBuilder<3>::Build(originalHash, actor);
} else if (HashesCount == 4) {
THashesBuilder<4>::Build(originalHash, actor);
} else if (HashesCount == 5) {
THashesBuilder<5>::Build(originalHash, actor);
} else if (HashesCount == 6) {
THashesBuilder<6>::Build(originalHash, actor);
} else if (HashesCount == 7) {
THashesBuilder<7>::Build(originalHash, actor);
} else if (HashesCount == 8) {
THashesBuilder<8>::Build(originalHash, actor);
} else {
for (ui32 b = 1; b <= HashesCount; ++b) {
const ui64 hash = (HashesConstructorA * originalHash + b) % HashesConstructorP;
actor(hash);
}
}
}

ui64 CalcHash(const char* data, const ui32 size) const {
if (size == 3) {
return (*(const ui32*)data) & 0x00FFFFFF;
// TStringBuilder sb;
// sb << res << "/" << (ui32)((ui8*)&res)[0] << "/" << (ui32)((ui8*)&res)[1] << "/" << (ui32)((ui8*)&res)[2] << "/"
// << (ui32)((ui8*)&res)[3] << " vs " << (ui64)data[0] << "/" << (((ui64)data[1])) << "/" << (((ui64)data[2])) << Endl;
// Cerr << sb;
// return (ui64(*(const ui32*)data)) >> 8;
} else if (size == 4) {
return *(const ui32*)data;
} else {
uint64_t h = 2166136261;
for (size_t i = 0; i < size; i++) {
h = h ^ uint64_t(data[i]);
h = h * 16777619;
}
return h;
}
}

template <class TAction>
void BuildNGramms(const char* data, const ui32 dataSize, const std::optional<NRequest::TLikePart::EOperation> op, const ui32 nGrammSize,
const TAction& pred) const {
TBuffer fakeString;
AFL_VERIFY(nGrammSize >= 3)("value", nGrammSize);
if (!op || op == NRequest::TLikePart::EOperation::StartsWith) {
for (ui32 c = 1; c <= nGrammSize; ++c) {
TBuffer fakeStart;
fakeStart.Fill('\0', nGrammSize - c);
fakeStart.Append(data, std::min(c, dataSize));
if (fakeStart.size() < nGrammSize) {
fakeStart.Append(Zeros.data(), nGrammSize - fakeStart.size());
fakeString.Clear();
fakeString.Fill('\0', nGrammSize - c);
fakeString.Append(data, std::min(c, dataSize));
if (fakeString.size() < nGrammSize) {
fakeString.Fill('\0', nGrammSize - fakeString.size());
}
pred(fakeStart.data());
BuildHashesSet(CalcHash(fakeString.data(), nGrammSize), pred);
}
}
for (ui32 c = 0; c < dataSize; ++c) {
if (c + nGrammSize <= dataSize) {
pred(data + c);
} else if (!op || op == NRequest::TLikePart::EOperation::EndsWith) {
TBuffer fakeStart;
fakeStart.Append(data + c, dataSize - c);
fakeStart.Append(Zeros.data(), nGrammSize - fakeStart.size());
pred(fakeStart.data());
ui32 c = 0;
for (; c + nGrammSize <= dataSize; ++c) {
pred(CalcHash(data + c, nGrammSize));
}

if (!op || op == NRequest::TLikePart::EOperation::EndsWith) {
for (; c < dataSize; ++c) {
fakeString.Clear();
fakeString.Append(data + c, dataSize - c);
fakeString.Fill('\0', nGrammSize - fakeString.size());
BuildHashesSet(CalcHash(fakeString.data(), nGrammSize), pred);
}
}
}

public:
TNGrammBuilder()
: HashCalcer(0) {
TNGrammBuilder(const ui32 hashesCount)
: HashesCount(hashesCount)
{
AFL_VERIFY((ui64)HashesCount < HashesConstructorP);
Zeros.Fill('\0', 1024);
}

Expand All @@ -64,15 +138,7 @@ class TNGrammBuilder {
}
if constexpr (arrow::has_string_view<T>()) {
auto value = typedArray.GetView(row);
if (value.size() < nGrammSize) {
continue;
}
const auto pred = [&](const char* data) {
HashCalcer.Start();
HashCalcer.Update((const ui8*)data, nGrammSize);
fillData(HashCalcer.Finish());
};
BuildNGramms(value.data(), value.size(), {}, nGrammSize, pred);
BuildNGramms(value.data(), value.size(), {}, nGrammSize, fillData);
} else {
AFL_VERIFY(false);
}
Expand All @@ -83,33 +149,24 @@ class TNGrammBuilder {

template <class TFiller>
void FillNGrammHashes(const ui32 nGrammSize, const NRequest::TLikePart::EOperation op, const TString& userReq, const TFiller& fillData) {
const auto pred = [&](const char* value) {
HashCalcer.Start();
HashCalcer.Update((const ui8*)value, nGrammSize);
fillData(HashCalcer.Finish());
};
BuildNGramms(userReq.data(), userReq.size(), op, nGrammSize, pred);
BuildNGramms(userReq.data(), userReq.size(), op, nGrammSize, fillData);
}
};

TString TIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader) const {
AFL_VERIFY(reader.GetColumnsCount() == 1)("count", reader.GetColumnsCount());
TNGrammBuilder builder;

TFixStringBitsStorage bits(FilterSizeBytes * 8);
TNGrammBuilder builder(HashesCount);

const auto pred = [&](const ui64 hash) {
const auto predSet = [&](const ui64 hashSecondary) {
bits.Set(true, hashSecondary % bits.GetSizeBits());
};
BuildHashesSet(hash, predSet);
std::vector<bool> bitsVector(FilterSizeBytes * 8, false);
bool* memAccessor = &bitsVector[0];
const auto predSet = [&](const ui64 hashSecondary) {
memAccessor[hashSecondary % (FilterSizeBytes * 8)] = true;
};
for (reader.Start(); reader.IsCorrect();) {
builder.FillNGrammHashes(NGrammSize, reader.begin()->GetCurrentChunk(), pred);
builder.FillNGrammHashes(NGrammSize, reader.begin()->GetCurrentChunk(), predSet);
reader.ReadNext(reader.begin()->GetCurrentChunk()->length());
}

return bits.GetData();
return TFixStringBitsStorage(bitsVector).GetData();
}

void TIndexMeta::DoFillIndexCheckers(
Expand All @@ -133,16 +190,13 @@ void TIndexMeta::DoFillIndexCheckers(
}

std::set<ui64> hashes;
const auto pred = [&](const ui64 hash) {
const auto predSet = [&](const ui64 hashSecondary) {
hashes.emplace(hashSecondary);
};
BuildHashesSet(hash, predSet);
const auto predSet = [&](const ui64 hashSecondary) {
hashes.emplace(hashSecondary);
};
TNGrammBuilder builder;
TNGrammBuilder builder(HashesCount);
for (auto&& c : foundColumns) {
for (auto&& ls : c.second.GetLikeSequences()) {
builder.FillNGrammHashes(NGrammSize, ls.second.GetOperation(), ls.second.GetValue(), pred);
builder.FillNGrammHashes(NGrammSize, ls.second.GetOperation(), ls.second.GetValue(), predSet);
}
}
branch->MutableIndexes().emplace_back(std::make_shared<TFilterChecker>(GetIndexId(), std::move(hashes)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,6 @@ class TIndexMeta: public TIndexByColumns {
AFL_VERIFY(NGrammSize > 2);
}

static const ui64 HashesConstructorP = ((ui64)2 << 31) - 1;
static const ui64 HashesConstructorA = (ui64)2 << 16;

template <class TActor>
void BuildHashesSet(const ui64 originalHash, const TActor& actor) const {
AFL_VERIFY(HashesCount < HashesConstructorP);
for (ui32 b = 1; b <= HashesCount; ++b) {
const ui64 hash = (HashesConstructorA * originalHash + b) % HashesConstructorP;
actor(hash);
}
}

template <class TContainer, class TActor>
void BuildHashesSet(const TContainer& originalHashes, const TActor& actor) const {
AFL_VERIFY(HashesCount < HashesConstructorP);
for (auto&& hOriginal : originalHashes) {
BuildHashesSet(hOriginal, actor);
}
}

protected:
virtual TConclusionStatus DoCheckModificationCompatibility(const IIndexMeta& /*newMeta*/) const override {
return TConclusionStatus::Fail("not supported");
Expand Down

0 comments on commit 06cd4cd

Please sign in to comment.