Skip to content

Commit

Permalink
initial openrpc server
Browse files Browse the repository at this point in the history
  • Loading branch information
despiegk committed Jan 31, 2025
1 parent 99ecf1b commit 8322280
Show file tree
Hide file tree
Showing 15 changed files with 1,848 additions and 0 deletions.
1 change: 1 addition & 0 deletions lib/core/jobs/openrpc/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
server
183 changes: 183 additions & 0 deletions lib/core/jobs/openrpc/examples/job_client.vsh
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#!/usr/bin/env -S v -n -w -gc none -cc tcc -d use_openssl -enable-globals run

import freeflowuniverse.herolib.core.jobs.model
import net.websocket
import json
import rand
import time
import term

const ws_url = 'ws://localhost:8080'


// Helper function to send request and receive response
fn send_request(mut ws websocket.Client, request OpenRPCRequest) !OpenRPCResponse {
// Send request
request_json := json.encode(request)
println(request_json)
ws.write_string(request_json) or {
eprintln(term.red('Failed to send request: ${err}'))
return err
}

// Wait for response
mut msg := ws.read_next_message() or {
eprintln(term.red('Failed to read response: ${err}'))
return err
}

if msg.opcode != websocket.OPCode.text_frame {
return error('Invalid response type: expected text frame')
}

response_text := msg.payload.bytestr()

// Parse response
response := json.decode(OpenRPCResponse, response_text) or {
eprintln(term.red('Failed to decode response: ${err}'))
return err
}
return response
}

// OpenRPC request/response structures (copied from handler.v)
struct OpenRPCRequest {
jsonrpc string [required]
method string [required]
params []string
id int [required]
}

struct OpenRPCResponse {
jsonrpc string [required]
result string
error string
id int [required]
}


// Initialize and configure WebSocket client
fn init_client() !&websocket.Client {
mut ws := websocket.new_client(ws_url)!

ws.on_open(fn (mut ws websocket.Client) ! {
println(term.green('Connected to WebSocket server and ready...'))
})

ws.on_error(fn (mut ws websocket.Client, err string) ! {
eprintln(term.red('WebSocket error: ${err}'))
})

ws.on_close(fn (mut ws websocket.Client, code int, reason string) ! {
println(term.yellow('WebSocket connection closed: ${reason}'))
})

ws.on_message(fn (mut ws websocket.Client, msg &websocket.Message) ! {
if msg.payload.len > 0 {
println(term.blue('Received message: ${msg.payload.bytestr()}'))
}
})

ws.connect() or {
eprintln(term.red('Failed to connect: ${err}'))
return err
}

spawn ws.listen()
return ws
}

// Main client logic
mut ws := init_client()!
defer {
ws.close(1000, 'normal') or {
eprintln(term.red('Error closing connection: ${err}'))
}
}
println(term.green('Connected to ${ws_url}'))

// Create a new job
println(term.blue('\nCreating new job...'))
new_job := send_request(mut ws, OpenRPCRequest{
jsonrpc: '2.0'
method: 'job.new'
params: []string{}
id: rand.i32_in_range(1,10000000)!
}) or {
eprintln(term.red('Failed to create new job: ${err}'))
exit(1)
}
println(term.green('Created new job:'))
println(json.encode_pretty(new_job))

// Parse job from response
job := json.decode(model.Job, new_job.result) or {
eprintln(term.red('Failed to parse job: ${err}'))
exit(1)
}

// Set job properties
println(term.blue('\nSetting job properties...'))
mut updated_job := job
updated_job.guid = 'test-job-1'
updated_job.actor = 'vm_manager'
updated_job.action = 'start'
updated_job.params = {
'name': 'test-vm'
'memory': '2048'
}

// Save job
set_response := send_request(mut ws, OpenRPCRequest{
jsonrpc: '2.0'
method: 'job.set'
params: [json.encode(updated_job)]
id: rand.int()
}) or {
eprintln(term.red('Failed to save job: ${err}'))
exit(1)
}
println(term.green('Saved job:'))
println(json.encode_pretty(set_response))

// Update job status to running
println(term.blue('\nUpdating job status...'))
update_response := send_request(mut ws, OpenRPCRequest{
jsonrpc: '2.0'
method: 'job.update_status'
params: ['test-job-1', 'running']
id: rand.int()
}) or {
eprintln(term.red('Failed to update job status: ${err}'))
exit(1)
}
println(term.green('Updated job status:'))
println(json.encode_pretty(update_response))

// Get job to verify changes
println(term.blue('\nRetrieving job...'))
get_response := send_request(mut ws, OpenRPCRequest{
jsonrpc: '2.0'
method: 'job.get'
params: ['test-job-1']
id: rand.int()
}) or {
eprintln(term.red('Failed to retrieve job: ${err}'))
exit(1)
}
println(term.green('Retrieved job:'))
println(json.encode_pretty(get_response))

// List all jobs
println(term.blue('\nListing all jobs...'))
list_response := send_request(mut ws, OpenRPCRequest{
jsonrpc: '2.0'
method: 'job.list'
params: []string{}
id: rand.int()
}) or {
eprintln(term.red('Failed to list jobs: ${err}'))
exit(1)
}
println(term.green('All jobs:'))
println(json.encode_pretty(list_response))
41 changes: 41 additions & 0 deletions lib/core/jobs/openrpc/examples/server.vsh
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env -S v -n -w -gc none -cc tcc -d use_openssl -enable-globals run

