Skip to content

Commit

Permalink
EOS-27348: Updated Iem and MessageBus server cortxconf usage removed (S…
Browse files Browse the repository at this point in the history
…eagate#718)

* Updated Iem and MessageBus server cortx conf changes

Signed-off-by: suryakumar.kumaravelan <[email protected]>

* Added mappedconf class removed conf variable

Signed-off-by: suryakumar.kumaravelan <[email protected]>

* code opt

Signed-off-by: suryakumar.kumaravelan <[email protected]>

* Update utils_server.py

Co-authored-by: Sachin Punadikar <[email protected]>
Signed-off-by: suryakumar.kumaravelan <[email protected]>
  • Loading branch information
suryakumar1024 and Sachin Punadikar committed Mar 21, 2022
1 parent 3d3a315 commit e7a0cbb
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 34 deletions.
25 changes: 7 additions & 18 deletions py-utils/src/utils/iem_framework/iem_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,15 @@
from cortx.utils.utils_server.error import RestServerError
from cortx.utils.iem_framework.error import EventMessageError
from cortx.utils.log import Log
from cortx.utils.common import CortxConf
from cortx.utils.conf_store import Conf
from cortx.utils.conf_store import Conf, MappedConf

routes = web.RouteTableDef()


class IemRequestHandler(MessageServer):
""" Rest interface of Iem """

@staticmethod
def _get_cluster_data(config_path):
message_bus_backend = Conf.get('config',\
'cortx>utils>message_bus_backend')
message_server_endpoints = Conf.get('config',\
f'cortx>external>{message_bus_backend}>endpoints')
cluster_id = Conf.get('config','cluster>id')
return message_server_endpoints, cluster_id
cluster_id = None
message_server_endpoints = None

@staticmethod
async def send(request):
Expand All @@ -47,11 +39,9 @@ async def send(request):

component = payload['component']
source = payload['source']
cluster_conf = CortxConf.get_cluster_conf_path()
endpoint, cluster_id = IemRequestHandler._get_cluster_data(cluster_conf)

EventMessage.init(component=component, source=source,\
cluster_id=cluster_id, message_server_endpoints=endpoint)
cluster_id=IemRequestHandler.cluster_id,\
message_server_endpoints=IemRequestHandler.message_server_endpoints)

del payload['component']
del payload['source']
Expand Down Expand Up @@ -89,11 +79,10 @@ async def receive(request):
Log.debug(f"Received GET request for component " \
f"{request.rel_url.query['component']}")
try:
cluster_conf = CortxConf.get_cluster_conf_path()
component = request.rel_url.query['component']
endpoint, _ = IemRequestHandler._get_cluster_data(cluster_conf)
EventMessage.subscribe(component=component,\
message_server_endpoints=endpoint)
message_server_endpoints=\
IemRequestHandler.message_server_endpoints)
alert = EventMessage.receive()
except EventMessageError as e:
status_code = e.rc
Expand Down
14 changes: 0 additions & 14 deletions py-utils/src/utils/message_bus/message_bus_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
from cortx.utils.utils_server.error import RestServerError
from cortx.utils.message_bus import MessageConsumer, MessageProducer, MessageBus
from cortx.utils.log import Log
from cortx.utils.common import CortxConf
from cortx.utils.conf_store import Conf

routes = web.RouteTableDef()

Expand All @@ -40,14 +38,8 @@ async def send(request):
message_type = request.match_info['message_type']
payload = await request.json()
messages = payload['messages']
cluster_conf = CortxConf.get_cluster_conf_path()
Conf.load('config', cluster_conf, skip_reload=True)
message_server_endpoints = Conf.get('config',\
'cortx>external>kafka>endpoints')
MessageBus.init(message_server_endpoints=message_server_endpoints)
producer = MessageProducer(producer_id='rest_producer', \
message_type=message_type, method='sync')

producer.send(messages)
except MessageBusError as e:
status_code = e.rc
Expand Down Expand Up @@ -85,15 +77,9 @@ async def receive(request):
try:
message_types = str(request.match_info['message_type']).split('&')
consumer_group = request.rel_url.query['consumer_group']
cluster_conf = CortxConf.get_cluster_conf_path()
Conf.load('config', cluster_conf, skip_reload=True)
message_server_endpoints = Conf.get('config',\
'cortx>external>kafka>endpoints')
MessageBus.init(message_server_endpoints=message_server_endpoints)
consumer = MessageConsumer(consumer_id='rest_consumer', \
consumer_group=consumer_group, message_types=message_types, \
auto_ack=True, offset='latest')

message = consumer.receive()
except MessageBusError as e:
status_code = e.rc
Expand Down
8 changes: 6 additions & 2 deletions py-utils/src/utils/utils_server/utils_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,15 @@ def run_app(self, web, port):

class MessageServer(UtilsServer):
"""Base class for Cortx Rest Server implementation."""
def __init__(self, message_server_endpoints, message_server_port=28300):
def __init__(self, message_server_endpoints, message_server_port=28300,\
cluster_id=None):
super().__init__()
MessageBus.init(message_server_endpoints=message_server_endpoints)
from cortx.utils.iem_framework import IemRequestHandler
from cortx.utils.message_bus import MessageBusRequestHandler
from cortx.utils.audit_log import AuditLogRequestHandler
IemRequestHandler.cluster_id = cluster_id
IemRequestHandler.message_server_endpoints = message_server_endpoints
self.app.add_routes([web.post('/EventMessage/event', IemRequestHandler.send), \
web.get('/EventMessage/event', IemRequestHandler.receive), \
web.post('/MessageBus/message/{message_type}', \
Expand Down Expand Up @@ -91,4 +94,5 @@ def __init__(self, message_server_endpoints, message_server_port=28300):
message_server_endpoints = cluster_conf.get(
f'cortx>external>{message_bus_backend}>endpoints')
message_server_port = cluster_conf.get('cortx>utils>message_server_port')
MessageServer(message_server_endpoints, message_server_port)
cluster_id = cluster_conf.get('cluster>id')
MessageServer(message_server_endpoints, message_server_port, cluster_id)

0 comments on commit e7a0cbb

Please sign in to comment.