Skip to content

Commit

Permalink
format code and improve error handling when restarting
Browse files Browse the repository at this point in the history
  • Loading branch information
ck-c8y committed Jan 24, 2025
1 parent a6ddf9b commit 82efc70
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 99 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ The modal dialog provides the option:

On the monitoring tab you can view the latest alarms and events for the Streaming Analytics Engine. This is especially helpful if after an upload of a new extension no extension is loaded.

The property `is_safe_mode` is an indication if the engine was started in [Safe Mode](https://cumulocity.com/guides/streaming-analytics/troubleshooting/#safe-mode-on-startup), i.e. without loading any extension. In this case you have to delete the latest uploaded extension and restart the engine again.
The property `is_safe_mode` is an indication if the engine was started in [Safe Mode](https://cumulocity.com/docs/streaming-analytics/troubleshooting/#apama_safe_mode), i.e. without loading any extension. In this case you have to delete the latest uploaded extension and restart the engine again.

![Monitoring](resources/images/monitoring.png)

Expand Down
97 changes: 54 additions & 43 deletions analytics-service/c8y_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class C8YAgent:
PATH_TENANT_OPTIONS = "/tenant/options"
PATH_CEP_RESTART = "/service/cep/restart"
ANALYTICS_MANAGEMENT_REPOSITORIES = "analytics-management.repository"

def __init__(self):
self._logger = logging.getLogger("C8YAgent")
self._logger.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -60,30 +61,25 @@ def restart_cep(self, request):

self._logger.info(f"Restarted CEP!")

def prepare_header(self, request):
headers = dict(request.headers)
if 'authorization' in request.cookies:
headers['Authorization'] = f"Bearer {request.cookies['authorization']}"
return headers

def get_cep_operationobject_id(self, request):
try:
self._logger.info(f"Retrieving id of operation object for CEP ...")

headers = self.prepare_header(request)
self._logger.info(headers)
response = self.c8yapp.get_tenant_instance(headers=headers).get(
resource=self.PATH_CEP_DIAGNOSTICS)
resource=self.PATH_CEP_DIAGNOSTICS
)
try:
app_id = response["microservice_application_id"]
microservice_name = response["microservice_name"]
cep_operationobject_id = None
query = f"applicationId eq '{app_id}' and name eq '{microservice_name}'"

self._logger.info(f"Build filter: {query}")

managed_objects_app = self.c8yapp.get_tenant_instance(
headers=headers
headers=headers
).inventory.select(query=query)
managed_object_id = None
for managed_object in managed_objects_app:
Expand All @@ -110,10 +106,11 @@ def get_cep_operationobject_id(self, request):
def get_cep_ctrl_status(self, request):
try:
self._logger.info(f"Retrieving CEP control status ...")

headers = self.prepare_header(request)
response = self.c8yapp.get_tenant_instance(headers=headers).get(
resource=self.PATH_CEP_DIAGNOSTICS)
resource=self.PATH_CEP_DIAGNOSTICS
)
return response
except Exception as e:
self._logger.error(f"Exception:", exc_info=True)
Expand All @@ -123,53 +120,59 @@ def get_cep_ctrl_status(self, request):
def load_repositories(self, request):
try:
self._logger.info(f"Retrieving repositories ...")

# tenant_options = self.c8yapp.get_tenant_instance(headers=request_headers).tenant_options.get_all(category=self.ANALYTICS_MANAGEMENT_REPOSITORIES)

headers = self.prepare_header(request)
response = self.c8yapp.get_tenant_instance(headers=headers).get(
resource=f"{self.PATH_TENANT_OPTIONS}/{self.ANALYTICS_MANAGEMENT_REPOSITORIES}")
resource=f"{self.PATH_TENANT_OPTIONS}/{self.ANALYTICS_MANAGEMENT_REPOSITORIES}"
)
tenant_options = response
# List comprehension to convert TenantOptions to array
repositories = []
for repository_id in tenant_options:
# Assuming option.value is a JSON string containing repository details
value_dict = json.loads(tenant_options[repository_id])

