Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Use C-type long long (8 bytes) type format to pack int64 values #2609

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/build_wheels.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,15 @@ jobs:
node-version: '17.x'
registry-url: 'https://registry.npmjs.org'
- name: Build and install dependencies
# There's a `git restore` in here because `make install-go-ci-dependencies` is actually messing up go.mod & go.sum.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this stuff

run: |
pip install -U pip setuptools wheel twine
make install-protoc-dependencies
make install-go-proto-dependencies
make install-go-ci-dependencies
make build-ui
git status
git restore go.mod go.sum
- name: Build
run: |
python3 setup.py sdist
Expand Down
3 changes: 3 additions & 0 deletions protos/feast/core/FeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ message FeatureViewSpec {

// Whether these features should be served online or not
bool online = 8;

// Needed for backwards compatible behaviour when fixing
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update comment after when fixing

int32 entity_key_serialization_version = 13;
}

message FeatureViewMeta {
Expand Down
3 changes: 3 additions & 0 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ message OnDemandFeatureViewSpec {

// Owner of the on demand feature view.
string owner = 8;

// Needed for backwards compatible behaviour when fixing
int32 entity_key_serialization_version = 9;
}

message OnDemandFeatureViewMeta {
Expand Down
3 changes: 3 additions & 0 deletions protos/feast/core/RequestFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,7 @@ message RequestFeatureViewSpec {

// Owner of the request feature view.
string owner = 6;

// Needed for backwards compatible behaviour when fixing
int32 entity_key_serialization_version = 7;
}
4 changes: 4 additions & 0 deletions sdk/python/feast/base_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class BaseFeatureView(ABC):
created_timestamp: Optional[datetime]
last_updated_timestamp: Optional[datetime]

entity_key_serialization_version: int

@abstractmethod
def __init__(
self,
Expand All @@ -58,6 +60,7 @@ def __init__(
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
entity_key_serialization_version: int = 1,
):
"""
Creates a BaseFeatureView object.
Expand All @@ -82,6 +85,7 @@ def __init__(
self.projection = FeatureViewProjection.from_definition(self)
self.created_timestamp = None
self.last_updated_timestamp = None
self.entity_key_serialization_version = entity_key_serialization_version

@property
@abstractmethod
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/feast/batch_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(
owner: str = "",
schema: Optional[List[Field]] = None,
source: Optional[DataSource] = None,
entity_key_serialization_version: int = 1,
):

if source is None:
Expand All @@ -55,4 +56,5 @@ def __init__(
owner=owner,
schema=schema,
source=source,
entity_key_serialization_version=entity_key_serialization_version,
)
7 changes: 7 additions & 0 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def __init__(
owner: str = "",
schema: Optional[List[Field]] = None,
source: Optional[DataSource] = None,
entity_key_serialization_version: int = 1,
):
"""
Creates a FeatureView object.
Expand Down Expand Up @@ -260,6 +261,7 @@ def __init__(
description=description,
tags=tags,
owner=owner,
entity_key_serialization_version=entity_key_serialization_version,
)
self.online = online
self.materialization_intervals = []
Expand Down Expand Up @@ -443,6 +445,7 @@ def to_proto(self) -> FeatureViewProto:
online=self.online,
batch_source=batch_source_proto,
stream_source=stream_source_proto,
entity_key_serialization_version=self.entity_key_serialization_version,
)

return FeatureViewProto(spec=spec, meta=meta)
Expand Down Expand Up @@ -514,6 +517,10 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
)
)

feature_view.entity_key_serialization_version = (
feature_view_proto.spec.entity_key_serialization_version
)

return feature_view

@property
Expand Down
16 changes: 12 additions & 4 deletions sdk/python/feast/infra/key_encoding_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@
from feast.protos.feast.types.Value_pb2 import ValueType


def _serialize_val(value_type, v: ValueProto) -> Tuple[bytes, int]:
def _serialize_val(
value_type, v: ValueProto, entity_key_serialization_version=1
) -> Tuple[bytes, int]:
if value_type == "string_val":
return v.string_val.encode("utf8"), ValueType.STRING
elif value_type == "bytes_val":
return v.bytes_val, ValueType.BYTES
elif value_type == "int32_val":
return struct.pack("<i", v.int32_val), ValueType.INT32
elif value_type == "int64_val":
return struct.pack("<l", v.int64_val), ValueType.INT64
return struct.pack("<q", v.int64_val), ValueType.INT64
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I understand the compatibility approach you're taking here - can you explain?

