Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JDE: Added MongoDB insert action + example #81

Merged
merged 14 commits into from
Jan 21, 2025
5 changes: 4 additions & 1 deletion examples/request_reply.yaml
Original file line number Diff line number Diff line change
@@ -33,6 +33,9 @@ flows:
component_config:
<<: *broker_connection
request_expiry_ms: 30000 # 30 seconds
user_properties_reply_topic_key: :response.user.topic # nested with :
user_properties_reply_metadata_key: response.user.metadata # string literal

input_transforms:
- type: copy
source_expression: input.payload
@@ -77,7 +80,7 @@ flows:
source_expression: input.user_properties
dest_expression: user_data.output:user_properties
- type: copy
source_expression: input.user_properties:__solace_ai_connector_broker_request_response_topic__
source_expression: input.user_properties:response.user.topic
dest_expression: user_data.output:topic
input_selection:
source_expression: user_data.output
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ dependencies = [
"PyYAML~=6.0.1",
"Requests~=2.32.3",
"solace_pubsubplus>=1.8.0",
"litellm~=1.51.3",
"litellm>=1.51.3",
"Flask~=3.0.3",
"Flask-SocketIO~=5.4.1",
"build~=1.2.2.post1",
120 changes: 3 additions & 117 deletions src/solace_ai_connector/common/message.py
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@

from .log import log
from .trace_message import TraceMessage

from .utils import set_data_value, get_data_value

class Message:
def __init__(self, payload=None, topic=None, user_properties=None):
@@ -59,7 +59,7 @@ def get_data(self, expression, calling_object=None, data_type=None):
if expression.startswith("static:"):
return expression.split(":", 1)[1]
data_object = self.get_data_object(expression, calling_object=calling_object)
data = self.get_data_value(data_object, expression)
data = get_data_value(data_object, expression)

if data_type:
data = self.convert_data_type(data, data_type)
@@ -89,7 +89,7 @@ def set_data(self, expression, value):
create_if_not_exists=True,
create_value={} if not first_part.isdigit() else [],
)
self.set_data_value(data_object, expression, value)
set_data_value(data_object, expression, value)

def get_data_object(
self,
@@ -158,120 +158,6 @@ def set_data_object(self, expression, value):
f"Unknown data type '{data_type}' in expression '{expression}'"
)

def get_data_value(self, data_object, expression):
if ":" not in expression:
return data_object

# If the data_object is a value, return it
if (
not isinstance(data_object, dict)
and not isinstance(data_object, list)
and not isinstance(data_object, object)
):
return data_object

data_name = expression.split(":")[1]

if data_name == "":
return data_object

# Split the data_name by dots to get the path
path_parts = data_name.split(".")

# Start with the entire data_object
current_data = data_object

# Traverse the path
for part in path_parts:
# If the current data is a dictionary, get the value with the key 'part'
if isinstance(current_data, dict):
current_data = current_data.get(part)
# If the current data is a list and 'part' is a number, get the value at
# the index 'part'
elif isinstance(current_data, list) and part.isdigit():
current_data = current_data[int(part)]
# If the current data is neither a dictionary nor a list, or if 'part' is
# not a number, return None
elif isinstance(current_data, object):
current_data = getattr(current_data, part, None)
else:
raise ValueError(
f"Could not get data value for expression '{expression}' - data "
"is not a dictionary or list"
)

# If at any point we get None, stop and return None
if current_data is None:
return None

# Return the final data
return current_data

# Similar to get_data_value, we need to use the expression to find the place to set the value
# except that we will create objects along the way if they don't exist
def set_data_value(self, data_object, expression, value):
data_name = expression.split(":")[1]

# It is an error if the data_object is None or not a dictionary or list
if data_object is None:
raise ValueError(
f"Could not set data value for expression '{expression}' - data_object is None"
)
if not isinstance(data_object, dict) and not isinstance(data_object, list):
raise ValueError(
f"Could not set data value for expression '{expression}' - data_object "
"is not a dictionary or list"
)

# It is an error if the data_name is empty
if data_name == "":
raise ValueError(
f"Could not set data value for expression '{expression}' - data_name is empty"
)

# Split the data_name by dots to get the path
path_parts = data_name.split(".")

# Start with the entire data_object
current_data = data_object

# Traverse the path
for i, part in enumerate(path_parts):
# If we're at the last part of the path, set the value
if i == len(path_parts) - 1:
if isinstance(current_data, dict):
current_data[part] = value
elif isinstance(current_data, list) and part.isdigit():
while len(current_data) <= int(part):
current_data.append(None)
current_data[int(part)] = value
else:
log.error(
"Could not set data value for expression '%s' - "
"data is not a dictionary or list",
expression,
)
# If we're not at the last part of the path, move to the next part
else:
next_part_is_digit = path_parts[i + 1].isdigit()
if isinstance(current_data, dict):
current_data = current_data.setdefault(
part, [] if next_part_is_digit else {}
)
elif isinstance(current_data, list) and part.isdigit():
while len(current_data) <= int(part):
current_data.append(None)
if current_data[int(part)] is None:
current_data[int(part)] = [] if next_part_is_digit else {}
current_data = current_data[int(part)]
else:
log.error(
"Could not set data value for expression '%s' - data "
"is not a dictionary or list",
expression,
)
return

def set_iteration_data(self, item, index):
self.iteration_data["item"] = item
self.iteration_data["index"] = index
184 changes: 184 additions & 0 deletions src/solace_ai_connector/common/utils.py
Original file line number Diff line number Diff line change
@@ -390,3 +390,187 @@ def decode_payload(payload, encoding, payload_format):
payload = yaml.safe_load(payload)

