Skip to content

Commit

Permalink
ready for CI, pending query.name fix
Browse files Browse the repository at this point in the history
  • Loading branch information
WweiL committed Jun 8, 2024
1 parent d967119 commit ef7a116
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 6 deletions.
8 changes: 7 additions & 1 deletion python/pyspark/sql/streaming/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,13 @@ def fromJson(cls, j: Dict[str, Any]) -> "StreamingQueryProgress":
)

def __getitem__(self, key):
return getattr(self, key)
# Before Spark 4.0, StreamingQuery.lastProgress returns a dict, which casts id and runId
# to string. To prevent breaking change, also cast them to string when accessed with
# __getitem__.
if key == "id" or key == "runId":
return str(getattr(self, key))
else:
return getattr(self, key)

def __setitem__(self, key, value):
internal_key = "_" + key
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/tests/streaming/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def test_streaming_progress(self):
self.assertEqual(lastProgress["id"], query.id)
# SPARK-48567 Use attribute to access fields in q.lastProgress
self.assertEqual(lastProgress.name, query.name)
self.assertEqual(lastProgress.id, query.id)
self.assertEqual(str(lastProgress.id), query.id)
new_name = "myNewQuery"
lastProgress["name"] = new_name
self.assertEqual(lastProgress.name, new_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def check_streaming_query_progress(self, progress, is_stateful):

self.assertTrue(isinstance(progress.sink, SinkProgress))
self.check_sink_progress(progress.sink)
self.assertTrue(isinstance(progress.observedMetrics, dict))
self.assertTrue(isinstance(progress.observedMetrics, Row))

def check_state_operator_progress(self, progress):
"""Check StateOperatorProgress"""
Expand Down Expand Up @@ -264,9 +264,6 @@ def test_streaming_progress(self):
for p in q.recentProgress:
self.check_streaming_query_progress(p, True)

row = q.lastProgress.observedMetrics.get("my_event")
self.assertTrue(row["rc"] > 0)
self.assertTrue(row["erc"] > 0)
finally:
q.stop()

Expand Down

0 comments on commit ef7a116

Please sign in to comment.