what do the various values of entity_key_serialization_version represent? and there needs to be some fork in the behavior here depending on entity_key_serialization_version right?

I was imagining we should do something like the following:

  • add entity_key_serialization_version to the protos, as you've done, where 0 will represent the current (broken) serialization method and 1 will represent the new (correct) serialization method
  • ensure that existing feature view protos do not magically get entity_key_serialization_version switched from 0 (the default value since the field doesn't exist) to 1, which requires special logic in the registry (as I think you've done)
  • default all new feature views to use serialization method 1

else:
raise ValueError(f"Value type not supported for Firestore: {v}")

Expand All @@ -35,7 +37,9 @@ def serialize_entity_key_prefix(entity_keys: List[str]) -> bytes:
return b"".join(output)


def serialize_entity_key(entity_key: EntityKeyProto) -> bytes:
def serialize_entity_key(
entity_key: EntityKeyProto, entity_key_serialization_version=1
) -> bytes:
"""
Serialize entity key to a bytestring so it can be used as a lookup key in a hash table.

Expand All @@ -54,7 +58,11 @@ def serialize_entity_key(entity_key: EntityKeyProto) -> bytes:
output.append(struct.pack("<I", ValueType.STRING))
output.append(k.encode("utf8"))
for v in sorted_values:
val_bytes, value_type = _serialize_val(v.WhichOneof("val"), v)
val_bytes, value_type = _serialize_val(
v.WhichOneof("val"),
v,
entity_key_serialization_version=entity_key_serialization_version,
)

output.append(struct.pack("<I", value_type))

Expand Down
12 changes: 10 additions & 2 deletions sdk/python/feast/infra/online_stores/contrib/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ def online_write_batch(
with self._get_conn(config) as conn, conn.cursor() as cur:
insert_values = []
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(entity_key)
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=table.entity_key_serialization_version,
)
timestamp = _to_naive_utc(timestamp)
if created_ts is not None:
created_ts = _to_naive_utc(created_ts)
Expand Down Expand Up @@ -104,7 +107,12 @@ def online_read(
# to PostgreSQL
keys = []
for entity_key in entity_keys:
keys.append(serialize_entity_key(entity_key))
keys.append(
serialize_entity_key(
entity_key,
entity_key_serialization_version=table.entity_key_serialization_version,
)
)

cur.execute(
sql.SQL(
Expand Down
23 changes: 19 additions & 4 deletions sdk/python/feast/infra/online_stores/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,16 @@ def get_online_store_from_config(online_store_config: Any) -> OnlineStore:
return online_store_class()


def _redis_key(project: str, entity_key: EntityKeyProto) -> bytes:
key: List[bytes] = [serialize_entity_key(entity_key), project.encode("utf-8")]
def _redis_key(
project: str, entity_key: EntityKeyProto, entity_key_serialization_version=1
) -> bytes:
key: List[bytes] = [
serialize_entity_key(
entity_key,
entity_key_serialization_version=entity_key_serialization_version,
),
project.encode("utf-8"),
]
return b"".join(key)


Expand All @@ -40,10 +48,17 @@ def _mmh3(key: str):
return bytes.fromhex(struct.pack("<Q", key_hash).hex()[:8])


def compute_entity_id(entity_key: EntityKeyProto) -> str:
def compute_entity_id(
entity_key: EntityKeyProto, entity_key_serialization_version=1
) -> str:
"""
Compute Entity id given Feast Entity Key for online stores.
Remember that Entity here refers to `EntityKeyProto` which is used in some online stores to encode the keys.
It has nothing to do with the Entity concept we have in Feast.
"""
return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex()
return mmh3.hash_bytes(
serialize_entity_key(
entity_key,
entity_key_serialization_version=entity_key_serialization_version,
)
).hex()
10 changes: 8 additions & 2 deletions sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ def online_write_batch(

with conn:
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(entity_key)
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=table.entity_key_serialization_version,
)
timestamp = to_naive_utc(timestamp)
if created_ts is not None:
created_ts = to_naive_utc(created_ts)
Expand Down Expand Up @@ -161,7 +164,10 @@ def online_read(
k: list(group) for k, group in itertools.groupby(rows, key=lambda r: r[0])
}
for entity_key in entity_keys:
entity_key_bin = serialize_entity_key(entity_key)
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=table.entity_key_serialization_version,
)
res = {}
res_ts = None
for _, feature_name, val_bin, ts in rows.get(entity_key_bin, []):
Expand Down
6 changes: 6 additions & 0 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__( # noqa: C901
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
entity_key_serialization_version=1,
):
"""
Creates an OnDemandFeatureView object.
Expand Down Expand Up @@ -219,6 +220,7 @@ def __init__( # noqa: C901
description=description,
tags=tags,
owner=owner,
entity_key_serialization_version=entity_key_serialization_version,
)
assert _sources is not None
self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {}
Expand Down Expand Up @@ -310,6 +312,7 @@ def to_proto(self) -> OnDemandFeatureViewProto:
description=self.description,
tags=self.tags,
owner=self.owner,
entity_key_serialization_version=self.entity_key_serialization_version,
)

