Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Features agent websocket #23

Merged
merged 16 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions autonomous_agent/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
app.log
3 changes: 3 additions & 0 deletions autonomous_agent/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Changelog

- Setup websocket client for autonomous agent service
48 changes: 48 additions & 0 deletions autonomous_agent/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@

# Autonomous Agent

This is a python service for creating websocket connection with the central server using agent id. The agent pings the server periodically via the websocket connection.

Requirments:

- Python 3.12.2

- Poetry
## Setup Guide

Clone the project

```bash
git clone https://github.com/sireto/cardano-autonomous-agent
```

Go to the Autonomous Agent Directory

```bash
cd autonomous_agent
```

Install dependencies via poetry.

```bash
poetry install
```
## Creating a new Agent.

To create a new agent you need to send a post request to **api/create_agent** endpoint. Make sure that **autonomous_agent_api** backend service and **database** are running correctly.

Copy the id from the post response . The id looks something similar to **c2d4c358-5171-4be8-b273-0147cc57c204.**

## Running the service

Activate the poetry virtual env inside **autonomous_agent** folder by running the following command.

```
poetry shell
```
Now run the following command along with your agent id.

```
python connect-agent.py --agent_id < Your id here >
```
After a successfull connection , you should see the periodic ping request and response in your terminal.
Empty file added autonomous_agent/__init__.py
Empty file.
39 changes: 39 additions & 0 deletions autonomous_agent/connect-agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import asyncio
import websockets
import argparse
from websockets.exceptions import ConnectionClosed

default_ping_timeout = 10 # Sends a ping every 10 seconds


async def connect_to_server(agent_id):
uri = "ws://127.0.0.1:8000/api/agent/ws"
headers = {"agent_id": agent_id}

try:
async with websockets.connect(uri, extra_headers=headers) as websocket:
while True:
try:
await websocket.send("PING")
response = await websocket.recv()
print("Received:", response)
await asyncio.sleep(default_ping_timeout)
except ConnectionClosed:
print("Connection closed by server")
break
except ConnectionError:
print("Failed to connect to the server")


async def main(agent_id):
await connect_to_server(agent_id)


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Connect to Central WebSocket server with agent ID"
)
parser.add_argument("--agent_id", help="Agent ID to connect with", required=True)
args = parser.parse_args()

asyncio.run(main(args.agent_id))
87 changes: 87 additions & 0 deletions autonomous_agent/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions autonomous_agent/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[tool.poetry]
name = "autonomous-agent"
version = "0.1.0"
description = ""
authors = ["Joseph Rana <[email protected]>"]
readme = "README.md"

[tool.poetry.dependencies]
python = "^3.12"
websockets = "^12.0"


[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
3 changes: 3 additions & 0 deletions autonomous_agent_api/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@
- Made table for Triggers
- Created router,service,repository for trigger CRUD Action
- Created the validation function for cron expression and kafka topic while creating the trigger by agent

# TITLE - AGENT Websocket , DATE- 2024-04-09
- Added websocket endpoint for agent websocket connection
81 changes: 81 additions & 0 deletions autonomous_agent_api/backend/app/controllers/agent_websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from fastapi import WebSocket, APIRouter, WebSocketDisconnect, WebSocketException
from backend.config.database import prisma_connection
from fastapi import status
from datetime import datetime
from backend.config.logger import logger

router = APIRouter()


class WebSocket_Connection_Manager:
def __init__(self) -> None:
self.active_connections: dict[str, WebSocket] = {}

async def connect_websocket(self, websocket_agent_id: str, websocket: WebSocket):
# Stores Websocket along with agent id in Key Value Pair : websocket_agent_id : Key , websocket : value
await websocket.accept()
self.active_connections[websocket_agent_id] = websocket

async def disconnect_websocket(self, websocket_agent_id):
"""Critical : Dont use .close() here as this will close the new connection when dealing with multiple web socket request for the same bot.
This is due to the nature of try/except code that gets called by the previous connection."""
self.active_connections.pop(websocket_agent_id)

async def send_message_to_websocket(self, websocket_agent_id: str, message: dict):
# Checks if agent is active , first then sends message
agent_active = await self.check_if_agent_active(websocket_agent_id)
if agent_active:
await self.active_connections[websocket_agent_id].send_json(message)
else:
logger.critical(
"Agent with the id {websocket_agent_id} does not exist in the active Connection list. Sending Message Failed!"
)

async def check_if_agent_active(self, websocket_agent_id: str):
# Checks if agent is present in active connection list.
return websocket_agent_id in self.active_connections

async def remove_previous_agent_connection_if_exists(self, websocket_agent_id: str):
"""
If client requests websocket connection for an already active bot.
Removes the old connection and establishes a new one.
"""
if await self.check_if_agent_active(websocket_agent_id):
existing_websocket = self.active_connections.pop(websocket_agent_id)
await existing_websocket.close(code=1000, reason="establishing a new connection")


manager = WebSocket_Connection_Manager()


@router.websocket("/agent/ws")
async def agent_websocket_endpoint(websocket: WebSocket):
# todo: Cookies Authentication

# Get agent id from the websocket header.
websocket_agent_id = websocket.headers.get("agent_id")
if websocket_agent_id == None:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)

# Check if agent with the id exists.
agent_exists = await check_if_agent_exists_in_db(websocket_agent_id)
if agent_exists:
await manager.remove_previous_agent_connection_if_exists(websocket_agent_id)
await manager.connect_websocket(websocket_agent_id, websocket)
try:
while True:
data = await websocket.receive_text()
print(f"Received Data: {data} from {websocket_agent_id}")
await websocket.send_text(f"Ping recieved from {websocket_agent_id} at {datetime.now()}")
except WebSocketDisconnect:
await manager.disconnect_websocket(websocket_agent_id)
pass
else:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)


async def check_if_agent_exists_in_db(agent_id: str):
# Query agent with the agent id from the database -> reurns a boolean
async with prisma_connection:
agent_exists = await prisma_connection.prisma.agent.find_first(where={"id": agent_id, "deleted_at": None})
return bool(agent_exists)
10 changes: 9 additions & 1 deletion autonomous_agent_api/backend/app/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,22 @@
"""

from fastapi import APIRouter
from backend.app.controllers import ready, demo, agent_router, trigger_router

from backend.app.controllers import ready, demo, agent_router, trigger_router, agent_websocket

root_api_router = APIRouter(prefix="/api")

# For ready status Api
root_api_router.include_router(ready.router, tags=["ready"])

# For Demo Ping APi
root_api_router.include_router(demo.router, tags=["test"])

# For Agent CRUD operations
root_api_router.include_router(agent_router.AgentRouter().router, tags=["agent"])

# For Agent Websocket connection
root_api_router.include_router(agent_websocket.router)

# For Agent Trigger
root_api_router.include_router(trigger_router.TriggerRouter().router, tags=["trigger"])
Loading