From 594c6757257ee16985ef92f75304450add662ed2 Mon Sep 17 00:00:00 2001 From: Bhargav Dodla Date: Tue, 21 Jan 2025 19:56:45 +0530 Subject: [PATCH] fix: Remove hardcoded client IDs and enhance logging in CassandraOnlineStore methods --- .../cassandra_online_store.py | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index 816e4d88a5c..b8c9377622d 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -310,7 +310,6 @@ def _get_cluster(self, config: RepoConfig): auth_provider=auth_provider, idle_heartbeat_interval=0, idle_heartbeat_timeout=0, - client_id="2a3be3e4-2bf1-45f2-a486-abababababab", **cluster_kwargs, ) else: @@ -418,7 +417,6 @@ def _get_session(self, config: RepoConfig): auth_provider=auth_provider, idle_heartbeat_interval=0, idle_heartbeat_timeout=0, - client_id="2a3be3e4-2bf1-45f2-a486-ssssssssssss", **cluster_kwargs, ) else: @@ -447,6 +445,13 @@ def __del__(self): You'd get a RuntimeError "cannot schedule new futures after shutdown". """ print("Calling CassandraOnlineStore __del__() method") + if self._cluster: + if not self._cluster.is_shutdown: + print(f"{self._cluster.client_id}: Cluster is still active") + else: + print(f"{self._cluster.client_id}: Cluster is not active") + else: + print("Cluster object doesn't exists.") # pass def online_write_batch( @@ -472,24 +477,21 @@ def online_write_batch( rows is written to the online store. Can be used to display progress. """ - print("Called CassandraOnlineStore online_write_batch method") project = config.project cluster: Cluster = self._get_cluster(config) + print( + "{cluster.client_id}: Called CassandraOnlineStore online_write_batch method" + ) # session: Session = self._get_session(config) keyspace: str = self._keyspace fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table) futures = [] with cluster.connect(keyspace) as session: - logger.info( - f"Cluster Client ID: {cluster.client_id} and Session id: {session.session_id}" - ) - print( - f"Cluster Client ID: {cluster.client_id} and Session id: {session.session_id}" - ) - for connection in cluster.get_connection_holders(): - print(f"Connection ID: {connection.get_connections()}") + print( + f"{cluster.client_id}: {session.session_id}: Connection ID: {connection.get_connections()}" + ) insert_cql = self._get_cql_statement( config, "insert4", fqtable=fqtable, session=session @@ -533,6 +535,9 @@ def online_write_batch( logger.error(f"Error writing a batch: {exc}") print(f"Error writing a batch: {exc}") raise Exception("Error writing a batch") from exc + print( + "{cluster.client_id}: Done Calling CassandraOnlineStore online_write_batch method" + ) # correction for the last missing call to `progress`: if progress: progress(1)