Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test "DisabledAlterCompression" moved in KqpOlapCompression #9114

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion ydb/core/kqp/ut/common/columnshard.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "columnshard.h"
#include <ydb/core/testlib/cs_helper.h>

#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/formats/arrow/serializer/parsing.h>
#include <ydb/core/testlib/cs_helper.h>

extern "C" {
#include <ydb/library/yql/parser/pg_wrapper/postgresql/src/include/catalog/pg_type_d.h>
Expand Down Expand Up @@ -143,6 +145,13 @@ namespace NKqp {
}
}

void TTestHelper::SetCompression(
const TColumnTableBase& columnTable, const TString& columnName, const TCompression& compression, const NYdb::EStatus expectedStatus) {
auto alterQuery = columnTable.BuildAlterCompressionQuery(columnName, compression);
auto result = GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), expectedStatus, result.GetIssues().ToString());
}

TString TTestHelper::TColumnSchema::BuildQuery() const {
TStringBuilder str;
str << Name << ' ';
Expand Down Expand Up @@ -185,6 +194,16 @@ namespace NKqp {
return str;
}

TString TTestHelper::TColumnTableBase::BuildAlterCompressionQuery(const TString& columnName, const TCompression& compression) const {
auto str = TStringBuilder() << "ALTER OBJECT `" << Name << "` (TYPE " << GetObjectType() << ") SET";
str << " (ACTION=ALTER_COLUMN, NAME=" << columnName << ", `SERIALIZER.CLASS_NAME`=`" << compression.GetSerializerName() << "`,";
str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(compression.GetType()) << "`";
if (compression.GetCompressionLevel() != Max<i32>()) {
str << "`COMPRESSION.LEVEL`=" << compression.GetCompressionLevel();
}
str << ");";
return str;
}

std::shared_ptr<arrow::Schema> TTestHelper::TColumnTableBase::GetArrowSchema(const TVector<TColumnSchema>& columns) {
std::vector<std::shared_ptr<arrow::Field>> result;
Expand Down
146 changes: 80 additions & 66 deletions ydb/core/kqp/ut/common/columnshard.h
Original file line number Diff line number Diff line change
@@ -1,88 +1,102 @@
#pragma once

#include "kqp_ut_common.h"

#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/library/formats/arrow/simple_builder/filler.h>
#include <ydb/library/formats/arrow/simple_builder/array.h>
#include <ydb/library/formats/arrow/simple_builder/batch.h>
#include <ydb/library/formats/arrow/simple_builder/filler.h>
#include <ydb/public/lib/scheme_types/scheme_type_id.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>

namespace NKikimr {
namespace NKqp {
class TTestHelper {
class TTestHelper {
public:
class TCompression {
YDB_ACCESSOR(TString, SerializerName, "ARROW_SERIALIZER");
YDB_ACCESSOR(arrow::Compression::type, Type, arrow::Compression::type::UNCOMPRESSED);
YDB_ACCESSOR(i32, CompressionLevel, Max<i32>());
};

class TColumnSchema {
YDB_ACCESSOR_DEF(TString, Name);
YDB_ACCESSOR_DEF(NScheme::TTypeInfo, TypeInfo);
YDB_FLAG_ACCESSOR(Nullable, true);

public:
class TColumnSchema {
YDB_ACCESSOR_DEF(TString, Name);
YDB_ACCESSOR_DEF(NScheme::TTypeInfo, TypeInfo);
YDB_FLAG_ACCESSOR(Nullable, true);
public:
TString BuildQuery() const;

TColumnSchema& SetType(NScheme::TTypeId typeId);
};

using TUpdatesBuilder = NColumnShard::TTableUpdatesBuilder;

class TColumnTableBase {
YDB_ACCESSOR_DEF(TString, Name);
YDB_ACCESSOR_DEF(TVector<TColumnSchema>, Schema);
YDB_ACCESSOR_DEF(TVector<TString>, PrimaryKey);
YDB_ACCESSOR_DEF(TVector<TString>, Sharding);
YDB_ACCESSOR(ui32, MinPartitionsCount, 1);

std::optional<std::pair<TString, TString>> TTLConf;
public:
TString BuildQuery() const;
std::shared_ptr<arrow::Schema> GetArrowSchema(const TVector<TColumnSchema>& columns);

TColumnTableBase& SetTTL(const TString& columnName, const TString& ttlConf) {
TTLConf = std::make_pair(columnName, ttlConf);
return *this;
}

private:
virtual TString GetObjectType() const = 0;
TString BuildColumnsStr(const TVector<TColumnSchema>& clumns) const;
std::shared_ptr<arrow::Field> BuildField(const TString name, const NScheme::TTypeInfo& typeInfo, bool nullable) const;
};

class TColumnTable : public TColumnTableBase {
private:
TString GetObjectType() const override;
};

class TColumnTableStore : public TColumnTableBase {
private:
TString GetObjectType() const override;
};
TString BuildQuery() const;

private:
std::unique_ptr<TKikimrRunner> Kikimr;
std::unique_ptr<NYdb::NTable::TTableClient> TableClient;
std::unique_ptr<NYdb::NTable::TSession> Session;
TColumnSchema& SetType(NScheme::TTypeId typeId);
};

using TUpdatesBuilder = NColumnShard::TTableUpdatesBuilder;

class TColumnTableBase {
YDB_ACCESSOR_DEF(TString, Name);
YDB_ACCESSOR_DEF(TVector<TColumnSchema>, Schema);
YDB_ACCESSOR_DEF(TVector<TString>, PrimaryKey);
YDB_ACCESSOR_DEF(TVector<TString>, Sharding);
YDB_ACCESSOR(ui32, MinPartitionsCount, 1);

std::optional<std::pair<TString, TString>> TTLConf;

public:
TTestHelper(const TKikimrSettings& settings);
TKikimrRunner& GetKikimr();
TTestActorRuntime& GetRuntime();
NYdb::NTable::TSession& GetSession();
void CreateTable(const TColumnTableBase& table, const NYdb::EStatus expectedStatus = NYdb::EStatus::SUCCESS);
void DropTable(const TString& tableName);
void CreateTier(const TString& tierName);
TString CreateTieringRule(const TString& tierName, const TString& columnName);
void SetTiering(const TString& tableName, const TString& ruleName);
void ResetTiering(const TString& tableName);
void BulkUpsert(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS);
void BulkUpsert(const TColumnTable& table, std::shared_ptr<arrow::RecordBatch> batch, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS);
void ReadData(const TString& query, const TString& expected, const NYdb::EStatus opStatus = NYdb::EStatus::SUCCESS);
void RebootTablets(const TString& tableName);
void WaitTabletDeletionInHive(ui64 tabletId, TDuration duration);
TString BuildQuery() const;
TString BuildAlterCompressionQuery(const TString& columnName, const TCompression& compression) const;
std::shared_ptr<arrow::Schema> GetArrowSchema(const TVector<TColumnSchema>& columns);

TColumnTableBase& SetTTL(const TString& columnName, const TString& ttlConf) {
TTLConf = std::make_pair(columnName, ttlConf);
return *this;
}

private:
virtual TString GetObjectType() const = 0;
TString BuildColumnsStr(const TVector<TColumnSchema>& clumns) const;
std::shared_ptr<arrow::Field> BuildField(const TString name, const NScheme::TTypeInfo& typeInfo, bool nullable) const;
};

class TColumnTable: public TColumnTableBase {
private:
TString GetObjectType() const override;
};

class TColumnTableStore: public TColumnTableBase {
private:
TString GetObjectType() const override;
};

private:
std::unique_ptr<TKikimrRunner> Kikimr;
std::unique_ptr<NYdb::NTable::TTableClient> TableClient;
std::unique_ptr<NYdb::NTable::TSession> Session;

public:
TTestHelper(const TKikimrSettings& settings);
TKikimrRunner& GetKikimr();
TTestActorRuntime& GetRuntime();
NYdb::NTable::TSession& GetSession();
void CreateTable(const TColumnTableBase& table, const NYdb::EStatus expectedStatus = NYdb::EStatus::SUCCESS);
void DropTable(const TString& tableName);
void CreateTier(const TString& tierName);
TString CreateTieringRule(const TString& tierName, const TString& columnName);
void SetTiering(const TString& tableName, const TString& ruleName);
void ResetTiering(const TString& tableName);
void BulkUpsert(
const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS);
void BulkUpsert(const TColumnTable& table, std::shared_ptr<arrow::RecordBatch> batch,
const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS);
void ReadData(const TString& query, const TString& expected, const NYdb::EStatus opStatus = NYdb::EStatus::SUCCESS);
void RebootTablets(const TString& tableName);
void WaitTabletDeletionInHive(ui64 tabletId, TDuration duration);
void SetCompression(const TColumnTableBase& columnTable, const TString& columnName, const TCompression& compression,
const NYdb::EStatus expectedStatus = NYdb::EStatus::SUCCESS);
};
}
}
30 changes: 30 additions & 0 deletions ydb/core/kqp/ut/olap/compression_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#include <ydb/core/kqp/ut/common/columnshard.h>

namespace NKikimr::NKqp {

Y_UNIT_TEST_SUITE(KqpOlapCompression) {
Y_UNIT_TEST(DisabledAlterCompression) {
TKikimrSettings settings = TKikimrSettings().SetWithSampleTables(false).SetEnableOlapCompression(false);
TTestHelper testHelper(settings);
TVector<TTestHelper::TColumnSchema> schema = {
TTestHelper::TColumnSchema().SetName("pk_int").SetType(NScheme::NTypeIds::Uint64).SetNullable(false)
};
TTestHelper::TCompression compression = TTestHelper::TCompression().SetType(arrow::Compression::type::ZSTD);

TTestHelper::TColumnTable standaloneTable;
standaloneTable.SetName("/Root/StandaloneTable").SetPrimaryKey({ "pk_int" }).SetSharding({ "pk_int" }).SetSchema(schema);
testHelper.CreateTable(standaloneTable);
testHelper.SetCompression(standaloneTable, "pk_int", compression, NYdb::EStatus::SCHEME_ERROR);

TTestHelper::TColumnTableStore testTableStore;
testTableStore.SetName("/Root/TableStoreTest").SetPrimaryKey({ "pk_int" }).SetSchema(schema);
testHelper.CreateTable(testTableStore);
testHelper.SetCompression(testTableStore, "pk_int", compression, NYdb::EStatus::PRECONDITION_FAILED);

TTestHelper::TColumnTable testTable;
testTable.SetName("/Root/TableStoreTest/ColumnTableTest").SetPrimaryKey({ "pk_int" }).SetSharding({ "pk_int" }).SetSchema(schema);
testHelper.CreateTable(testTable);
testHelper.SetCompression(testTable, "pk_int", compression, NYdb::EStatus::SCHEME_ERROR);
}
}
}
11 changes: 0 additions & 11 deletions ydb/core/kqp/ut/olap/sys_view_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,17 +219,6 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
}
}

Y_UNIT_TEST(DisabledAlterCompression) {
TKikimrSettings settings = TKikimrSettings().SetWithSampleTables(false).SetEnableOlapCompression(false);
TKikimrRunner kikimr(settings);
TTypedLocalHelper helper("", kikimr, "olapTable", "olapStore");
helper.CreateTestOlapTable();
helper.FillPKOnly(0, 1);
helper.ExecuteSchemeQuery(
"ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=pk_int, "
"`SERIALIZER.CLASS_NAME`=`ARROW_SERIALIZER`, `COMPRESSION.TYPE`=`zstd`);", NYdb::EStatus::PRECONDITION_FAILED);
}

Y_UNIT_TEST(StatsSysViewBytesColumnActualization) {
ui64 rawBytes1;
ui64 bytes1;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/olap/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ SRCS(
sparsed_ut.cpp
tiering_ut.cpp
decimal_ut.cpp
compression_ut.cpp
)

PEERDIR(
Expand Down
Loading