Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stabilize the connector and add monitoring #70

Merged
merged 65 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
eddc391
feat: add monitring component
alimosaed Nov 29, 2024
a955cfb
fix: resolve a bug
alimosaed Nov 29, 2024
2b271e3
fix: add sleep time
alimosaed Nov 29, 2024
5075043
fix: add sleep time
alimosaed Nov 29, 2024
03d8b75
feat: add readiness and handle excessive logs
alimosaed Nov 29, 2024
9965033
fix: handle sleep error
alimosaed Dec 2, 2024
6c5362b
fix: handle sleep error
alimosaed Dec 2, 2024
3dbba60
feat: gracefully exit
alimosaed Dec 3, 2024
a62d4cf
feat: set the log back
alimosaed Dec 3, 2024
116068a
fix: rename log fields
alimosaed Dec 3, 2024
4c80e41
fix: disabled monitoring
alimosaed Dec 3, 2024
74f3d34
Merge branch 'main' of github.com:SolaceDev/solace-ai-connector
alimosaed Dec 3, 2024
98b8699
fix: resolve log naming
alimosaed Dec 3, 2024
796fd67
fix: resolved logging issues
alimosaed Dec 4, 2024
cbbbd7c
fix: resolve log
alimosaed Dec 5, 2024
37e3144
fix: resolve log
alimosaed Dec 5, 2024
52b5233
feat: remove dependency to Langchain
alimosaed Dec 5, 2024
ad742f7
feat: update monitoring
alimosaed Dec 6, 2024
7d186bd
Merge branch 'main' of github.com:SolaceDev/solace-ai-connector
alimosaed Dec 9, 2024
d93e5de
feat: drop error messages when the queue is full
alimosaed Dec 9, 2024
dfe4219
feat: add a text splitter component
alimosaed Dec 9, 2024
9648a94
feat: updated docs
alimosaed Dec 9, 2024
f3afadf
Merge branch 'alireza/AI-278/add_vdb_text_splitter' into alireza/AI-2…
alimosaed Dec 10, 2024
4eb4bef
fix: resolve graceful termination issues
alimosaed Dec 10, 2024
488de44
fix: remove payloads from logs
alimosaed Dec 11, 2024
a5de339
Merge branch 'main' of github.com:SolaceDev/solace-ai-connector
alimosaed Dec 16, 2024
e712b9b
feat: add the forever retry
alimosaed Dec 16, 2024
7f26452
Merge branch 'alireza/AI-278/add_forever_broker_reconnection_config' …
alimosaed Dec 16, 2024
5ff264a
feat: keep connecting
alimosaed Dec 19, 2024
8098723
Merge branch 'alireza/AI-278/add_forever_broker_reconnection_config' …
alimosaed Dec 19, 2024
f9fa6b9
Feat: add monitoring
alimosaed Dec 19, 2024
a589776
feat: replace the reconnection
alimosaed Dec 19, 2024
e67add1
Merge branch 'alireza/AI-278/add_forever_broker_reconnection_config' …
alimosaed Dec 19, 2024
aa535d2
feat: refactor monitoring
alimosaed Dec 20, 2024
bc8251c
feat: add connection metric
alimosaed Dec 20, 2024
012d544
convert connection to async
alimosaed Jan 2, 2025
776cc71
get metrics enum
alimosaed Jan 2, 2025
e1b5c46
add types of metrics
alimosaed Jan 3, 2025
d768771
use metrics rather than metric values
alimosaed Jan 3, 2025
3ba1042
fix bug
alimosaed Jan 3, 2025
d31b032
update type
alimosaed Jan 3, 2025
0c5bf81
convert monitoring output to dictionary
alimosaed Jan 6, 2025
0d6421b
fix bug
alimosaed Jan 6, 2025
7a726ba
feat: add connection status
alimosaed Jan 9, 2025
5f56c79
feat: add reconnecting status
alimosaed Jan 9, 2025
b30c99b
feat: add reconnecting log and handled signals
alimosaed Jan 10, 2025
a64a48b
fix: update status
alimosaed Jan 10, 2025
7467b8f
fix: update log
alimosaed Jan 10, 2025
d49b4ef
fix: fix bug
alimosaed Jan 10, 2025
ae69248
fix: fix bug
alimosaed Jan 10, 2025
8a9ba5c
fix: resolve connection logs
alimosaed Jan 10, 2025
1bff59e
fix: handle threads
alimosaed Jan 10, 2025
89ecf1d
fix: update connection state machine
alimosaed Jan 13, 2025
2e0cb5f
feat: add prefix to the broker logs
alimosaed Jan 14, 2025
33e042f
fix: synchronize logs with connection attempts
alimosaed Jan 17, 2025
d8cfd16
fix: remove datadog dependency
alimosaed Jan 17, 2025
caab235
Merge branch 'main' of github.com:SolaceDev/solace-ai-connector
alimosaed Jan 17, 2025
e4ad224
Merge branch 'main' into alireza/AI-278/add_monitoring
alimosaed Jan 17, 2025
8d664d0
fix: cover an exception
alimosaed Jan 17, 2025
7cac805
ref: upgrade to latest pubsub and replace a metric
alimosaed Jan 20, 2025
7dfd113
ref: capsulate some variables
alimosaed Jan 21, 2025
b982189
ref: enable daemon for threads to close them safely
alimosaed Jan 21, 2025
93ebd70
Merge branch 'main' of github.com:SolaceDev/solace-ai-connector
alimosaed Jan 21, 2025
be7df51
Merge branch 'main' into alireza/AI-278/add_monitoring
alimosaed Jan 21, 2025
9bfc6e6
ref: remove useless variable
alimosaed Jan 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/llm/anthropic_chat.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# It will then send an event back to Solace with the topic: `demo/question/response`
#
# Dependencies:
# pip install -U langchain-anthropic
# pip install -U langchain-anthropic langchain-core~=0.3.0 langchain~=0.3.0
#
# required ENV variables:
# - ANTHROPIC_API_KEY
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/bedrock_anthropic_chat.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# }
#
# Dependencies:
# pip install langchain_aws
# pip install langchain_aws langchain-core~=0.3.0 langchain~=0.3.0
#
# required ENV variables:
# - AWS_BEDROCK_ANTHROPIC_CLAUDE_MODEL_ID
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/langchain_openai_with_history_chat.yaml
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goal: Remove dependency to Langchain.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# It will then send an event back to Solace with the topic: `demo/joke/subject/response`
#
# Dependencies:
# pip install -U langchain_openai openai
# pip install -U langchain_openai openai langchain-core~=0.3.0 langchain~=0.3.0
#
# required ENV variables:
# - OPENAI_API_KEY
Expand Down
13 changes: 11 additions & 2 deletions examples/llm/litellm_chat.yaml
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goal: Add log rotation

Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,16 @@

