Skip to content

Commit

Permalink
Merge e677ae3 into a179d17
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-gogov authored Sep 16, 2024
2 parents a179d17 + e677ae3 commit 5f29865
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 78 deletions.
21 changes: 20 additions & 1 deletion ydb/core/kqp/ut/common/columnshard.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "columnshard.h"
#include <ydb/core/testlib/cs_helper.h>

#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/formats/arrow/serializer/parsing.h>
#include <ydb/core/testlib/cs_helper.h>

extern "C" {
#include <ydb/library/yql/parser/pg_wrapper/postgresql/src/include/catalog/pg_type_d.h>
Expand Down Expand Up @@ -143,6 +145,13 @@ namespace NKqp {
}
}

void TTestHelper::SetCompression(
const TColumnTableBase& columnTable, const TString& columnName, const TCompression& compression, const NYdb::EStatus expectedStatus) {
auto alterQuery = columnTable.BuildAlterCompressionQuery(columnName, compression);
auto result = GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), expectedStatus, result.GetIssues().ToString());
}

TString TTestHelper::TColumnSchema::BuildQuery() const {
TStringBuilder str;
str << Name << ' ';
Expand Down Expand Up @@ -185,6 +194,16 @@ namespace NKqp {
return str;
}

TString TTestHelper::TColumnTableBase::BuildAlterCompressionQuery(const TString& columnName, const TCompression& compression) const {
auto str = TStringBuilder() << "ALTER OBJECT `" << Name << "` (TYPE " << GetObjectType() << ") SET";
str << " (ACTION=ALTER_COLUMN, NAME=" << columnName << ", `SERIALIZER.CLASS_NAME`=`" << compression.GetSerializerName() << "`,";
str << " `COMPRESSION.TYPE`=`" << NArrow::CompressionToString(compression.GetType()) << "`";
if (compression.GetCompressionLevel() != Max<i32>()) {
str << "`COMPRESSION.LEVEL`=" << compression.GetCompressionLevel();
}
str << ");";
return str;
}

std::shared_ptr<arrow::Schema> TTestHelper::TColumnTableBase::GetArrowSchema(const TVector<TColumnSchema>& columns) {
std::vector<std::shared_ptr<arrow::Field>> result;
Expand Down
146 changes: 80 additions & 66 deletions ydb/core/kqp/ut/common/columnshard.h
Original file line number Diff line number Diff line change
@@ -1,88 +1,102 @@
#pragma once

#include "kqp_ut_common.h"

#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/library/formats/arrow/simple_builder/filler.h>
#include <ydb/library/formats/arrow/simple_builder/array.h>
#include <ydb/library/formats/arrow/simple_builder/batch.h>
#include <ydb/library/formats/arrow/simple_builder/filler.h>
#include <ydb/public/lib/scheme_types/scheme_type_id.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>

namespace NKikimr {
namespace NKqp {
class TTestHelper {
class TTestHelper {
public:
class TCompression {
YDB_ACCESSOR(TString, SerializerName, "ARROW_SERIALIZER");
YDB_ACCESSOR(arrow::Compression::type, Type, arrow::Compression::type::UNCOMPRESSED);
YDB_ACCESSOR(i32, CompressionLevel, Max<i32>());
};

class TColumnSchema {
YDB_ACCESSOR_DEF(TString, Name);
YDB_ACCESSOR_DEF(NScheme::TTypeInfo, TypeInfo);
YDB_FLAG_ACCESSOR(Nullable, true);

public:
class TColumnSchema {
YDB_ACCESSOR_DEF(TString, Name);
YDB_ACCESSOR_DEF(NScheme::TTypeInfo, TypeInfo);
YDB_FLAG_ACCESSOR(Nullable, true);
public:
TString BuildQuery() const;

TColumnSchema& SetType(NScheme::TTypeId typeId);
};

using TUpdatesBuilder = NColumnShard::TTableUpdatesBuilder;

class TColumnTableBase {
YDB_ACCESSOR_DEF(TString, Name);
YDB_ACCESSOR_DEF(TVector<TColumnSchema>, Schema);
YDB_ACCESSOR_DEF(TVector<TString>, PrimaryKey);
YDB_ACCESSOR_DEF(TVector<TString>, Sharding);
YDB_ACCESSOR(ui32, MinPartitionsCount, 1);

std::optional<std::pair<TString, TString>> TTLConf;
public:
TString BuildQuery() const;
std::shared_ptr<arrow::Schema> GetArrowSchema(const TVector<TColumnSchema>& columns);

TColumnTableBase& SetTTL(const TString& columnName, const TString& ttlConf) {
TTLConf = std::make_pair(columnName, ttlConf);
return *this;
}

private:
virtual TString GetObjectType() const = 0;
TString BuildColumnsStr(const TVector<TColumnSchema>& clumns) const;
std::shared_ptr<arrow::Field> BuildField(const TString name, const NScheme::TTypeInfo& typeInfo, bool nullable) const;
};

class TColumnTable : public TColumnTableBase {
private:
TString GetObjectType() const override;
};

class TColumnTableStore : public TColumnTableBase {
private:
TString GetObjectType() const override;
};
TString BuildQuery() const;

private:
std::unique_ptr<TKikimrRunner> Kikimr;
std::unique_ptr<NYdb::NTable::TTableClient> TableClient;
std::unique_ptr<NYdb::NTable::TSession> Session;
TColumnSchema& SetType(NScheme::TTypeId typeId);
};

using TUpdatesBuilder = NColumnShard::TTableUpdatesBuilder;

class TColumnTableBase {
YDB_ACCESSOR_DEF(TString, Name);
YDB_ACCESSOR_DEF(TVector<TColumnSchema>, Schema);
YDB_ACCESSOR_DEF(TVector<TString>, PrimaryKey);
YDB_ACCESSOR_DEF(TVector<TString>, Sharding);
YDB_ACCESSOR(ui32, MinPartitionsCount, 1);

std::optional<std::pair<TString, TString>> TTLConf;

public:
TTestHelper(const TKikimrSettings& settings);
TKikimrRunner& GetKikimr();
TTestActorRuntime& GetRuntime();
NYdb::NTable::TSession& GetSession();
void CreateTable(const TColumnTableBase& table, const NYdb::EStatus expectedStatus = NYdb::EStatus::SUCCESS);
void DropTable(const TString& tableName);
void CreateTier(const TString& tierName);
TString CreateTieringRule(const TString& tierName, const TString& columnName);
void SetTiering(const TString& tableName, const TString& ruleName);
void ResetTiering(const TString& tableName);
void BulkUpsert(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS);
void BulkUpsert(const TColumnTable& table, std::shared_ptr<arrow::RecordBatch> batch, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS);
void ReadData(const TString& query, const TString& expected, const NYdb::EStatus opStatus = NYdb::EStatus::SUCCESS);
void RebootTablets(const TString& tableName);
void WaitTabletDeletionInHive(ui64 tabletId, TDuration duration);
TString BuildQuery() const;
TString BuildAlterCompressionQuery(const TString& columnName, const TCompression& compression) const;
std::shared_ptr<arrow::Schema> GetArrowSchema(const TVector<TColumnSchema>& columns);

TColumnTableBase& SetTTL(const TString& columnName, const TString& ttlConf) {
TTLConf = std::make_pair(columnName, ttlConf);
return *this;
}

private:
virtual TString GetObjectType() const = 0;
TString BuildColumnsStr(const TVector<TColumnSchema>& clumns) const;
std::shared_ptr<arrow::Field> BuildField(const TString name, const NScheme::TTypeInfo& typeInfo, bool nullable) const;
};

class TColumnTable: public TColumnTableBase {
private:
TString GetObjectType() const override;
};

class TColumnTableStore: public TColumnTableBase {
private:
TString GetObjectType() const override;
};

private:
std::unique_ptr<TKikimrRunner> Kikimr;
std::unique_ptr<NYdb::NTable::TTableClient> TableClient;
std::unique_ptr<NYdb::NTable::TSession> Session;

public:
TTestHelper(const TKikimrSettings& settings);
TKikimrRunner& GetKikimr();
TTestActorRuntime& GetRuntime();
NYdb::NTable::TSession& GetSession();
void CreateTable(const TColumnTableBase& table, const NYdb::EStatus expectedStatus = NYdb::EStatus::SUCCESS);
void DropTable(const TString& tableName);
void CreateTier(const TString& tierName);
TString CreateTieringRule(const TString& tierName, const TString& columnName);
void SetTiering(const TString& tableName, const TString& ruleName);
void ResetTiering(const TString& tableName);
void BulkUpsert(
const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS);
void BulkUpsert(const TColumnTable& table, std::shared_ptr<arrow::RecordBatch> batch,
const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS);
void ReadData(const TString& query, const TString& expected, const NYdb::EStatus opStatus = NYdb::EStatus::SUCCESS);
void RebootTablets(const TString& tableName);
void WaitTabletDeletionInHive(ui64 tabletId, TDuration duration);
void SetCompression(const TColumnTableBase& columnTable, const TString& columnName, const TCompression& compression,
const NYdb::EStatus expectedStatus = NYdb::EStatus::SUCCESS);
};
}
}
30 changes: 30 additions & 0 deletions ydb/core/kqp/ut/olap/compression_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#include <ydb/core/kqp/ut/common/columnshard.h>

namespace NKikimr::NKqp {

Y_UNIT_TEST_SUITE(KqpOlapCompression) {
Y_UNIT_TEST(DisabledAlterCompression) {
TKikimrSettings settings = TKikimrSettings().SetWithSampleTables(false).SetEnableOlapCompression(false);
TTestHelper testHelper(settings);
TVector<TTestHelper::TColumnSchema> schema = {
TTestHelper::TColumnSchema().SetName("pk_int").SetType(NScheme::NTypeIds::Uint64).SetNullable(false)
};
TTestHelper::TCompression compression = TTestHelper::TCompression().SetType(arrow::Compression::type::ZSTD);

TTestHelper::TColumnTable standaloneTable;
standaloneTable.SetName("/Root/StandaloneTable").SetPrimaryKey({ "pk_int" }).SetSharding({ "pk_int" }).SetSchema(schema);
testHelper.CreateTable(standaloneTable);
testHelper.SetCompression(standaloneTable, "pk_int", compression, NYdb::EStatus::SCHEME_ERROR);

TTestHelper::TColumnTableStore testTableStore;
testTableStore.SetName("/Root/TableStoreTest").SetPrimaryKey({ "pk_int" }).SetSchema(schema);
testHelper.CreateTable(testTableStore);
testHelper.SetCompression(testTableStore, "pk_int", compression, NYdb::EStatus::PRECONDITION_FAILED);

TTestHelper::TColumnTable testTable;
testTable.SetName("/Root/TableStoreTest/ColumnTableTest").SetPrimaryKey({ "pk_int" }).SetSharding({ "pk_int" }).SetSchema(schema);
testHelper.CreateTable(testTable);
testHelper.SetCompression(testTable, "pk_int", compression, NYdb::EStatus::SCHEME_ERROR);
}
}
}
11 changes: 0 additions & 11 deletions ydb/core/kqp/ut/olap/sys_view_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,17 +219,6 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
}
}

Y_UNIT_TEST(DisabledAlterCompression) {
TKikimrSettings settings = TKikimrSettings().SetWithSampleTables(false).SetEnableOlapCompression(false);
TKikimrRunner kikimr(settings);
TTypedLocalHelper helper("", kikimr, "olapTable", "olapStore");
helper.CreateTestOlapTable();
helper.FillPKOnly(0, 1);
helper.ExecuteSchemeQuery(
"ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=pk_int, "
"`SERIALIZER.CLASS_NAME`=`ARROW_SERIALIZER`, `COMPRESSION.TYPE`=`zstd`);", NYdb::EStatus::PRECONDITION_FAILED);
}

Y_UNIT_TEST(StatsSysViewBytesColumnActualization) {
ui64 rawBytes1;
ui64 bytes1;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/olap/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ SRCS(
sparsed_ut.cpp
tiering_ut.cpp
decimal_ut.cpp
compression_ut.cpp
)

PEERDIR(
Expand Down

0 comments on commit 5f29865

Please sign in to comment.