Skip to content

Commit

Permalink
Alireza/ai 278/add forever broker reconnection config (#77)
Browse files Browse the repository at this point in the history
* feat: add the forever retry

* feat: keep connecting

* feat: replace the reconnection

* ref: moved settings to a new yaml file

* feat: update documents

* ref: move common settings to base broker

* feat: generate documents

* fix: retrieve litellm config
  • Loading branch information
alimosaed authored Jan 20, 2025
1 parent e996822 commit b657ae9
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 188 deletions.
10 changes: 0 additions & 10 deletions docs/components/broker_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ Connect to a messaging broker and receive messages from it. The component will o
component_name: <user-supplied-name>
component_module: broker_input
component_config:
broker_type: <string>
broker_url: <string>
broker_username: <string>
broker_password: <string>
broker_vpn: <string>
broker_queue_name: <string>
temporary_queue: <string>
broker_subscriptions: <string>
Expand All @@ -22,11 +17,6 @@ component_config:
| Parameter | Required | Default | Description |
| --- | --- | --- | --- |
| broker_type | True | | Type of broker (Solace, MQTT, etc.) |
| broker_url | True | | Broker URL (e.g. tcp://localhost:55555) |
| broker_username | True | | Client username for broker |
| broker_password | True | | Client password for broker |
| broker_vpn | True | | Client VPN for broker |
| broker_queue_name | False | | Queue name for broker, if not provided it will use a temporary queue |
| temporary_queue | False | False | Whether to create a temporary queue that will be deleted after disconnection, defaulted to True if broker_queue_name is not provided |
| broker_subscriptions | True | | Subscriptions for broker |
Expand Down
10 changes: 0 additions & 10 deletions docs/components/broker_output.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ Connect to a messaging broker and send messages to it. Note that this component
component_name: <user-supplied-name>
component_module: broker_output
component_config:
broker_type: <string>
broker_url: <string>
broker_username: <string>
broker_password: <string>
broker_vpn: <string>
payload_encoding: <string>
payload_format: <string>
propagate_acknowledgements: <string>
Expand All @@ -23,11 +18,6 @@ component_config:
| Parameter | Required | Default | Description |
| --- | --- | --- | --- |
| broker_type | True | | Type of broker (Solace, MQTT, etc.) |
| broker_url | True | | Broker URL (e.g. tcp://localhost:55555) |
| broker_username | True | | Client username for broker |
| broker_password | True | | Client password for broker |
| broker_vpn | True | | Client VPN for broker |
| payload_encoding | False | utf-8 | Encoding for the payload (utf-8, base64, gzip, none) |
| payload_format | False | json | Format for the payload (json, yaml, text) |
| propagate_acknowledgements | False | True | Propagate acknowledgements from the broker to the previous components |
Expand Down
2 changes: 2 additions & 0 deletions docs/components/broker_request_response.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ component_config:
payload_format: <string>
response_topic_prefix: <string>
response_topic_suffix: <string>
response_topic_insertion_expression: <string>
response_queue_prefix: <string>
request_expiry_ms: <integer>
streaming: <string>
Expand All @@ -38,6 +39,7 @@ component_config:
| payload_format | False | json | Format for the payload (json, yaml, text) |
| response_topic_prefix | False | reply | Prefix for reply topics |
| response_topic_suffix | False | | Suffix for reply topics |
| response_topic_insertion_expression | False | | Expression to insert the reply topic into the request message. If not set, the reply topic will only be added to the request_response_metadata. The expression uses the same format as other data expressions: (e.g input.payload:myObj.replyTopic). If there is no object type in the expression, it will default to 'input.payload'. |
| response_queue_prefix | False | reply-queue | Prefix for reply queues |
| request_expiry_ms | False | 60000 | Expiry time for cached requests in milliseconds |
| streaming | False | | The response will arrive in multiple pieces. If True, the streaming_complete_expression must be set and will be used to determine when the last piece has arrived. |
Expand Down
65 changes: 65 additions & 0 deletions examples/broker_input_output.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Example for the broker inputs and outputs
#
# It will subscribe to `demo/messages` and expect an event with the payload:
#
# The input message has the following schema:
# {
# "content": "<some text>",
# }
#
# It will then send an event back to Solace with the topic: `demo/messages/output`
#
#
# required ENV variables:
# - SOLACE_BROKER_URL
# - SOLACE_BROKER_USERNAME
# - SOLACE_BROKER_PASSWORD
# - SOLACE_BROKER_VPN

---
log:
stdout_log_level: INFO
log_file_level: INFO
log_file: solace_ai_connector.log

shared_config:
- broker_config: &broker_connection
broker_type: solace
broker_url: ${SOLACE_BROKER_URL}
broker_username: ${SOLACE_BROKER_USERNAME}
broker_password: ${SOLACE_BROKER_PASSWORD}
broker_vpn: ${SOLACE_BROKER_VPN}
reconnection_strategy: forever_retry # options: forever_retry, parametrized_retry
retry_interval: 1000 # in milliseconds


flows:
- name: Simple input flow
components:
# Input from a Solace broker
- component_name: solace_input
component_module: broker_input
component_config:
<<: *broker_connection
broker_subscriptions:
- topic: demo/messages
payload_encoding: utf-8
payload_format: json

# Send messages back to broker
- component_name: solace_output
component_module: broker_output
component_config:
<<: *broker_connection
payload_encoding: utf-8
payload_format: json
copy_user_properties: true
input_transforms:
- type: copy
source_expression: previous:payload
dest_expression: user_data.output:payload
- type: copy
source_expression: template:{{text://input.topic}}/output
dest_expression: user_data.output:topic
input_selection:
source_expression: user_data.output
60 changes: 52 additions & 8 deletions src/solace_ai_connector/common/messaging/solace_messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,59 @@ def connect(self):
or os.path.dirname(certifi.where())
or "/usr/share/ca-certificates/mozilla/",
}
# print (f"Broker Properties: {self.broker_properties}")
self.messaging_service = (
MessagingService.builder()
.from_properties(broker_props)
.with_reconnection_retry_strategy(
RetryStrategy.parametrized_retry(20, 3000)
strategy = self.broker_properties.get("reconnection_strategy")
if strategy and strategy == "forever_retry":
retry_interval = self.broker_properties.get("retry_interval")
if not retry_interval:
log.warning("retry_interval not provided, using default value of 3000")
retry_interval = 3000
self.messaging_service = (
MessagingService.builder()
.from_properties(broker_props)
.with_reconnection_retry_strategy(
RetryStrategy.forever_retry(retry_interval)
)
.with_connection_retry_strategy(
RetryStrategy.forever_retry(retry_interval)
)
.build()
)
elif strategy and strategy == "parametrized_retry":
retry_count = self.broker_properties.get("retry_count")
retry_wait = self.broker_properties.get("retry_wait")
if not retry_count:
log.warning("retry_count not provided, using default value of 20")
retry_count = 20
if not retry_wait:
log.warning("retry_wait not provided, using default value of 3000")
retry_wait = 3000
self.messaging_service = (
MessagingService.builder()
.from_properties(broker_props)
.with_reconnection_retry_strategy(
RetryStrategy.parametrized_retry(retry_count, retry_wait)
)
.with_connection_retry_strategy(
RetryStrategy.parametrized_retry(retry_count, retry_wait)
)
.build()
)
else:
# default
log.info(
"Using default reconnection strategy. 20 retries with 3000ms interval"
)
self.messaging_service = (
MessagingService.builder()
.from_properties(broker_props)
.with_reconnection_retry_strategy(
RetryStrategy.parametrized_retry(20, 3000)
)
.with_connection_retry_strategy(
RetryStrategy.parametrized_retry(20, 3000)
)
.build()
)
.build()
)

# Blocking connect thread
self.messaging_service.connect()
Expand Down
60 changes: 58 additions & 2 deletions src/solace_ai_connector/components/inputs_outputs/broker_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,59 @@
# queue binding and that object is used to retrieve the next message rather than
# the message_service object.

base_info = {
"class_name": "BrokerBase",
"description": "Base class for broker input/output components",
"config_parameters": [
{
"name": "broker_type",
"required": True,
"description": "Type of broker (Solace, MQTT, etc.)",
},
{
"name": "broker_url",
"required": True,
"description": "Broker URL (e.g. tcp://localhost:55555)",
},
{
"name": "broker_username",
"required": True,
"description": "Client username for broker",
},
{
"name": "broker_password",
"required": True,
"description": "Client password for broker",
},
{
"name": "broker_vpn",
"required": True,
"description": "Client VPN for broker",
},
{
"name": "reconnection_strategy",
"required": False,
"description": "Reconnection strategy for the broker (forever_retry, parametrized_retry)",
"default": "forever_retry",
},
{
"name": "retry_interval",
"required": False,
"description": "Reconnection retry interval in seconds for the broker",
"default": 10000, # in milliseconds
},
{
"name": "retry_count",
"required": False,
"description": "Number of reconnection retries. Only used if reconnection_strategy is parametrized_retry",
"default": 10,
},
],
}


class BrokerBase(ComponentBase):

def __init__(self, module_info, **kwargs):
super().__init__(module_info, **kwargs)
self.broker_properties = self.get_broker_properties()
Expand All @@ -43,6 +94,7 @@ def __init__(self, module_info, **kwargs):
self.messages_to_ack = []
self.connected = False
self.needs_acknowledgement = True
self.connection_repeat_sleep_time = 5

@abstractmethod
def invoke(self, message, data):
Expand All @@ -51,12 +103,12 @@ def invoke(self, message, data):
def connect(self):
if not self.connected:
self.messaging_service.connect()
self.connected = True
self.connected = self.messaging_service.is_connected

def disconnect(self):
if self.connected:
self.messaging_service.disconnect()
self.connected = False
self.connected = self.messaging_service.is_connected

def stop_component(self):
self.disconnect()
Expand Down Expand Up @@ -94,6 +146,10 @@ def get_broker_properties(self):
"subscriptions": self.get_config("broker_subscriptions"),
"trust_store_path": self.get_config("trust_store_path"),
"temporary_queue": self.get_config("temporary_queue"),
"reconnection_strategy": self.get_config("reconnection_strategy"),
"retry_interval": self.get_config("retry_interval"),
"retry_count": self.get_config("retry_count"),
"retry_interval": self.get_config("retry_interval"),
}
return broker_properties

Expand Down
Loading

0 comments on commit b657ae9

Please sign in to comment.