Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JDE: Added MongoDB insert action + example #81

Merged
merged 14 commits into from
Jan 21, 2025
6 changes: 6 additions & 0 deletions docs/components/broker_request_response.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ component_config:
payload_format: <string>
response_topic_prefix: <string>
response_topic_suffix: <string>
response_topic_insertion_expression: <string>
response_queue_prefix: <string>
user_properties_reply_topic_key: <string>
user_properties_reply_metadata_key: <string>
request_expiry_ms: <integer>
streaming: <string>
streaming_complete_expression: <string>
Expand All @@ -38,7 +41,10 @@ component_config:
| payload_format | False | json | Format for the payload (json, yaml, text) |
| response_topic_prefix | False | reply | Prefix for reply topics |
| response_topic_suffix | False | | Suffix for reply topics |
| response_topic_insertion_expression | False | | Expression to insert the reply topic into the request message. If not set, the reply topic will only be added to the request_response_metadata. The expression uses the same format as other data expressions: (e.g input.payload:myObj.replyTopic). If there is no object type in the expression, it will default to 'input.payload'. |
| response_queue_prefix | False | reply-queue | Prefix for reply queues |
| user_properties_reply_topic_key | False | __solace_ai_connector_broker_request_response_topic__ | Key to store the reply topic in the user properties. Start with : for nested object |
| user_properties_reply_metadata_key | False | __solace_ai_connector_broker_request_reply_metadata__ | Key to store the reply metadata in the user properties. Start with : for nested object |
| request_expiry_ms | False | 60000 | Expiry time for cached requests in milliseconds |
| streaming | False | | The response will arrive in multiple pieces. If True, the streaming_complete_expression must be set and will be used to determine when the last piece has arrived. |
| streaming_complete_expression | False | | The source expression to determine when the last piece of a streaming response has arrived. |
Expand Down
2 changes: 2 additions & 0 deletions docs/components/mongo_insert.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ component_config:
database_password: <string>
database_name: <string>
database_collection: <string>
data_types: <string>
```

| Parameter | Required | Default | Description |
Expand All @@ -24,6 +25,7 @@ component_config:
| database_password | False | | MongoDB password |
| database_name | True | | Database name |
| database_collection | False | | Collection name - if not provided, all collections will be used |
| data_types | False | | An array of key value pairs to specify the data types for each field in the data. Used for non-JSON types like Date. Supports nested dotted names |


## Component Input Schema
Expand Down
56 changes: 56 additions & 0 deletions examples/db/mongodb_insert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
---
log:
stdout_log_level: INFO
log_file_level: INFO
log_file: solace_ai_connector.log

trace:
trace_file: solace_ai_connector_trace.log

shared_config:
- broker_config: &broker_connection
broker_type: solace
broker_url: ${SOLACE_BROKER_URL}
broker_username: ${SOLACE_BROKER_USERNAME}
broker_password: ${SOLACE_BROKER_PASSWORD}
broker_vpn: ${SOLACE_BROKER_VPN}

flows:
# Data ingestion to MongoDB for context mesh
- name: real_time_data_ingest
components:
# Data Input from Solace broker
- component_name: solace_data_input
component_module: broker_input
component_config:
<<: *broker_connection
broker_queue_name: demo_data_ingest
broker_subscriptions:
- topic: data/ingest
qos: 1
payload_encoding: utf-8
payload_format: json

# Batch messages to avoid frequent calls to DB
- component_name: batch_aggregate
component_module: aggregate
component_config:
max_items: 100
max_time_ms: 3000
input_selection:
source_expression: input.payload:event

# Insert into MongoDB
- component_name: mongo_insert
component_module: mongo_insert
component_config:
database_host: ${MONGO_HOST}
database_port: ${MONGO_PORT}
database_user: ${MONGO_USER}
database_password: ${MONGO_PASSWORD}
database_name: ${MONGO_DB}
database_collection: ${MONGO_COLLECTION}
data_types:
timestamp: Date
input_selection:
source_expression: previous
Original file line number Diff line number Diff line change
@@ -1,21 +1,99 @@
"""MongoDB Agent Component for handling database insert."""

import datetime
import dateutil.parser
from .mongo_base import MongoDBBaseComponent, info as base_info

info = base_info.copy()
info["class_name"] = "MongoDBInsertComponent"
info["description"] = "Inserts data into a MongoDB database."
info["config_parameters"].extend([
{
"name": "data_types",
"required": False,
"description": "Key value pairs to specify the data types for each field in the data. Used for non-JSON types like Date. Supports nested dotted names",
},
])

POSSIBLE_TYPES = ["date", "timestamp", "int", "int32", "int64", "float", "double", "bool", "string", "null"]

class MongoDBInsertComponent(MongoDBBaseComponent):
"""Component for handling MongoDB database operations."""

def __init__(self, **kwargs):
super().__init__(info, **kwargs)
self.data_types_map = self.get_config("data_types")
if self.data_types_map:
if not isinstance(self.data_types_map, dict):
raise ValueError(
"Invalid data types provided for MongoDB insert. Expected a dictionary."
)
for key, field_type in self.data_types_map.items():
if not isinstance(key, str) or not isinstance(field_type, str) or field_type.lower() not in POSSIBLE_TYPES:
raise ValueError(
"Invalid data types provided for MongoDB insert. Expected a dictionary with key value pairs where key is a string and value is a string from the following list: "
+ ", ".join(POSSIBLE_TYPES)
)


def invoke(self, message, data):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please give some examples of data_types and data. Since the data variable may include any complex data structure (e.g., List[List[List]]), would your code expect to handle all possible combinations?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added proper validation.
As for example, you can find some here

if not data:
if not data or not isinstance(data, dict) and not isinstance(data, list):
raise ValueError(
"Invalid data provided for MongoDB insert. Expected a dictionary or a list of dictionary."
)

if self.data_types_map:
for key, field_type in self.data_types_map.items():
if isinstance(data, list):
new_data = []
for item in data:
new_data.append(self._convert_data_type(item, key, field_type))
data = new_data
else:
data = self._convert_data_type(data, key, field_type)
return self.db_handler.insert_documents(data)

def _convert_data_type(self, data, key, field_type):
if not key or not field_type:
cyrus2281 marked this conversation as resolved.
Show resolved Hide resolved
return data
if not isinstance(data, list) and not isinstance(data, dict):
return data
if "." in key:
segments = key.split(".")
segment = segments[0]
if segment not in data:
if key in data:
data[key] = self._convert_field_type(data[key], field_type)
return data
if len(segments) > 1:
data[segment] = self._convert_data_type(data[segment], ".".join(segments[1:]), field_type)
else:
data[segment] = self._convert_field_type(data[segment], field_type)
else:
if key in data:
data[key] = self._convert_field_type(data[key], field_type)
return data

def _convert_field_type(self, value, field_type):
field_type = field_type.lower()
if field_type == "date" or field_type == "timestamp":
if isinstance(value, str):
return dateutil.parser.parse(value)
elif isinstance(value, int) or isinstance(value, float):
return datetime.datetime.fromtimestamp(value)
else:
return value
if field_type == "int" or field_type == "int32" or field_type == "int64":
return int(value)
if field_type == "float" or field_type == "double":
return float(value)
if field_type == "bool":
if isinstance(value, str) and value.lower() == "false":
return False
return bool(value)
if field_type == "string":
return str(value)
if field_type == "null":
return None
return value

Loading