Skip to content

Commit

Permalink
replace accessToken when repositories are returned
Browse files Browse the repository at this point in the history
  • Loading branch information
ck-c8y committed Jan 28, 2025
1 parent dbd7155 commit 204e6ba
Show file tree
Hide file tree
Showing 3 changed files with 325 additions and 149 deletions.
191 changes: 113 additions & 78 deletions analytics-service/c8y_agent.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from xmlrpc.client import boolean
from dotenv import load_dotenv
from c8y_api.app import MultiTenantCumulocityApp
from c8y_api.model import Binary, TenantOption
Expand All @@ -25,12 +26,14 @@
import json
from typing import Dict, List, Optional, Set, Tuple


class C8YAgent:
# Constants
DUMMY_ACCESS_TOKEN = "_DUMMY_ACCESS_CODE_"
PATHS = {
'CEP_DIAGNOSTICS': "/service/cep/diagnostics/apamaCtrlStatus",
'TENANT_OPTIONS': "/tenant/options",
'CEP_RESTART': "/service/cep/restart"
"CEP_DIAGNOSTICS": "/service/cep/diagnostics/apamaCtrlStatus",
"TENANT_OPTIONS": "/tenant/options",
"CEP_RESTART": "/service/cep/restart",
}
ANALYTICS_MANAGEMENT_REPOSITORIES = "analytics-management.repository"

Expand All @@ -40,9 +43,9 @@ def __init__(self):
load_dotenv()
self.c8y_app = MultiTenantCumulocityApp()

def _get_tenant_instance(self, headers: Dict) -> any:
def _get_tenant_instance(self, headers: Dict, cookies: Dict) -> any:
"""Get tenant instance with error handling"""
return self.c8y_app.get_tenant_instance(headers=headers)
return self.c8y_app.get_tenant_instance(headers=headers, cookies=cookies)

def _handle_request(self, func, *args, **kwargs) -> Tuple[Dict, int]:
"""Generic request handler with error handling"""
Expand All @@ -54,9 +57,9 @@ def _handle_request(self, func, *args, **kwargs) -> Tuple[Dict, int]:
return {"error": str(e)}, 500