return payload


def get_data_value(data_object, expression):
# If the data_object is a value, return it
if (
not isinstance(data_object, dict)
and not isinstance(data_object, list)
and not isinstance(data_object, object)
):
return data_object

if not data_object or not expression:
return data_object

if ":" not in expression:
if expression in data_object:
return data_object[expression]
return data_object

data_name = expression.split(":")[1]

if data_name == "":
return data_object

# Split the data_name by dots to get the path
path_parts = data_name.split(".")

# Start with the entire data_object
current_data = data_object

# Traverse the path
for part in path_parts:
# If the current data is a dictionary, get the value with the key 'part'
if isinstance(current_data, dict):
current_data = current_data.get(part)
# If the current data is a list and 'part' is a number, get the value at
# the index 'part'
elif isinstance(current_data, list) and part.isdigit():
current_data = current_data[int(part)]
# If the current data is neither a dictionary nor a list, or if 'part' is
# not a number, return None
elif isinstance(current_data, object):
current_data = getattr(current_data, part, None)
else:
raise ValueError(
f"Could not get data value for expression '{expression}' - data "
"is not a dictionary or list"
)

# If at any point we get None, stop and return None
if current_data is None:
return None

# Return the final data
return current_data

# Similar to get_data_value, we need to use the expression to find the place to set the value
# except that we will create objects along the way if they don't exist
def set_data_value(data_object, expression, value):
if ":" not in expression:
data_object[expression] = value
return

data_name = expression.split(":")[1]

# It is an error if the data_object is None or not a dictionary or list
if data_object is None:
raise ValueError(
f"Could not set data value for expression '{expression}' - data_object is None"
)
if not isinstance(data_object, dict) and not isinstance(data_object, list):
raise ValueError(
f"Could not set data value for expression '{expression}' - data_object "
"is not a dictionary or list"
)

# It is an error if the data_name is empty
if data_name == "":
raise ValueError(
f"Could not set data value for expression '{expression}' - data_name is empty"
)

# Split the data_name by dots to get the path
path_parts = data_name.split(".")

# Start with the entire data_object
current_data = data_object

# Traverse the path
for i, part in enumerate(path_parts):
# If we're at the last part of the path, set the value
if i == len(path_parts) - 1:
if isinstance(current_data, dict):
current_data[part] = value
elif isinstance(current_data, list) and part.isdigit():
while len(current_data) <= int(part):
current_data.append(None)
current_data[int(part)] = value
else:
log.error(
"Could not set data value for expression '%s' - "
"data is not a dictionary or list",
expression,
)
# If we're not at the last part of the path, move to the next part
else:
next_part_is_digit = path_parts[i + 1].isdigit()
if isinstance(current_data, dict):
current_data = current_data.setdefault(
part, [] if next_part_is_digit else {}
)
elif isinstance(current_data, list) and part.isdigit():
while len(current_data) <= int(part):
current_data.append(None)
if current_data[int(part)] is None:
current_data[int(part)] = [] if next_part_is_digit else {}
current_data = current_data[int(part)]
else:
log.error(
"Could not set data value for expression '%s' - data "
"is not a dictionary or list",
expression,
)
return

def remove_data_value(data_object, expression):
if ":" not in expression:
data_object.pop(expression, None)
return

data_name = expression.split(":")[1]

# It is an error if the data_object is None or not a dictionary or list
if data_object is None:
raise ValueError(
f"Could not remove data value for expression '{expression}' - data_object is None"
)
if not isinstance(data_object, dict) and not isinstance(data_object, list):
raise ValueError(
f"Could not remove data value for expression '{expression}' - data_object "
"is not a dictionary or list"
)

# It is an error if the data_name is empty
if data_name == "":
raise ValueError(
f"Could not remove data value for expression '{expression}' - data_name is empty"
)

# Split the data_name by dots to get the path
path_parts = data_name.split(".")

# Start with the entire data_object
current_data = data_object

# Traverse the path
for i, part in enumerate(path_parts):
# If we're at the last part of the path, remove the value
if i == len(path_parts) - 1:
if isinstance(current_data, dict):
current_data.pop(part, None)
elif isinstance(current_data, list) and part.isdigit():
if len(current_data) > int(part):
current_data.pop(int(part))
else:
log.error(
"Could not remove data value for expression '%s' - "
"data is not a dictionary or list",
expression,
)
# If we're not at the last part of the path, move to the next part
else:
if isinstance(current_data, dict):
current_data = current_data.get(part, {})
elif isinstance(current_data, list) and part.isdigit():
if len(current_data) > int(part):
current_data = current_data[int(part)]
else:
log.error(
"Could not remove data value for expression '%s' - data "
"is not a dictionary or list",
expression,
)
return
10 changes: 9 additions & 1 deletion src/solace_ai_connector/components/component_base.py
Original file line number Diff line number Diff line change
@@ -304,7 +304,15 @@ def setup_broker_request_response(self):
"request_expiry_ms": request_expiry_ms,
}

for key in ["response_topic_prefix", "response_queue_prefix", "response_topic_insertion_expression"]:
optional_keys = [
"response_topic_prefix",
"response_queue_prefix",
"user_properties_reply_topic_key",
"user_properties_reply_metadata_key",
"response_topic_insertion_expression",
]

for key in optional_keys:
if key in self.broker_request_response_config:
rrc_config[key] = self.broker_request_response_config[key]

Loading