Skip to content

Commit

Permalink
fixup!: quality
Browse files Browse the repository at this point in the history
  • Loading branch information
Rebecca Graber committed May 4, 2023
1 parent 0535f74 commit a86f90a
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 13 deletions.
31 changes: 23 additions & 8 deletions edx_event_bus_kafka/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
# See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst
try:
import confluent_kafka
from confluent_kafka import TIMESTAMP_NOT_AVAILABLE, DeserializingConsumer, Consumer
from confluent_kafka import TIMESTAMP_NOT_AVAILABLE, Consumer
from confluent_kafka.error import KafkaError
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.serialization import MessageField, SerializationContext
except ImportError: # pragma: no cover
confluent_kafka = None

Expand Down Expand Up @@ -260,6 +260,8 @@ def _consume_indefinitely(self):
msg = self.consumer.poll(timeout=CONSUMER_POLL_TIMEOUT)
if msg is not None:
with function_trace('_consume_indefinitely_consume_single_message'):
# Before processing, make sure our db connection is still active
_reconnect_to_db_if_needed()
self.consume_single_message(msg, run_context)
consecutive_errors = 0
self._add_message_monitoring(run_context=run_context, message=msg)
Expand Down Expand Up @@ -287,8 +289,13 @@ def _consume_indefinitely(self):
self.consumer.close()

def consume_single_message(self, msg, run_context):
# Before processing, make sure our db connection is still active
_reconnect_to_db_if_needed()
"""
Deserialize a single message and emit the associated signal with the message data
Parameters:
msg: A raw message from Kafka
run_context: A dictionary of information about the consumer, for logging
"""

# determine the event type of the message and use it to create a deserializer
all_msg_headers = msg.headers()
Expand All @@ -297,7 +304,6 @@ def consume_single_message(self, msg, run_context):
try:
signal = OpenEdxPublicSignal.get_signal_by_type(event_type)
except KeyError as ke:
breakpoint()
raise UnusableMessageError(f"Unrecognized event_type {event_type}, cannot determine signal") from ke
run_context['expected_signal'] = signal

Expand Down Expand Up @@ -346,10 +352,13 @@ def consume_indefinitely(self, offset_timestamp=None):
@function_trace('emit_signals_from_deserialized_message')
def emit_signals_from_deserialized_message(self, msg, signal):
"""
Determine the correct signal and send the event from the message.
Send the event from the message.
This method expects that the caller has already determined the correct signal from the message headers
Arguments:
msg (Message): Consumed message with the value deserialized
msg (Message): Deserialized message
signal (OpenEdxPublicSignal): The signal determined by the message headers.
"""
self._log_message_received(msg)

Expand All @@ -369,7 +378,7 @@ def emit_signals_from_deserialized_message(self, msg, signal):
except Exception as e:
raise UnusableMessageError(f"Error determining metadata from message headers: {e}") from e

with function_trace('emit_signals_from_message_send_event_with_custom_metadata'):
with function_trace('emit_signals_from_deserialized_message_send_event_with_custom_metadata'):
send_results = signal.send_event_with_custom_metadata(event_metadata, **msg.value())

# Raise an exception if any receivers errored out. This allows logging of the receivers
Expand Down Expand Up @@ -538,6 +547,12 @@ def _add_message_monitoring(self, run_context, message, error=None):
set_custom_attribute('kafka_monitoring_error', repr(e))

def _determine_event_type(self, headers):
"""
Get event type from message headers
Arguments:
headers: List of key/value tuples. Keys are strings, values are bytestrings.
"""
event_types = get_message_header_values(headers, HEADER_EVENT_TYPE)
if len(event_types) == 0:
raise UnusableMessageError(
Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/internal/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
try:
import confluent_kafka
from confluent_kafka import Producer
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.schema_registry import topic_record_subject_name_strategy
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import MessageField, SerializationContext
except ImportError: # pragma: no cover
confluent_kafka = None
Expand Down
6 changes: 3 additions & 3 deletions edx_event_bus_kafka/internal/tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ def setUp(self):
timestamp=(TIMESTAMP_CREATE_TIME, 1675114920123),
)


self.mock_receiver = Mock()
self.signal = SESSION_LOGIN_COMPLETED
self.signal.connect(fake_receiver_returns_quietly)
Expand Down Expand Up @@ -483,9 +482,10 @@ def test_emit_success(self, audit_logging, mock_logger, mock_set_attribute):
@patch('edx_event_bus_kafka.internal.consumer.logger', autospec=True)
def test_emit_success_tolerates_missing_timestamp(self, mock_logger, mock_set_attribute):
self.signal.disconnect(fake_receiver_raises_error) # just successes for this one!
self.deserialized_normal_message._timestamp = (TIMESTAMP_NOT_AVAILABLE, None) # pylint: disable=protected-access
deserialized_message = self.deserialized_normal_message
deserialized_message._timestamp = (TIMESTAMP_NOT_AVAILABLE, None) # pylint: disable=protected-access

self.event_consumer.emit_signals_from_deserialized_message(self.deserialized_normal_message, self.signal)
self.event_consumer.emit_signals_from_deserialized_message(deserialized_message, self.signal)
self.assert_signal_sent_with(self.signal, self.normal_event_data)
# Specifically, not called with 'kafka_logging_error'
mock_set_attribute.assert_not_called()
Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/internal/tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ def test_serialize_and_produce_to_same_topic(self, mock_context):
headers=ANY,
)


class TestCommand(TestCase):
"""
Test produce_event management command
Expand Down Expand Up @@ -389,4 +390,3 @@ def test_command(self, _, fake_logger):
mocked_producer.produce.assert_called_once_with('dev-test', key=b'bytes-here', value=b'bytes-here',
on_delivery=ANY, headers=ANY,)
fake_logger.exception.assert_not_called()

0 comments on commit a86f90a

Please sign in to comment.