Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-41246: [C++][Python] Simplify nested field encryption configuration #45462

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 194 additions & 4 deletions cpp/src/arrow/dataset/file_parquet_encryption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
// specific language governing permissions and limitations
// under the License.

#include <arrow/array/builder_binary.h>
#include <arrow/array/builder_nested.h>
#include <arrow/array/builder_primitive.h>
#include <arrow/util/logging.h>
#include <boost/container/container_fwd.hpp>
#include <string_view>

#include "gtest/gtest.h"
Expand Down Expand Up @@ -43,7 +48,7 @@ constexpr std::string_view kFooterKeyMasterKeyId = "footer_key";
constexpr std::string_view kFooterKeyName = "footer_key";
constexpr std::string_view kColumnMasterKey = "1234567890123450";
constexpr std::string_view kColumnMasterKeyId = "col_key";
constexpr std::string_view kColumnKeyMapping = "col_key: a";
constexpr std::string_view kColumnName = "a";
constexpr std::string_view kBaseDir = "";

using arrow::internal::checked_pointer_cast;
Expand Down Expand Up @@ -90,7 +95,9 @@ class DatasetEncryptionTestBase : public ::testing::Test {
auto encryption_config =
std::make_shared<parquet::encryption::EncryptionConfiguration>(
std::string(kFooterKeyName));
encryption_config->column_keys = kColumnKeyMapping;
std::stringstream column_key;
column_key << kColumnMasterKeyId << ": " << ColumnKey();
encryption_config->column_keys = column_key.str();
auto parquet_encryption_config = std::make_shared<ParquetEncryptionConfig>();
// Directly assign shared_ptr objects to ParquetEncryptionConfig members
parquet_encryption_config->crypto_factory = crypto_factory_;
Expand Down Expand Up @@ -118,6 +125,7 @@ class DatasetEncryptionTestBase : public ::testing::Test {
}

virtual void PrepareTableAndPartitioning() = 0;
virtual std::string_view ColumnKey() { return kColumnName; }

void TestScanDataset() {
// Create decryption properties.
Expand Down Expand Up @@ -179,8 +187,9 @@ class DatasetEncryptionTest : public DatasetEncryptionTestBase {
// The dataset is partitioned using a Hive partitioning scheme.
void PrepareTableAndPartitioning() override {
// Prepare table data.
auto table_schema = schema({field("a", int64()), field("c", int64()),
field("e", int64()), field("part", utf8())});
auto table_schema =
schema({field(std::string(kColumnName), int64()), field("c", int64()),
field("e", int64()), field("part", utf8())});
table_ = TableFromJSON(table_schema, {R"([
[ 0, 9, 1, "a" ],
[ 1, 8, 2, "a" ],
Expand Down Expand Up @@ -240,6 +249,187 @@ TEST_F(DatasetEncryptionTest, ReadSingleFile) {
ASSERT_EQ(checked_pointer_cast<Int64Array>(table->column(2)->chunk(0))->GetView(0), 1);
}

class NestedFieldsEncryptionTest : public DatasetEncryptionTestBase,
public ::testing::WithParamInterface<std::string> {
public:
NestedFieldsEncryptionTest() : rand_gen(0) {}

// The dataset is partitioned using a Hive partitioning scheme.
void PrepareTableAndPartitioning() override {
// Prepare table and partitioning.
auto table_schema = schema({field("a", std::move(column_type_))});
table_ = arrow::Table::Make(table_schema, {column_data_});
partitioning_ = std::make_shared<dataset::DirectoryPartitioning>(arrow::schema({}));
}

std::string_view ColumnKey() override { return GetParam(); }

protected:
std::shared_ptr<DataType> column_type_;
std::shared_ptr<Array> column_data_;
arrow::random::RandomArrayGenerator rand_gen;
};

class ListFieldEncryptionTest : public NestedFieldsEncryptionTest {
public:
ListFieldEncryptionTest() {
arrow::MemoryPool* pool = arrow::default_memory_pool();
auto value_builder = std::make_shared<arrow::Int32Builder>(pool);
arrow::ListBuilder list_builder = arrow::ListBuilder(pool, value_builder);
ARROW_CHECK_OK(list_builder.Append());
ARROW_CHECK_OK(value_builder->Append(1));
ARROW_CHECK_OK(value_builder->Append(2));
ARROW_CHECK_OK(value_builder->Append(3));
ARROW_CHECK_OK(list_builder.Append());
ARROW_CHECK_OK(value_builder->Append(4));
ARROW_CHECK_OK(value_builder->Append(5));
ARROW_CHECK_OK(list_builder.Append());
ARROW_CHECK_OK(value_builder->Append(6));

std::shared_ptr<arrow::Array> list_array;
arrow::Status status = list_builder.Finish(&list_array);

column_type_ = list(int32());
column_data_ = list_array;
}
};

class MapFieldEncryptionTest : public NestedFieldsEncryptionTest {
public:
MapFieldEncryptionTest() : NestedFieldsEncryptionTest() {
arrow::MemoryPool* pool = arrow::default_memory_pool();
auto map_type = map(utf8(), int32());
auto key_builder = std::make_shared<arrow::StringBuilder>(pool);
auto item_builder = std::make_shared<arrow::Int32Builder>(pool);
auto map_builder =
std::make_shared<arrow::MapBuilder>(pool, key_builder, item_builder, map_type);
ARROW_CHECK_OK(map_builder->Append());
ARROW_CHECK_OK(key_builder->Append("one"));
ARROW_CHECK_OK(item_builder->Append(1));
ARROW_CHECK_OK(map_builder->Append());
ARROW_CHECK_OK(key_builder->Append("two"));
ARROW_CHECK_OK(item_builder->Append(2));
ARROW_CHECK_OK(map_builder->Append());
ARROW_CHECK_OK(key_builder->Append("three"));
ARROW_CHECK_OK(item_builder->Append(3));

std::shared_ptr<arrow::Array> map_array;
ARROW_CHECK_OK(map_builder->Finish(&map_array));

column_type_ = map_type;
column_data_ = map_array;
}
};

class StructFieldEncryptionTest : public NestedFieldsEncryptionTest {
public:
StructFieldEncryptionTest() : NestedFieldsEncryptionTest() {
arrow::MemoryPool* pool = arrow::default_memory_pool();
auto struct_type = struct_({field("f1", int32()), field("f2", utf8())});
auto f1_builder = std::make_shared<arrow::Int32Builder>(pool);
auto f2_builder = std::make_shared<arrow::StringBuilder>(pool);
std::vector<std::shared_ptr<ArrayBuilder>> value_builders = {f1_builder, f2_builder};
auto struct_builder = std::make_shared<arrow::StructBuilder>(std::move(struct_type),
pool, value_builders);
ARROW_CHECK_OK(struct_builder->Append());
ARROW_CHECK_OK(f1_builder->Append(1));
ARROW_CHECK_OK(f2_builder->Append("one"));
ARROW_CHECK_OK(struct_builder->Append());
ARROW_CHECK_OK(f1_builder->Append(2));
ARROW_CHECK_OK(f2_builder->Append("two"));
ARROW_CHECK_OK(struct_builder->Append());
ARROW_CHECK_OK(f1_builder->Append(3));
ARROW_CHECK_OK(f2_builder->Append("three"));

std::shared_ptr<arrow::Array> struct_array;
ARROW_CHECK_OK(struct_builder->Finish(&struct_array));

column_type_ = struct_type;
column_data_ = struct_array;
}
};

class DeepNestedFieldEncryptionTest : public NestedFieldsEncryptionTest {
public:
DeepNestedFieldEncryptionTest() : NestedFieldsEncryptionTest() {
arrow::MemoryPool* pool = arrow::default_memory_pool();

auto struct_type = struct_({field("f1", int32()), field("f2", utf8())});
auto f1_builder = std::make_shared<arrow::Int32Builder>(pool);
auto f2_builder = std::make_shared<arrow::StringBuilder>(pool);
std::vector<std::shared_ptr<ArrayBuilder>> value_builders = {f1_builder, f2_builder};
auto struct_builder = std::make_shared<arrow::StructBuilder>(std::move(struct_type),
pool, value_builders);

auto map_type = map(int32(), struct_type);
auto key_builder = std::make_shared<arrow::Int32Builder>(pool);
auto item_builder = struct_builder;
auto map_builder =
std::make_shared<arrow::MapBuilder>(pool, key_builder, item_builder, map_type);

auto list_type = list(map_type);
auto value_builder = map_builder;
arrow::ListBuilder list_builder = arrow::ListBuilder(pool, value_builder);

ARROW_CHECK_OK(list_builder.Append());
ARROW_CHECK_OK(value_builder->Append());

ARROW_CHECK_OK(key_builder->Append(1));
ARROW_CHECK_OK(item_builder->Append());
ARROW_CHECK_OK(f1_builder->Append(1));
ARROW_CHECK_OK(f2_builder->Append("one"));

ARROW_CHECK_OK(key_builder->Append(1));
ARROW_CHECK_OK(item_builder->Append());
ARROW_CHECK_OK(f1_builder->Append(2));
ARROW_CHECK_OK(f2_builder->Append("two"));

ARROW_CHECK_OK(value_builder->Append());

ARROW_CHECK_OK(key_builder->Append(3));
ARROW_CHECK_OK(item_builder->Append());
ARROW_CHECK_OK(f1_builder->Append(3));
ARROW_CHECK_OK(f2_builder->Append("three"));

ARROW_CHECK_OK(list_builder.Append());
ARROW_CHECK_OK(value_builder->Append());

ARROW_CHECK_OK(key_builder->Append(4));
ARROW_CHECK_OK(item_builder->Append());
ARROW_CHECK_OK(f1_builder->Append(4));
ARROW_CHECK_OK(f2_builder->Append("four"));

std::shared_ptr<arrow::Array> list_array;
arrow::Status status = list_builder.Finish(&list_array);

column_type_ = list_type;
column_data_ = list_array;
}
};

// Test writing and reading encrypted nested fields
INSTANTIATE_TEST_SUITE_P(List, ListFieldEncryptionTest,
::testing::Values("a", "a.list.element"));
INSTANTIATE_TEST_SUITE_P(Map, MapFieldEncryptionTest,
::testing::Values("a", "a.key", "a.value", "a.key_value.key",
"a.key_value.value"));
INSTANTIATE_TEST_SUITE_P(Struct, StructFieldEncryptionTest,
::testing::Values("a", "a.f1", "a.f2"));
INSTANTIATE_TEST_SUITE_P(DeepNested, DeepNestedFieldEncryptionTest,
::testing::Values("a", "a.list.element",
"a.list.element.key_value.key",
"a.list.element.key_value.value",
"a.list.element.key_value.value.f1",
"a.list.element.key_value.value.f2"));

TEST_P(ListFieldEncryptionTest, ColumnKeys) { TestScanDataset(); }

TEST_P(MapFieldEncryptionTest, ColumnKeys) { TestScanDataset(); }

TEST_P(StructFieldEncryptionTest, ColumnKeys) { TestScanDataset(); }

TEST_P(DeepNestedFieldEncryptionTest, ColumnKeys) { TestScanDataset(); }

// GH-39444: This test covers the case where parquet dataset scanner crashes when
// processing encrypted datasets over 2^15 rows in multi-threaded mode.
class LargeRowEncryptionTest : public DatasetEncryptionTestBase {
Expand Down
64 changes: 64 additions & 0 deletions cpp/src/parquet/encryption/encryption.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,70 @@ FileEncryptionProperties::Builder* FileEncryptionProperties::Builder::encrypted_
return this;
}

void FileEncryptionProperties::encrypt_schema(const SchemaDescriptor& schema) {
// Check that all columns in columnEncryptionProperties exist in the schema.
// Copy the encrypted_columns map as we are going to modify it while iterating it
auto encrypted_columns = ColumnPathToEncryptionPropertiesMap(encrypted_columns_);
// if columnEncryptionProperties is empty, every column in file schema will be
// encrypted with footer key.
if (encrypted_columns.size() != 0) {
std::vector<std::pair<std::string, std::string>> column_path_vec;
// First, memorize all column or schema paths of the schema as dot-strings.
for (int i = 0; i < schema.num_columns(); i++) {
auto column = schema.Column(i);
auto column_path = column->path()->ToDotString();
auto schema_path = column->schema_path()->ToDotString();
column_path_vec.emplace_back(column_path, column_path);
if (schema_path != column_path) {
column_path_vec.emplace_back(schema_path, column_path);
}
}
// Sort them alphabetically, so that we can use binary-search and look up parent columns.
std::sort(column_path_vec.begin(), column_path_vec.end());

// Check if encrypted column exists in schema, or if it is a parent field of a column.
for (const auto& elem : encrypted_columns) {
auto& encrypted_column = elem.first;
auto encrypted_column_len = encrypted_column.size();

// first we look up encrypted_columns as
// find first column that equals encrypted_column or starts with encrypted_column
auto it = std::lower_bound(
column_path_vec.begin(), column_path_vec.end(), encrypted_column,
[&](const std::pair<std::string, std::string>& item, const std::string& term) {
return item.first < term;
});
bool matches = false;

// encrypted_column encrypts column 'it' when 'it' is either equal to encrypted_column,
// or 'it' starts with encrypted_column followed by a '.'
while (it != column_path_vec.end() && (it->first == encrypted_column ||
(it->first.size() > encrypted_column_len && it->first.substr(0, encrypted_column_len) == encrypted_column && it->first.at(encrypted_column_len) == '.')
)) {
// count columns encrypted by encrypted_column
matches = true;

// add column 'it' to file_encryption_properties.encrypted_columns
// when encrypted_column is a parent column
if (it->second != encrypted_column) {
encrypted_columns_.erase(encrypted_column);
encrypted_columns_.emplace(it->second, elem.second);
}

// move to next match
++it;
}

// check encrypted_column matches any existing column
if (!matches) {
std::stringstream ss;
ss << "Encrypted column " + encrypted_column + " not in file schema";
throw ParquetException(ss.str());
}
}
}
}

void FileEncryptionProperties::WipeOutEncryptionKeys() {
footer_key_.clear();
for (const auto& element : encrypted_columns_) {
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/parquet/encryption/encryption.h
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,16 @@ class PARQUET_EXPORT FileEncryptionProperties {
return encrypted_columns_;
}

/// All columns in encrypted_columns must refer to columns in the given schema.
/// They can also refer to parent fields if schema contains nested fields. Then
/// all those nested fields of a matching parent field are encrypted by the same key.
/// This modifies encrypted_columns to reflect this.
///
/// Columns in encrypted_columns can refer to the parquet column paths as well as the
/// schema paths of columns. Those are usually identical, except for nested fields of
/// lists and maps.
void encrypt_schema(const SchemaDescriptor& schema);

private:
EncryptionAlgorithm algorithm_;
std::string footer_key_;
Expand Down
Loading
Loading