diff --git a/sdk/python/feast/infra/registry_stores/sql.py b/sdk/python/feast/infra/registry_stores/sql.py index f793ef7376..af988edd59 100644 --- a/sdk/python/feast/infra/registry_stores/sql.py +++ b/sdk/python/feast/infra/registry_stores/sql.py @@ -148,6 +148,15 @@ ) +feast_metadata = Table( + "feast_metadata", + metadata, + Column("metadata_key", String(50), primary_key=True), + Column("metadata_value", String(50), nullable=False), + Column("last_updated_timestamp", BigInteger, nullable=False), +) + + class SqlRegistry(BaseRegistry): def __init__( self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path] @@ -575,7 +584,6 @@ def get_user_metadata( def proto(self) -> RegistryProto: r = RegistryProto() project = "" - # TODO(achal): Support Infra object, and last_updated_timestamp. for lister, registry_proto_field in [ (self.list_entities, r.entities), (self.list_feature_views, r.feature_views), @@ -591,6 +599,11 @@ def proto(self) -> RegistryProto: if objs: registry_proto_field.extend([obj.to_proto() for obj in objs]) + r.infra.CopyFrom(self.get_infra(project).to_proto()) + last_updated_timestamp = self._get_last_updated_metadata() + if last_updated_timestamp: + r.last_updated.FromDatetime(last_updated_timestamp) + return r def commit(self): @@ -626,6 +639,7 @@ def _apply_object(self, table, id_field_name, obj, proto_field_name, name=None): } insert_stmt = insert(table).values(values,) conn.execute(insert_stmt) + self._set_last_updated_metadata(update_datetime) def _delete_object(self, table, name, project, id_field_name, not_found_exception): with self.engine.connect() as conn: @@ -633,6 +647,7 @@ def _delete_object(self, table, name, project, id_field_name, not_found_exceptio rows = conn.execute(stmt) if rows.rowcount < 1 and not_found_exception: raise not_found_exception(name, project) + self._set_last_updated_metadata(datetime.utcnow()) return rows.rowcount def _get_object( @@ -666,3 +681,40 @@ def _list_objects(self, table, proto_class, python_class, proto_field_name): for row in rows ] return [] + + def _set_last_updated_metadata(self, last_updated: datetime): + with self.engine.connect() as conn: + stmt = select(feast_metadata).where( + feast_metadata.c.metadata_key == "last_updated_timestamp" + ) + row = conn.execute(stmt).first() + + update_time = int(last_updated.timestamp()) + + values = { + "metadata_key": "last_updated_timestamp", + "metadata_value": f"{update_time}", + "last_updated_timestamp": update_time, + } + if row: + update_stmt = ( + update(feast_metadata) + .where(feast_metadata.c.metadata_key == "last_updated_timestamp") + .values(values) + ) + conn.execute(update_stmt) + else: + insert_stmt = insert(feast_metadata).values(values,) + conn.execute(insert_stmt) + + def _get_last_updated_metadata(self): + with self.engine.connect() as conn: + stmt = select(feast_metadata).where( + feast_metadata.c.metadata_key == "last_updated_timestamp" + ) + row = conn.execute(stmt).first() + if not row: + return None + update_time = int(row["last_updated_timestamp"]) + + return datetime.utcfromtimestamp(update_time)