Skip to content

Commit

Permalink
add error reporting in kafka callback
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Dec 1, 2022
1 parent e14784d commit c1c2b5d
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions metadata-ingestion/src/datahub/emitter/kafka_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ def emit(
callback: Optional[Callable[[Exception, str], None]] = None,
) -> None:
if isinstance(item, (MetadataChangeProposal, MetadataChangeProposalWrapper)):
return self.emit_mcp_async(item, callback or _noop_callback)
return self.emit_mcp_async(item, callback or _error_reporting_callback)
else:
return self.emit_mce_async(item, callback or _noop_callback)
return self.emit_mce_async(item, callback or _error_reporting_callback)

def emit_mce_async(
self,
Expand Down Expand Up @@ -155,5 +155,6 @@ def flush(self) -> None:
producer.flush()


def _noop_callback(err: Exception, msg: str) -> None:
pass
def _error_reporting_callback(err: Exception, msg: str) -> None:
if err:
logger.error(f"Failed to emit to kafka: {err} {msg}")

0 comments on commit c1c2b5d

Please sign in to comment.