Skip to content

Commit

Permalink
dont use insert table totally (#10088)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Oct 9, 2024
1 parent 47748f5 commit 7258b98
Show file tree
Hide file tree
Showing 88 changed files with 1,749 additions and 662 deletions.
4 changes: 4 additions & 0 deletions ydb/core/formats/arrow/reader/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,16 @@ void TMergePartialStream::DrainCurrentPosition(TRecordBatchBuilder* builder, std
Y_ABORT_UNLESS(SortHeap.Size());
Y_ABORT_UNLESS(!SortHeap.Current().IsControlPoint());
if (!SortHeap.Current().IsDeleted()) {
// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_add", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
if (builder) {
builder->AddRecord(SortHeap.Current().GetKeyColumns());
}
if (resultScanData && resultPosition) {
*resultScanData = SortHeap.Current().GetKeyColumns().GetSorting();
*resultPosition = SortHeap.Current().GetKeyColumns().GetPosition();
}
} else {
// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_skip", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
}
CheckSequenceInDebug(SortHeap.Current().GetKeyColumns());
const ui64 startPosition = SortHeap.Current().GetKeyColumns().GetPosition();
Expand All @@ -169,6 +172,7 @@ void TMergePartialStream::DrainCurrentPosition(TRecordBatchBuilder* builder, std
bool isFirst = true;
while (SortHeap.Size() && (isFirst || SortHeap.Current().GetKeyColumns().Compare(*startSorting, startPosition) == std::partial_ordering::equivalent)) {
if (!isFirst) {
// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_skip1", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
auto& anotherIterator = SortHeap.Current();
if (PossibleSameVersionFlag) {
AFL_VERIFY(anotherIterator.GetVersionColumns().Compare(*startVersion, startPosition) != std::partial_ordering::greater)
Expand Down
41 changes: 41 additions & 0 deletions ydb/core/formats/arrow/reader/position.h
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,8 @@ class TIntervalPositions {
private:
std::vector<TIntervalPosition> Positions;
public:
using const_iterator = std::vector<TIntervalPosition>::const_iterator;

bool IsEmpty() const {
return Positions.empty();
}
Expand All @@ -459,6 +461,16 @@ class TIntervalPositions {
return Positions.begin();
}

TString DebugString() const {
TStringBuilder sb;
sb << "[";
for (auto&& p : Positions) {
sb << p.DebugJson().GetStringRobust() << ";";
}
sb << "]";
return sb;
}

std::vector<TIntervalPosition>::const_iterator end() const {
return Positions.end();
}
Expand Down Expand Up @@ -662,6 +674,35 @@ class TRWSortableBatchPosition: public TSortableBatchPosition, public TMoveOnly
return SplitByBorders(batch, columnNames, it);
}

class TIntervalPointsIterator {
private:
typename TIntervalPositions::const_iterator Current;
typename TIntervalPositions::const_iterator End;

public:
TIntervalPointsIterator(const TIntervalPositions& container)
: Current(container.begin())
, End(container.end()) {
}

bool IsValid() const {
return Current != End;
}

void Next() {
++Current;
}

const auto& CurrentPosition() const {
return Current->GetPosition();
}
};

static std::vector<std::shared_ptr<arrow::RecordBatch>> SplitByBordersInIntervalPositions(
const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columnNames, const TIntervalPositions& container) {
TIntervalPointsIterator it(container);
return SplitByBorders(batch, columnNames, it);
}
};

}
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/serializer/native.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ TString TNativeSerializer::DoSerializePayload(const std::shared_ptr<arrow::Recor
// Write prepared payload into the resultant string. No extra allocation will be made.
TStatusValidator::Validate(arrow::ipc::WriteIpcPayload(payload, Options, &out, &metadata_length));
Y_ABORT_UNLESS(out.GetPosition() == str.size());
Y_DEBUG_ABORT_UNLESS(Deserialize(str, batch->schema()).ok());
AFL_VERIFY_DEBUG(Deserialize(str, batch->schema()).ok());
AFL_DEBUG(NKikimrServices::ARROW_HELPER)("event", "serialize")("size", str.size())("columns", batch->schema()->num_fields());
return str;
}
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/formats/arrow/serializer/native.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ class TNativeSerializer: public ISerializer {
virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapColumn::TSerializer& proto) const override;

public:
static std::shared_ptr<ISerializer> GetUncompressed() {
static std::shared_ptr<ISerializer> result =
std::make_shared<NArrow::NSerialization::TNativeSerializer>(arrow::Compression::UNCOMPRESSED);
return result;
}

static std::shared_ptr<ISerializer> GetFast() {
static std::shared_ptr<ISerializer> result =
std::make_shared<NArrow::NSerialization::TNativeSerializer>(arrow::Compression::LZ4_FRAME);
return result;
}

virtual TString GetClassName() const override {
return GetClassNameStatic();
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/ut/common/kqp_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
exchangerSettings->SetMaxDelayMs(10);
AppConfig.MutableColumnShardConfig()->SetDisabledOnSchemeShard(false);
FeatureFlags.SetEnableSparsedColumns(true);
FeatureFlags.SetEnableImmediateWritingOnBulkUpsert(true);
FeatureFlags.SetEnableWritePortionsOnInsert(true);
FeatureFlags.SetEnableParameterizedDecimal(true);
FeatureFlags.SetEnableTopicAutopartitioningForCDC(true);
}
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/ut/olap/aggregations_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ Y_UNIT_TEST_SUITE(KqpOlapAggregations) {
WriteTestData(kikimr, "/Root/olapStore/olapTable", 20000, 2000000, 7000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 30000, 1000000, 11000);
}
while (csController->GetInsertFinishedCounter().Val() == 0) {
while (csController->GetCompactionFinishedCounter().Val() == 0) {
Cout << "Wait indexation..." << Endl;
Sleep(TDuration::Seconds(2));
}
Expand Down Expand Up @@ -374,7 +374,7 @@ Y_UNIT_TEST_SUITE(KqpOlapAggregations) {
.AddExpectedPlanOptions("KqpOlapFilter")
#if SSA_RUNTIME_VERSION >= 2U
.AddExpectedPlanOptions("TKqpOlapAgg")
.MutableLimitChecker().SetExpectedResultCount(1)
.MutableLimitChecker().SetExpectedResultCount(2)
#else
.AddExpectedPlanOptions("Condense")
#endif
Expand Down Expand Up @@ -417,7 +417,7 @@ Y_UNIT_TEST_SUITE(KqpOlapAggregations) {
.AddExpectedPlanOptions("KqpOlapFilter")
#if SSA_RUNTIME_VERSION >= 2U
.AddExpectedPlanOptions("TKqpOlapAgg")
.MutableLimitChecker().SetExpectedResultCount(1)
.MutableLimitChecker().SetExpectedResultCount(2)
#else
.AddExpectedPlanOptions("CombineCore")
.AddExpectedPlanOptions("KqpOlapFilter")
Expand Down
Loading

0 comments on commit 7258b98

Please sign in to comment.