From 64863216af294b8afdf2641b06bb1e758582ac96 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Sun, 16 Jun 2024 13:37:28 -0700 Subject: [PATCH 1/7] Add new version of serialization and desrialization Signed-off-by: cmuhao --- sdk/python/feast/infra/key_encoding_utils.py | 72 ++++++++++++++++++- .../infra/online_stores/contrib/postgres.py | 2 +- sdk/python/feast/repo_config.py | 6 +- .../unit/infra/test_key_encoding_utils.py | 22 ++++-- 4 files changed, 93 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/infra/key_encoding_utils.py b/sdk/python/feast/infra/key_encoding_utils.py index ca834f1917..c917633ca5 100644 --- a/sdk/python/feast/infra/key_encoding_utils.py +++ b/sdk/python/feast/infra/key_encoding_utils.py @@ -7,7 +7,7 @@ def _serialize_val( - value_type, v: ValueProto, entity_key_serialization_version=1 + value_type, v: ValueProto, entity_key_serialization_version=1 ) -> Tuple[bytes, int]: if value_type == "string_val": return v.string_val.encode("utf8"), ValueType.STRING @@ -23,6 +23,22 @@ def _serialize_val( raise ValueError(f"Value type not supported for Firestore: {v}") +def _deserialize_value(value_type, value_bytes) -> ValueProto: + if value_type == ValueType.INT64: + value = struct.unpack(" bytes: """ Serialize keys to a bytestring, so it can be used to prefix-scan through items stored in the online store @@ -40,7 +56,7 @@ def serialize_entity_key_prefix(entity_keys: List[str]) -> bytes: def serialize_entity_key( - entity_key: EntityKeyProto, entity_key_serialization_version=1 + entity_key: EntityKeyProto, entity_key_serialization_version=1 ) -> bytes: """ Serialize entity key to a bytestring so it can be used as a lookup key in a hash table. @@ -58,6 +74,8 @@ def serialize_entity_key( output: List[bytes] = [] for k in sorted_keys: output.append(struct.pack(" 2: + output.append(struct.pack(" EntityKeyProto: + """ + Deserialize entity key from a bytestring. This function can only be used with entity_key_serialization_version > 2. + Args: + entity_key_serialization_version: version of the entity key serialization + serialized_entity_key: serialized entity key bytes + + Returns: EntityKeyProto + + """ + if entity_key_serialization_version <= 2: + raise ValueError( + "Deserialization of entity key with version <= 2 is not supported. Please use version > 2." + ) + offset = 0 + keys = [] + values = [] + while offset < len(serialized_entity_key): + key_type = struct.unpack_from(" %s as distance, event_ts FROM {table_name} WHERE feature_name = {feature_name} ORDER BY distance diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index d5b3160b56..a9ed2ee30a 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -175,10 +175,12 @@ class RepoConfig(FeastBaseModel): used when writing data to the online store. A value <= 1 uses the serialization scheme used by feast up to Feast 0.22. A value of 2 uses a newer serialization scheme, supported as of Feast 0.23. - The main difference between the two scheme is that the serialization scheme v1 stored `long` values as `int`s, - which would result in errors trying to serialize a range of values. + A value of 3 uses the latest serialization scheme, supported as of Feast 0.38. + The main difference between the three schema is that + v1. the serialization scheme v1 stored `long` values as `int`s, which would result in errors trying to serialize a range of values. v2 fixes this error, but v1 is kept around to ensure backwards compatibility - specifically the ability to read feature values for entities that have already been written into the online store. + v3 add entity_key value length to serialized bytes to enable deserialization, which can be used in retrieval of entity_key in document retrieval. """ coerce_tz_aware: Optional[bool] = True diff --git a/sdk/python/tests/unit/infra/test_key_encoding_utils.py b/sdk/python/tests/unit/infra/test_key_encoding_utils.py index df691ea21e..d375a1d924 100644 --- a/sdk/python/tests/unit/infra/test_key_encoding_utils.py +++ b/sdk/python/tests/unit/infra/test_key_encoding_utils.py @@ -1,6 +1,6 @@ import pytest -from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.key_encoding_utils import serialize_entity_key, deserialize_entity_key from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto @@ -9,14 +9,14 @@ def test_serialize_entity_key(): # Should be fine serialize_entity_key( EntityKeyProto( - join_keys=["user"], entity_values=[ValueProto(int64_val=int(2**15))] + join_keys=["user"], entity_values=[ValueProto(int64_val=int(2 ** 15))] ), entity_key_serialization_version=2, ) # True int64, but should also be fine. serialize_entity_key( EntityKeyProto( - join_keys=["user"], entity_values=[ValueProto(int64_val=int(2**31))] + join_keys=["user"], entity_values=[ValueProto(int64_val=int(2 ** 31))] ), entity_key_serialization_version=2, ) @@ -25,6 +25,20 @@ def test_serialize_entity_key(): with pytest.raises(BaseException): serialize_entity_key( EntityKeyProto( - join_keys=["user"], entity_values=[ValueProto(int64_val=int(2**31))] + join_keys=["user"], entity_values=[ValueProto(int64_val=int(2 ** 31))] ), ) + + +def test_deserialize_entity_key(): + serialized_entity_key = serialize_entity_key( + EntityKeyProto( + join_keys=["user"], entity_values=[ValueProto(int64_val=int(2 ** 15))] + ), + entity_key_serialization_version=3, + ) + + deserialized_entity_key = deserialize_entity_key(serialized_entity_key, entity_key_serialization_version=3) + assert deserialized_entity_key == EntityKeyProto( + join_keys=["user"], entity_values=[ValueProto(int64_val=int(2 ** 15))] + ) From c8f6c2bf4b5569c635bd4ae8b2178b0b08a183ba Mon Sep 17 00:00:00 2001 From: cmuhao Date: Sun, 16 Jun 2024 13:38:18 -0700 Subject: [PATCH 2/7] Add new version of serialization and desrialization Signed-off-by: cmuhao --- sdk/python/feast/infra/key_encoding_utils.py | 29 ++++++++++--------- .../unit/infra/test_key_encoding_utils.py | 16 +++++----- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/sdk/python/feast/infra/key_encoding_utils.py b/sdk/python/feast/infra/key_encoding_utils.py index c917633ca5..b4f9864d63 100644 --- a/sdk/python/feast/infra/key_encoding_utils.py +++ b/sdk/python/feast/infra/key_encoding_utils.py @@ -7,7 +7,7 @@ def _serialize_val( - value_type, v: ValueProto, entity_key_serialization_version=1 + value_type, v: ValueProto, entity_key_serialization_version=1 ) -> Tuple[bytes, int]: if value_type == "string_val": return v.string_val.encode("utf8"), ValueType.STRING @@ -31,7 +31,7 @@ def _deserialize_value(value_type, value_bytes) -> ValueProto: value = struct.unpack(" bytes: def serialize_entity_key( - entity_key: EntityKeyProto, entity_key_serialization_version=1 + entity_key: EntityKeyProto, entity_key_serialization_version=1 ) -> bytes: """ Serialize entity key to a bytestring so it can be used as a lookup key in a hash table. @@ -92,7 +92,9 @@ def serialize_entity_key( return b"".join(output) -def deserialize_entity_key(serialized_entity_key: bytes, entity_key_serialization_version=3) -> EntityKeyProto: +def deserialize_entity_key( + serialized_entity_key: bytes, entity_key_serialization_version=3 +) -> EntityKeyProto: """ Deserialize entity key from a bytestring. This function can only be used with entity_key_serialization_version > 2. Args: @@ -114,32 +116,31 @@ def deserialize_entity_key(serialized_entity_key: bytes, entity_key_serializatio offset += 4 # Read the length of the key - key_length = struct.unpack_from(' Date: Sun, 16 Jun 2024 13:40:46 -0700 Subject: [PATCH 3/7] fix test Signed-off-by: cmuhao --- sdk/python/feast/infra/online_stores/contrib/postgres.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/postgres.py b/sdk/python/feast/infra/online_stores/contrib/postgres.py index ae40f7f12c..3eddd8ba20 100644 --- a/sdk/python/feast/infra/online_stores/contrib/postgres.py +++ b/sdk/python/feast/infra/online_stores/contrib/postgres.py @@ -338,7 +338,7 @@ def retrieve_online_documents( feature_name, value, vector_value, - vector_value <-> %s as distance, + vector_value {distance_metric_sql} %s as distance, event_ts FROM {table_name} WHERE feature_name = {feature_name} ORDER BY distance From 0578c30054892237ecdf2133273342f571d3e589 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Sun, 16 Jun 2024 13:42:48 -0700 Subject: [PATCH 4/7] fix test Signed-off-by: cmuhao --- sdk/python/feast/repo_config.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index a9ed2ee30a..aacb95f420 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -177,10 +177,10 @@ class RepoConfig(FeastBaseModel): A value of 2 uses a newer serialization scheme, supported as of Feast 0.23. A value of 3 uses the latest serialization scheme, supported as of Feast 0.38. The main difference between the three schema is that - v1. the serialization scheme v1 stored `long` values as `int`s, which would result in errors trying to serialize a range of values. - v2 fixes this error, but v1 is kept around to ensure backwards compatibility - specifically the ability to read + v1: the serialization scheme v1 stored `long` values as `int`s, which would result in errors trying to serialize a range of values. + v2: fixes this error, but v1 is kept around to ensure backwards compatibility - specifically the ability to read feature values for entities that have already been written into the online store. - v3 add entity_key value length to serialized bytes to enable deserialization, which can be used in retrieval of entity_key in document retrieval. + v3: add entity_key value length to serialized bytes to enable deserialization, which can be used in retrieval of entity_key in document retrieval. """ coerce_tz_aware: Optional[bool] = True From 3e124c6c7f53a7ab3401e685a52107526ca659e6 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Tue, 18 Jun 2024 11:08:54 -0700 Subject: [PATCH 5/7] add test Signed-off-by: cmuhao --- sdk/python/feast/infra/key_encoding_utils.py | 2 +- .../unit/infra/test_key_encoding_utils.py | 57 +++++++++++++++++-- 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/key_encoding_utils.py b/sdk/python/feast/infra/key_encoding_utils.py index b4f9864d63..f9ead383fb 100644 --- a/sdk/python/feast/infra/key_encoding_utils.py +++ b/sdk/python/feast/infra/key_encoding_utils.py @@ -20,7 +20,7 @@ def _serialize_val( return struct.pack(" ValueProto: diff --git a/sdk/python/tests/unit/infra/test_key_encoding_utils.py b/sdk/python/tests/unit/infra/test_key_encoding_utils.py index 6dd3f0a8b7..9f2153d5f7 100644 --- a/sdk/python/tests/unit/infra/test_key_encoding_utils.py +++ b/sdk/python/tests/unit/infra/test_key_encoding_utils.py @@ -1,22 +1,24 @@ import pytest -from feast.infra.key_encoding_utils import deserialize_entity_key, serialize_entity_key +from feast.infra.key_encoding_utils import deserialize_entity_key, serialize_entity_key, _serialize_val, \ + _deserialize_value from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.protos.feast.types.Value_pb2 import ValueType def test_serialize_entity_key(): # Should be fine serialize_entity_key( EntityKeyProto( - join_keys=["user"], entity_values=[ValueProto(int64_val=int(2**15))] + join_keys=["user"], entity_values=[ValueProto(int64_val=int(2 ** 15))] ), entity_key_serialization_version=2, ) # True int64, but should also be fine. serialize_entity_key( EntityKeyProto( - join_keys=["user"], entity_values=[ValueProto(int64_val=int(2**31))] + join_keys=["user"], entity_values=[ValueProto(int64_val=int(2 ** 31))] ), entity_key_serialization_version=2, ) @@ -25,7 +27,7 @@ def test_serialize_entity_key(): with pytest.raises(BaseException): serialize_entity_key( EntityKeyProto( - join_keys=["user"], entity_values=[ValueProto(int64_val=int(2**31))] + join_keys=["user"], entity_values=[ValueProto(int64_val=int(2 ** 31))] ), ) @@ -33,7 +35,7 @@ def test_serialize_entity_key(): def test_deserialize_entity_key(): serialized_entity_key = serialize_entity_key( EntityKeyProto( - join_keys=["user"], entity_values=[ValueProto(int64_val=int(2**15))] + join_keys=["user"], entity_values=[ValueProto(int64_val=int(2 ** 15))] ), entity_key_serialization_version=3, ) @@ -42,5 +44,48 @@ def test_deserialize_entity_key(): serialized_entity_key, entity_key_serialization_version=3 ) assert deserialized_entity_key == EntityKeyProto( - join_keys=["user"], entity_values=[ValueProto(int64_val=int(2**15))] + join_keys=["user"], entity_values=[ValueProto(int64_val=int(2 ** 15))] ) + + +def test_serialize_value(): + v, t = _serialize_val("string_val", ValueProto(string_val="test")) + assert t == ValueType.STRING + assert v == b"test" + + v, t = _serialize_val("bytes_val", ValueProto(bytes_val=b"test")) + assert t == ValueType.BYTES + assert v == b"test" + + v, t = _serialize_val("int32_val", ValueProto(int32_val=1)) + assert t == ValueType.INT32 + assert v == b"\x01\x00\x00\x00" + + # default entity_key_serialization_version is 1, so the result should be 4 bytes + v, t = _serialize_val("int64_val", ValueProto(int64_val=1)) + assert t == ValueType.INT64 + assert v == b"\x01\x00\x00\x00" + + # current entity_key_serialization_version is 2, so the result should be 8 bytes + v, t = _serialize_val("int64_val", ValueProto(int64_val=1), entity_key_serialization_version=2) + assert t == ValueType.INT64 + assert v == b"\x01\x00\x00\x00\x00\x00\x00\x00" + + # new entity_key_serialization_version is 3, the result should be same as version 2 + v, t = _serialize_val("int64_val", ValueProto(int64_val=1), entity_key_serialization_version=3) + assert t == ValueType.INT64 + assert v == b"\x01\x00\x00\x00\x00\x00\x00\x00" + + +def test_deserialize_value(): + v = _deserialize_value(ValueType.STRING, b"test") + assert v.string_val == "test" + + v = _deserialize_value(ValueType.BYTES, b"test") + assert v.bytes_val == b"test" + + v = _deserialize_value(ValueType.INT32, b"\x01\x00\x00\x00") + assert v.int32_val == 1 + + v = _deserialize_value(ValueType.INT64, b"\x01\x00\x00\x00\x00\x00\x00\x00") + assert v.int64_val == 1 From 34b987e7fc13aa31f239caef3753aafa308adff5 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Tue, 18 Jun 2024 11:09:48 -0700 Subject: [PATCH 6/7] add test Signed-off-by: cmuhao --- .../unit/infra/test_key_encoding_utils.py | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/sdk/python/tests/unit/infra/test_key_encoding_utils.py b/sdk/python/tests/unit/infra/test_key_encoding_utils.py index 9f2153d5f7..658c04c358 100644 --- a/sdk/python/tests/unit/infra/test_key_encoding_utils.py +++ b/sdk/python/tests/unit/infra/test_key_encoding_utils.py @@ -1,7 +1,11 @@ import pytest -from feast.infra.key_encoding_utils import deserialize_entity_key, serialize_entity_key, _serialize_val, \ - _deserialize_value +from feast.infra.key_encoding_utils import ( + _deserialize_value, + _serialize_val, + deserialize_entity_key, + serialize_entity_key, +) from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.protos.feast.types.Value_pb2 import ValueType @@ -11,14 +15,14 @@ def test_serialize_entity_key(): # Should be fine serialize_entity_key( EntityKeyProto( - join_keys=["user"], entity_values=[ValueProto(int64_val=int(2 ** 15))] + join_keys=["user"], entity_values=[ValueProto(int64_val=int(2**15))] ), entity_key_serialization_version=2, ) # True int64, but should also be fine. serialize_entity_key( EntityKeyProto( - join_keys=["user"], entity_values=[ValueProto(int64_val=int(2 ** 31))] + join_keys=["user"], entity_values=[ValueProto(int64_val=int(2**31))] ), entity_key_serialization_version=2, ) @@ -27,7 +31,7 @@ def test_serialize_entity_key(): with pytest.raises(BaseException): serialize_entity_key( EntityKeyProto( - join_keys=["user"], entity_values=[ValueProto(int64_val=int(2 ** 31))] + join_keys=["user"], entity_values=[ValueProto(int64_val=int(2**31))] ), ) @@ -35,7 +39,7 @@ def test_serialize_entity_key(): def test_deserialize_entity_key(): serialized_entity_key = serialize_entity_key( EntityKeyProto( - join_keys=["user"], entity_values=[ValueProto(int64_val=int(2 ** 15))] + join_keys=["user"], entity_values=[ValueProto(int64_val=int(2**15))] ), entity_key_serialization_version=3, ) @@ -44,7 +48,7 @@ def test_deserialize_entity_key(): serialized_entity_key, entity_key_serialization_version=3 ) assert deserialized_entity_key == EntityKeyProto( - join_keys=["user"], entity_values=[ValueProto(int64_val=int(2 ** 15))] + join_keys=["user"], entity_values=[ValueProto(int64_val=int(2**15))] ) @@ -67,12 +71,16 @@ def test_serialize_value(): assert v == b"\x01\x00\x00\x00" # current entity_key_serialization_version is 2, so the result should be 8 bytes - v, t = _serialize_val("int64_val", ValueProto(int64_val=1), entity_key_serialization_version=2) + v, t = _serialize_val( + "int64_val", ValueProto(int64_val=1), entity_key_serialization_version=2 + ) assert t == ValueType.INT64 assert v == b"\x01\x00\x00\x00\x00\x00\x00\x00" # new entity_key_serialization_version is 3, the result should be same as version 2 - v, t = _serialize_val("int64_val", ValueProto(int64_val=1), entity_key_serialization_version=3) + v, t = _serialize_val( + "int64_val", ValueProto(int64_val=1), entity_key_serialization_version=3 + ) assert t == ValueType.INT64 assert v == b"\x01\x00\x00\x00\x00\x00\x00\x00" From 9a937d0c1eb99d5c4ae822bd3890f51cbf3ec190 Mon Sep 17 00:00:00 2001 From: cmuhao Date: Tue, 18 Jun 2024 11:13:00 -0700 Subject: [PATCH 7/7] update doc Signed-off-by: cmuhao --- sdk/python/feast/infra/key_encoding_utils.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/key_encoding_utils.py b/sdk/python/feast/infra/key_encoding_utils.py index f9ead383fb..1f9ffeef14 100644 --- a/sdk/python/feast/infra/key_encoding_utils.py +++ b/sdk/python/feast/infra/key_encoding_utils.py @@ -66,6 +66,15 @@ def serialize_entity_key( serialize to the same byte string[1]. [1] https://developers.google.com/protocol-buffers/docs/encoding + + Args: + entity_key_serialization_version: version of the entity key serialization + version 1: int64 values are serialized as 4 bytes + version 2: int64 values are serialized as 8 bytes + version 3: entity_key size is added to the serialization for deserialization purposes + entity_key: EntityKeyProto + + Returns: bytes of the serialized entity key """ sorted_keys, sorted_values = zip( *sorted(zip(entity_key.join_keys, entity_key.entity_values)) @@ -106,7 +115,7 @@ def deserialize_entity_key( """ if entity_key_serialization_version <= 2: raise ValueError( - "Deserialization of entity key with version <= 2 is not supported. Please use version > 2." + "Deserialization of entity key with version <= 2 is not supported. Please use version > 2 by setting entity_key_serialization_version=3" ) offset = 0 keys = []