Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-gogov authored and zverevgeny committed Jan 5, 2025
1 parent 5dfe000 commit cbc6cee
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 18 deletions.
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ ydb/core/kqp/ut/query KqpQuery.QueryTimeout
ydb/core/kqp/ut/service KqpQueryService.TableSink_OltpReplace+HasSecondaryIndex
ydb/core/kqp/ut/scan KqpRequestContext.TraceIdInErrorMessage
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
ydb/core/kqp/ut/scheme KqpOlapScheme.DropColumnAfterInsert
ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication
ydb/core/kqp/ut/scheme KqpScheme.QueryWithAlter
ydb/core/kqp/ut/scheme [14/50]*
Expand Down
141 changes: 140 additions & 1 deletion ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8060,13 +8060,152 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
testHelper.BulkUpsert(testTable, batch);

auto alterQueryAdd = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` DROP COLUMN int_column;";
Cerr << alterQueryAdd << Endl;
auto alterAddResult = testHelper.GetSession().ExecuteSchemeQuery(alterQueryAdd).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterAddResult.GetStatus(), EStatus::SUCCESS, alterAddResult.GetIssues().ToString());

csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation);
csController->WaitIndexation(TDuration::Seconds(5));
}

void TestInsertAddInsertDrop(
bool autoIndexation, bool indexationAfterInsertAddColumn, bool indexationAfterInsertDropColumn, bool indexationInEnd) {
using namespace NArrow;

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
if (!autoIndexation) {
csController->DisableBackground(NYDBTest::ICSController::EBackground::Indexation);
}

TKikimrSettings runnerSettings;
runnerSettings.WithSampleTables = false;
TTestHelper testHelper(runnerSettings);

TVector<TTestHelper::TColumnSchema> schema = {
TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Uint64).SetNullable(false),
TTestHelper::TColumnSchema().SetName("int_column").SetType(NScheme::NTypeIds::Int32).SetNullable(true)
};

TTestHelper::TColumnTable testTable;
testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({ "id" }).SetSchema(schema);
testHelper.CreateTable(testTable);

TVector<NConstruction::IArrayBuilder::TPtr> dataBuilders;
dataBuilders.push_back(
NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>::BuildNotNullable("id", false));
dataBuilders.push_back(
std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::Int32Type>>>("int_column"));
auto batch = NConstruction::TRecordBatchConstructor(dataBuilders).BuildBatch(100);

for (ui32 i = 0; i < 5; i++) {
testHelper.BulkUpsert(testTable, batch);
auto alterQueryAdd = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` ADD COLUMN column" << i << " Uint64;";
auto alterAddResult = testHelper.GetSession().ExecuteSchemeQuery(alterQueryAdd).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterAddResult.GetStatus(), EStatus::SUCCESS, alterAddResult.GetIssues().ToString());

if (!autoIndexation && indexationAfterInsertAddColumn) {
csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation);
csController->WaitIndexation(TDuration::Seconds(5));
csController->DisableBackground(NYDBTest::ICSController::EBackground::Indexation);
}