---
log:
stdout_log_level: INFO
stdout_log_level: DEBUG
log_file_level: DEBUG
log_file: solace_ai_connector.log
log_file: ${LOG_FILE}
log_format: jsonl
logback:
rollingpolicy:
file-name-pattern: "${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz"
max-file-size: 100MB
max-history: 5
total-size-cap: 1GB

shared_config:
- broker_config: &broker_connection
Expand All @@ -44,6 +51,8 @@ shared_config:
broker_username: ${SOLACE_BROKER_USERNAME}
broker_password: ${SOLACE_BROKER_PASSWORD}
broker_vpn: ${SOLACE_BROKER_VPN}
reconnection_strategy: forever_retry # options: forever_retry, parametrized_retry
retry_interval: 10000 # in milliseconds

# Take from input broker and publish back to Solace
flows:
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/mixture_of_agents.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# NOTE: For horizontal scaling, partitioned queues must be used. This is not implemented in this example.
#
# Dependencies:
# pip install -U langchain-google-vertexai langchain_anthropic langchain_openai openai
# pip install -U langchain-google-vertexai langchain_anthropic langchain_openai openai langchain-core~=0.3.0 langchain~=0.3.0
#
# required ENV variables:
# - GOOGLE_APPLICATION_CREDENTIALS: the path to a service account JSON file
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/openai_chat.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# It will then send an event back to Solace with the topic: `demo/question/response`
#
# Dependencies:
# pip install -U langchain_openai openai
# pip install -U langchain_openai openai langchain-core~=0.3.0 langchain~=0.3.0
#
# required ENV variables:
# - OPENAI_API_KEY
Expand Down
40 changes: 37 additions & 3 deletions examples/llm/openai_chroma_rag.yaml
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goal: Split a long payload to small chunks rather than getting an array of chunks.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# Load Data:
# Send data to Solace topic `demo/rag/data` with the following payload format:
# {
# "texts": [<text data 1>. <text data 2>, ...]
# "text": text
# }
#
# RAG Query:
Expand All @@ -18,7 +18,7 @@
# The response will be sent to Solace topic `demo/rag/query/response`
#
# Dependencies:
# pip install -U langchain_openai openai chromadb langchain-chroma
# pip install -U langchain_openai openai chromadb langchain-chroma langchain-core~=0.3.0 langchain~=0.3.0
#
# Required ENV variables:
# - OPENAI_API_KEY
Expand Down Expand Up @@ -61,6 +61,22 @@ flows:
payload_encoding: utf-8
payload_format: json

