Skip to content

Commit

Permalink
DATAGO-90835:add nack (#83)
Browse files Browse the repository at this point in the history
* feat: add monitring component

* fix: resolve a bug

* fix: add sleep time

* fix: add sleep time

* feat: add readiness and handle excessive logs

* fix: handle sleep error

* fix: handle sleep error

* feat: gracefully exit

* feat: set the log back

* fix: rename log fields

* fix: disabled monitoring

* fix: resolve log naming

* fix: resolved logging issues

* fix: resolve log

* fix: resolve log

* feat: remove dependency to Langchain

* feat: update monitoring

* feat: drop error messages when the queue is full

* feat: add a text splitter component

* feat: updated docs

* fix: resolve graceful termination issues

* fix: remove payloads from logs

* feat: add the forever retry

* feat: keep connecting

* Feat: add monitoring

* feat: replace the reconnection

* feat: refactor monitoring

* feat: add connection metric

* convert connection to async

* get metrics enum

* add types of metrics

* use metrics rather than metric values

* fix bug

* update type

* convert monitoring output to dictionary

* fix bug

* feat: add connection status

* feat: add reconnecting status

* feat: add reconnecting log and handled signals

* fix: update status

* fix: update log

* fix: fix bug

* fix: fix bug

* fix: resolve connection logs

* fix: handle threads

* fix: update connection state machine

* feat: add prefix to the broker logs

* fix: synchronize logs with connection attempts

* fix: remove datadog dependency

* fix: cover an exception

* ref: upgrade to latest pubsub and replace a metric

* feat: add retry and timeout to litellm

* feat: add nack

* fix: replace exception with exception type

* fix: remove useless exceptions

* Create pull_request_template.md

* fix: update the default nack

* ref: replace nack string status with enumerations

* ref: generate docs

* ref: remove default value

* ref: move common imports to a module

* ref: update imports

* ref: update import
  • Loading branch information
alimosaed authored Jan 24, 2025
1 parent 5fb33eb commit 5eacb82
Show file tree
Hide file tree
Showing 15 changed files with 310 additions and 74 deletions.
5 changes: 5 additions & 0 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
### What is the purpose of this change?

### How is this accomplished?

### Anything reviews should focus on/be aware of?
10 changes: 6 additions & 4 deletions docs/components/litellm_chat_model.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ component_config:
embedding_params: <string>
temperature: <string>
set_response_uuid_in_user_properties: <boolean>
timeout: <string>
retry_policy: <string>
allowed_fails_policy: <string>
stream_to_flow: <string>
stream_to_next_component: <string>
llm_mode: <string>
stream_batch_size: <string>
history_max_turns: <string>
history_max_time: <string>
history_max_turns: <string>
history_max_time: <string>
stream_to_flow: <string>
stream_to_next_component: <string>
llm_mode: <string>
Expand All @@ -32,14 +33,15 @@ component_config:
| embedding_params | False | | LiteLLM model parameters. The model, api_key and base_url are mandatory.find more models at https://docs.litellm.ai/docs/providersfind more parameters at https://docs.litellm.ai/docs/completion/input |
| temperature | False | 0.7 | Sampling temperature to use |
| set_response_uuid_in_user_properties | False | False | Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response. |
| timeout | False | 60 | Request timeout in seconds |
| retry_policy | False | | Retry policy for the load balancer. Find more at https://docs.litellm.ai/docs/routing#cooldowns |
| allowed_fails_policy | False | | Allowed fails policy for the load balancer. Find more at https://docs.litellm.ai/docs/routing#cooldowns |
| stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. This is mutually exclusive with stream_to_next_component. |
| stream_to_next_component | False | False | Whether to stream the output to the next component in the flow. This is mutually exclusive with stream_to_flow. |
| llm_mode | False | none | The mode for streaming results: 'none' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. |
| stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. |
| history_max_turns | False | 10 | Maximum number of conversation turns to keep in history |
| history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) |
| history_max_turns | False | 10 | Maximum number of conversation turns to keep in history |
| history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) |
| stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. This is mutually exclusive with stream_to_next_component. |
| stream_to_next_component | False | False | Whether to stream the output to the next component in the flow. This is mutually exclusive with stream_to_flow. |
| llm_mode | False | none | The mode for streaming results: 'none' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. |
Expand Down
10 changes: 6 additions & 4 deletions docs/components/litellm_chat_model_with_history.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ component_config:
embedding_params: <string>
temperature: <string>
set_response_uuid_in_user_properties: <boolean>
timeout: <string>
retry_policy: <string>
allowed_fails_policy: <string>
stream_to_flow: <string>
stream_to_next_component: <string>
llm_mode: <string>
stream_batch_size: <string>
history_max_turns: <string>
history_max_time: <string>
history_max_turns: <string>
history_max_time: <string>
```
| Parameter | Required | Default | Description |
Expand All @@ -28,14 +29,15 @@ component_config:
| embedding_params | False | | LiteLLM model parameters. The model, api_key and base_url are mandatory.find more models at https://docs.litellm.ai/docs/providersfind more parameters at https://docs.litellm.ai/docs/completion/input |
| temperature | False | 0.7 | Sampling temperature to use |
| set_response_uuid_in_user_properties | False | False | Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response. |
| timeout | False | 60 | Request timeout in seconds |
| retry_policy | False | | Retry policy for the load balancer. Find more at https://docs.litellm.ai/docs/routing#cooldowns |
| allowed_fails_policy | False | | Allowed fails policy for the load balancer. Find more at https://docs.litellm.ai/docs/routing#cooldowns |
| stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. This is mutually exclusive with stream_to_next_component. |
| stream_to_next_component | False | False | Whether to stream the output to the next component in the flow. This is mutually exclusive with stream_to_flow. |
| llm_mode | False | none | The mode for streaming results: 'none' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. |
| stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. |
| history_max_turns | False | 10 | Maximum number of conversation turns to keep in history |
| history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) |
| history_max_turns | False | 10 | Maximum number of conversation turns to keep in history |
| history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) |
## Component Input Schema
Expand Down
10 changes: 6 additions & 4 deletions docs/components/litellm_embeddings.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ component_config:
embedding_params: <string>
temperature: <string>
set_response_uuid_in_user_properties: <boolean>
timeout: <string>
retry_policy: <string>
allowed_fails_policy: <string>
stream_to_flow: <string>
stream_to_next_component: <string>
llm_mode: <string>
stream_batch_size: <string>
history_max_turns: <string>
history_max_time: <string>
history_max_turns: <string>
history_max_time: <string>
stream_to_flow: <string>
stream_to_next_component: <string>
llm_mode: <string>
Expand All @@ -32,14 +33,15 @@ component_config:
| embedding_params | False | | LiteLLM model parameters. The model, api_key and base_url are mandatory.find more models at https://docs.litellm.ai/docs/providersfind more parameters at https://docs.litellm.ai/docs/completion/input |
| temperature | False | 0.7 | Sampling temperature to use |
| set_response_uuid_in_user_properties | False | False | Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response. |
| timeout | False | 60 | Request timeout in seconds |
| retry_policy | False | | Retry policy for the load balancer. Find more at https://docs.litellm.ai/docs/routing#cooldowns |
| allowed_fails_policy | False | | Allowed fails policy for the load balancer. Find more at https://docs.litellm.ai/docs/routing#cooldowns |
| stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. This is mutually exclusive with stream_to_next_component. |
| stream_to_next_component | False | False | Whether to stream the output to the next component in the flow. This is mutually exclusive with stream_to_flow. |
| llm_mode | False | none | The mode for streaming results: 'none' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. |
| stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. |
| history_max_turns | False | 10 | Maximum number of conversation turns to keep in history |
| history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) |
| history_max_turns | False | 10 | Maximum number of conversation turns to keep in history |
| history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) |
| stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. This is mutually exclusive with stream_to_next_component. |
| stream_to_next_component | False | False | Whether to stream the output to the next component in the flow. This is mutually exclusive with stream_to_flow. |
| llm_mode | False | none | The mode for streaming results: 'none' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. |
Expand Down
2 changes: 1 addition & 1 deletion docs/components/mongo_insert.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ component_config:
| database_password | False | | MongoDB password |
| database_name | True | | Database name |
| database_collection | False | | Collection name - if not provided, all collections will be used |
| data_types | False | | An array of key value pairs to specify the data types for each field in the data. Used for non-JSON types like Date. Supports nested dotted names |
| data_types | False | | Key value pairs to specify the data types for each field in the data. Used for non-JSON types like Date. Supports nested dotted names |
## Component Input Schema
Expand Down
14 changes: 14 additions & 0 deletions examples/llm/litellm_chat.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,20 @@ flows:
component_module: litellm_chat_model
component_config:
llm_mode: none # options: none or stream
retry_policy: # retry the request per error type
ContentPolicyViolationErrorRetries: 1
AuthenticationErrorRetries: 1
BadRequestErrorRetries: 1
TimeoutErrorRetries: 1
RateLimitErrorRetries: 1
InternalServerErrorRetries: 1
allowed_fails_policy: # allow X failures per minute before cooling down
ContentPolicyViolationErrorAllowedFails: 1000
RateLimitErrorAllowedFails: 1000
AuthenticationErrorAllowedFails: 1000
TimeoutErrorAllowedFails: 1000
InternalServerErrorAllowedFails: 1000
timeout: 10 # in second
load_balancer:
- model_name: "gpt-4o" # model alias
litellm_params:
Expand Down
3 changes: 2 additions & 1 deletion src/solace_ai_connector/common/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
# Directory for all common code
# Directory for all common code
from solace.messaging.config.message_acknowledgement_configuration import Outcome as Message_NACK_Outcome
16 changes: 15 additions & 1 deletion src/solace_ai_connector/common/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@
import yaml
import pprint


from .log import log
from .trace_message import TraceMessage
from .utils import set_data_value, get_data_value
from ..common import Message_NACK_Outcome


class Message:

def __init__(self, payload=None, topic=None, user_properties=None):
self.payload = payload
self.topic = topic
self.user_properties = user_properties or {}
self.ack_callbacks = []
self.nack_callbacks = []
self.topic_delimiter = "/"
self.private_data = {}
self.iteration_data = {}
Expand Down Expand Up @@ -274,6 +277,9 @@ def get_previous(self):
def add_acknowledgement(self, callback):
self.ack_callbacks.append(callback)

def add_negative_acknowledgements(self, callback):
self.nack_callbacks.append(callback)

def call_acknowledgements(self):
"""Call all the ack callbacks. This is used to notify the previous components that the
message has been acknowledged."""
Expand All @@ -282,6 +288,14 @@ def call_acknowledgements(self):
for callback in ack_callbacks:
callback()

def call_negative_acknowledgements(self, nack=Message_NACK_Outcome.REJECTED):
"""Call all the ack callbacks. This is used to notify the previous components that the
message has been acknowledged."""
nack_callbacks = self.nack_callbacks
self.nack_callbacks = []
for callback in nack_callbacks:
callback(nack)

def set_topic_delimiter(self, topic_delimiter):
self.topic_delimiter = topic_delimiter

Expand Down
17 changes: 17 additions & 0 deletions src/solace_ai_connector/common/messaging/dev_broker_messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
import queue
import re
from copy import deepcopy

from .messaging import Messaging
from ...common import Message_NACK_Outcome


class DevBroker(Messaging):

def __init__(self, broker_properties: dict, flow_lock_manager, flow_kv_store):
super().__init__(broker_properties)
self.flow_lock_manager = flow_lock_manager
Expand Down Expand Up @@ -87,6 +90,20 @@ def subscribe(self, subscription: str, queue_name: str):
def ack_message(self, message):
pass

def nack_message(self, broker_message, outcome: Message_NACK_Outcome):
"""
This method handles the negative acknowledgment (nack) of a broker message.
If the broker message contains an "_original_message" key, it settles the message
with the given outcome using the persistent receiver. If the "_original_message"
key is not found, it logs a warning indicating that the original Solace message
could not be found and therefore cannot be dropped.
Args:
broker_message (dict): The broker message to be nacked.
outcome (Message_NACK_Outcome): The outcome to be used for settling the message.
"""
pass

def _get_matching_queue_names(self, topic: str) -> List[str]:
matching_queue_names = []
with self.subscriptions_lock:
Expand Down
25 changes: 25 additions & 0 deletions src/solace_ai_connector/common/messaging/solace_messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

from .messaging import Messaging
from ..log import log
from ...common import Message_NACK_Outcome


class ConnectionStatus(Enum):
Expand Down Expand Up @@ -362,6 +363,9 @@ def bind_to_queue(
.with_missing_resources_creation_strategy(
MissingResourcesCreationStrategy.CREATE_ON_START
)
.with_required_message_outcome_support(
Message_NACK_Outcome.FAILED, Message_NACK_Outcome.REJECTED
)
.build(queue)
)
self.persistent_receiver.start()
Expand Down Expand Up @@ -457,3 +461,24 @@ def ack_message(self, broker_message):
log.warning(
f"{self.error_prefix} Cannot acknowledge message: original Solace message not found"
)

def nack_message(self, broker_message, outcome: Message_NACK_Outcome):
"""
This method handles the negative acknowledgment (nack) of a broker message.
If the broker message contains an "_original_message" key, it settles the message
with the given outcome using the persistent receiver. If the "_original_message"
key is not found, it logs a warning indicating that the original Solace message
could not be found and therefore cannot be dropped.
Args:
broker_message (dict): The broker message to be nacked.
outcome (Message_NACK_Outcome): The outcome to be used for settling the message.
"""
if "_original_message" in broker_message:
self.persistent_receiver.settle(
broker_message["_original_message"], outcome
)
else:
log.warning(
f"{self.error_prefix} Cannot drop message: original Solace message not found"
)
39 changes: 38 additions & 1 deletion src/solace_ai_connector/components/component_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
from abc import abstractmethod
from typing import Any

from ..common.log import log
from ..common.utils import resolve_config_values
from ..common.utils import get_source_expression
Expand All @@ -16,6 +17,7 @@
from ..flow.request_response_flow_controller import RequestResponseFlowController
from ..common.monitoring import Monitoring
from ..common.monitoring import Metrics
from ..common import Message_NACK_Outcome

DEFAULT_QUEUE_TIMEOUT_MS = 1000
DEFAULT_QUEUE_MAX_DEPTH = 5
Expand Down Expand Up @@ -167,7 +169,14 @@ def process_event(self, event):
self.trace_data(data)

self.current_message_has_been_discarded = False
result = self.invoke(message, data)
try:
result = self.invoke(message, data)
except Exception as e:
self.current_message = None
self.handle_negative_acknowledgements(message, e)
raise e
finally:
self.current_message = None

if self.current_message_has_been_discarded:
message.call_acknowledgements()
Expand All @@ -184,6 +193,11 @@ def process_event(self, event):
)

def process_pre_invoke(self, message):
# add nack callback to the message
callback = self.get_negative_acknowledgement_callback() # pylint: disable=assignment-from-none
if callback is not None:
message.add_negative_acknowledgements(callback)

self.apply_input_transforms(message)
return self.get_input_data(message)

Expand Down Expand Up @@ -490,6 +504,29 @@ def do_broker_request_response(
f"Broker request response controller not found for component {self.name}"
)

def handle_negative_acknowledgements(self, message, exception):
"""Handle NACK for the message."""
log.error(
"%sComponent failed to process message: %s\n%s",
self.log_identifier,
exception,
traceback.format_exc(),
)
nack = self.nack_reaction_to_exception(type(exception))
message.call_negative_acknowledgements(nack)
self.handle_error(exception, Event(EventType.MESSAGE, message))

@abstractmethod
def get_negative_acknowledgement_callback(self):
"""This should be overridden by the component if it needs to NACK messages."""
return None

@abstractmethod
def nack_reaction_to_exception(self, exception_type):
"""This should be overridden by the component if it needs to determine
NACK reaction regarding the exception type."""
return Message_NACK_Outcome.REJECTED

def get_metrics_with_header(self) -> dict[dict[Metrics, Any], Any]:
metrics = {}
required_metrics = self.monitoring.get_required_metrics()
Expand Down
Loading

0 comments on commit 5eacb82

Please sign in to comment.