Skip to content

Commit

Permalink
fix: hydrate infra object in the sql registry proto() method
Browse files Browse the repository at this point in the history
Signed-off-by: Achal Shah <[email protected]>
  • Loading branch information
achals committed Jun 10, 2022
1 parent 52a989b commit 680c5d6
Showing 1 changed file with 65 additions and 11 deletions.
76 changes: 65 additions & 11 deletions sdk/python/feast/infra/registry_stores/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from feast.feature_service import FeatureService
from feast.feature_view import FeatureView
from feast.infra.infra_object import Infra
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto
Expand Down Expand Up @@ -149,6 +148,15 @@
)


feast_metadata = Table(
"feast_metadata",
metadata,
Column("metadata_key", String(50), primary_key=True),
Column("metadata_value", String(50), nullable=False),
Column("last_updated_timestamp", BigInteger, nullable=False),
)


class SqlRegistry(BaseRegistry):
def __init__(
self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path]
Expand Down Expand Up @@ -466,11 +474,16 @@ def apply_materialization(
raise ValueError(
f"Cannot apply materialization for feature {feature_view.name} of type {python_class}"
)
fv: Union[FeatureView, StreamFeatureView] = self._get_object(table, feature_view.name, project, proto_class,
python_class,
"feature_view_name",
"feature_view_proto",
FeatureViewNotFoundException)
fv: Union[FeatureView, StreamFeatureView] = self._get_object(
table,
feature_view.name,
project,
proto_class,
python_class,
"feature_view_name",
"feature_view_proto",
FeatureViewNotFoundException,
)
fv.materialization_intervals.append((start_date, end_date))
self._apply_object(table, "feature_view_name", fv, "feature_view_proto")

Expand Down Expand Up @@ -571,7 +584,7 @@ def get_user_metadata(
def proto(self) -> RegistryProto:
r = RegistryProto()
project = ""
# TODO(achal): Support Infra object, and last_updated_timestamp.
# TODO(achal): Support last_updated_timestamp.
for lister, registry_proto_field in [
(self.list_entities, r.entities),
(self.list_feature_views, r.feature_views),
Expand All @@ -587,16 +600,18 @@ def proto(self) -> RegistryProto:
if objs:
registry_proto_field.extend([obj.to_proto() for obj in objs])

r.infra.CopyFrom(self.get_infra(project).to_proto())
last_update_timestamp = self._get_last_updated_metadata()
if last_update_timestamp:
r.last_updated.FromDatetime(last_update_timestamp)

return r

def commit(self):
# This method is a no-op since we're always writing values eagerly to the db.
pass

def _apply_object(
self, table, id_field_name, obj, proto_field_name,
name=None
):
def _apply_object(self, table, id_field_name, obj, proto_field_name, name=None):
name = name or obj.name
with self.engine.connect() as conn:
stmt = select(table).where(getattr(table.c, id_field_name) == name)
Expand Down Expand Up @@ -625,13 +640,15 @@ def _apply_object(
}
insert_stmt = insert(table).values(values,)
conn.execute(insert_stmt)
self._set_last_updated_metadata(update_datetime)

def _delete_object(self, table, name, project, id_field_name, not_found_exception):
with self.engine.connect() as conn:
stmt = delete(table).where(getattr(table.c, id_field_name) == name)
rows = conn.execute(stmt)
if rows.rowcount < 1 and not_found_exception:
raise not_found_exception(name, project)
self._set_last_updated_metadata(datetime.utcnow())
return rows.rowcount

def _get_object(
Expand Down Expand Up @@ -665,3 +682,40 @@ def _list_objects(self, table, proto_class, python_class, proto_field_name):
for row in rows
]
return []

def _set_last_updated_metadata(self, last_updated: datetime):
with self.engine.connect() as conn:
stmt = select(feast_metadata).where(
feast_metadata.c.metadata_key == "last_updated_timestamp"
)
row = conn.execute(stmt).first()

update_time = int(last_updated.timestamp())

values = {
"metadata_key": "last_updated_timestamp",
"metadata_value": f"{update_time}",
"last_updated_timestamp": update_time,
}
if row:
update_stmt = (
update(feast_metadata)
.where(feast_metadata.c.metadata_key == "last_updated_timestamp")
.values(values)
)
conn.execute(update_stmt)
else:
insert_stmt = insert(feast_metadata).values(values,)
conn.execute(insert_stmt)

def _get_last_updated_metadata(self):
with self.engine.connect() as conn:
stmt = select(feast_metadata).where(
feast_metadata.c.metadata_key == "last_updated_timestamp"
)
row = conn.execute(stmt).first()
if not row:
return None
update_time = int(row["last_updated_timestamp"])

return datetime.utcfromtimestamp(update_time)

0 comments on commit 680c5d6

Please sign in to comment.