def upload_extension(self, request, extension_name: str, ext_file) -> str:
headers = self.prepare_header(request)
[headers, cookies] = self.prepare_header(request)
binary = Binary(
c8y=self._get_tenant_instance(headers),
c8y=self._get_tenant_instance(headers, cookies),
type="application/zip",
name=extension_name,
file=ext_file,
Expand All @@ -67,19 +70,18 @@ def upload_extension(self, request, extension_name: str, ext_file) -> str:
def restart_cep(self, request) -> None:
"""
Attempt to restart CEP, ignoring any errors that occur.
Args:
request: The incoming request object
"""
try:
headers = self.prepare_header(request)
[headers, cookies] = self.prepare_header(request)
self._logger.info("Attempting to restart CEP...")

self._get_tenant_instance(headers).put(
resource=self.PATHS['CEP_RESTART'],
json={}

self._get_tenant_instance(headers, cookies).put(
resource=self.PATHS["CEP_RESTART"], json={}
)

except Exception as e:
# Log the error but don't raise it
self._logger.warning(f"Non-critical error during CEP restart: {str(e)}")
Expand All @@ -88,52 +90,54 @@ def restart_cep(self, request) -> None:
self._logger.info("CEP restart procedure completed")

def get_cep_operationobject_id(self, request) -> Optional[Dict]:
headers = self.prepare_header(request)
# response = self._get_tenant_instance(headers).get(
[headers, cookies] = self.prepare_header(request)
# response = self._get_tenant_instance(headers,cookies).get(
# resource=self.PATHS['CEP_DIAGNOSTICS']
# )
ti = self._get_tenant_instance(headers)

ti = self._get_tenant_instance(headers, cookies)
self._logger.info(f"Updated get_cep_operationobject_id: {ti}")
self._logger.info(f"Updated path: {self.PATHS['CEP_DIAGNOSTICS']}")
response = ti.get(
resource=self.PATHS['CEP_DIAGNOSTICS']
)

response = ti.get(resource=self.PATHS["CEP_DIAGNOSTICS"])

app_id = response.get("microservice_application_id")
microservice_name = response.get("microservice_name")

if not all([app_id, microservice_name]):
return None

query = f"applicationId eq '{app_id}' and name eq '{microservice_name}'"
managed_objects = self._get_tenant_instance(headers).inventory.select(query=query)

managed_objects = self._get_tenant_instance(headers, cookies).inventory.select(
query=query
)

for managed_object in managed_objects:
return {"id": managed_object.id}

return None

def get_cep_ctrl_status(self, request) -> Dict:
headers = self.prepare_header(request)
return self._get_tenant_instance(headers).get(
resource=self.PATHS['CEP_DIAGNOSTICS']
[headers, cookies] = self.prepare_header(request)
return self._get_tenant_instance(headers, cookies).get(
resource=self.PATHS["CEP_DIAGNOSTICS"]
)

def _process_repository_data(self, repo_data: Union[Dict, str, 'TenantOption'], repository_id: str = None) -> Dict:

def _process_repository_data(
self, repo_data: Union[Dict, str, "TenantOption"], repository_id: str = None, replace_access_token: boolean = True
) -> Dict:
"""
Process repository data into standard format.
Args:
repo_data: Repository data as either a dictionary, JSON string, or TenantOption
repository_id: Optional repository ID
Returns:
Dictionary containing processed repository data
"""
try:
# Handle TenantOption input
if hasattr(repo_data, 'value'):
if hasattr(repo_data, "value"):
try:
value_dict = json.loads(repo_data.value)
except json.JSONDecodeError:
Expand All @@ -142,7 +146,7 @@ def _process_repository_data(self, repo_data: Union[Dict, str, 'TenantOption'],
"name": repo_data.value, # Use the value as name
"url": "",
"accessToken": "",
"enabled": False
"enabled": False,
}
# Handle string input
elif isinstance(repo_data, str):
Expand All @@ -154,22 +158,26 @@ def _process_repository_data(self, repo_data: Union[Dict, str, 'TenantOption'],
"name": repo_data, # Use the string as name
"url": "",
"accessToken": "",
"enabled": False
"enabled": False,
}
# Handle dict input
elif isinstance(repo_data, dict):
value_dict = json.loads(repo_data.get('value', '{}'))
value_dict = json.loads(repo_data.get("value", "{}"))
else:
raise ValueError(f"Unsupported repo_data type: {type(repo_data)}")

# Process the dictionary
return {
result = {
"id": repository_id or value_dict.get("id"),
"name": value_dict.get("name", ""),
"url": value_dict.get("url", ""),
"accessToken": value_dict.get("accessToken", ""),
"enabled": value_dict.get("enabled", False)
"enabled": value_dict.get("enabled", False),
}
# If there's an access token and replace_access_token, replace it with dummy
if value_dict.get("accessToken") and replace_access_token:
result["accessToken"] = self.DUMMY_ACCESS_TOKEN
# Process the dictionary
return result
except Exception as e:
self._logger.error(f"Error processing repository data: {e}", exc_info=True)
# Return a basic dict with default values in case of error
Expand All @@ -178,42 +186,47 @@ def _process_repository_data(self, repo_data: Union[Dict, str, 'TenantOption'],
"name": str(repo_data)[:100], # Truncate long strings
"url": "",
"accessToken": "",
"enabled": False
"enabled": False,
}

def load_repositories(self, request) -> List[Dict]:
headers = self.prepare_header(request)
tenant = self._get_tenant_instance(headers)
[headers, cookies] = self.prepare_header(request)
tenant = self._get_tenant_instance(headers, cookies)
tenant_options = tenant.tenant_options.get_all(
category=self.ANALYTICS_MANAGEMENT_REPOSITORIES
)
)
return [
self._process_repository_data(option, option.key)
self._process_repository_data(option, option.key, True)
for option in tenant_options
]

def load_repository(self, request, repository_id: str) -> Dict:
headers = self.prepare_header(request)
tenant = self._get_tenant_instance(headers)
def load_repository(
self, request, repository_id: str, replace_access_token: boolean
) -> Dict:
[headers, cookies] = self.prepare_header(request)
tenant = self._get_tenant_instance(headers, cookies)
tenant_option = tenant.tenant_options.get(
category=self.ANALYTICS_MANAGEMENT_REPOSITORIES,
key=repository_id
category=self.ANALYTICS_MANAGEMENT_REPOSITORIES, key=repository_id
)
# Print various attributes of the TenantOption object
# Print various attributes of the TenantOption object
print(f"TenantOption contents:")
print(f"Category: {tenant_option.category}")
print(f"Key: {tenant_option.key}")
print(f"Value: {tenant_option.value}")

# Print the entire object
print(f"Complete TenantOption object: {vars(tenant_option)}")
return self._process_repository_data(tenant_option, repository_id)
return self._process_repository_data(
tenant_option, repository_id, replace_access_token
)

def update_repositories(self, request, repositories: List[Dict]) -> Tuple[Dict, int]:
def update_repositories(
self, request, repositories: List[Dict]
) -> Tuple[Dict, int]:
try:
headers = self.prepare_header(request)
tenant = self._get_tenant_instance(headers)
[headers, cookies] = self.prepare_header(request)
tenant = self._get_tenant_instance(headers, cookies)

existing_repos = self.load_repositories(request)
new_repo_ids = {repo.get("id") for repo in repositories}
existing_repo_ids = {repo.get("id") for repo in existing_repos}
Expand All @@ -234,37 +247,59 @@ def update_repositories(self, request, repositories: List[Dict]) -> Tuple[Dict,

def _update_single_repository(self, tenant, repository: Dict) -> None:
"""Helper method to update a single repository"""
value_dict = {
"name": repository.get("name"),
"url": repository.get("url"),
"enabled": bool(repository.get("enabled", False))
}
if repository.get("accessToken"):
value_dict["accessToken"] = repository["accessToken"]

option = TenantOption(
category=self.ANALYTICS_MANAGEMENT_REPOSITORIES,
key=repository.get("id"),
value=json.dumps(value_dict)
)
tenant.tenant_options.create(option)
self._logger.info(f"Updated repository: {repository.get('id')}")
try:
# If the access token is the dummy, get the original from existing repository
if repository.get("accessToken") == self.DUMMY_ACCESS_TOKEN:
existing_repo = tenant.tenant_options.get(
category=self.ANALYTICS_MANAGEMENT_REPOSITORIES,
key=repository.get("id"),
)
if existing_repo:
existing_data = json.loads(existing_repo.value)
access_token = existing_data.get("accessToken", "")
else:
access_token = ""
else:
access_token = repository.get("accessToken", "")

value_dict = {
"name": repository.get("name"),
"url": repository.get("url"),
"enabled": bool(repository.get("enabled", False)),
}

# Only add access token if it exists
if access_token:
value_dict["accessToken"] = access_token

option = TenantOption(
category=self.ANALYTICS_MANAGEMENT_REPOSITORIES,
key=repository.get("id"),
value=json.dumps(value_dict),
)
tenant.tenant_options.create(option)
self._logger.info(f"Updated repository: {repository.get('id')}")

except Exception as e:
self._logger.error(
f"Failed to update repository {repository.get('id')}: {str(e)}"
)
raise

def _delete_repositories(self, tenant, repo_ids: Set[str]) -> None:
"""Helper method to delete multiple repositories"""
for repo_id in repo_ids:
try:
tenant.tenant_options.delete_by(
category=self.ANALYTICS_MANAGEMENT_REPOSITORIES,
key=repo_id
category=self.ANALYTICS_MANAGEMENT_REPOSITORIES, key=repo_id
)
self._logger.info(f"Deleted repository: {repo_id}")
except Exception as e:
self._logger.warning(f"Failed to delete repository {repo_id}: {str(e)}")

@staticmethod
def prepare_header(request) -> Dict:
headers = dict(request.headers)
if "authorization" in request.cookies:
headers["Authorization"] = f"Bearer {request.cookies['authorization']}"
return headers
# headers = dict(request.headers)
# if "authorization" in request.cookies:
# headers["Authorization"] = f"Bearer {request.cookies['authorization']}"
return [request.headers, request.cookies]
Loading

0 comments on commit 204e6ba

Please sign in to comment.