From 82efc704afcc90bbf894d863888dc7b575e72eaf Mon Sep 17 00:00:00 2001 From: ck-c8y Date: Fri, 24 Jan 2025 11:34:33 +0100 Subject: [PATCH] format code and improve error handling when restarting --- README.md | 2 +- analytics-service/c8y_agent.py | 97 +++++++------- analytics-service/flask_wrapper.py | 118 +++++++++++------- .../monitoring/engine-monitoring.component.ts | 2 +- .../editor/editor-modal.component.html | 2 +- analytics-ui/src/shared/analytics.service.ts | 28 +++-- 6 files changed, 150 insertions(+), 99 deletions(-) diff --git a/README.md b/README.md index 7a3e08d..daa9576 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,7 @@ The modal dialog provides the option: On the monitoring tab you can view the latest alarms and events for the Streaming Analytics Engine. This is especially helpful if after an upload of a new extension no extension is loaded. -The property `is_safe_mode` is an indication if the engine was started in [Safe Mode](https://cumulocity.com/guides/streaming-analytics/troubleshooting/#safe-mode-on-startup), i.e. without loading any extension. In this case you have to delete the latest uploaded extension and restart the engine again. +The property `is_safe_mode` is an indication if the engine was started in [Safe Mode](https://cumulocity.com/docs/streaming-analytics/troubleshooting/#apama_safe_mode), i.e. without loading any extension. In this case you have to delete the latest uploaded extension and restart the engine again. ![Monitoring](resources/images/monitoring.png) diff --git a/analytics-service/c8y_agent.py b/analytics-service/c8y_agent.py index 40499fe..745ad67 100644 --- a/analytics-service/c8y_agent.py +++ b/analytics-service/c8y_agent.py @@ -26,6 +26,7 @@ class C8YAgent: PATH_TENANT_OPTIONS = "/tenant/options" PATH_CEP_RESTART = "/service/cep/restart" ANALYTICS_MANAGEMENT_REPOSITORIES = "analytics-management.repository" + def __init__(self): self._logger = logging.getLogger("C8YAgent") self._logger.setLevel(logging.DEBUG) @@ -60,20 +61,15 @@ def restart_cep(self, request): self._logger.info(f"Restarted CEP!") - def prepare_header(self, request): - headers = dict(request.headers) - if 'authorization' in request.cookies: - headers['Authorization'] = f"Bearer {request.cookies['authorization']}" - return headers - def get_cep_operationobject_id(self, request): try: self._logger.info(f"Retrieving id of operation object for CEP ...") - + headers = self.prepare_header(request) self._logger.info(headers) response = self.c8yapp.get_tenant_instance(headers=headers).get( - resource=self.PATH_CEP_DIAGNOSTICS) + resource=self.PATH_CEP_DIAGNOSTICS + ) try: app_id = response["microservice_application_id"] microservice_name = response["microservice_name"] @@ -81,9 +77,9 @@ def get_cep_operationobject_id(self, request): query = f"applicationId eq '{app_id}' and name eq '{microservice_name}'" self._logger.info(f"Build filter: {query}") - + managed_objects_app = self.c8yapp.get_tenant_instance( - headers=headers + headers=headers ).inventory.select(query=query) managed_object_id = None for managed_object in managed_objects_app: @@ -110,10 +106,11 @@ def get_cep_operationobject_id(self, request): def get_cep_ctrl_status(self, request): try: self._logger.info(f"Retrieving CEP control status ...") - + headers = self.prepare_header(request) response = self.c8yapp.get_tenant_instance(headers=headers).get( - resource=self.PATH_CEP_DIAGNOSTICS) + resource=self.PATH_CEP_DIAGNOSTICS + ) return response except Exception as e: self._logger.error(f"Exception:", exc_info=True) @@ -123,53 +120,59 @@ def get_cep_ctrl_status(self, request): def load_repositories(self, request): try: self._logger.info(f"Retrieving repositories ...") - + # tenant_options = self.c8yapp.get_tenant_instance(headers=request_headers).tenant_options.get_all(category=self.ANALYTICS_MANAGEMENT_REPOSITORIES) - + headers = self.prepare_header(request) response = self.c8yapp.get_tenant_instance(headers=headers).get( - resource=f"{self.PATH_TENANT_OPTIONS}/{self.ANALYTICS_MANAGEMENT_REPOSITORIES}") + resource=f"{self.PATH_TENANT_OPTIONS}/{self.ANALYTICS_MANAGEMENT_REPOSITORIES}" + ) tenant_options = response # List comprehension to convert TenantOptions to array repositories = [] for repository_id in tenant_options: # Assuming option.value is a JSON string containing repository details value_dict = json.loads(tenant_options[repository_id]) - + repository = { - 'id': repository_id, - 'name': value_dict.get('name'), - 'url': value_dict.get('url'), - 'accessToken': value_dict.get('accessToken'), - 'enabled': value_dict.get('enabled', False) # Default to False if not present + "id": repository_id, + "name": value_dict.get("name"), + "url": value_dict.get("url"), + "accessToken": value_dict.get("accessToken"), + "enabled": value_dict.get( + "enabled", False + ), # Default to False if not present } repositories.append(repository) self._logger.info(f"Found repositories: {repositories}") return repositories except Exception as e: self._logger.error(f"Exception:", exc_info=True) - + def load_repository(self, request, repository_id): try: self._logger.info(f"Retrieving repository {repository_id} ...") - + # tenant_options = self.c8yapp.get_tenant_instance(headers=request_headers).tenant_options.get_all(category=self.ANALYTICS_MANAGEMENT_REPOSITORIES) - + headers = self.prepare_header(request) response = self.c8yapp.get_tenant_instance(headers=headers).get( - resource=f"{self.PATH_TENANT_OPTIONS}/{self.ANALYTICS_MANAGEMENT_REPOSITORIES}/{repository_id}") + resource=f"{self.PATH_TENANT_OPTIONS}/{self.ANALYTICS_MANAGEMENT_REPOSITORIES}/{repository_id}" + ) tenant_option = response # List comprehension to convert TenantOptions to array repository = {} # Assuming option.value is a JSON string containing repository details - value_dict = json.loads(tenant_option['value']) - + value_dict = json.loads(tenant_option["value"]) + repository = { - 'id': repository_id, - 'name': value_dict.get('name'), - 'url': value_dict.get('url'), - 'accessToken': value_dict.get('accessToken'), - 'enabled': value_dict.get('enabled', False) # Default to False if not present + "id": repository_id, + "name": value_dict.get("name"), + "url": value_dict.get("url"), + "accessToken": value_dict.get("accessToken"), + "enabled": value_dict.get( + "enabled", False + ), # Default to False if not present } self._logger.info(f"Found repository: {repository}") return repository @@ -179,30 +182,32 @@ def load_repository(self, request, repository_id): def save_repositories(self, request, repositories): try: self._logger.info(f"Saving repositories...") - + headers = self.prepare_header(request) tenant = self.c8yapp.get_tenant_instance(headers=headers) for repository in repositories: - repository_id = repository.get('id') + repository_id = repository.get("id") # Create value dictionary excluding the id field value_dict = { - 'name': repository.get('name'), - 'url': repository.get('url'), - 'enabled': bool(repository.get('enabled', False)) + "name": repository.get("name"), + "url": repository.get("url"), + "enabled": bool(repository.get("enabled", False)), } # Only add accessToken if it exists and is not empty - if repository.get('accessToken'): - value_dict['accessToken'] = repository['accessToken'] - + if repository.get("accessToken"): + value_dict["accessToken"] = repository["accessToken"] + # Convert to JSON string value_json = json.dumps(value_dict) # self._logger.info(f"Updating repository: {repository_id} {value_dict} {value_json}") - option = TenantOption(category=self.ANALYTICS_MANAGEMENT_REPOSITORIES, - key=repository_id, - value=value_json) + option = TenantOption( + category=self.ANALYTICS_MANAGEMENT_REPOSITORIES, + key=repository_id, + value=value_json, + ) # Try to update existing repository tenant.tenant_options.create(option) self._logger.info(f"Updated/created repository: {repository_id}") @@ -212,3 +217,9 @@ def save_repositories(self, request, repositories): except Exception as e: self._logger.error(f"Exception while saving repositories:", exc_info=True) return {"error": str(e)}, 500 + + def prepare_header(self, request): + headers = dict(request.headers) + if "authorization" in request.cookies: + headers["Authorization"] = f"Bearer {request.cookies['authorization']}" + return headers diff --git a/analytics-service/flask_wrapper.py b/analytics-service/flask_wrapper.py index 7ae4985..8f395f2 100644 --- a/analytics-service/flask_wrapper.py +++ b/analytics-service/flask_wrapper.py @@ -44,23 +44,27 @@ def get_content_list(): try: encoded_url = request.args.get("url") repository_id = request.args.get("id") - headers = { - 'Accept': 'application/json' - } + headers = {"Accept": "application/json"} if repository_id: - repository_configuration = agent.load_repository(request=request, repository_id=repository_id) + repository_configuration = agent.load_repository( + request=request, repository_id=repository_id + ) if "accessToken" in repository_configuration: - headers["Authorization"] = f"token {repository_configuration['accessToken']}" + headers["Authorization"] = ( + f"token {repository_configuration['accessToken']}" + ) logger.info(f"Found accessToken: {headers['Authorization']}") - + logger.info( f"Get content list encoded_url: {repository_configuration} {encoded_url}" ) decoded_url = urllib.parse.unquote(encoded_url) logger.info(f"Get content list from decoded_url: {decoded_url}") - response_repository = requests.get(decoded_url, headers=headers, allow_redirects=True) - response_repository.raise_for_status() + response_repository = requests.get( + decoded_url, headers=headers, allow_redirects=True + ) + response_repository.raise_for_status() logger.info(f"Response: {response_repository}") response = make_response(response_repository.content, 200) @@ -68,20 +72,28 @@ def get_content_list(): return response except HTTPError as e: status_code = e.response.status_code # This will give you the HTTP status code - logger.error(f"Exception when retrieving content list! Status code: {status_code}", exc_info=True) + logger.error( + f"Exception when retrieving content list! Status code: {status_code}", + exc_info=True, + ) resp = Response( - json.dumps({"message": f"Bad request: {str(e)}"}), mimetype="application/json" + json.dumps({"message": f"Bad request: {str(e)}"}), + mimetype="application/json", + ) + resp.status_code = ( + status_code # Set the response status code to match the error ) - resp.status_code = status_code # Set the response status code to match the error return resp except Exception as e: logger.error(f"Exception when retrieving content list!", exc_info=True) resp = Response( - json.dumps({"message": f"Bad request: {str(e)}"}), mimetype="application/json" + json.dumps({"message": f"Bad request: {str(e)}"}), + mimetype="application/json", ) resp.status_code = 500 # Generic server error return resp + # download the content from github # params: # url url of monitor to download @@ -93,17 +105,19 @@ def get_content(): encoded_url = request.args.get("url") cep_block_name = request.args.get("cep_block_name") repository_id = request.args.get("repository_id") - extract_fqn_cep_block = parse_boolean(request.args.get( - "extract_fqn_cep_block", default=False - )) - headers = { - 'Accept': 'application/vnd.github.v3.raw' - } - + extract_fqn_cep_block = parse_boolean( + request.args.get("extract_fqn_cep_block", default=False) + ) + headers = {"Accept": "application/vnd.github.v3.raw"} + if repository_id: - repository_configuration = agent.load_repository(request=request, repository_id=repository_id) + repository_configuration = agent.load_repository( + request=request, repository_id=repository_id + ) if "accessToken" in repository_configuration: - headers["Authorization"] = f"token {repository_configuration['accessToken']}" + headers["Authorization"] = ( + f"token {repository_configuration['accessToken']}" + ) logger.info(f"Found accessToken: {headers['Authorization']}") logger.info( f"Get content encoded_url: {repository_configuration} {extract_fqn_cep_block} {cep_block_name} {encoded_url}" @@ -115,10 +129,12 @@ def get_content(): decoded_url = urllib.parse.unquote(encoded_url) logger.info(f"Get content decoded_url: {decoded_url}") - response_repository = requests.get(decoded_url, headers=headers, allow_redirects=True) + response_repository = requests.get( + decoded_url, headers=headers, allow_redirects=True + ) # logger.info(f"Response headers: {dict(response_repository.headers)}") # logger.info(f"Request headers: {dict(response_repository.request.headers)}") - response_repository.raise_for_status() + response_repository.raise_for_status() if extract_fqn_cep_block: regex = "(package\s)(.*?);" package = re.findall(regex, response_repository.text) @@ -131,7 +147,9 @@ def get_content(): response.mimetype = "text/plain" return response except Exception as e: - logger.error(f"Exception when retrieving fqn of monitor file!", exc_info=True) + logger.error( + f"Exception when retrieving fqn of monitor file!", exc_info=True + ) return f"Bad request: {str(e)}", 400 else: logger.info(f"Response: {response_repository}") @@ -141,14 +159,13 @@ def get_content(): except Exception as e: logger.error(f"Exception when retrieving content from github!", exc_info=True) resp = Response( - json.dumps({"message": f"Bad request: {str(e)}"}), mimetype="application/json" + json.dumps({"message": f"Bad request: {str(e)}"}), + mimetype="application/json", ) resp.status_code = 400 return resp - - # load the configured repositores # params: @app.route("/repository/configuration", methods=["GET"]) @@ -156,16 +173,18 @@ def load_repositories(): result = agent.load_repositories(request) if result == None: resp = Response( - json.dumps({"message": "No repositories found"}), mimetype="application/json" + json.dumps({"message": "No repositories found"}), + mimetype="application/json", ) resp.status_code = 400 return resp - + return jsonify(result) + # save the configured repositores # params: -@app.route('/repository/configuration', methods=['POST']) +@app.route("/repository/configuration", methods=["POST"]) def save_repositories(): try: # Get repositories from request body @@ -175,7 +194,7 @@ def save_repositories(): # Validate repository format for repo in repositories: - if not all(key in repo for key in ['id', 'name', 'url']): + if not all(key in repo for key in ["id", "name", "url"]): return {"error": "Each repository must have id, name, and url"}, 400 # Call save method @@ -209,15 +228,21 @@ def create_extension_zip(): # get the contents of the file try: file_name = extract_raw_path(monitor["downloadUrl"]) - repository_configuration = agent.load_repository(request=request, repository_id=monitor["repositoryId"]) - headers = { - 'Accept': 'application/vnd.github.v3.raw' - } + repository_configuration = agent.load_repository( + request=request, repository_id=monitor["repositoryId"] + ) + headers = {"Accept": "application/vnd.github.v3.raw"} if "accessToken" in repository_configuration: - headers["Authorization"] = f"token {repository_configuration['accessToken']}" - logger.info(f"Found accessToken: {headers['Authorization']}") - - response_monitor_code = requests.get(monitor["url"], headers=headers, allow_redirects=True) + headers["Authorization"] = ( + f"token {repository_configuration['accessToken']}" + ) + logger.info( + f"Found accessToken: {headers['Authorization']}" + ) + + response_monitor_code = requests.get( + monitor["url"], headers=headers, allow_redirects=True + ) # Combine output directory and filename logger.info(f"File downloaded and saved to: {file_name}") @@ -227,8 +252,12 @@ def create_extension_zip(): named_file.close() except Exception as e: - logger.error(f"Error downloading file: {monitor}", exc_info=True) - resp = Response(json.dumps({"message": f"Bad request: {str(e)}"}), mimetype="application/json" + logger.error( + f"Error downloading file: {monitor}", exc_info=True + ) + resp = Response( + json.dumps({"message": f"Bad request: {str(e)}"}), + mimetype="application/json", ) resp.status_code = 400 return resp @@ -283,7 +312,8 @@ def create_extension_zip(): except Exception as e: logger.error(f"Exception when creating extension!", exc_info=True) resp = Response( - json.dumps({"message": f"Bad request: {str(e)}"}), mimetype="application/json" + json.dumps({"message": f"Bad request: {str(e)}"}), + mimetype="application/json", ) resp.status_code = 400 return resp @@ -347,7 +377,7 @@ def get_cep_operationobject_id(): ) resp.status_code = 400 return resp - + return jsonify(result) @@ -392,7 +422,7 @@ def parse_boolean(value): if isinstance(value, bool): return value if isinstance(value, str): - return value.lower() == 'true' + return value.lower() == "true" return False diff --git a/analytics-ui/src/monitoring/engine-monitoring.component.ts b/analytics-ui/src/monitoring/engine-monitoring.component.ts index 69da727..e156889 100644 --- a/analytics-ui/src/monitoring/engine-monitoring.component.ts +++ b/analytics-ui/src/monitoring/engine-monitoring.component.ts @@ -63,7 +63,7 @@ export class EngineMonitoringComponent implements OnInit { // eslint-disable-next-line @typescript-eslint/no-unused-vars action: (e, link: string) => window.open( - 'https://cumulocity.com/guides/streaming-analytics/troubleshooting/#safe-mode-on-startup', + 'https://cumulocity.com/docs/streaming-analytics/troubleshooting/#apama_safe_mode', '_blank', 'noopener,noreferrer' ) diff --git a/analytics-ui/src/repository/editor/editor-modal.component.html b/analytics-ui/src/repository/editor/editor-modal.component.html index ee1d384..7625a11 100644 --- a/analytics-ui/src/repository/editor/editor-modal.component.html +++ b/analytics-ui/src/repository/editor/editor-modal.component.html @@ -4,6 +4,6 @@ - \ No newline at end of file diff --git a/analytics-ui/src/shared/analytics.service.ts b/analytics-ui/src/shared/analytics.service.ts index db26a28..68083c8 100644 --- a/analytics-ui/src/shared/analytics.service.ts +++ b/analytics-ui/src/shared/analytics.service.ts @@ -55,7 +55,7 @@ export class AnalyticsService { private applicationService: ApplicationService, ) { this.realtime = new Realtime(this.fetchClient); - this.subscribeMonitoringChannel(true); + this.subscribeMonitoringChannel(); } initiateReload(resetCache: boolean) { @@ -141,7 +141,7 @@ export class AnalyticsService { this._blocksDeployed = undefined; this._extensionsDeployed = undefined; this._cepOperationObjectId = undefined; - this.subscribeMonitoringChannel(true); + this.subscribeMonitoringChannel(); } async getLoadedBlocksFromCEP(): Promise { @@ -260,7 +260,11 @@ export class AnalyticsService { cepOperationObjectId = data[0].id; } } - this._cepOperationObjectId = Promise.resolve(cepOperationObjectId); + if (cepOperationObjectId) { + this._cepOperationObjectId = Promise.resolve(cepOperationObjectId); + } else { + this._cepOperationObjectId = undefined; + } } return this._cepOperationObjectId; } @@ -294,17 +298,23 @@ export class AnalyticsService { method: 'GET' }); } - this._cepCtrlStatus = response.json(); + this._cepCtrlStatus = await response.json(); } return this._cepCtrlStatus; } - async subscribeMonitoringChannel(showWarning: boolean): Promise { + async subscribeMonitoringChannel(): Promise { const cepOperationObjectId = await this.getCEP_OperationObjectId(); - if (!cepOperationObjectId && showWarning) { - this.alertService.warning( - 'The supporting microservice for the Analytics Managment is currently not deployed. Not all feature are available ...' - ); + if (!cepOperationObjectId) { + if (!this._isBackendDeployed) { + this.alertService.warning( + 'The supporting microservice for the Analytics Management is currently not deployed. Not all feature are available ...' + ); + } else { + this.alertService.warning( + 'Streaming Analytics is restarting. Please retry later ...' + ); + } } const { data } = await this.inventoryService.detail(cepOperationObjectId); this.cepOperationObject$.next(data);