diff --git a/python/src/iceberg/exceptions.py b/python/src/iceberg/exceptions.py new file mode 100644 index 000000000000..d342bfee3cc0 --- /dev/null +++ b/python/src/iceberg/exceptions.py @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +class ValidationException(Exception): + pass diff --git a/python/src/iceberg/partitioning.py b/python/src/iceberg/partitioning.py new file mode 100644 index 000000000000..b61618bfa76a --- /dev/null +++ b/python/src/iceberg/partitioning.py @@ -0,0 +1,285 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations +from typing import List, Tuple, Dict + +from iceberg.types import NestedField, Schema, index_by_id +from iceberg.exceptions import ValidationException + + +class PartitionField: + def __init__(self, source_id: int, field_id: int, name: str): + self._source_id = source_id + self._field_id = field_id + self._name = name + # TODO: add transform field + + @property + def source_id(self) -> int: + return self._source_id + + @property + def field_id(self) -> int: + return self._field_id + + @property + def name(self) -> str: + return self._name + + # TODO: add property for transform + + def __eq__(self, other): + if isinstance(other, PartitionField): + return ( + self.source_id == other.source_id + and self.field_id == other.field_id + and self.name == other.name + ) + return False + + def __str__(self): + # TODO: add transform to str representation + return f"{self.field_id}: {self.name} ({self._source_id})" + + def __repr__(self): + return f"PartitionField(field_id={self.field_id}, name={self.name}, source_id={self._source_id})" + + +def check_compatibility(spec, schema: Schema): + for field in spec.fields: + source_type = schema.find_type(field.source_id) + if source_type is None: + raise ValidationException( + f"Cannot find source column for partition field: {field}" + ) + if not source_type.is_primitive: + raise ValidationException( + f"Cannot partition by non-primitive source field: {source_type}" + ) + # TODO: Add transform check + + +PARTITION_DATA_ID_START = 1000 + + +class PartitionSpec: + def __init__( + self, + schema: Schema, + spec_id: int, + part_fields: List[PartitionField], + last_assigned_field_id: int, + ): + self._schema = schema + self._spec_id = spec_id + self._fields = tuple(part_fields) + self._last_assigned_field_id = last_assigned_field_id + self.fields_by_source_id: Dict[int, List[PartitionField]] = {} + + @property + def schema(self) -> Schema: + return self._schema + + @property + def spec_id(self) -> int: + return self._spec_id + + @property + def fields(self) -> Tuple[PartitionField]: + return self._fields + + @property + def last_assigned_field_id(self) -> int: + return self._last_assigned_field_id + + def is_partitioned(self): + return len(self.fields) < 1 + + def get_fields_by_source_id(self, field_id: int) -> List[PartitionField]: + if not self.fields_by_source_id: + for field in self.fields: + self.fields_by_source_id[field.source_id] = [field] + return self.fields_by_source_id.get(field_id) + + def __eq__(self, other): + if isinstance(other, PartitionSpec): + if self.spec_id != other.spec_id: + return False + return self.fields == other.fields + return False + + def __str__(self): + partition_spec_str = "[" + for field in self.fields: + partition_spec_str += "\n" + partition_spec_str += " " + str(field) + partition_spec_str += "]" if len(self.fields) > 0 else "\n" + return partition_spec_str + + def __repr__(self): + return "PartitionSpec" + self.__str__() + + def compatible_with(self, other): + if self.__eq__(other): + return True + + if len(self.fields) != len(other.fields): + return False + + index = 0 + # TODO: Need to fix with transforms + for field in self.fields: + other_field = other.fields[index] + if ( + field.source_id != other_field.source_id + or field.name != other_field.name + ): + # TODO: Add transform check + return False + index += 1 + return True + + def partition_type(self): + struct_fields = [] + # TODO: Needs transform + pass + + def partition_to_path(self): + # TODO: Needs transform + pass + + def unpartitioned(self): + return PartitionSpec( + schema=Schema, + spec_id=0, + part_fields=[], + last_assigned_field_id=PARTITION_DATA_ID_START - 1, + ) + + def has_sequential_ids(self, spec): + index = 0 + for field in spec.fields: + if field.field_id != PARTITION_DATA_ID_START + index: + return False + index += 1 + return True + + +class Builder: + def __init__(self, schema: Schema): + self._schema = schema + self.fields: List[PartitionField] = [] + self.partition_names = set() + self.dedup_fields = dict() + self.spec_id = 0 + self.last_assigned_field_id = PARTITION_DATA_ID_START - 1 + + def builder_for(self, schema): + return Builder(schema=schema) + + def next_field_id(self): + return self.last_assigned_field_id.inc() + + def check_and_add_partition_name(self, name: str): + # return check_and_add_partition_name(name, None) + # TODO: needs more schema methods + pass + + def check_and_add_partition_name(self, name: str, source_column_id: int): + # TODO: needs more schema methods + pass + + def check_for_redundant_partitions(self, field: PartitionField): + # TODO: needs transforms + pass + + def with_spec_id(self, new_spec_id: int): + self.spec_id = new_spec_id + return self + + def find_source_column(self, source_name: str) -> NestedField: + # TODO: needs schema.find_field + pass + + def identity(self, source_name: str, target_name: str) -> Builder: + # TODO: needs check_for_redundant_partitions + pass + + def identity(self, source_name: str) -> Builder: + return self.identity(source_name, source_name) + + def year(self, source_name: str, target_name: str): + # TODO: needs check_for_redundant_partitions + pass + + def year(self, source_name: str) -> Builder: + return self.year(source_name, source_name + "_year") + + def month(self, source_name: str, target_name: str) -> Builder: + # TODO: needs check_for_redundant_partitions + pass + + def month(self, source_name: str) -> Builder: + return self.month(source_name, source_name + "_month") + + def day(self, source_name: str, target_name: str) -> Builder: + # TODO: needs check_for_redundant_partitions + pass + + def day(self, source_name: str) -> Builder: + return self.day(source_name, source_name + "_day") + + def hour(self, source_name: str, target_name: str) -> Builder: + # TODO: needs check_for_redundant_partitions + pass + + def hour(self, source_name: str) -> Builder: + return self.hour(source_name, source_name + "_hour") + + def bucket(self, source_name: str, num_buckets: int, target_name: str) -> Builder: + # TODO: needs transforms + pass + + def bucket(self, source_name: str, num_buckets: int) -> Builder: + return self.bucket(source_name, num_buckets, source_name + "_bucket") + + def truncate(self, source_name: str, width: int, target_name: str) -> Builder: + # TODO: needs check_and_add_partition_name and transforms + pass + + def truncate(self, source_name: str, width: int) -> Builder: + return self.truncate(source_name, width, source_name + "_trunc") + + def always_null(self, source_name: str, target_name: str) -> Builder: + # TODO: needs check_and_add_partition_name and transforms + pass + + def always_null(self, source_name: str): + return self.always_null(source_name, source_name + "_null") + + def add(self, source_id: int, field_id: int): + # TODO: needs transforms + # TODO: Two more add methods are needed + pass + + def build(self) -> PartitionSpec: + spec = PartitionSpec( + self.schema, self.spec_id, self.fields, self.last_assigned_field_id + ) + check_compatibility(spec, self.schema) + return spec diff --git a/python/src/iceberg/types.py b/python/src/iceberg/types.py index c456c58e4cf8..2e3e39a381c8 100644 --- a/python/src/iceberg/types.py +++ b/python/src/iceberg/types.py @@ -15,7 +15,8 @@ # specific language governing permissions and limitations # under the License. -from typing import Optional + +from typing import TypeVar, Generic, List, Dict, Optional class Type(object): @@ -117,7 +118,7 @@ def __str__(self): class StructType(Type): - def __init__(self, fields: list): + def __init__(self, fields: List[NestedField]): super().__init__( f"struct<{', '.join(map(str, fields))}>", f"StructType(fields={repr(fields)})", @@ -125,7 +126,7 @@ def __init__(self, fields: list): self._fields = fields @property - def fields(self) -> list: + def fields(self) -> List[NestedField]: return self._fields @@ -169,3 +170,249 @@ def value(self) -> NestedField: StringType = Type("string", "StringType", is_primitive=True) UUIDType = Type("uuid", "UUIDType", is_primitive=True) BinaryType = Type("binary", "BinaryType", is_primitive=True) + + +class Schema(object): + """ + Schema of a data table + """ + + def __init__( + self, + columns: List[NestedField], + schema_id: int, + identifier_field_ids: [int], + id_to_field: Dict[int, NestedField], + alias_to_id: Dict[str, int], + name_to_id: Dict[str, int], + lowercase_name_to_id: Dict[str, int], + ): + self._struct = StructType(columns) + self._schema_id = schema_id + self._identifier_field_ids = identifier_field_ids + self._id_to_field = id_to_field + self._alias_to_id = alias_to_id + self._name_to_id = name_to_id + self._lowercase_name_to_id = lowercase_name_to_id + + @property + def columns(self): + return self._struct.fields + + @property + def schema_id(self): + return self._schema_id + + @property + def identifier_field_ids(self): + return self._identifier_field_ids + + def as_struct(self): + return self._struct + + def _id_to_field(self) -> Dict[int, NestedField]: + if not self._id_to_field: + return index_by_id(self._struct) + + def find_type(self, field_id): + field = self._id_to_field.get(field_id) + if field: + return field.type + return None + + def _name_to_id(self) -> Dict[str, int]: + if not self._name_to_id: + return index_by_name(self._struct) + + def _id_to_name(self) -> Dict[int, str]: + if not self._id_to_name(): + return index_name_by_id(self._struct) + + def find_field(self, name: str) -> NestedField: + # TODO: needs TypeUtil Methods + pass + + +T = TypeVar("T") + + +class SchemaVisitor(Generic[T]): + def before_field(self, field: NestedField) -> None: + pass + + def after_field(self, field: NestedField) -> None: + pass + + def before_list_element(self, element: NestedField) -> None: + self.before_field(element) + + def after_list_element(self, element: NestedField) -> None: + self.after_field(element) + + def before_map_key(self, key: NestedField) -> None: + self.before_field(key) + + def after_map_key(self, key: NestedField) -> None: + self.after_field(key) + + def before_map_value(self, value: NestedField) -> None: + self.before_field(value) + + def after_map_value(self, value: NestedField) -> None: + self.after_field(value) + + def schema(self, schema: Schema, struct_result: T) -> T: + return None + + def struct(self, struct: StructType, field_results: List[T]) -> T: + return None + + def field(self, field: NestedField, field_result: T) -> T: + return None + + def list(self, list_type: ListType, element_result: T) -> T: + return None + + def map(self, map_type: MapType, key_result: T, value_result: T) -> T: + return None + + def primitive(self, primitive: Type) -> T: + return None + + +def visit(obj, visitor: SchemaVisitor[T]) -> T: + if isinstance(obj, Schema): + return visitor.schema(obj, visit(obj.as_struct(), visitor)) + + elif isinstance(obj, StructType): + results = [] + for field in obj.fields: + visitor.before_field(field) + try: + result = visit(field.type, visitor) + finally: + visitor.after_field(field) + + results.append(visitor.field(field, result)) + + return visitor.struct(obj, results) + + elif isinstance(obj, ListType): + visitor.before_list_element(obj.element) + try: + result = visit(obj.element.type, visitor) + finally: + visitor.after_list_element(obj.element) + + return visitor.list(obj, result) + + elif isinstance(obj, MapType): + visitor.before_map_key(obj.key) + try: + key_result = visit(obj.key.type, visitor) + finally: + visitor.after_map_key(obj.key) + + visitor.before_map_value(obj.value) + try: + value_result = visit(obj.value.type, visitor) + finally: + visitor.after_list_element(obj.value) + + return visitor.map(obj, key_result, value_result) + + elif isinstance(obj, Type): + return visitor.primitive(obj) + + else: + raise NotImplementedError("Cannot visit non-type: %s" % obj) + + +def index_by_id(schema_or_type) -> Dict[int, NestedField]: + class IndexById(SchemaVisitor[Dict[int, NestedField]]): + def __init__(self): + self._index: Dict[int, NestedField] = {} + + def schema(self, schema, result): + return self._index + + def struct(self, struct, results): + return self._index + + def field(self, field, result): + self._index[field.field_id] = field + return self._index + + def list(self, list_type, result): + self._index[list_type.element.field_id] = list_type.element + return self._index + + def map(self, map_type, key_result, value_result): + self._index[map_type.key.field_id] = map_type.key + self._index[map_type.value.field_id] = map_type.value + return self._index + + def primitive(self, primitive): + return self._index + + return visit(schema_or_type, IndexById()) + + +def index_by_name(schema_or_type) -> Dict[str, int]: + class IndexByName(SchemaVisitor[Dict[str, int]]): + def __init__(self): + self._index: Dict[str, int] = {} + + def schema(self, schema, result): + return self._index + + def struct(self, struct, results): + return self._index + + def field(self, field, result): + self._index[field.name] = field.field_id + return self._index + + def list(self, list_type, result): + self._index[list_type.element.name] = list_type.element.field_id + return self._index + + def map(self, map_type, key_result, value_result): + self._index[map_type.key.name] = map_type.key.field_id + self._index[map_type.value.name] = map_type.value.field_id + return self._index + + def primitive(self, primitive): + return self._index + + return visit(schema_or_type, IndexByName()) + + +def index_name_by_id(schema_or_type) -> Dict[int, str]: + class IndexByNameById(SchemaVisitor[Dict[int, str]]): + def __init__(self): + self._index: Dict[int, str] = {} + + def schema(self, schema, result): + return self._index + + def struct(self, struct, results): + return self._index + + def field(self, field, result): + self._index[field.field_id] = field.name + return self._index + + def list(self, list_type, result): + self._index[list_type.element.field_id] = list_type.element.name + return self._index + + def map(self, map_type, key_result, value_result): + self._index[map_type.key.field_id] = map_type.key.name + self._index[map_type.value.field_id] = map_type.value.name + return self._index + + def primitive(self, primitive): + return self._index + + return visit(schema_or_type, IndexByNameById()) diff --git a/python/src/iceberg/utils.py b/python/src/iceberg/utils.py new file mode 100644 index 000000000000..f52d02ffcdd7 --- /dev/null +++ b/python/src/iceberg/utils.py @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from urllib import parse + + +def escape(self, input_str): + try: + return parse.urlencode(input_str, encoding="utf-8") + except TypeError as e: + raise e diff --git a/python/tests/test_partitioning.py b/python/tests/test_partitioning.py new file mode 100644 index 000000000000..4fff3d718d37 --- /dev/null +++ b/python/tests/test_partitioning.py @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from iceberg.partitioning import PartitionField, PartitionSpec + + +def test_partition_field_equality(): + assert PartitionField(1, 2, "test_1") == PartitionField(1, 2, "test_1") + + +def test_partition_field(): + assert str(PartitionField(1, 2, "test_1")) == "2: test_1 (1)" + assert ( + repr(PartitionField(1, 2, "test_1")) + == "PartitionField(field_id=2, name=test_1, source_id=1)" + )