repository = {
'id': repository_id,
'name': value_dict.get('name'),
'url': value_dict.get('url'),
'accessToken': value_dict.get('accessToken'),
'enabled': value_dict.get('enabled', False) # Default to False if not present
"id": repository_id,
"name": value_dict.get("name"),
"url": value_dict.get("url"),
"accessToken": value_dict.get("accessToken"),
"enabled": value_dict.get(
"enabled", False
), # Default to False if not present
}
repositories.append(repository)
self._logger.info(f"Found repositories: {repositories}")
return repositories
except Exception as e:
self._logger.error(f"Exception:", exc_info=True)

def load_repository(self, request, repository_id):
try:
self._logger.info(f"Retrieving repository {repository_id} ...")

# tenant_options = self.c8yapp.get_tenant_instance(headers=request_headers).tenant_options.get_all(category=self.ANALYTICS_MANAGEMENT_REPOSITORIES)

headers = self.prepare_header(request)
response = self.c8yapp.get_tenant_instance(headers=headers).get(
resource=f"{self.PATH_TENANT_OPTIONS}/{self.ANALYTICS_MANAGEMENT_REPOSITORIES}/{repository_id}")
resource=f"{self.PATH_TENANT_OPTIONS}/{self.ANALYTICS_MANAGEMENT_REPOSITORIES}/{repository_id}"
)
tenant_option = response
# List comprehension to convert TenantOptions to array
repository = {}
# Assuming option.value is a JSON string containing repository details
value_dict = json.loads(tenant_option['value'])
value_dict = json.loads(tenant_option["value"])

repository = {
'id': repository_id,
'name': value_dict.get('name'),
'url': value_dict.get('url'),
'accessToken': value_dict.get('accessToken'),
'enabled': value_dict.get('enabled', False) # Default to False if not present
"id": repository_id,
"name": value_dict.get("name"),
"url": value_dict.get("url"),
"accessToken": value_dict.get("accessToken"),
"enabled": value_dict.get(
"enabled", False
), # Default to False if not present
}
self._logger.info(f"Found repository: {repository}")
return repository
Expand All @@ -179,30 +182,32 @@ def load_repository(self, request, repository_id):
def save_repositories(self, request, repositories):
try:
self._logger.info(f"Saving repositories...")

headers = self.prepare_header(request)
tenant = self.c8yapp.get_tenant_instance(headers=headers)

for repository in repositories:
repository_id = repository.get('id')
repository_id = repository.get("id")
# Create value dictionary excluding the id field
value_dict = {
'name': repository.get('name'),
'url': repository.get('url'),
'enabled': bool(repository.get('enabled', False))
"name": repository.get("name"),
"url": repository.get("url"),
"enabled": bool(repository.get("enabled", False)),
}

# Only add accessToken if it exists and is not empty
if repository.get('accessToken'):
value_dict['accessToken'] = repository['accessToken']
if repository.get("accessToken"):
value_dict["accessToken"] = repository["accessToken"]

# Convert to JSON string
value_json = json.dumps(value_dict)
# self._logger.info(f"Updating repository: {repository_id} {value_dict} {value_json}")

option = TenantOption(category=self.ANALYTICS_MANAGEMENT_REPOSITORIES,
key=repository_id,
value=value_json)
option = TenantOption(
category=self.ANALYTICS_MANAGEMENT_REPOSITORIES,
key=repository_id,
value=value_json,
)
# Try to update existing repository
tenant.tenant_options.create(option)
self._logger.info(f"Updated/created repository: {repository_id}")
Expand All @@ -212,3 +217,9 @@ def save_repositories(self, request, repositories):
except Exception as e:
self._logger.error(f"Exception while saving repositories:", exc_info=True)
return {"error": str(e)}, 500

def prepare_header(self, request):
headers = dict(request.headers)
if "authorization" in request.cookies:
headers["Authorization"] = f"Bearer {request.cookies['authorization']}"
return headers
Loading

0 comments on commit 82efc70

Please sign in to comment.