testHelper.BulkUpsert(testTable, batch);
auto alterQueryDrop = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` DROP COLUMN column" << i << ";";
auto alterDropResult = testHelper.GetSession().ExecuteSchemeQuery(alterQueryDrop).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterDropResult.GetStatus(), EStatus::SUCCESS, alterDropResult.GetIssues().ToString());

if (!autoIndexation && indexationAfterInsertDropColumn) {
csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation);
csController->WaitIndexation(TDuration::Seconds(5));
csController->DisableBackground(NYDBTest::ICSController::EBackground::Indexation);
}
}

if (!autoIndexation && indexationInEnd) {
csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation);
csController->WaitIndexation(TDuration::Seconds(5));
}
}

Y_UNIT_TEST(InsertAddInsertDrop) {
TestInsertAddInsertDrop(true, false, false, false);
for (i32 i = 0; i < 8; i++) {
TestInsertAddInsertDrop(false, i & 1, i & 2, i & 3);
}
}

Y_UNIT_TEST(DropTableAfterInsert) {
using namespace NArrow;

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
csController->DisableBackground(NYDBTest::ICSController::EBackground::Indexation);

TKikimrSettings runnerSettings;
runnerSettings.WithSampleTables = false;
TTestHelper testHelper(runnerSettings);

TVector<TTestHelper::TColumnSchema> schema = {
TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Uint64).SetNullable(false),
TTestHelper::TColumnSchema().SetName("int_column").SetType(NScheme::NTypeIds::Int32).SetNullable(true)
};

TTestHelper::TColumnTable testTable;
testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({ "id" }).SetSchema(schema);
testHelper.CreateTable(testTable);

TVector<NConstruction::IArrayBuilder::TPtr> dataBuilders;
dataBuilders.push_back(
NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>::BuildNotNullable("id", false));
dataBuilders.push_back(
std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::Int32Type>>>("int_column"));
auto batch = NConstruction::TRecordBatchConstructor(dataBuilders).BuildBatch(100);

testHelper.BulkUpsert(testTable, batch);

auto alterQueryDrop = TStringBuilder() << "DROP TABLE `" << testTable.GetName() << "`;";
auto alterDropResult = testHelper.GetSession().ExecuteSchemeQuery(alterQueryDrop).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterDropResult.GetStatus(), EStatus::SUCCESS, alterDropResult.GetIssues().ToString());

csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation);
csController->WaitIndexation(TDuration::Seconds(5));
}

Y_UNIT_TEST(InsertDropAddColumn) {
using namespace NArrow;

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
csController->DisableBackground(NYDBTest::ICSController::EBackground::Indexation);

TKikimrSettings runnerSettings;
runnerSettings.WithSampleTables = false;
TTestHelper testHelper(runnerSettings);

TVector<TTestHelper::TColumnSchema> schema = {
TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Uint64).SetNullable(false),
TTestHelper::TColumnSchema().SetName("int_column").SetType(NScheme::NTypeIds::Int32).SetNullable(true)
};

TTestHelper::TColumnTable testTable;
testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({ "id" }).SetSchema(schema);
testHelper.CreateTable(testTable);

TVector<NConstruction::IArrayBuilder::TPtr> dataBuilders;
dataBuilders.push_back(
NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>::BuildNotNullable("id", false));
dataBuilders.push_back(
std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::Int32Type>>>("int_column"));
auto batch = NConstruction::TRecordBatchConstructor(dataBuilders).BuildBatch(100);

testHelper.BulkUpsert(testTable, batch);

auto alterQueryDrop = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` DROP COLUMN int_column;";
auto alterDropResult = testHelper.GetSession().ExecuteSchemeQuery(alterQueryDrop).GetValueSync();

