Skip to content

Commit

Permalink
correction
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed Oct 8, 2024
1 parent d892737 commit 759cdb7
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 29 deletions.
10 changes: 10 additions & 0 deletions ydb/core/formats/arrow/reader/position.h
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,16 @@ class TIntervalPositions {
return Positions.begin();
}

TString DebugString() const {
TStringBuilder sb;
sb << "[";
for (auto&& p : Positions) {
sb << p.DebugJson().GetStringRobust() << ";";
}
sb << "]";
return sb;
}

std::vector<TIntervalPosition>::const_iterator end() const {
return Positions.end();
}
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/kqp/ut/olap/indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,8 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
Cerr << "WAIT_COMPACTION: " << csController->GetCompactionStartedCounter().Val() << Endl;
Sleep(TDuration::Seconds(1));
}
AFL_VERIFY(csController->GetCompactionStartedCounter().Val());
// important checker for control compactions (<=21) and control indexes constructed (>=21)
AFL_VERIFY(csController->GetCompactionStartedCounter().Val() == 21)("count", csController->GetCompactionStartedCounter().Val());

{
auto it = tableClient
Expand Down Expand Up @@ -460,7 +461,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
CompareYson(result, R"([[1u;]])");
}

AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() < csController->GetIndexesSkippingOnSelect().Val())
AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() * 5 < csController->GetIndexesSkippingOnSelect().Val())
("approved", csController->GetIndexesApprovedOnSelect().Val())("skipped", csController->GetIndexesSkippingOnSelect().Val());
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ std::shared_ptr<NKikimr::NOlap::TGranuleMeta> TGranulesStorage::GetGranuleForCom
std::optional<NStorageOptimizer::TOptimizationPriority> priorityChecker;
const TDuration actualizationLag = NYDBTest::TControllers::GetColumnShardController()->GetCompactionActualizationLag();
for (auto&& i : Tables) {
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("path_id", i.first);
// NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("path_id", i.first);
i.second->ActualizeOptimizer(now, actualizationLag);
auto gPriority = i.second->GetCompactionPriority();
if (gPriority.IsZero() || (priorityChecker && gPriority < *priorityChecker)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -963,11 +963,18 @@ class TPortionsBucket: public TMoveOnly {

class TPortionBuckets {
private:

struct TReverseComparator {
bool operator()(const i64 l, const i64 r) const {
return r < l;
}
};

const std::shared_ptr<arrow::Schema> PrimaryKeysSchema;
const std::shared_ptr<IStoragesManager> StoragesManager;
std::shared_ptr<TPortionsBucket> LeftBucket;
std::map<NArrow::TReplaceKey, std::shared_ptr<TPortionsBucket>> Buckets;
std::map<i64, THashSet<TPortionsBucket*>> BucketsByWeight;
std::map<i64, THashSet<TPortionsBucket*>, TReverseComparator> BucketsByWeight;
std::shared_ptr<TCounters> Counters;
std::vector<std::shared_ptr<TPortionsBucket>> GetAffectedBuckets(const NArrow::TReplaceKey& fromInclude, const NArrow::TReplaceKey& toInclude) {
std::vector<std::shared_ptr<TPortionsBucket>> result;
Expand Down Expand Up @@ -1073,8 +1080,8 @@ class TPortionBuckets {
if (BucketsByWeight.empty()) {
return false;
}
AFL_VERIFY(BucketsByWeight.rbegin()->second.size());
const TPortionsBucket* bucketForOptimization = *BucketsByWeight.rbegin()->second.begin();
AFL_VERIFY(BucketsByWeight.begin()->second.size());
const TPortionsBucket* bucketForOptimization = *BucketsByWeight.begin()->second.begin();
return bucketForOptimization->IsLocked(dataLocksManager);
}

Expand Down Expand Up @@ -1103,7 +1110,7 @@ class TPortionBuckets {

i64 GetWeight() const {
AFL_VERIFY(BucketsByWeight.size());
return BucketsByWeight.rbegin()->first;
return BucketsByWeight.begin()->first;
}

void RemovePortion(const std::shared_ptr<TPortionInfo>& portion) {
Expand All @@ -1117,11 +1124,11 @@ class TPortionBuckets {

std::shared_ptr<TColumnEngineChanges> BuildOptimizationTask(std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& locksManager) const {
AFL_VERIFY(BucketsByWeight.size());
if (!BucketsByWeight.rbegin()->first) {
if (!BucketsByWeight.begin()->first) {
return nullptr;
}
AFL_VERIFY(BucketsByWeight.rbegin()->second.size());
const TPortionsBucket* bucketForOptimization = *BucketsByWeight.rbegin()->second.begin();
AFL_VERIFY(BucketsByWeight.begin()->second.size());
const TPortionsBucket* bucketForOptimization = *BucketsByWeight.begin()->second.begin();
if (bucketForOptimization == LeftBucket.get()) {
if (Buckets.size()) {
return bucketForOptimization->BuildOptimizationTask(granule, locksManager, &Buckets.begin()->first, PrimaryKeysSchema, StoragesManager);
Expand Down Expand Up @@ -1190,10 +1197,6 @@ class TPortionBuckets {
AFL_VERIFY(i.second->GetStartPos());
result.AddPosition(*i.second->GetStartPos(), false);
}
if (Buckets.size() && Buckets.rbegin()->second->GetPortion()->GetRecordsCount() > 1) {
NArrow::NMerger::TSortableBatchPosition pos(Buckets.rbegin()->second->GetPortion()->IndexKeyEnd().ToBatch(PrimaryKeysSchema), 0, PrimaryKeysSchema->field_names(), {}, false);
result.AddPosition(std::move(pos), false);
}
return result;
}
};
Expand Down
20 changes: 12 additions & 8 deletions ydb/core/tx/data_events/columnshard_splitter.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
#include "columnshard_splitter.h"

#include <ydb/core/tx/columnshard/splitter/settings.h>

namespace NKikimr::NEvWrite {

NKikimr::NEvWrite::IShardsSplitter::TYdbConclusionStatus TColumnShardShardsSplitter::DoSplitData(const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, const IEvWriteDataAccessor& data) {
NKikimr::NEvWrite::IShardsSplitter::TYdbConclusionStatus TColumnShardShardsSplitter::DoSplitData(
const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, const IEvWriteDataAccessor& data) {
if (schemeEntry.Kind != NSchemeCache::TSchemeCacheNavigate::KindColumnTable) {
return TYdbConclusionStatus::Fail(Ydb::StatusIds::SCHEME_ERROR, "The specified path is not an column table");
}
Expand Down Expand Up @@ -36,7 +39,8 @@ NKikimr::NEvWrite::IShardsSplitter::TYdbConclusionStatus TColumnShardShardsSplit
std::shared_ptr<arrow::Schema> arrowScheme = ExtractArrowSchema(scheme);
batch = NArrow::DeserializeBatch(data.GetSerializedData(), arrowScheme);
if (!batch) {
return TYdbConclusionStatus::Fail(Ydb::StatusIds::SCHEME_ERROR, TString("cannot deserialize batch with schema ") + arrowScheme->ToString());
return TYdbConclusionStatus::Fail(
Ydb::StatusIds::SCHEME_ERROR, TString("cannot deserialize batch with schema ") + arrowScheme->ToString());
}

auto res = batch->ValidateFull();
Expand All @@ -55,12 +59,11 @@ NKikimr::NEvWrite::IShardsSplitter::TYdbConclusionStatus TColumnShardShardsSplit
return SplitImpl(batch, shardingConclusion.DetachResult());
}

NKikimr::NEvWrite::IShardsSplitter::TYdbConclusionStatus TColumnShardShardsSplitter::SplitImpl(const std::shared_ptr<arrow::RecordBatch>& batch,
const std::shared_ptr<NSharding::IShardingBase>& sharding)
{
NKikimr::NEvWrite::IShardsSplitter::TYdbConclusionStatus TColumnShardShardsSplitter::SplitImpl(
const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<NSharding::IShardingBase>& sharding) {
Y_ABORT_UNLESS(batch);

auto split = sharding->SplitByShards(batch, NColumnShard::TLimits::GetMaxBlobSize() * 0.875);
auto split = sharding->SplitByShards(batch, NOlap::NSplitter::TSplitSettings().GetExpectedPortionSize());
if (split.IsFail()) {
return TYdbConclusionStatus::Fail(Ydb::StatusIds::SCHEME_ERROR, split.GetErrorMessage());
}
Expand All @@ -69,7 +72,8 @@ NKikimr::NEvWrite::IShardsSplitter::TYdbConclusionStatus TColumnShardShardsSplit
const TString schemaString = NArrow::SerializeSchema(*batch->schema());
for (auto&& [shardId, chunks] : split.GetResult()) {
for (auto&& c : chunks) {
result.AddShardInfo(shardId, std::make_shared<TShardInfo>(schemaString, c.GetData(), c.GetRowsCount(), sharding->GetShardInfoVerified(shardId).GetShardingVersion()));
result.AddShardInfo(shardId, std::make_shared<TShardInfo>(schemaString, c.GetData(), c.GetRowsCount(),
sharding->GetShardInfoVerified(shardId).GetShardingVersion()));
}
}

Expand All @@ -88,4 +92,4 @@ std::shared_ptr<arrow::Schema> TColumnShardShardsSplitter::ExtractArrowSchema(co
return NArrow::TStatusValidator::GetValid(NArrow::MakeArrowSchema(columns));
}

}
} // namespace NKikimr::NEvWrite
21 changes: 14 additions & 7 deletions ydb/core/tx/data_events/write_data.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#pragma once
#include "common/modification_type.h"

#include <ydb/core/tx/long_tx_service/public/types.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/library/formats/arrow/modifier/subset.h>
#include <ydb/library/accessor/accessor.h>
#include <ydb/core/formats/arrow/reader/position.h>
#include <ydb/core/tx/long_tx_service/public/types.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/library/actors/core/monotonic.h>
#include <ydb/library/conclusion/result.h>
#include <ydb/library/formats/arrow/modifier/subset.h>

#include <util/generic/guid.h>

namespace NKikimr::NOlap {
Expand All @@ -17,9 +19,13 @@ class IBlobsWritingAction;
namespace NKikimr::NEvWrite {

class IDataContainer {
private:
YDB_ACCESSOR_DEF(NArrow::NMerger::TIntervalPositions, SeparationPoints);

public:
using TPtr = std::shared_ptr<IDataContainer>;
virtual ~IDataContainer() {}
virtual ~IDataContainer() {
}
virtual TConclusion<std::shared_ptr<arrow::RecordBatch>> ExtractBatch() = 0;
virtual ui64 GetSchemaVersion() const = 0;
virtual ui64 GetSize() const = 0;
Expand Down Expand Up @@ -47,6 +53,7 @@ class TWriteMeta {
YDB_ACCESSOR(TMonotonic, WriteMiddle5StartInstant, TMonotonic::Now());
YDB_ACCESSOR(TMonotonic, WriteMiddle6StartInstant, TMonotonic::Now());
std::optional<ui64> LockId;

public:
void SetLockId(const ui64 lockId) {
LockId = lockId;
Expand Down Expand Up @@ -77,8 +84,8 @@ class TWriteMeta {
: WriteId(writeId)
, TableId(tableId)
, Source(source)
, GranuleShardingVersion(granuleShardingVersion)
{}
, GranuleShardingVersion(granuleShardingVersion) {
}
};

class TWriteData {
Expand Down Expand Up @@ -112,4 +119,4 @@ class TWriteData {
}
};

}
} // namespace NKikimr::NEvWrite

0 comments on commit 759cdb7

Please sign in to comment.