return OnDemandFeatureViewProto(spec=spec, meta=meta)
Expand Down Expand Up @@ -357,6 +360,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
description=on_demand_feature_view_proto.spec.description,
tags=dict(on_demand_feature_view_proto.spec.tags),
owner=on_demand_feature_view_proto.spec.owner,
entity_key_serialization_version=on_demand_feature_view_proto.spec.entity_key_serialization_version,
)

# FeatureViewProjections are not saved in the OnDemandFeatureView proto.
Expand Down Expand Up @@ -524,6 +528,7 @@ def on_demand_feature_view(
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
entity_key_serialization_version=1,
):
"""
Creates an OnDemandFeatureView object with the given user function as udf.
Expand Down Expand Up @@ -650,6 +655,7 @@ def decorator(user_function):
description=description,
tags=tags,
owner=owner,
entity_key_serialization_version=entity_key_serialization_version,
)
functools.update_wrapper(
wrapper=on_demand_feature_view_obj, wrapped=user_function
Expand Down
10 changes: 10 additions & 0 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,7 @@ def apply_feature_view(
else:
raise ValueError(f"Unexpected feature view type: {type(feature_view)}")

saved_entity_key_serialization_version: Optional[int] = None
for idx, existing_feature_view_proto in enumerate(
existing_feature_views_of_same_type
):
Expand All @@ -1127,8 +1128,17 @@ def apply_feature_view(
):
return
else:
saved_entity_key_serialization_version = existing_feature_views_of_same_type[
idx
].spec.entity_key_serialization_version
del existing_feature_views_of_same_type[idx]
break

if saved_entity_key_serialization_version:
feature_view_proto.spec.entity_key_serialization_version = (
saved_entity_key_serialization_version
)

existing_feature_views_of_same_type.append(feature_view_proto)
if commit:
self.commit()
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/feast/request_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
entity_key_serialization_version=1,
):
"""
Creates a RequestFeatureView object.
Expand Down Expand Up @@ -77,6 +78,7 @@ def __init__(
description=description,
tags=tags,
owner=owner,
entity_key_serialization_version=entity_key_serialization_version,
)
self.request_source = request_data_source

Expand All @@ -97,6 +99,7 @@ def to_proto(self) -> RequestFeatureViewProto:
description=self.description,
tags=self.tags,
owner=self.owner,
entity_key_serialization_version=self.entity_key_serialization_version,
)

return RequestFeatureViewProto(spec=spec)
Expand All @@ -121,6 +124,7 @@ def from_proto(cls, request_feature_view_proto: RequestFeatureViewProto):
description=request_feature_view_proto.spec.description,
tags=dict(request_feature_view_proto.spec.tags),
owner=request_feature_view_proto.spec.owner,
entity_key_serialization_version=request_feature_view_proto.spec.entity_key_serialization_version,
)

# FeatureViewProjections are not saved in the RequestFeatureView proto.
Expand Down
28 changes: 28 additions & 0 deletions sdk/python/tests/unit/infra/test_key_encoding_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import pytest

from feast.infra.key_encoding_utils import serialize_entity_key
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto


@pytest.mark.parametrize(
"entity_key,expected_contains",
[
(
EntityKeyProto(
join_keys=["customer"],
entity_values=[ValueProto(int64_val=int(2 ** 31))],
),
b"customer",
),
(
EntityKeyProto(
join_keys=["user"], entity_values=[ValueProto(int32_val=int(2 ** 15))]
),
b"user",
),
],
)
def test_serialize_entity_key(entity_key, expected_contains):
output = serialize_entity_key(entity_key)
assert output.find(expected_contains) >= 0