auto alterQueryAdd = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` ADD COLUMN int_column Int32;";
auto alterAddResult = testHelper.GetSession().ExecuteSchemeQuery(alterQueryAdd).GetValueSync();

csController->EnableBackground(NYDBTest::ICSController::EBackground::Indexation);
csController->WaitIndexation(TDuration::Seconds(5));
}
}

Y_UNIT_TEST_SUITE(KqpOlapTypes) {
Expand Down
12 changes: 10 additions & 2 deletions ydb/core/tx/columnshard/engines/changes/indexation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,19 @@ class TPathFieldsInfo {
return;
}
auto blobSchema = context.SchemaVersions.GetSchemaVerified(data.GetSchemaVersion());
std::set<ui32> columnIdsToDelete = blobSchema->GetColumnIdsToDelete(ResultSchema);
if (!Schemas.contains(data.GetSchemaVersion())) {
Schemas.emplace(data.GetSchemaVersion(), blobSchema);
}
std::vector<ui32> filteredIds = data.GetMeta().GetSchemaSubset().Apply(blobSchema->GetIndexInfo().GetColumnIds(false));
if (data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete) {
filteredIds.emplace_back((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG);
}
UsageColumnIds.insert(filteredIds.begin(), filteredIds.end());
for (const auto& filteredId : filteredIds) {
if (!columnIdsToDelete.contains(filteredId)) {
UsageColumnIds.insert(filteredId);
}
}
}
};

Expand Down Expand Up @@ -242,7 +247,10 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
auto batchSchema =
std::make_shared<arrow::Schema>(inserted.GetMeta().GetSchemaSubset().Apply(blobSchema->GetIndexInfo().ArrowSchema()->fields()));
batch = std::make_shared<NArrow::TGeneralContainer>(NArrow::DeserializeBatch(blobData, batchSchema));
blobSchema->AdaptBatchToSchema(*batch, resultSchema);
std::set<ui32> columnIdsToDelete = blobSchema->GetColumnIdsToDelete(resultSchema);
if (!columnIdsToDelete.empty()) {
batch->DeleteFieldsByIndex(blobSchema->ConvertColumnIdsToIndexes(columnIdsToDelete));
}
}
IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot(), (ui64)inserted.GetInsertWriteId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,10 @@ void TCommittedDataSource::DoAssembleColumns(const std::shared_ptr<TColumnsSet>&
auto rBatch = NArrow::DeserializeBatch(bData, std::make_shared<arrow::Schema>(CommittedBlob.GetSchemaSubset().Apply(schema->fields())));
AFL_VERIFY(rBatch)("schema", schema->ToString());
auto batch = std::make_shared<NArrow::TGeneralContainer>(rBatch);
batchSchema->AdaptBatchToSchema(*batch, resultSchema);
std::set<ui32> columnIdsToDelete = batchSchema->GetColumnIdsToDelete(resultSchema);
if (!columnIdsToDelete.empty()) {
batch->DeleteFieldsByIndex(batchSchema->ConvertColumnIdsToIndexes(columnIdsToDelete));
}
TSnapshot ss = TSnapshot::Zero();
if (CommittedBlob.IsCommitted()) {
ss = CommittedBlob.GetCommittedSnapshotVerified();
Expand Down
31 changes: 19 additions & 12 deletions ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,27 @@ TConclusion<std::shared_ptr<arrow::RecordBatch>> ISnapshotSchema::PrepareForModi
return batch;
}

void ISnapshotSchema::AdaptBatchToSchema(NArrow::TGeneralContainer& batch, const ISnapshotSchema::TPtr& targetSchema) const {
if (targetSchema->GetVersion() != GetVersion()) {
std::vector<ui32> columnIdxToDelete;
for (size_t columnIdx = 0; columnIdx < batch.GetSchema()->GetFields().size(); ++columnIdx) {
const std::optional<ui32> targetColumnId = targetSchema->GetColumnIdOptional(batch.GetSchema()->field(columnIdx)->name());
const ui32 batchColumnId = GetColumnIdVerified(GetFieldByIndex(columnIdx)->name());
if (!targetColumnId || *targetColumnId != batchColumnId) {
columnIdxToDelete.emplace_back(columnIdx);
}
}
if (!columnIdxToDelete.empty()) {
batch.DeleteFieldsByIndex(columnIdxToDelete);
std::set<ui32> ISnapshotSchema::GetColumnIdsToDelete(const ISnapshotSchema::TPtr& targetSchema) const {
if (targetSchema->GetVersion() == GetVersion()) {
return {};
}
std::set<ui32> columnIdxsToDelete;
for (const auto& columnIdx : GetColumnIds()) {
const std::optional<ui32> targetColumnId = targetSchema->GetColumnIdOptional(GetFieldByColumnIdOptional(columnIdx)->name());
if (!targetColumnId || *targetColumnId != columnIdx) {
columnIdxsToDelete.emplace(columnIdx);
}
}
return columnIdxsToDelete;
}

std::vector<ui32> ISnapshotSchema::ConvertColumnIdsToIndexes(const std::set<ui32>& idxs) const {
std::vector<ui32> columnIndexes;
for (const auto& id : idxs) {
AFL_VERIFY(HasColumnId(id));
columnIndexes.emplace_back(GetFieldIndex(id));
}
return columnIndexes;
}

ui32 ISnapshotSchema::GetColumnId(const std::string& columnName) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class ISnapshotSchema {
const ISnapshotSchema& dataSchema, const std::shared_ptr<NArrow::TGeneralContainer>& batch, const std::set<ui32>& restoreColumnIds) const;
[[nodiscard]] TConclusion<std::shared_ptr<arrow::RecordBatch>> PrepareForModification(
const std::shared_ptr<arrow::RecordBatch>& incomingBatch, const NEvWrite::EModificationType mType) const;
void AdaptBatchToSchema(NArrow::TGeneralContainer& batch, const ISnapshotSchema::TPtr& targetSchema) const;
std::set<ui32> GetColumnIdsToDelete(const ISnapshotSchema::TPtr& targetSchema) const;
std::vector<ui32> ConvertColumnIdsToIndexes(const std::set<ui32>& idxs) const;
};

} // namespace NKikimr::NOlap

0 comments on commit cbc6cee

Please sign in to comment.