# Split text
- component_name: text_splitter
component_module: langchain_split_text
component_config:
langchain_module: langchain_text_splitters
langchain_class: TokenTextSplitter
langchain_component_config:
chunk_size: 10
chunk_overlap: 1
input_transforms:
- type: copy
source_expression: input.payload:text
dest_expression: user_data.input:text
input_selection:
source_expression: user_data.input

# Embedding data & ChromaDB ingest
- component_name: chroma_embed
component_module: langchain_vector_store_embedding_index
Expand All @@ -81,11 +97,29 @@ flows:
source_value: topic:demo/rag/data
dest_expression: user_data.vector_input:metadatas.source
- type: copy
source_expression: input.payload:texts
source_expression: previous
dest_expression: user_data.vector_input:texts
input_selection:
source_expression: user_data.vector_input

# Send response back to broker
- component_name: send_response
component_module: broker_output
component_config:
<<: *broker_connection
payload_encoding: utf-8
payload_format: json
copy_user_properties: true
input_transforms:
- type: copy
source_expression: previous
dest_expression: user_data.output:payload
- type: copy
source_expression: template:demo/rag/response
dest_expression: user_data.output:topic
input_selection:
source_expression: user_data.output

# RAG Inference flow
- name: OpenAI_RAG
components:
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/openai_component_request_response.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
# It will then send an event back to Solace with the topic: `demo/question/response`
#
# Dependencies:
# pip install -U langchain_openai openai
# pip install -U langchain_openai openai langchain-core~=0.3.0 langchain~=0.3.0
#
# required ENV variables:
# - OPENAI_API_KEY
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/vertexai_chat.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# It will then send an event back to Solace with the topic: `demo/question/response`
#
# Dependencies:
# pip install -U langchain-google-vertexai
# pip install -U langchain-google-vertexai langchain-core~=0.3.0 langchain~=0.3.0
#
# required ENV variables:
# - GOOGLE_APPLICATION_CREDENTIALS: the path to a service account JSON file
Expand Down
7 changes: 2 additions & 5 deletions pyproject.toml
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goal: Remove dependency to langchain and litellm. Upgrade the pubsub library.

Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@ classifiers = [
]
dependencies = [
"boto3~=1.34.122",
"langchain-core~=0.3.0",
"langchain~=0.3.0",
"PyYAML~=6.0.1",
"Requests~=2.32.3",
"solace_pubsubplus>=1.8.0",
"litellm>=1.51.3",
"solace_pubsubplus>=1.9.0",
"Flask~=3.0.3",
"Flask-SocketIO~=5.4.1",
"build~=1.2.2.post1",
Expand All @@ -44,7 +41,7 @@ solace-ai-connector-gen-docs = "solace_ai_connector.tools.gen_component_docs:mai
[tool.hatch.envs.hatch-test]
installer = "pip"

# # Specify minimum and maximum Python versions to test
# Specify minimum and maximum Python versions to test
[[tool.hatch.envs.hatch-test.matrix]]
python = ["3.10", "3.12"]

Expand Down
9 changes: 4 additions & 5 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
boto3~=1.34.122
langchain-core~=0.3.0
langchain~=0.3.0
PyYAML~=6.0.1
Requests~=2.32.3
solace_pubsubplus~=1.8.0
litellm~=1.51.3
solace_pubsubplus~=1.9.0
Flask~=3.0.3
Flask-SocketIO~=5.4.1
build~=1.2.2.post1
build~=1.2.2.post1
datadog~=0.50.2
SQLAlchemy~=2.0.36
80 changes: 71 additions & 9 deletions src/solace_ai_connector/common/log.py
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goal: Support log rotation and backup

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import logging
import logging.handlers
import json
import os
from datetime import datetime


log = logging.getLogger("solace_ai_connector")
Expand Down Expand Up @@ -35,7 +37,22 @@ def format(self, record):
return json.dumps(log_record)


def setup_log(logFilePath, stdOutLogLevel, fileLogLevel, logFormat):
def convert_to_bytes(size_str):
size_str = size_str.upper()
size_units = {"KB": 1024, "MB": 1024**2, "GB": 1024**3, "TB": 1024**4, "B": 1}
for unit in size_units:
if size_str.endswith(unit):
return int(size_str[: -len(unit)]) * size_units[unit]
return int(size_str)


def setup_log(
logFilePath,
stdOutLogLevel,
fileLogLevel,
logFormat,
logBack,
):
"""
Set up the configuration for the logger.

Expand All @@ -44,8 +61,9 @@ def setup_log(logFilePath, stdOutLogLevel, fileLogLevel, logFormat):
stdOutLogLevel (int): Logging level for standard output.
fileLogLevel (int): Logging level for the log file.
logFormat (str): Format of the log output ('jsonl' or 'pipe-delimited').

logBack (dict): Rolling log file configuration.
"""

# Set the global logger level to the lowest of the two levels
log.setLevel(min(stdOutLogLevel, fileLogLevel))

Expand All @@ -54,17 +72,61 @@ def setup_log(logFilePath, stdOutLogLevel, fileLogLevel, logFormat):
stream_formatter = logging.Formatter("%(message)s")
stream_handler.setFormatter(stream_formatter)

# Create an empty file at logFilePath (this will overwrite any existing content)
with open(logFilePath, "w") as file:
file.write("")

# file_handler = logging.handlers.TimedRotatingFileHandler(
# filename=logFilePath, when='midnight', backupCount=30, mode='w')
file_handler = logging.FileHandler(filename=logFilePath, mode="a")
if logFormat == "jsonl":
file_formatter = JsonlFormatter()
else:
file_formatter = logging.Formatter("%(asctime)s | %(levelname)s: %(message)s")

if logBack:
rollingpolicy = logBack.get("rollingpolicy", {})
if rollingpolicy:
if "file-name-pattern" not in rollingpolicy:
log.warning(
"file-name-pattern is required in rollingpolicy. Continuing with default value '{LOG_FILE}.%d{yyyy-MM-dd}.%i'."
)
file_name_pattern = rollingpolicy.get(
"file-name-pattern", "{LOG_FILE}.%d{yyyy-MM-dd}.%i.gz"
)

if "max-file-size" not in rollingpolicy:
log.warning(
"max-file-size is required in rollingpolicy. Continuing with default value '1GB'."
)
max_file_size = rollingpolicy.get("max-file-size", "1GB")

if "max-history" not in rollingpolicy:
log.warning(
"max-history is required in rollingpolicy. Continuing with default value '7'."
)
max_history = rollingpolicy.get("max-history", 7)

if "total-size-cap" not in rollingpolicy:
log.warning(
"total-size-cap is required in rollingpolicy. Continuing with default value '1TB'."
)
total_size_cap = rollingpolicy.get("total-size-cap", "1TB")

# Convert size strings to bytes
max_file_size = convert_to_bytes(max_file_size)
total_size_cap = convert_to_bytes(total_size_cap)

# Generate the log file name using the pattern
log_file_name = logFilePath

# Overwrite the file handler with a rotating file handler
file_handler = logging.handlers.RotatingFileHandler(
filename=log_file_name,
backupCount=max_history,
maxBytes=max_file_size,
)
file_handler.namer = (
lambda name: file_name_pattern.replace("${LOG_FILE}", logFilePath)
.replace("%d{yyyy-MM-dd}", datetime.now().strftime("%Y-%m-%d"))
.replace("%i", str(name.split(".")[-1]))
)
else:
file_handler = logging.FileHandler(filename=logFilePath, mode="a")

file_handler.setFormatter(file_formatter)
file_handler.setLevel(fileLogLevel)

Expand Down
9 changes: 7 additions & 2 deletions src/solace_ai_connector/common/messaging/messaging_builder.py
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goal: Transfer stop_signal and broker_name for graceful termination and labeling logs for each broker.

Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,23 @@

# Make a Messaging Service builder - this is a factory for Messaging Service objects
class MessagingServiceBuilder:
def __init__(self, flow_lock_manager, flow_kv_store):

def __init__(self, flow_lock_manager, flow_kv_store, broker_name, stop_signal):
self.broker_properties = {}
self.flow_lock_manager = flow_lock_manager
self.flow_kv_store = flow_kv_store
self.stop_signal = stop_signal
self.broker_name = broker_name

def from_properties(self, broker_properties: dict):
self.broker_properties = broker_properties
return self

def build(self):
if self.broker_properties["broker_type"] == "solace":
return SolaceMessaging(self.broker_properties)
return SolaceMessaging(
self.broker_properties, self.broker_name, self.stop_signal
)
elif self.broker_properties["broker_type"] == "dev_broker":
return DevBroker(
self.broker_properties, self.flow_lock_manager, self.flow_kv_store
Expand Down
Loading
Loading