From 4e0e505e01a6c08b2059389d1a343f31ff5e253d Mon Sep 17 00:00:00 2001 From: Cyrus Mobini Date: Tue, 10 Dec 2024 14:28:33 -0500 Subject: [PATCH 01/11] Added mongodb insert component --- docs/components/broker_request_response.md | 8 + docs/components/error_input.md | 2 + docs/components/index.md | 1 + docs/components/langchain_chat_model.md | 6 + .../langchain_chat_model_with_history.md | 6 + docs/components/litellm_chat_model.md | 8 +- .../litellm_chat_model_with_history.md | 8 +- docs/components/litellm_embeddings.md | 6 +- docs/components/mongo_insert.md | 27 +++ docs/components/openai_chat_model.md | 6 +- .../openai_chat_model_with_history.md | 4 +- src/solace_ai_connector/common/utils.py | 1 + .../components/__init__.py | 6 + .../components/general/db/__init__.py | 0 .../components/general/db/mongo/__init__.py | 0 .../general/db/mongo/mongo_handler.py | 173 ++++++++++++++++++ .../general/db/mongo/mongo_insert.py | 79 ++++++++ 17 files changed, 329 insertions(+), 12 deletions(-) create mode 100644 docs/components/mongo_insert.md create mode 100644 src/solace_ai_connector/components/general/db/__init__.py create mode 100644 src/solace_ai_connector/components/general/db/mongo/__init__.py create mode 100644 src/solace_ai_connector/components/general/db/mongo/mongo_handler.py create mode 100644 src/solace_ai_connector/components/general/db/mongo/mongo_insert.py diff --git a/docs/components/broker_request_response.md b/docs/components/broker_request_response.md index 30ee6167..a408e2e7 100644 --- a/docs/components/broker_request_response.md +++ b/docs/components/broker_request_response.md @@ -21,6 +21,10 @@ component_config: request_expiry_ms: streaming: streaming_complete_expression: + streaming: + streaming_complete_expression: + streaming: + streaming_complete_expression: ``` | Parameter | Required | Default | Description | @@ -38,6 +42,10 @@ component_config: | request_expiry_ms | False | 60000 | Expiry time for cached requests in milliseconds | | streaming | False | | The response will arrive in multiple pieces. If True, the streaming_complete_expression must be set and will be used to determine when the last piece has arrived. | | streaming_complete_expression | False | | The source expression to determine when the last piece of a streaming response has arrived. | +| streaming | False | | The response will arrive in multiple pieces. If True, the streaming_complete_expression must be set and will be used to determine when the last piece has arrived. | +| streaming_complete_expression | False | | The source expression to determine when the last piece of a streaming response has arrived. | +| streaming | False | | The response will arrive in multiple pieces. If True, the streaming_complete_expression must be set and will be used to determine when the last piece has arrived. | +| streaming_complete_expression | False | | The source expression to determine when the last piece of a streaming response has arrived. | ## Component Input Schema diff --git a/docs/components/error_input.md b/docs/components/error_input.md index de06a883..a4bb24cc 100644 --- a/docs/components/error_input.md +++ b/docs/components/error_input.md @@ -9,11 +9,13 @@ component_name: component_module: error_input component_config: max_rate: + max_queue_depth: ``` | Parameter | Required | Default | Description | | --- | --- | --- | --- | | max_rate | False | None | Maximum rate of errors to process per second. Any errors above this rate will be dropped. If not set, all errors will be processed. | +| max_queue_depth | False | 1000 | Maximum number of messages that can be queued in the input queue.If the queue is full, the new message is dropped. | diff --git a/docs/components/index.md b/docs/components/index.md index 1b3b8516..6a9d1126 100644 --- a/docs/components/index.md +++ b/docs/components/index.md @@ -21,6 +21,7 @@ | [litellm_chat_model_with_history](litellm_chat_model_with_history.md) | LiteLLM model handler component with conversation history | | [litellm_embeddings](litellm_embeddings.md) | Embed text using a LiteLLM model | | [message_filter](message_filter.md) | A filtering component. This will apply a user configurable expression. If the expression evaluates to True, the message will be passed on. If the expression evaluates to False, the message will be discarded. If the message is discarded, any previous components that require an acknowledgement will be acknowledged. | +| [mongo_insert](mongo_insert.md) | Inserts given JSON data into a MongoDB database. | | [openai_chat_model](openai_chat_model.md) | OpenAI chat model component | | [openai_chat_model_with_history](openai_chat_model_with_history.md) | OpenAI chat model component with conversation history | | [parser](parser.md) | Parse input from the given type to output type. | diff --git a/docs/components/langchain_chat_model.md b/docs/components/langchain_chat_model.md index b65dea59..fda44bf6 100644 --- a/docs/components/langchain_chat_model.md +++ b/docs/components/langchain_chat_model.md @@ -11,6 +11,9 @@ component_config: langchain_module: langchain_class: langchain_component_config: + llm_mode: + stream_to_flow: + stream_batch_size: llm_response_format: ``` @@ -19,6 +22,9 @@ component_config: | langchain_module | True | | The chat model module - e.g. 'langchain_openai.chat_models' | | langchain_class | True | | The chat model class to use - e.g. ChatOpenAI | | langchain_component_config | True | | Model specific configuration for the chat model. See documentation for valid parameter names. | +| llm_mode | False | | The mode for streaming results: 'none' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. | +| stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. | +| stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. | | llm_response_format | False | | The response format for this LLM request. This can be 'json', 'yaml', or 'text'. If set to 'json' or 'yaml', the response will be parsed by the appropriate parser and the fields will be available in the response object. If set to 'text', the response will be returned as a string. | diff --git a/docs/components/langchain_chat_model_with_history.md b/docs/components/langchain_chat_model_with_history.md index 8686061e..e7483225 100644 --- a/docs/components/langchain_chat_model_with_history.md +++ b/docs/components/langchain_chat_model_with_history.md @@ -11,6 +11,9 @@ component_config: langchain_module: langchain_class: langchain_component_config: + llm_mode: + stream_to_flow: + stream_batch_size: llm_response_format: history_max_turns: history_max_message_size: @@ -27,6 +30,9 @@ component_config: | langchain_module | True | | The chat model module - e.g. 'langchain_openai.chat_models' | | langchain_class | True | | The chat model class to use - e.g. ChatOpenAI | | langchain_component_config | True | | Model specific configuration for the chat model. See documentation for valid parameter names. | +| llm_mode | False | | The mode for streaming results: 'none' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. | +| stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. | +| stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. | | llm_response_format | False | | The response format for this LLM request. This can be 'json', 'yaml', or 'text'. If set to 'json' or 'yaml', the response will be parsed by the appropriate parser and the fields will be available in the response object. If set to 'text', the response will be returned as a string. | | history_max_turns | False | 20 | The maximum number of turns to keep in the history. If not set, the history will be limited to 20 turns. | | history_max_message_size | False | 1000 | The maximum amount of characters to keep in a single message in the history. | diff --git a/docs/components/litellm_chat_model.md b/docs/components/litellm_chat_model.md index acd19851..e617a772 100644 --- a/docs/components/litellm_chat_model.md +++ b/docs/components/litellm_chat_model.md @@ -11,11 +11,11 @@ component_config: load_balancer: embedding_params: temperature: + set_response_uuid_in_user_properties: stream_to_flow: stream_to_next_component: llm_mode: stream_batch_size: - set_response_uuid_in_user_properties: history_max_turns: history_max_time: history_max_turns: @@ -31,11 +31,11 @@ component_config: | load_balancer | False | | Add a list of models to load balancer. | | embedding_params | False | | LiteLLM model parameters. The model, api_key and base_url are mandatory.find more models at https://docs.litellm.ai/docs/providersfind more parameters at https://docs.litellm.ai/docs/completion/input | | temperature | False | 0.7 | Sampling temperature to use | +| set_response_uuid_in_user_properties | False | False | Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response. | | stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. This is mutually exclusive with stream_to_next_component. | | stream_to_next_component | False | False | Whether to stream the output to the next component in the flow. This is mutually exclusive with stream_to_flow. | -| llm_mode | False | none | The mode for streaming results: 'sync' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. | +| llm_mode | False | none | The mode for streaming results: 'none' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. | | stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. | -| set_response_uuid_in_user_properties | False | False | Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response. | | history_max_turns | False | 10 | Maximum number of conversation turns to keep in history | | history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) | | history_max_turns | False | 10 | Maximum number of conversation turns to keep in history | @@ -57,6 +57,7 @@ component_config: }, ... ], + stream: , clear_history_but_keep_depth: } ``` @@ -65,6 +66,7 @@ component_config: | messages | True | | | messages[].role | True | | | messages[].content | True | | +| stream | False | Whether to stream the response - overwrites llm_mode | | clear_history_but_keep_depth | False | Clear history but keep the last N messages. If 0, clear all history. If not set, do not clear history. | diff --git a/docs/components/litellm_chat_model_with_history.md b/docs/components/litellm_chat_model_with_history.md index 29aa640c..67ca587b 100644 --- a/docs/components/litellm_chat_model_with_history.md +++ b/docs/components/litellm_chat_model_with_history.md @@ -11,11 +11,11 @@ component_config: load_balancer: embedding_params: temperature: + set_response_uuid_in_user_properties: stream_to_flow: stream_to_next_component: llm_mode: stream_batch_size: - set_response_uuid_in_user_properties: history_max_turns: history_max_time: history_max_turns: @@ -27,11 +27,11 @@ component_config: | load_balancer | False | | Add a list of models to load balancer. | | embedding_params | False | | LiteLLM model parameters. The model, api_key and base_url are mandatory.find more models at https://docs.litellm.ai/docs/providersfind more parameters at https://docs.litellm.ai/docs/completion/input | | temperature | False | 0.7 | Sampling temperature to use | +| set_response_uuid_in_user_properties | False | False | Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response. | | stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. This is mutually exclusive with stream_to_next_component. | | stream_to_next_component | False | False | Whether to stream the output to the next component in the flow. This is mutually exclusive with stream_to_flow. | -| llm_mode | False | none | The mode for streaming results: 'sync' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. | +| llm_mode | False | none | The mode for streaming results: 'none' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. | | stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. | -| set_response_uuid_in_user_properties | False | False | Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response. | | history_max_turns | False | 10 | Maximum number of conversation turns to keep in history | | history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) | | history_max_turns | False | 10 | Maximum number of conversation turns to keep in history | @@ -49,6 +49,7 @@ component_config: }, ... ], + stream: , clear_history_but_keep_depth: } ``` @@ -57,6 +58,7 @@ component_config: | messages | True | | | messages[].role | True | | | messages[].content | True | | +| stream | False | Whether to stream the response - overwrites llm_mode | | clear_history_but_keep_depth | False | Clear history but keep the last N messages. If 0, clear all history. If not set, do not clear history. | diff --git a/docs/components/litellm_embeddings.md b/docs/components/litellm_embeddings.md index 83930083..4e3e739e 100644 --- a/docs/components/litellm_embeddings.md +++ b/docs/components/litellm_embeddings.md @@ -11,11 +11,11 @@ component_config: load_balancer: embedding_params: temperature: + set_response_uuid_in_user_properties: stream_to_flow: stream_to_next_component: llm_mode: stream_batch_size: - set_response_uuid_in_user_properties: history_max_turns: history_max_time: history_max_turns: @@ -31,11 +31,11 @@ component_config: | load_balancer | False | | Add a list of models to load balancer. | | embedding_params | False | | LiteLLM model parameters. The model, api_key and base_url are mandatory.find more models at https://docs.litellm.ai/docs/providersfind more parameters at https://docs.litellm.ai/docs/completion/input | | temperature | False | 0.7 | Sampling temperature to use | +| set_response_uuid_in_user_properties | False | False | Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response. | | stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. This is mutually exclusive with stream_to_next_component. | | stream_to_next_component | False | False | Whether to stream the output to the next component in the flow. This is mutually exclusive with stream_to_flow. | -| llm_mode | False | none | The mode for streaming results: 'sync' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. | +| llm_mode | False | none | The mode for streaming results: 'none' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. | | stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. | -| set_response_uuid_in_user_properties | False | False | Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response. | | history_max_turns | False | 10 | Maximum number of conversation turns to keep in history | | history_max_time | False | 3600 | Maximum time to keep conversation history (in seconds) | | history_max_turns | False | 10 | Maximum number of conversation turns to keep in history | diff --git a/docs/components/mongo_insert.md b/docs/components/mongo_insert.md new file mode 100644 index 00000000..7c5a0896 --- /dev/null +++ b/docs/components/mongo_insert.md @@ -0,0 +1,27 @@ +# MongoDBInsertComponent + +Inserts given JSON data into a MongoDB database. + +## Configuration Parameters + +```yaml +component_name: +component_module: mongo_insert +component_config: + database_host: + database_port: + database_user: + database_password: + database_name: + database_collection: +``` + +| Parameter | Required | Default | Description | +| --- | --- | --- | --- | +| database_host | True | | MongoDB host | +| database_port | True | | MongoDB port | +| database_user | False | | MongoDB user | +| database_password | False | | MongoDB password | +| database_name | True | | Database name | +| database_collection | False | | Collection name - if not provided, all collections will be used | + diff --git a/docs/components/openai_chat_model.md b/docs/components/openai_chat_model.md index e41c6692..62120978 100644 --- a/docs/components/openai_chat_model.md +++ b/docs/components/openai_chat_model.md @@ -27,7 +27,7 @@ component_config: | base_url | False | None | Base URL for OpenAI API | | stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. This is mutually exclusive with stream_to_next_component. | | stream_to_next_component | False | False | Whether to stream the output to the next component in the flow. This is mutually exclusive with stream_to_flow. | -| llm_mode | False | none | The mode for streaming results: 'sync' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. | +| llm_mode | False | none | The mode for streaming results: 'none' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. | | stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. | | set_response_uuid_in_user_properties | False | False | Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response. | @@ -42,7 +42,8 @@ component_config: content: }, ... - ] + ], + stream: } ``` | Field | Required | Description | @@ -50,6 +51,7 @@ component_config: | messages | True | | | messages[].role | True | | | messages[].content | True | | +| stream | False | Whether to stream the response. It is is not provided, it will default to the value of llm_mode. | ## Component Output Schema diff --git a/docs/components/openai_chat_model_with_history.md b/docs/components/openai_chat_model_with_history.md index 9c7c4dc3..ce306a5b 100644 --- a/docs/components/openai_chat_model_with_history.md +++ b/docs/components/openai_chat_model_with_history.md @@ -29,7 +29,7 @@ component_config: | base_url | False | None | Base URL for OpenAI API | | stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. This is mutually exclusive with stream_to_next_component. | | stream_to_next_component | False | False | Whether to stream the output to the next component in the flow. This is mutually exclusive with stream_to_flow. | -| llm_mode | False | none | The mode for streaming results: 'sync' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. | +| llm_mode | False | none | The mode for streaming results: 'none' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. | | stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. | | set_response_uuid_in_user_properties | False | False | Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response. | | history_max_turns | False | 10 | Maximum number of conversation turns to keep in history | @@ -47,6 +47,7 @@ component_config: }, ... ], + stream: , clear_history_but_keep_depth: } ``` @@ -55,6 +56,7 @@ component_config: | messages | True | | | messages[].role | True | | | messages[].content | True | | +| stream | False | Whether to stream the response. It is is not provided, it will default to the value of llm_mode. | | clear_history_but_keep_depth | False | Clear history but keep the last N messages. If 0, clear all history. If not set, do not clear history. | diff --git a/src/solace_ai_connector/common/utils.py b/src/solace_ai_connector/common/utils.py index aaf9ee4f..19bbf3fd 100755 --- a/src/solace_ai_connector/common/utils.py +++ b/src/solace_ai_connector/common/utils.py @@ -128,6 +128,7 @@ def import_module(module, base_path=None, component_package=None): ".components.general.llm.langchain", ".components.general.llm.openai", ".components.general.llm.litellm", + ".components.general.db.mongo", ".components.general.websearch", ".components.inputs_outputs", ".transforms", diff --git a/src/solace_ai_connector/components/__init__.py b/src/solace_ai_connector/components/__init__.py index d20da981..0510c3f4 100755 --- a/src/solace_ai_connector/components/__init__.py +++ b/src/solace_ai_connector/components/__init__.py @@ -40,6 +40,11 @@ litellm_chat_model_with_history, ) +from .general.db.mongo import ( + mongo_handler, + mongo_insert, +) + from .general.websearch import ( websearch_duckduckgo, websearch_google, @@ -76,6 +81,7 @@ from .general.llm.langchain.langchain_vector_store_embedding_search import ( LangChainVectorStoreEmbeddingsSearch, ) +from .general.db.mongo.mongo_insert import MongoDBInsertComponent from .general.websearch.websearch_duckduckgo import WebSearchDuckDuckGo from .general.websearch.websearch_google import WebSearchGoogle from .general.websearch.websearch_bing import WebSearchBing \ No newline at end of file diff --git a/src/solace_ai_connector/components/general/db/__init__.py b/src/solace_ai_connector/components/general/db/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/solace_ai_connector/components/general/db/mongo/__init__.py b/src/solace_ai_connector/components/general/db/mongo/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/solace_ai_connector/components/general/db/mongo/mongo_handler.py b/src/solace_ai_connector/components/general/db/mongo/mongo_handler.py new file mode 100644 index 00000000..26862f09 --- /dev/null +++ b/src/solace_ai_connector/components/general/db/mongo/mongo_handler.py @@ -0,0 +1,173 @@ +"""MongoDB database handler for MongoDB agent.""" + +from pymongo import MongoClient +from typing import List, Dict, Any, Tuple +import threading + +from .....common.log import log + + +class MongoHandler: + """Handler for MongoDB database operations.""" + + def __init__(self, host, port, user, password, collection, database_name): + """Initialize the MongoDB handler. + + Args: + host: MongoDB host + port: MongoDB port + user: MongoDB user + password: MongoDB password + collection: Collection name + database_name: Database name + """ + self.host = host + self.port = port + self.user = user + self.password = password + self.collection = collection + self.database_name = database_name + self.local = threading.local() + + def get_connection(self): + """Get or create a thread-local database connection.""" + if not hasattr(self.local, 'client'): + try: + if self.user and self.password: + connection_string = f"mongodb://{self.user}:{self.password}@{self.host}:{self.port}" + else: + connection_string = f"mongodb://{self.host}:{self.port}" + + self.local.client = MongoClient(connection_string) + self.local.db = self.local.client[self.database_name] + log.info("Successfully connected to MongoDB database") + except Exception as e: + log.error("Error connecting to MongoDB database: %s", str(e)) + raise + return self.local.db + + def insert_documents(self, documents: List[Dict[str, Any]], collection: str = None) -> List[str]: + if not documents: + return [] + if not collection: + collection = self.collection + if not isinstance(documents, dict) or not isinstance(documents, list): + raise ValueError("Documents must be a dictionary or list of dictionaries") + if isinstance(documents, dict): + documents = [documents] + if not isinstance(documents[0], dict): + raise ValueError("Documents must be a dictionary or list of dictionaries") + db = self.get_connection() + result = db[collection].insert_many(documents) + log.debug("Successfully inserted %d documents into %s", len(result.inserted_ids), collection) + return result.inserted_ids + + def execute_query(self, collection: str, pipeline: List[Dict]) -> List[Dict[str, Any]]: + """Execute an aggregation pipeline on MongoDB. + + Args: + collection: Name of the collection to query + pipeline: List of aggregation pipeline stages + + Returns: + List of dictionaries containing the query results. + + Raises: + Exception: If there's an error executing the pipeline. + ValueError: If pipeline is not a valid aggregation pipeline. + """ + if not isinstance(pipeline, list): + raise ValueError("Pipeline must be a list of aggregation stages") + + # Validate each pipeline stage + for stage in pipeline: + if not isinstance(stage, dict) or not stage: + raise ValueError("Each pipeline stage must be a non-empty dictionary") + if not any(key.startswith('$') for key in stage.keys()): + raise ValueError(f"Invalid pipeline stage: {stage}. Each stage must start with '$'") + + try: + db = self.get_connection() + if self.collection: + collection = self.collection + cursor = db[collection].aggregate(pipeline) + result = list(cursor) + result = self._remove_object_ids(result) + return result + except Exception as e: + log.error("Error executing MongoDB query: %s", str(e)) + raise Exception(f"Failed to execute MongoDB query: {str(e)}") + + def get_collections(self) -> List[str]: + """Get all collection names in the database. + + Returns: + List of collection names. + """ + db = self.get_connection() + return db.list_collection_names() + + def get_fields(self, collection: str) -> List[str]: + """Get all field names for a given collection. + + Args: + collection: Name of the collection. + + Returns: + List of field names. + """ + db = self.get_connection() + # Sample a few documents to get field names + pipeline = [ + {"$sample": {"size": 100}}, + {"$project": {"arrayofkeyvalue": {"$objectToArray": "$$ROOT"}}}, + {"$unwind": "$arrayofkeyvalue"}, + {"$group": {"_id": None, "allkeys": {"$addToSet": "$arrayofkeyvalue.k"}}} + ] + result = list(db[collection].aggregate(pipeline)) + if result: + # Remove _id from fields list as it's always present + fields = [f for f in result[0]["allkeys"] if f != "_id"] + return sorted(fields) + return [] + + def get_sample_values(self, collection: str, field: str, min: int = 3, max: int = 10) -> Tuple[List[str], bool]: + """Get unique sample values for a given field in a collection. If the number of unique values is less than + the maximum, return all unique values. Otherwise, return a random sample of unique values up to the manimum. + + Args: + collection: Name of the collection. + field: Name of the field. + limit: Maximum number of unique values to return. + + Returns: + List of unique sample values as strings, + and a boolean indicating whether all unique values were returned. + """ + db = self.get_connection() + pipeline = [ + {"$match": {field: {"$exists": True}}}, + {"$group": {"_id": f"${field}"}}, + {"$sample": {"size": max+1}}, + {"$project": {"value": "$_id", "_id": 0}} + ] + + results = list(db[collection].aggregate(pipeline)) + if len(results) > max: + return [str(result["value"]) for result in results[:min]], False + + return [str(result["value"]) for result in results], True + + def _remove_object_ids(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Remove the _id field from a list of MongoDB documents. + + Args: + results: List of MongoDB documents. + + Returns: + List of MongoDB documents with the _id field removed. + """ + for result in results: + if "_id" in result: + del result["_id"] + return results diff --git a/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py b/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py new file mode 100644 index 00000000..2e13dff4 --- /dev/null +++ b/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py @@ -0,0 +1,79 @@ +"""MongoDB Agent Component for handling database insert.""" + +from .mongo_handler import MongoDBHandler +from ....component_base import ComponentBase + + +info = { + "class_name": "MongoDBInsertComponent", + "description": "Inserts given JSON data into a MongoDB database.", + "config_parameters": [ + { + "name": "database_host", + "required": True, + "description": "MongoDB host", + "type": "string", + }, + { + "name": "database_port", + "required": True, + "description": "MongoDB port", + "type": "integer", + }, + { + "name": "database_user", + "required": False, + "description": "MongoDB user", + "type": "string", + }, + { + "name": "database_password", + "required": False, + "description": "MongoDB password", + "type": "string", + }, + { + "name": "database_name", + "required": True, + "description": "Database name", + "type": "string", + }, + { + "name": "database_collection", + "required": False, + "description": "Collection name - if not provided, all collections will be used", + }, + ], +} + + +class MongoDBInsertComponent(ComponentBase): + """Component for handling MongoDB database operations.""" + + def __init__(self, **kwargs): + """Initialize the MongoDB component. + + Args: + **kwargs: Additional keyword arguments. + + Raises: + ValueError: If required database configuration is missing. + """ + super().__init__(info, **kwargs) + + # Initialize MongoDB handler + self.db_handler = MongoDBHandler( + self.get_config("database_host"), + self.get_config("database_port"), + self.get_config("database_user"), + self.get_config("database_password"), + self.get_config("database_collection"), + self.get_config("database_name"), + ) + + def invoke(self, message, data): + if not data: + raise ValueError( + "Invalid data provided for MongoDB insert. Expected a dictionary or a list of dictionary." + ) + return self.db_handler.insert_documents(data) From 9fc1f03c1680d5efc799b0ffa50e8d249be434aa Mon Sep 17 00:00:00 2001 From: Cyrus Mobini Date: Wed, 11 Dec 2024 12:52:56 -0500 Subject: [PATCH 02/11] type --- .../components/general/db/mongo/mongo_insert.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py b/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py index 2e13dff4..df523d3c 100644 --- a/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py +++ b/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py @@ -1,6 +1,6 @@ """MongoDB Agent Component for handling database insert.""" -from .mongo_handler import MongoDBHandler +from .mongo_handler import MongoHandler from ....component_base import ComponentBase @@ -62,7 +62,7 @@ def __init__(self, **kwargs): super().__init__(info, **kwargs) # Initialize MongoDB handler - self.db_handler = MongoDBHandler( + self.db_handler = MongoHandler( self.get_config("database_host"), self.get_config("database_port"), self.get_config("database_user"), From ba06c46985afc6c1b23bfa6082b928e91ab8153d Mon Sep 17 00:00:00 2001 From: Cyrus Mobini Date: Tue, 17 Dec 2024 10:48:32 -0500 Subject: [PATCH 03/11] added search component --- docs/components/index.md | 1 + docs/components/mongo_search.md | 42 ++++++++ pyproject.toml | 1 + .../general/db/mongo/mongo_search.py | 97 +++++++++++++++++++ 4 files changed, 141 insertions(+) create mode 100644 docs/components/mongo_search.md create mode 100644 src/solace_ai_connector/components/general/db/mongo/mongo_search.py diff --git a/docs/components/index.md b/docs/components/index.md index 6a9d1126..56e7bb9e 100644 --- a/docs/components/index.md +++ b/docs/components/index.md @@ -22,6 +22,7 @@ | [litellm_embeddings](litellm_embeddings.md) | Embed text using a LiteLLM model | | [message_filter](message_filter.md) | A filtering component. This will apply a user configurable expression. If the expression evaluates to True, the message will be passed on. If the expression evaluates to False, the message will be discarded. If the message is discarded, any previous components that require an acknowledgement will be acknowledged. | | [mongo_insert](mongo_insert.md) | Inserts given JSON data into a MongoDB database. | +| [mongo_search](mongo_search.md) | Searches a MongoDB database. | | [openai_chat_model](openai_chat_model.md) | OpenAI chat model component | | [openai_chat_model_with_history](openai_chat_model_with_history.md) | OpenAI chat model component with conversation history | | [parser](parser.md) | Parse input from the given type to output type. | diff --git a/docs/components/mongo_search.md b/docs/components/mongo_search.md new file mode 100644 index 00000000..f27b6cab --- /dev/null +++ b/docs/components/mongo_search.md @@ -0,0 +1,42 @@ +# MongoDBSearchComponent + +Searches a MongoDB database. + +## Configuration Parameters + +```yaml +component_name: +component_module: mongo_search +component_config: + database_host: + database_port: + database_user: + database_password: + database_name: + database_collection: +``` + +| Parameter | Required | Default | Description | +| --- | --- | --- | --- | +| database_host | True | | MongoDB host | +| database_port | True | | MongoDB port | +| database_user | False | | MongoDB user | +| database_password | False | | MongoDB password | +| database_name | True | | Database name | +| database_collection | False | | Collection name - if not provided, all collections will be used | + + +## Component Input Schema + +``` +{ + collection: , + query: { + + } +} +``` +| Field | Required | Description | +| --- | --- | --- | +| collection | False | The collection to search in. | +| query | False | The query pipeline to execute. if string is provided, it will be converted to JSON. | diff --git a/pyproject.toml b/pyproject.toml index e24e4284..9f8820e4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ dependencies = [ "Flask~=3.0.3", "Flask-SocketIO~=5.4.1", "build~=1.2.2.post1", + "pymongo~=4.10.1", ] [project.urls] diff --git a/src/solace_ai_connector/components/general/db/mongo/mongo_search.py b/src/solace_ai_connector/components/general/db/mongo/mongo_search.py new file mode 100644 index 00000000..e887936c --- /dev/null +++ b/src/solace_ai_connector/components/general/db/mongo/mongo_search.py @@ -0,0 +1,97 @@ +"""MongoDB Agent Component for handling database search.""" +import json + +from .mongo_handler import MongoHandler +from ....component_base import ComponentBase + + +info = { + "class_name": "MongoDBSearchComponent", + "description": "Searches a MongoDB database.", + "config_parameters": [ + { + "name": "database_host", + "required": True, + "description": "MongoDB host", + "type": "string", + }, + { + "name": "database_port", + "required": True, + "description": "MongoDB port", + "type": "integer", + }, + { + "name": "database_user", + "required": False, + "description": "MongoDB user", + "type": "string", + }, + { + "name": "database_password", + "required": False, + "description": "MongoDB password", + "type": "string", + }, + { + "name": "database_name", + "required": True, + "description": "Database name", + "type": "string", + }, + { + "name": "database_collection", + "required": False, + "description": "Collection name - if not provided, all collections will be used", + }, + ], + "input_schema": { + "type": "object", + "properties": { + "collection": { + "type": "string", + "description": "The collection to search in.", + }, + "query": { + "type": "object", + "description": "The query pipeline to execute. if string is provided, it will be converted to JSON.", + } + }, + }, +} + + +class MongoDBSearchComponent(ComponentBase): + """Component for handling MongoDB database operations.""" + + def __init__(self, **kwargs): + """Initialize the MongoDB component. + + Args: + **kwargs: Additional keyword arguments. + + Raises: + ValueError: If required database configuration is missing. + """ + super().__init__(info, **kwargs) + + # Initialize MongoDB handler + self.db_handler = MongoHandler( + self.get_config("database_host"), + self.get_config("database_port"), + self.get_config("database_user"), + self.get_config("database_password"), + self.get_config("database_collection"), + self.get_config("database_name"), + ) + + def invoke(self, message, data): + collection = data.get("collection") + query = data.get("query") + if not query: + raise ValueError("No query provided") + if not collection: + raise ValueError("No collection provided") + if isinstance(query, str): + query = json.loads(query) + return self.db_handler.execute_query(collection, query) From 8562c311d3796859cc47357366ea1e50a5cf3755 Mon Sep 17 00:00:00 2001 From: Cyrus Mobini Date: Wed, 18 Dec 2024 11:23:20 -0500 Subject: [PATCH 04/11] applied comments --- .../components/__init__.py | 6 -- .../components/general/db/mongo/mongo_base.py | 89 +++++++++++++++++++ .../general/db/mongo/mongo_handler.py | 8 +- .../general/db/mongo/mongo_insert.py | 68 ++------------ .../general/db/mongo/mongo_search.py | 87 +++--------------- 5 files changed, 115 insertions(+), 143 deletions(-) create mode 100644 src/solace_ai_connector/components/general/db/mongo/mongo_base.py diff --git a/src/solace_ai_connector/components/__init__.py b/src/solace_ai_connector/components/__init__.py index 0510c3f4..d20da981 100755 --- a/src/solace_ai_connector/components/__init__.py +++ b/src/solace_ai_connector/components/__init__.py @@ -40,11 +40,6 @@ litellm_chat_model_with_history, ) -from .general.db.mongo import ( - mongo_handler, - mongo_insert, -) - from .general.websearch import ( websearch_duckduckgo, websearch_google, @@ -81,7 +76,6 @@ from .general.llm.langchain.langchain_vector_store_embedding_search import ( LangChainVectorStoreEmbeddingsSearch, ) -from .general.db.mongo.mongo_insert import MongoDBInsertComponent from .general.websearch.websearch_duckduckgo import WebSearchDuckDuckGo from .general.websearch.websearch_google import WebSearchGoogle from .general.websearch.websearch_bing import WebSearchBing \ No newline at end of file diff --git a/src/solace_ai_connector/components/general/db/mongo/mongo_base.py b/src/solace_ai_connector/components/general/db/mongo/mongo_base.py new file mode 100644 index 00000000..2cedebaa --- /dev/null +++ b/src/solace_ai_connector/components/general/db/mongo/mongo_base.py @@ -0,0 +1,89 @@ +"""MongoDB Agent Component for handling database search.""" +import json + +from .mongo_handler import MongoHandler +from ....component_base import ComponentBase + + +info = { + "class_name": "MongoDBBaseComponent", + "description": "Base MongoDB database component", + "config_parameters": [ + { + "name": "database_host", + "required": True, + "description": "MongoDB host", + "type": "string", + }, + { + "name": "database_port", + "required": True, + "description": "MongoDB port", + "type": "integer", + }, + { + "name": "database_user", + "required": False, + "description": "MongoDB user", + "type": "string", + }, + { + "name": "database_password", + "required": False, + "description": "MongoDB password", + "type": "string", + }, + { + "name": "database_name", + "required": True, + "description": "Database name", + "type": "string", + }, + { + "name": "database_collection", + "required": False, + "description": "Collection name - if not provided, all collections will be used", + }, + ], + "input_schema": { + "type": "object", + "properties": { + "collection": { + "type": "string", + "description": "The collection to search in.", + }, + "query": { + "type": "object", + "description": "The query pipeline to execute. if string is provided, it will be converted to JSON.", + } + }, + }, +} + + +class MongoDBBaseComponent(ComponentBase): + """Component for handling MongoDB database operations.""" + + def __init__(self, module_info, **kwargs): + """Initialize the MongoDB component. + + Args: + **kwargs: Additional keyword arguments. + + Raises: + ValueError: If required database configuration is missing. + """ + super().__init__(module_info, **kwargs) + + # Initialize MongoDB handler + self.db_handler = MongoHandler( + self.get_config("database_host"), + self.get_config("database_port"), + self.get_config("database_user"), + self.get_config("database_password"), + self.get_config("database_collection"), + self.get_config("database_name"), + ) + + def invoke(self, message, data): + pass diff --git a/src/solace_ai_connector/components/general/db/mongo/mongo_handler.py b/src/solace_ai_connector/components/general/db/mongo/mongo_handler.py index 26862f09..7b89b79b 100644 --- a/src/solace_ai_connector/components/general/db/mongo/mongo_handler.py +++ b/src/solace_ai_connector/components/general/db/mongo/mongo_handler.py @@ -50,12 +50,15 @@ def insert_documents(self, documents: List[Dict[str, Any]], collection: str = No if not documents: return [] if not collection: + log.debug("No collection specified, using default collection: %s", self.collection) collection = self.collection if not isinstance(documents, dict) or not isinstance(documents, list): + log.error("Documents must be a dictionary or list of dictionaries") raise ValueError("Documents must be a dictionary or list of dictionaries") if isinstance(documents, dict): documents = [documents] if not isinstance(documents[0], dict): + log.error("Documents must be a dictionary or list of dictionaries") raise ValueError("Documents must be a dictionary or list of dictionaries") db = self.get_connection() result = db[collection].insert_many(documents) @@ -82,13 +85,16 @@ def execute_query(self, collection: str, pipeline: List[Dict]) -> List[Dict[str, # Validate each pipeline stage for stage in pipeline: if not isinstance(stage, dict) or not stage: + log.error("Each pipeline stage must be a non-empty dictionary") raise ValueError("Each pipeline stage must be a non-empty dictionary") if not any(key.startswith('$') for key in stage.keys()): + log.error("Invalid pipeline stage: %s. Each stage must start with '$'", stage) raise ValueError(f"Invalid pipeline stage: {stage}. Each stage must start with '$'") try: db = self.get_connection() - if self.collection: + if not collection: + log.debug("No collection specified, using default collection: %s", self.collection) collection = self.collection cursor = db[collection].aggregate(pipeline) result = list(cursor) diff --git a/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py b/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py index df523d3c..351b9657 100644 --- a/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py +++ b/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py @@ -1,76 +1,18 @@ """MongoDB Agent Component for handling database insert.""" -from .mongo_handler import MongoHandler -from ....component_base import ComponentBase +from .mongo_base import MongoDBBaseComponent, info as base_info +info = base_info.copy() +info["class_name"] = "MongoDBInsertComponent" +info["description"] = "Inserts data into a MongoDB database." -info = { - "class_name": "MongoDBInsertComponent", - "description": "Inserts given JSON data into a MongoDB database.", - "config_parameters": [ - { - "name": "database_host", - "required": True, - "description": "MongoDB host", - "type": "string", - }, - { - "name": "database_port", - "required": True, - "description": "MongoDB port", - "type": "integer", - }, - { - "name": "database_user", - "required": False, - "description": "MongoDB user", - "type": "string", - }, - { - "name": "database_password", - "required": False, - "description": "MongoDB password", - "type": "string", - }, - { - "name": "database_name", - "required": True, - "description": "Database name", - "type": "string", - }, - { - "name": "database_collection", - "required": False, - "description": "Collection name - if not provided, all collections will be used", - }, - ], -} - -class MongoDBInsertComponent(ComponentBase): +class MongoDBInsertComponent(MongoDBBaseComponent): """Component for handling MongoDB database operations.""" def __init__(self, **kwargs): - """Initialize the MongoDB component. - - Args: - **kwargs: Additional keyword arguments. - - Raises: - ValueError: If required database configuration is missing. - """ super().__init__(info, **kwargs) - # Initialize MongoDB handler - self.db_handler = MongoHandler( - self.get_config("database_host"), - self.get_config("database_port"), - self.get_config("database_user"), - self.get_config("database_password"), - self.get_config("database_collection"), - self.get_config("database_name"), - ) - def invoke(self, message, data): if not data: raise ValueError( diff --git a/src/solace_ai_connector/components/general/db/mongo/mongo_search.py b/src/solace_ai_connector/components/general/db/mongo/mongo_search.py index e887936c..bdbc0e94 100644 --- a/src/solace_ai_connector/components/general/db/mongo/mongo_search.py +++ b/src/solace_ai_connector/components/general/db/mongo/mongo_search.py @@ -1,97 +1,38 @@ """MongoDB Agent Component for handling database search.""" -import json -from .mongo_handler import MongoHandler -from ....component_base import ComponentBase +import json +from .mongo_base import MongoDBBaseComponent, info as base_info -info = { - "class_name": "MongoDBSearchComponent", - "description": "Searches a MongoDB database.", - "config_parameters": [ - { - "name": "database_host", - "required": True, - "description": "MongoDB host", +info = base_info.copy() +info["class_name"] = "MongoDBSearchComponent" +info["description"] = "Searches a MongoDB database." +info["input_schema"] = { + "type": "object", + "properties": { + "collection": { "type": "string", + "description": "The collection to search in.", }, - { - "name": "database_port", - "required": True, - "description": "MongoDB port", - "type": "integer", - }, - { - "name": "database_user", - "required": False, - "description": "MongoDB user", - "type": "string", - }, - { - "name": "database_password", - "required": False, - "description": "MongoDB password", - "type": "string", - }, - { - "name": "database_name", - "required": True, - "description": "Database name", - "type": "string", - }, - { - "name": "database_collection", - "required": False, - "description": "Collection name - if not provided, all collections will be used", - }, - ], - "input_schema": { - "type": "object", - "properties": { - "collection": { - "type": "string", - "description": "The collection to search in.", - }, - "query": { - "type": "object", - "description": "The query pipeline to execute. if string is provided, it will be converted to JSON.", - } + "query": { + "type": "object", + "description": "The query pipeline to execute. if string is provided, it will be converted to JSON.", }, }, } -class MongoDBSearchComponent(ComponentBase): +class MongoDBSearchComponent(MongoDBBaseComponent): """Component for handling MongoDB database operations.""" def __init__(self, **kwargs): - """Initialize the MongoDB component. - - Args: - **kwargs: Additional keyword arguments. - - Raises: - ValueError: If required database configuration is missing. - """ super().__init__(info, **kwargs) - # Initialize MongoDB handler - self.db_handler = MongoHandler( - self.get_config("database_host"), - self.get_config("database_port"), - self.get_config("database_user"), - self.get_config("database_password"), - self.get_config("database_collection"), - self.get_config("database_name"), - ) - def invoke(self, message, data): collection = data.get("collection") query = data.get("query") if not query: raise ValueError("No query provided") - if not collection: - raise ValueError("No collection provided") if isinstance(query, str): query = json.loads(query) return self.db_handler.execute_query(collection, query) From 8fcccfd9009bedd3291172eac89f4afbf3cc82a8 Mon Sep 17 00:00:00 2001 From: Cyrus Mobini Date: Wed, 18 Dec 2024 13:52:25 -0500 Subject: [PATCH 05/11] updated docs --- docs/components/index.md | 3 +- docs/components/mongo_base.md | 42 +++++++++++++++++++ docs/components/mongo_insert.md | 17 +++++++- .../components/general/db/mongo/mongo_base.py | 1 - .../general/db/mongo/mongo_handler.py | 4 +- 5 files changed, 62 insertions(+), 5 deletions(-) create mode 100644 docs/components/mongo_base.md diff --git a/docs/components/index.md b/docs/components/index.md index 56e7bb9e..cb747122 100644 --- a/docs/components/index.md +++ b/docs/components/index.md @@ -21,7 +21,8 @@ | [litellm_chat_model_with_history](litellm_chat_model_with_history.md) | LiteLLM model handler component with conversation history | | [litellm_embeddings](litellm_embeddings.md) | Embed text using a LiteLLM model | | [message_filter](message_filter.md) | A filtering component. This will apply a user configurable expression. If the expression evaluates to True, the message will be passed on. If the expression evaluates to False, the message will be discarded. If the message is discarded, any previous components that require an acknowledgement will be acknowledged. | -| [mongo_insert](mongo_insert.md) | Inserts given JSON data into a MongoDB database. | +| [mongo_base](mongo_base.md) | Base MongoDB database component | +| [mongo_insert](mongo_insert.md) | Inserts data into a MongoDB database. | | [mongo_search](mongo_search.md) | Searches a MongoDB database. | | [openai_chat_model](openai_chat_model.md) | OpenAI chat model component | | [openai_chat_model_with_history](openai_chat_model_with_history.md) | OpenAI chat model component with conversation history | diff --git a/docs/components/mongo_base.md b/docs/components/mongo_base.md new file mode 100644 index 00000000..d0e977b5 --- /dev/null +++ b/docs/components/mongo_base.md @@ -0,0 +1,42 @@ +# MongoDBBaseComponent + +Base MongoDB database component + +## Configuration Parameters + +```yaml +component_name: +component_module: mongo_base +component_config: + database_host: + database_port: + database_user: + database_password: + database_name: + database_collection: +``` + +| Parameter | Required | Default | Description | +| --- | --- | --- | --- | +| database_host | True | | MongoDB host | +| database_port | True | | MongoDB port | +| database_user | False | | MongoDB user | +| database_password | False | | MongoDB password | +| database_name | True | | Database name | +| database_collection | False | | Collection name - if not provided, all collections will be used | + + +## Component Input Schema + +``` +{ + collection: , + query: { + + } +} +``` +| Field | Required | Description | +| --- | --- | --- | +| collection | False | The collection to search in. | +| query | False | The query pipeline to execute. if string is provided, it will be converted to JSON. | diff --git a/docs/components/mongo_insert.md b/docs/components/mongo_insert.md index 7c5a0896..23b18be8 100644 --- a/docs/components/mongo_insert.md +++ b/docs/components/mongo_insert.md @@ -1,6 +1,6 @@ # MongoDBInsertComponent -Inserts given JSON data into a MongoDB database. +Inserts data into a MongoDB database. ## Configuration Parameters @@ -25,3 +25,18 @@ component_config: | database_name | True | | Database name | | database_collection | False | | Collection name - if not provided, all collections will be used | + +## Component Input Schema + +``` +{ + collection: , + query: { + + } +} +``` +| Field | Required | Description | +| --- | --- | --- | +| collection | False | The collection to search in. | +| query | False | The query pipeline to execute. if string is provided, it will be converted to JSON. | diff --git a/src/solace_ai_connector/components/general/db/mongo/mongo_base.py b/src/solace_ai_connector/components/general/db/mongo/mongo_base.py index 2cedebaa..fec56c22 100644 --- a/src/solace_ai_connector/components/general/db/mongo/mongo_base.py +++ b/src/solace_ai_connector/components/general/db/mongo/mongo_base.py @@ -1,5 +1,4 @@ """MongoDB Agent Component for handling database search.""" -import json from .mongo_handler import MongoHandler from ....component_base import ComponentBase diff --git a/src/solace_ai_connector/components/general/db/mongo/mongo_handler.py b/src/solace_ai_connector/components/general/db/mongo/mongo_handler.py index 7b89b79b..29716c76 100644 --- a/src/solace_ai_connector/components/general/db/mongo/mongo_handler.py +++ b/src/solace_ai_connector/components/general/db/mongo/mongo_handler.py @@ -52,12 +52,12 @@ def insert_documents(self, documents: List[Dict[str, Any]], collection: str = No if not collection: log.debug("No collection specified, using default collection: %s", self.collection) collection = self.collection - if not isinstance(documents, dict) or not isinstance(documents, list): + if not isinstance(documents, dict) and not isinstance(documents, list): log.error("Documents must be a dictionary or list of dictionaries") raise ValueError("Documents must be a dictionary or list of dictionaries") if isinstance(documents, dict): documents = [documents] - if not isinstance(documents[0], dict): + if not documents or not isinstance(documents[0], dict): log.error("Documents must be a dictionary or list of dictionaries") raise ValueError("Documents must be a dictionary or list of dictionaries") db = self.get_connection() From 2f0802971e048d64d31515ac827b7e2bdaae0db1 Mon Sep 17 00:00:00 2001 From: Cyrus Mobini Date: Thu, 2 Jan 2025 15:24:30 -0500 Subject: [PATCH 06/11] Added the option to support custom keys for reply and metadata for request reponse user properties --- examples/request_reply.yaml | 5 +- pyproject.toml | 2 +- src/solace_ai_connector/common/message.py | 120 +----------- src/solace_ai_connector/common/utils.py | 184 ++++++++++++++++++ .../components/component_base.py | 10 +- .../inputs_outputs/broker_request_response.py | 81 ++++---- 6 files changed, 249 insertions(+), 153 deletions(-) diff --git a/examples/request_reply.yaml b/examples/request_reply.yaml index a6fb75cc..0480d700 100644 --- a/examples/request_reply.yaml +++ b/examples/request_reply.yaml @@ -33,6 +33,9 @@ flows: component_config: <<: *broker_connection request_expiry_ms: 30000 # 30 seconds + user_properties_reply_topic_key: :response.user.topic # nested with : + user_properties_reply_metadata_key: response.user.metadata # string literal + input_transforms: - type: copy source_expression: input.payload @@ -77,7 +80,7 @@ flows: source_expression: input.user_properties dest_expression: user_data.output:user_properties - type: copy - source_expression: input.user_properties:__solace_ai_connector_broker_request_response_topic__ + source_expression: input.user_properties:response.user.topic dest_expression: user_data.output:topic input_selection: source_expression: user_data.output diff --git a/pyproject.toml b/pyproject.toml index 9f8820e4..5d6a4a79 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ dependencies = [ "PyYAML~=6.0.1", "Requests~=2.32.3", "solace_pubsubplus>=1.8.0", - "litellm~=1.51.3", + "litellm>=1.51.3", "Flask~=3.0.3", "Flask-SocketIO~=5.4.1", "build~=1.2.2.post1", diff --git a/src/solace_ai_connector/common/message.py b/src/solace_ai_connector/common/message.py index 73c0fbf6..45c1ce30 100644 --- a/src/solace_ai_connector/common/message.py +++ b/src/solace_ai_connector/common/message.py @@ -8,7 +8,7 @@ from .log import log from .trace_message import TraceMessage - +from .utils import set_data_value, get_data_value class Message: def __init__(self, payload=None, topic=None, user_properties=None): @@ -59,7 +59,7 @@ def get_data(self, expression, calling_object=None, data_type=None): if expression.startswith("static:"): return expression.split(":", 1)[1] data_object = self.get_data_object(expression, calling_object=calling_object) - data = self.get_data_value(data_object, expression) + data = get_data_value(data_object, expression) if data_type: data = self.convert_data_type(data, data_type) @@ -89,7 +89,7 @@ def set_data(self, expression, value): create_if_not_exists=True, create_value={} if not first_part.isdigit() else [], ) - self.set_data_value(data_object, expression, value) + set_data_value(data_object, expression, value) def get_data_object( self, @@ -158,120 +158,6 @@ def set_data_object(self, expression, value): f"Unknown data type '{data_type}' in expression '{expression}'" ) - def get_data_value(self, data_object, expression): - if ":" not in expression: - return data_object - - # If the data_object is a value, return it - if ( - not isinstance(data_object, dict) - and not isinstance(data_object, list) - and not isinstance(data_object, object) - ): - return data_object - - data_name = expression.split(":")[1] - - if data_name == "": - return data_object - - # Split the data_name by dots to get the path - path_parts = data_name.split(".") - - # Start with the entire data_object - current_data = data_object - - # Traverse the path - for part in path_parts: - # If the current data is a dictionary, get the value with the key 'part' - if isinstance(current_data, dict): - current_data = current_data.get(part) - # If the current data is a list and 'part' is a number, get the value at - # the index 'part' - elif isinstance(current_data, list) and part.isdigit(): - current_data = current_data[int(part)] - # If the current data is neither a dictionary nor a list, or if 'part' is - # not a number, return None - elif isinstance(current_data, object): - current_data = getattr(current_data, part, None) - else: - raise ValueError( - f"Could not get data value for expression '{expression}' - data " - "is not a dictionary or list" - ) - - # If at any point we get None, stop and return None - if current_data is None: - return None - - # Return the final data - return current_data - - # Similar to get_data_value, we need to use the expression to find the place to set the value - # except that we will create objects along the way if they don't exist - def set_data_value(self, data_object, expression, value): - data_name = expression.split(":")[1] - - # It is an error if the data_object is None or not a dictionary or list - if data_object is None: - raise ValueError( - f"Could not set data value for expression '{expression}' - data_object is None" - ) - if not isinstance(data_object, dict) and not isinstance(data_object, list): - raise ValueError( - f"Could not set data value for expression '{expression}' - data_object " - "is not a dictionary or list" - ) - - # It is an error if the data_name is empty - if data_name == "": - raise ValueError( - f"Could not set data value for expression '{expression}' - data_name is empty" - ) - - # Split the data_name by dots to get the path - path_parts = data_name.split(".") - - # Start with the entire data_object - current_data = data_object - - # Traverse the path - for i, part in enumerate(path_parts): - # If we're at the last part of the path, set the value - if i == len(path_parts) - 1: - if isinstance(current_data, dict): - current_data[part] = value - elif isinstance(current_data, list) and part.isdigit(): - while len(current_data) <= int(part): - current_data.append(None) - current_data[int(part)] = value - else: - log.error( - "Could not set data value for expression '%s' - " - "data is not a dictionary or list", - expression, - ) - # If we're not at the last part of the path, move to the next part - else: - next_part_is_digit = path_parts[i + 1].isdigit() - if isinstance(current_data, dict): - current_data = current_data.setdefault( - part, [] if next_part_is_digit else {} - ) - elif isinstance(current_data, list) and part.isdigit(): - while len(current_data) <= int(part): - current_data.append(None) - if current_data[int(part)] is None: - current_data[int(part)] = [] if next_part_is_digit else {} - current_data = current_data[int(part)] - else: - log.error( - "Could not set data value for expression '%s' - data " - "is not a dictionary or list", - expression, - ) - return - def set_iteration_data(self, item, index): self.iteration_data["item"] = item self.iteration_data["index"] = index diff --git a/src/solace_ai_connector/common/utils.py b/src/solace_ai_connector/common/utils.py index 19bbf3fd..e32b5323 100755 --- a/src/solace_ai_connector/common/utils.py +++ b/src/solace_ai_connector/common/utils.py @@ -390,3 +390,187 @@ def decode_payload(payload, encoding, payload_format): payload = yaml.safe_load(payload) return payload + + +def get_data_value(data_object, expression): + # If the data_object is a value, return it + if ( + not isinstance(data_object, dict) + and not isinstance(data_object, list) + and not isinstance(data_object, object) + ): + return data_object + + if not data_object or not expression: + return data_object + + if ":" not in expression: + if expression in data_object: + return data_object[expression] + return data_object + + data_name = expression.split(":")[1] + + if data_name == "": + return data_object + + # Split the data_name by dots to get the path + path_parts = data_name.split(".") + + # Start with the entire data_object + current_data = data_object + + # Traverse the path + for part in path_parts: + # If the current data is a dictionary, get the value with the key 'part' + if isinstance(current_data, dict): + current_data = current_data.get(part) + # If the current data is a list and 'part' is a number, get the value at + # the index 'part' + elif isinstance(current_data, list) and part.isdigit(): + current_data = current_data[int(part)] + # If the current data is neither a dictionary nor a list, or if 'part' is + # not a number, return None + elif isinstance(current_data, object): + current_data = getattr(current_data, part, None) + else: + raise ValueError( + f"Could not get data value for expression '{expression}' - data " + "is not a dictionary or list" + ) + + # If at any point we get None, stop and return None + if current_data is None: + return None + + # Return the final data + return current_data + +# Similar to get_data_value, we need to use the expression to find the place to set the value +# except that we will create objects along the way if they don't exist +def set_data_value(data_object, expression, value): + if ":" not in expression: + data_object[expression] = value + return + + data_name = expression.split(":")[1] + + # It is an error if the data_object is None or not a dictionary or list + if data_object is None: + raise ValueError( + f"Could not set data value for expression '{expression}' - data_object is None" + ) + if not isinstance(data_object, dict) and not isinstance(data_object, list): + raise ValueError( + f"Could not set data value for expression '{expression}' - data_object " + "is not a dictionary or list" + ) + + # It is an error if the data_name is empty + if data_name == "": + raise ValueError( + f"Could not set data value for expression '{expression}' - data_name is empty" + ) + + # Split the data_name by dots to get the path + path_parts = data_name.split(".") + + # Start with the entire data_object + current_data = data_object + + # Traverse the path + for i, part in enumerate(path_parts): + # If we're at the last part of the path, set the value + if i == len(path_parts) - 1: + if isinstance(current_data, dict): + current_data[part] = value + elif isinstance(current_data, list) and part.isdigit(): + while len(current_data) <= int(part): + current_data.append(None) + current_data[int(part)] = value + else: + log.error( + "Could not set data value for expression '%s' - " + "data is not a dictionary or list", + expression, + ) + # If we're not at the last part of the path, move to the next part + else: + next_part_is_digit = path_parts[i + 1].isdigit() + if isinstance(current_data, dict): + current_data = current_data.setdefault( + part, [] if next_part_is_digit else {} + ) + elif isinstance(current_data, list) and part.isdigit(): + while len(current_data) <= int(part): + current_data.append(None) + if current_data[int(part)] is None: + current_data[int(part)] = [] if next_part_is_digit else {} + current_data = current_data[int(part)] + else: + log.error( + "Could not set data value for expression '%s' - data " + "is not a dictionary or list", + expression, + ) + return + +def remove_data_value(data_object, expression): + if ":" not in expression: + data_object.pop(expression, None) + return + + data_name = expression.split(":")[1] + + # It is an error if the data_object is None or not a dictionary or list + if data_object is None: + raise ValueError( + f"Could not remove data value for expression '{expression}' - data_object is None" + ) + if not isinstance(data_object, dict) and not isinstance(data_object, list): + raise ValueError( + f"Could not remove data value for expression '{expression}' - data_object " + "is not a dictionary or list" + ) + + # It is an error if the data_name is empty + if data_name == "": + raise ValueError( + f"Could not remove data value for expression '{expression}' - data_name is empty" + ) + + # Split the data_name by dots to get the path + path_parts = data_name.split(".") + + # Start with the entire data_object + current_data = data_object + + # Traverse the path + for i, part in enumerate(path_parts): + # If we're at the last part of the path, remove the value + if i == len(path_parts) - 1: + if isinstance(current_data, dict): + current_data.pop(part, None) + elif isinstance(current_data, list) and part.isdigit(): + if len(current_data) > int(part): + current_data.pop(int(part)) + else: + log.error( + "Could not remove data value for expression '%s' - " + "data is not a dictionary or list", + expression, + ) + # If we're not at the last part of the path, move to the next part + else: + if isinstance(current_data, dict): + current_data = current_data.get(part, {}) + elif isinstance(current_data, list) and part.isdigit(): + if len(current_data) > int(part): + current_data = current_data[int(part)] + else: + log.error( + "Could not remove data value for expression '%s' - data " + "is not a dictionary or list", + expression, + ) + return \ No newline at end of file diff --git a/src/solace_ai_connector/components/component_base.py b/src/solace_ai_connector/components/component_base.py index 1930df09..802252f3 100644 --- a/src/solace_ai_connector/components/component_base.py +++ b/src/solace_ai_connector/components/component_base.py @@ -304,7 +304,15 @@ def setup_broker_request_response(self): "request_expiry_ms": request_expiry_ms, } - for key in ["response_topic_prefix", "response_queue_prefix", "response_topic_insertion_expression"]: + optional_keys = [ + "response_topic_prefix", + "response_queue_prefix", + "user_properties_reply_topic_key", + "user_properties_reply_metadata_key", + "response_topic_insertion_expression", + ] + + for key in optional_keys: if key in self.broker_request_response_config: rrc_config[key] = self.broker_request_response_config[key] diff --git a/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py b/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py index 2fdb05fc..2dca63e5 100644 --- a/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py +++ b/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py @@ -7,6 +7,7 @@ from copy import deepcopy from ...common.log import log +from ...common.utils import set_data_value, get_data_value, remove_data_value from .broker_base import BrokerBase from ...common.message import Message from ...common.utils import ensure_slash_on_end, ensure_slash_on_start @@ -90,6 +91,18 @@ "description": "Prefix for reply queues", "default": "reply-queue", }, + { + "name": "user_properties_reply_topic_key", + "required": False, + "description": "Key to store the reply topic in the user properties. Start with : for nested object", + "default": "__solace_ai_connector_broker_request_response_topic__", + }, + { + "name": "user_properties_reply_metadata_key", + "required": False, + "description": "Key to store the reply metadata in the user properties. Start with : for nested object", + "default": "__solace_ai_connector_broker_request_reply_metadata__", + }, { "name": "request_expiry_ms", "required": False, @@ -193,6 +206,9 @@ }, } +DEFAULT_REPLY_TOPIC_KEY = "__solace_ai_connector_broker_request_response_topic__" +DEFAULT_REPLY_METADATA_KEY = "__solace_ai_connector_broker_request_reply_metadata__" + class BrokerRequestResponse(BrokerBase): """Request-Response broker component for the Solace AI Event Connector""" @@ -210,6 +226,12 @@ def __init__(self, **kwargs): self.response_queue_prefix = ensure_slash_on_end( self.get_config("response_queue_prefix") ) + self.user_properties_reply_topic_key = self.get_config( + "user_properties_reply_topic_key", DEFAULT_REPLY_TOPIC_KEY + ) + self.user_properties_reply_metadata_key = self.get_config( + "user_properties_reply_metadata_key", DEFAULT_REPLY_METADATA_KEY + ) self.requestor_id = str(uuid.uuid4()) self.reply_queue_name = f"{self.response_queue_prefix}{self.requestor_id}" self.response_topic = f"{self.response_topic_prefix}{self.requestor_id}{self.response_topic_suffix}" @@ -310,9 +332,7 @@ def process_response(self, broker_message): return streaming_complete_expression = None - metadata_json = user_properties.get( - "__solace_ai_connector_broker_request_reply_metadata__" - ) + metadata_json = get_data_value(user_properties, self.user_properties_reply_metadata_key) if not metadata_json: log.error("Received response without metadata: %s", payload) return @@ -357,21 +377,21 @@ def process_response(self, broker_message): # Update the metadata in the response if metadata_stack: - response["user_properties"][ - "__solace_ai_connector_broker_request_reply_metadata__" - ] = json.dumps(metadata_stack) + set_data_value( + response["user_properties"], + self.user_properties_reply_metadata_key, + json.dumps(metadata_stack), + ) # Put the last reply topic back in the user properties - response["user_properties"][ - "__solace_ai_connector_broker_request_response_topic__" - ] = metadata_stack[-1]["response_topic"] + set_data_value( + response["user_properties"], + self.user_properties_reply_topic_key, + metadata_stack[-1]["response_topic"], + ) else: # Remove the metadata and reply topic from the user properties - response["user_properties"].pop( - "__solace_ai_connector_broker_request_reply_metadata__", None - ) - response["user_properties"].pop( - "__solace_ai_connector_broker_request_response_topic__", None - ) + remove_data_value(response["user_properties"], self.user_properties_reply_metadata_key) + remove_data_value(response["user_properties"], self.user_properties_reply_topic_key) message = Message( payload=payload, @@ -410,16 +430,13 @@ def invoke(self, message, data): metadata = {"request_id": request_id, "response_topic": topic} - if ( - "__solace_ai_connector_broker_request_reply_metadata__" - in data["user_properties"] - ): + existing_metadata_json = get_data_value( + data["user_properties"], + self.user_properties_reply_metadata_key + ) + if existing_metadata_json: try: - existing_metadata = json.loads( - data["user_properties"][ - "__solace_ai_connector_broker_request_reply_metadata__" - ] - ) + existing_metadata = json.loads(existing_metadata_json) if isinstance(existing_metadata, list): existing_metadata.append(metadata) metadata = existing_metadata @@ -430,19 +447,17 @@ def invoke(self, message, data): except json.JSONDecodeError: log.warning( "Failed to decode existing metadata JSON: %s", - data["user_properties"][ - "__solace_ai_connector_broker_request_reply_metadata__" - ], + existing_metadata_json, ) else: metadata = [metadata] - data["user_properties"][ - "__solace_ai_connector_broker_request_reply_metadata__" - ] = json.dumps(metadata) - data["user_properties"][ - "__solace_ai_connector_broker_request_response_topic__" - ] = topic + set_data_value( + data["user_properties"], self.user_properties_reply_metadata_key, json.dumps(metadata) + ) + set_data_value( + data["user_properties"], self.user_properties_reply_topic_key, topic + ) # If we are configured to also insert the response topic into the request message # then create a temporary message to do so From 341a49ce806f81c2e5ed0237bf2f2213a0bc0e4a Mon Sep 17 00:00:00 2001 From: Cyrus Mobini Date: Fri, 3 Jan 2025 08:18:27 -0500 Subject: [PATCH 07/11] fixed issue --- src/solace_ai_connector/common/utils.py | 9 +++++---- .../components/inputs_outputs/broker_request_response.py | 5 +++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/solace_ai_connector/common/utils.py b/src/solace_ai_connector/common/utils.py index e32b5323..d94103d6 100755 --- a/src/solace_ai_connector/common/utils.py +++ b/src/solace_ai_connector/common/utils.py @@ -392,7 +392,7 @@ def decode_payload(payload, encoding, payload_format): return payload -def get_data_value(data_object, expression): +def get_data_value(data_object, expression, resolve_none_colon=False): # If the data_object is a value, return it if ( not isinstance(data_object, dict) @@ -405,9 +405,10 @@ def get_data_value(data_object, expression): return data_object if ":" not in expression: - if expression in data_object: - return data_object[expression] - return data_object + if resolve_none_colon: + return data_object.get(expression) + else: + return data_object data_name = expression.split(":")[1] diff --git a/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py b/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py index 2dca63e5..f8888ad3 100644 --- a/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py +++ b/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py @@ -332,7 +332,7 @@ def process_response(self, broker_message): return streaming_complete_expression = None - metadata_json = get_data_value(user_properties, self.user_properties_reply_metadata_key) + metadata_json = get_data_value(user_properties, self.user_properties_reply_metadata_key, True) if not metadata_json: log.error("Received response without metadata: %s", payload) return @@ -432,7 +432,8 @@ def invoke(self, message, data): existing_metadata_json = get_data_value( data["user_properties"], - self.user_properties_reply_metadata_key + self.user_properties_reply_metadata_key, + True ) if existing_metadata_json: try: From bc889d2ee2882b1e97cd16940ad943f5522e063b Mon Sep 17 00:00:00 2001 From: Cyrus Mobini Date: Thu, 9 Jan 2025 12:45:10 -0500 Subject: [PATCH 08/11] Updated insert with type --- examples/db/mongodb_insert.yaml | 56 ++++++++++++++++ .../general/db/mongo/mongo_insert.py | 67 ++++++++++++++++++- 2 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 examples/db/mongodb_insert.yaml diff --git a/examples/db/mongodb_insert.yaml b/examples/db/mongodb_insert.yaml new file mode 100644 index 00000000..adf64438 --- /dev/null +++ b/examples/db/mongodb_insert.yaml @@ -0,0 +1,56 @@ +--- +log: + stdout_log_level: INFO + log_file_level: INFO + log_file: solace_ai_connector.log + +trace: + trace_file: solace_ai_connector_trace.log + +shared_config: + - broker_config: &broker_connection + broker_type: solace + broker_url: ${SOLACE_BROKER_URL} + broker_username: ${SOLACE_BROKER_USERNAME} + broker_password: ${SOLACE_BROKER_PASSWORD} + broker_vpn: ${SOLACE_BROKER_VPN} + +flows: + # Data ingestion to MongoDB for context mesh + - name: real_time_data_ingest + components: + # Data Input from Solace broker + - component_name: solace_data_input + component_module: broker_input + component_config: + <<: *broker_connection + broker_queue_name: demo_data_ingest + broker_subscriptions: + - topic: data/ingest + qos: 1 + payload_encoding: utf-8 + payload_format: json + + # Batch messages to avoid frequent calls to DB + - component_name: batch_aggregate + component_module: aggregate + component_config: + max_items: 100 + max_time_ms: 3000 + input_selection: + source_expression: input.payload:event + + # Insert into MongoDB + - component_name: mongo_insert + component_module: mongo_insert + component_config: + database_host: ${MONGO_HOST} + database_port: ${MONGO_PORT} + database_user: ${MONGO_USER} + database_password: ${MONGO_PASSWORD} + database_name: ${MONGO_DB} + database_collection: ${MONGO_COLLECTION} + data_types: + timestamp: Date + input_selection: + source_expression: previous diff --git a/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py b/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py index 351b9657..c9038836 100644 --- a/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py +++ b/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py @@ -1,10 +1,19 @@ """MongoDB Agent Component for handling database insert.""" +import datetime +import dateutil.parser from .mongo_base import MongoDBBaseComponent, info as base_info info = base_info.copy() info["class_name"] = "MongoDBInsertComponent" info["description"] = "Inserts data into a MongoDB database." +info["config_parameters"].extend([ + { + "name": "data_types", + "required": False, + "description": "An array of key value pairs to specify the data types for each field in the data. Used for non-JSON types like Date. Supports nested dotted names", + }, +]) class MongoDBInsertComponent(MongoDBBaseComponent): @@ -12,10 +21,66 @@ class MongoDBInsertComponent(MongoDBBaseComponent): def __init__(self, **kwargs): super().__init__(info, **kwargs) + self.data_types_map = self.get_config("data_types") def invoke(self, message, data): - if not data: + if not data or not isinstance(data, dict) and not isinstance(data, list): raise ValueError( "Invalid data provided for MongoDB insert. Expected a dictionary or a list of dictionary." ) + + if self.data_types_map: + for key, field_type in self.data_types_map.items(): + if isinstance(data, list): + new_data = [] + for item in data: + new_data.append(self._convert_data_type(item, key, field_type)) + data = new_data + else: + data = self._convert_data_type(data, key, field_type) return self.db_handler.insert_documents(data) + + def _convert_data_type(self, data, key, field_type): + if not key or not field_type: + return data + if not isinstance(data, list) and not isinstance(data, dict): + return data + if "." in key: + segments = key.split(".") + segment = segments[0] + if segment not in data: + if key in data: + data[key] = self._convert_field_type(data[key], field_type) + return data + if len(segments) > 1: + data[segment] = self._convert_data_type(data[segment], ".".join(segments[1:]), field_type) + else: + data[segment] = self._convert_field_type(data[segment], field_type) + else: + if key in data: + data[key] = self._convert_field_type(data[key], field_type) + return data + + def _convert_field_type(self, value, field_type): + field_type = field_type.lower() + if field_type == "date" or field_type == "timestamp": + if isinstance(value, str): + return dateutil.parser.parse(value) + elif isinstance(value, int) or isinstance(value, float): + return datetime.datetime.fromtimestamp(value) + else: + return value + if field_type == "int" or field_type == "int32" or field_type == "int64": + return int(value) + if field_type == "float" or field_type == "double": + return float(value) + if field_type == "bool": + if isinstance(value, str) and value.lower() == "false": + return False + return bool(value) + if field_type == "string": + return str(value) + if field_type == "null": + return None + return value + From 2316d36ffc98679dc72b272c234ae1e6b58697ea Mon Sep 17 00:00:00 2001 From: Cyrus Mobini Date: Mon, 20 Jan 2025 12:38:24 -0500 Subject: [PATCH 09/11] added docs --- docs/components/broker_request_response.md | 6 ++++++ docs/components/mongo_insert.md | 2 ++ 2 files changed, 8 insertions(+) diff --git a/docs/components/broker_request_response.md b/docs/components/broker_request_response.md index a408e2e7..74fd32fd 100644 --- a/docs/components/broker_request_response.md +++ b/docs/components/broker_request_response.md @@ -17,7 +17,10 @@ component_config: payload_format: response_topic_prefix: response_topic_suffix: + response_topic_insertion_expression: response_queue_prefix: + user_properties_reply_topic_key: + user_properties_reply_metadata_key: request_expiry_ms: streaming: streaming_complete_expression: @@ -38,7 +41,10 @@ component_config: | payload_format | False | json | Format for the payload (json, yaml, text) | | response_topic_prefix | False | reply | Prefix for reply topics | | response_topic_suffix | False | | Suffix for reply topics | +| response_topic_insertion_expression | False | | Expression to insert the reply topic into the request message. If not set, the reply topic will only be added to the request_response_metadata. The expression uses the same format as other data expressions: (e.g input.payload:myObj.replyTopic). If there is no object type in the expression, it will default to 'input.payload'. | | response_queue_prefix | False | reply-queue | Prefix for reply queues | +| user_properties_reply_topic_key | False | __solace_ai_connector_broker_request_response_topic__ | Key to store the reply topic in the user properties. Start with : for nested object | +| user_properties_reply_metadata_key | False | __solace_ai_connector_broker_request_reply_metadata__ | Key to store the reply metadata in the user properties. Start with : for nested object | | request_expiry_ms | False | 60000 | Expiry time for cached requests in milliseconds | | streaming | False | | The response will arrive in multiple pieces. If True, the streaming_complete_expression must be set and will be used to determine when the last piece has arrived. | | streaming_complete_expression | False | | The source expression to determine when the last piece of a streaming response has arrived. | diff --git a/docs/components/mongo_insert.md b/docs/components/mongo_insert.md index 23b18be8..31deec67 100644 --- a/docs/components/mongo_insert.md +++ b/docs/components/mongo_insert.md @@ -14,6 +14,7 @@ component_config: database_password: database_name: database_collection: + data_types: ``` | Parameter | Required | Default | Description | @@ -24,6 +25,7 @@ component_config: | database_password | False | | MongoDB password | | database_name | True | | Database name | | database_collection | False | | Collection name - if not provided, all collections will be used | +| data_types | False | | An array of key value pairs to specify the data types for each field in the data. Used for non-JSON types like Date. Supports nested dotted names | ## Component Input Schema From 2c85c3bef125a4fbc167e889ea9d66f7662bab8d Mon Sep 17 00:00:00 2001 From: Cyrus Mobini Date: Mon, 20 Jan 2025 14:08:01 -0500 Subject: [PATCH 10/11] added config value validation --- .../components/general/db/mongo/mongo_insert.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py b/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py index c9038836..be2a0d24 100644 --- a/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py +++ b/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py @@ -11,7 +11,7 @@ { "name": "data_types", "required": False, - "description": "An array of key value pairs to specify the data types for each field in the data. Used for non-JSON types like Date. Supports nested dotted names", + "description": "Key value pairs to specify the data types for each field in the data. Used for non-JSON types like Date. Supports nested dotted names", }, ]) @@ -22,6 +22,17 @@ class MongoDBInsertComponent(MongoDBBaseComponent): def __init__(self, **kwargs): super().__init__(info, **kwargs) self.data_types_map = self.get_config("data_types") + if self.data_types_map: + if not isinstance(self.data_types_map, dict): + raise ValueError( + "Invalid data types provided for MongoDB insert. Expected a dictionary." + ) + for key, field_type in self.data_types_map.items(): + if not isinstance(key, str) or not isinstance(field_type, str): + raise ValueError( + "Invalid data types provided for MongoDB insert. Expected a dictionary with string key and value." + ) + def invoke(self, message, data): if not data or not isinstance(data, dict) and not isinstance(data, list): From 77668a8638c950258347ec41851d1b99903de48f Mon Sep 17 00:00:00 2001 From: Cyrus Mobini Date: Mon, 20 Jan 2025 16:06:42 -0500 Subject: [PATCH 11/11] added value check for mongo insert --- .../components/general/db/mongo/mongo_insert.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py b/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py index be2a0d24..8c0425f5 100644 --- a/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py +++ b/src/solace_ai_connector/components/general/db/mongo/mongo_insert.py @@ -15,6 +15,7 @@ }, ]) +POSSIBLE_TYPES = ["date", "timestamp", "int", "int32", "int64", "float", "double", "bool", "string", "null"] class MongoDBInsertComponent(MongoDBBaseComponent): """Component for handling MongoDB database operations.""" @@ -28,9 +29,10 @@ def __init__(self, **kwargs): "Invalid data types provided for MongoDB insert. Expected a dictionary." ) for key, field_type in self.data_types_map.items(): - if not isinstance(key, str) or not isinstance(field_type, str): + if not isinstance(key, str) or not isinstance(field_type, str) or field_type.lower() not in POSSIBLE_TYPES: raise ValueError( - "Invalid data types provided for MongoDB insert. Expected a dictionary with string key and value." + "Invalid data types provided for MongoDB insert. Expected a dictionary with key value pairs where key is a string and value is a string from the following list: " + + ", ".join(POSSIBLE_TYPES) )