import freeflowuniverse.herolib.core.jobs.openrpc
import freeflowuniverse.herolib.core.jobs.model
import time
import sync
import os

fn start_rpc_server( mut wg sync.WaitGroup) ! {
defer { wg.done() }

// Create OpenRPC server
openrpc.server_start()!

}

fn start_ws_server( mut wg sync.WaitGroup) ! {
defer { wg.done() }

// Get port from environment variable or use default
port := if ws_port := os.getenv_opt('WS_PORT') {
ws_port.int()
} else {
8080
}

// Create and start WebSocket server
mut ws_server := openrpc.new_ws_server(port)!
ws_server.start()!
}

// Create wait group for servers
mut wg := sync.new_waitgroup()
wg.add(2)

// Start servers in separate threads
spawn start_rpc_server(mut wg)
spawn start_ws_server(mut wg)

// Wait for servers to finish (they run forever)
wg.wait()
27 changes: 27 additions & 0 deletions lib/core/jobs/openrpc/factory.v
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
module openrpc
import freeflowuniverse.herolib.core.redisclient
import freeflowuniverse.herolib.core.jobs.model

// Generic OpenRPC server that handles all managers
pub struct OpenRPCServer {
mut:
redis &redisclient.Redis
queue &redisclient.RedisQueue
runner &model.HeroRunner
}

// Create new OpenRPC server with Redis connection
pub fn server_start() ! {
redis := redisclient.core_get()!
mut runner := model.new()!
mut s:= &OpenRPCServer{
redis: redis
queue: &redisclient.RedisQueue{
key: rpc_queue
redis: redis

}
runner:runner
}
s.start()!
}
71 changes: 71 additions & 0 deletions lib/core/jobs/openrpc/handler.v
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
module openrpc

import freeflowuniverse.herolib.core.redisclient
import json



// Start the server and listen for requests
pub fn (mut s OpenRPCServer) start() ! {
println('Starting OpenRPC server.')

for {
// Get message from queue
msg := s.queue.get(5000)!

if msg.len==0{
println("queue '${rpc_queue}' empty")
continue
}

println("process '${msg}'")

// Parse OpenRPC request
request := json.decode(OpenRPCRequest, msg) or {
println('Error decoding request: ${err}')
continue
}

// Process request with appropriate handler
response := s.handle_request(request)!

// Send response back to Redis using response queue
response_json := json.encode(response)
key:='${rpc_queue}:${request.id}'
println("response: ${} put on return queue ${key} ")
mut response_queue := &redisclient.RedisQueue{
key: key
redis: s.redis
}
response_queue.add(response_json)!
}
}

// Get the handler for a specific method based on its prefix
fn (mut s OpenRPCServer) handle_request(request OpenRPCRequest) !OpenRPCResponse {
method := request.method.to_lower()
println("process: method: '${method}'")
if method.starts_with("job.") {
return s.handle_request_job(request) or {
return rpc_response_error(request.id,"error in request job:\n${err}")
}
}
if method.starts_with("agent.") {
return s.handle_request_agent(request) or {
return rpc_response_error(request.id,"error in request agent:\n${err}")
}
}
if method.starts_with("group.") {
return s.handle_request_group(request) or {
return rpc_response_error(request.id,"error in request group:\n${err}")
}
}
if method.starts_with("service.") {
return s.handle_request_service(request) or {
return rpc_response_error(request.id,"error in request service:\n${err}")
}
}

return rpc_response_error(request.id,"Could not find handler for ${method}")

}
69 changes: 69 additions & 0 deletions lib/core/jobs/openrpc/handler_agent_manager.v
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
module openrpc

import freeflowuniverse.herolib.core.jobs.model
import json


pub fn (mut h OpenRPCServer) handle_request_agent(request OpenRPCRequest) !OpenRPCResponse {

mut response:=rpc_response_new(request.id)

match request.method {
'new' {
agent := h.runner.agents.new()
response.result = json.encode(agent)
}
'set' {
if request.params.len < 1 {
return error('Missing agent parameter')
}
agent := json.decode(model.Agent, request.params[0])!
h.runner.agents.set(agent)!
response.result = 'true'
}
'get' {
if request.params.len < 1 {
return error('Missing pubkey parameter')
}
agent := h.runner.agents.get(request.params[0])!
response.result = json.encode(agent)
}
'list' {
agents := h.runner.agents.list()!
response.result = json.encode(agents)
}
'delete' {
if request.params.len < 1 {
return error('Missing pubkey parameter')
}
h.runner.agents.delete(request.params[0])!
response.result = 'true'
}
'update_status' {
if request.params.len < 2 {
return error('Missing pubkey or status parameters')
}
status := match request.params[1] {
'ok' { model.AgentState.ok }
'down' { model.AgentState.down }
'error' { model.AgentState.error }
'halted' { model.AgentState.halted }
else { return error('Invalid status: ${request.params[1]}') }
}
h.runner.agents.update_status(request.params[0], status)!
response.result = 'true'
}
'get_by_service' {
if request.params.len < 2 {
return error('Missing actor or action parameters')
}
agents := h.runner.agents.get_by_service(request.params[0], request.params[1])!
response.result = json.encode(agents)
}
else {
return error('Unknown method: ${request.method}')
}
}

return response
}
Loading

0 comments on commit 8322280

Please sign in to comment.