Skip to content

Commit

Permalink
fix: Remove hardcoded client IDs and enhance logging in CassandraOnli…
Browse files Browse the repository at this point in the history
…neStore methods
  • Loading branch information
Bhargav Dodla committed Jan 21, 2025
1 parent 27d17bd commit 594c675
Showing 1 changed file with 16 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ def _get_cluster(self, config: RepoConfig):
auth_provider=auth_provider,
idle_heartbeat_interval=0,
idle_heartbeat_timeout=0,
client_id="2a3be3e4-2bf1-45f2-a486-abababababab",
**cluster_kwargs,
)
else:
Expand Down Expand Up @@ -418,7 +417,6 @@ def _get_session(self, config: RepoConfig):
auth_provider=auth_provider,
idle_heartbeat_interval=0,
idle_heartbeat_timeout=0,
client_id="2a3be3e4-2bf1-45f2-a486-ssssssssssss",
**cluster_kwargs,
)
else:
Expand Down Expand Up @@ -447,6 +445,13 @@ def __del__(self):
You'd get a RuntimeError "cannot schedule new futures after shutdown".
"""
print("Calling CassandraOnlineStore __del__() method")
if self._cluster:
if not self._cluster.is_shutdown:
print(f"{self._cluster.client_id}: Cluster is still active")
else:
print(f"{self._cluster.client_id}: Cluster is not active")
else:
print("Cluster object doesn't exists.")
# pass

def online_write_batch(
Expand All @@ -472,24 +477,21 @@ def online_write_batch(
rows is written to the online store. Can be used to
display progress.
"""
print("Called CassandraOnlineStore online_write_batch method")
project = config.project
cluster: Cluster = self._get_cluster(config)
print(
"{cluster.client_id}: Called CassandraOnlineStore online_write_batch method"
)
# session: Session = self._get_session(config)
keyspace: str = self._keyspace
fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table)

futures = []
with cluster.connect(keyspace) as session:
logger.info(
f"Cluster Client ID: {cluster.client_id} and Session id: {session.session_id}"
)
print(
f"Cluster Client ID: {cluster.client_id} and Session id: {session.session_id}"
)

for connection in cluster.get_connection_holders():
print(f"Connection ID: {connection.get_connections()}")
print(
f"{cluster.client_id}: {session.session_id}: Connection ID: {connection.get_connections()}"
)

insert_cql = self._get_cql_statement(
config, "insert4", fqtable=fqtable, session=session
Expand Down Expand Up @@ -533,6 +535,9 @@ def online_write_batch(
logger.error(f"Error writing a batch: {exc}")
print(f"Error writing a batch: {exc}")
raise Exception("Error writing a batch") from exc
print(
"{cluster.client_id}: Done Calling CassandraOnlineStore online_write_batch method"
)
# correction for the last missing call to `progress`:
if progress:
progress(1)
Expand Down

0 comments on commit 594c675

Please sign in to comment.