diff --git a/cms/check_rules_count_volume_per_rse_activity_state b/cms/check_rules_count_volume_per_rse_activity_state new file mode 100755 index 0000000..8596079 --- /dev/null +++ b/cms/check_rules_count_volume_per_rse_activity_state @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 +# Copyright 2012-2024 CERN +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Authors: +# - Christos Emmanouil, , 2024 + +""" +Probe to check number and volume of rules per rse, activity and state. +Handling rules in the states OK, REPLICATING, STUCK. +""" + +import sys +import traceback + +from sqlalchemy import func, case +from sqlalchemy.orm import aliased + +from rucio.db.sqla import models +from rucio.db.sqla.session import get_session +from rucio.common.types import InternalScope +from rucio.db.sqla.constants import RuleState, DIDType + +from utils import common +PrometheusPusher = common.PrometheusPusher + +# Exit statuses +OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 + +if __name__ == "__main__": + + try: + + session = get_session() + + ''' + # Handling WAITING_APPROVAL, INJECT, SUSPENDED with unique RSE destination + # - The information is retried using the following tables in the specified order: + # rules -> contents -> dids. + # - For these states, volume information can be retrieved from the contents and dids tables. + # However, only rules with container dids are counted. If a rule is created using a did + # of a different type, it will be ignored. + # - For rules in this state, information about the destination RSE is (only maybe?) available + # at the rules table (ReplicationRule) under rse_expression. However, the rse_expression field + # can be generic, leading to multiple RSEs as possible destinations. + # To group by RSE, a regex pattern is required to match only single RSE expressions. + # Sometimes, single RSE expressions can be something like 'rse=...', so this part must be removed. + # - The query is heavily time-consuming due to the contents table (DataIdentifierAssociation), + # which is huge and not properly indexed. + # In the DB, creating an index for the contents table, focusing on NAME, CHILD_NAME in combination + # with DID_TYPE, CHILD_TYPE, can improve the speed of directly querying the DB. + # However, index creation can impact DB performance during creation and later on, + # as the DB will need to update this index as well. + # - For this reason, handling rules of these states is better implemented using HDFS dumps. + + rse = case((models.ReplicationRule.rse_expression.startswith("rse="), + func.substr(models.ReplicationRule.rse_expression, 5, func.length(models.ReplicationRule.rse_expression) - 4)), + else_=models.ReplicationRule.rse_expression).label("rse") + container_to_dataset = aliased(models.DataIdentifierAssociation) + dataset_to_file = aliased(models.DataIdentifierAssociation) + + query = ( + session.query( + rse, + models.ReplicationRule.activity, + models.ReplicationRule.state, + func.count(models.ReplicationRule.id), + func.sum(models.DataIdentifier.bytes) + ) + .with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_STATE_IDX)', 'oracle') + .with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_SCOPE_NAME_IDX)', 'oracle') + .with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_SC_NA_AC_RS_CO_UQ_IDX)', 'oracle') + .with_hint(models.DataIdentifierAssociation, 'INDEX_FFS(contents CONTENTS_PK)', 'oracle') + .with_hint(models.DataIdentifier, 'INDEX_FFS(dids DIDS_PK)', 'oracle') + .select_from(models.ReplicationRule) + .filter(models.ReplicationRule.scope == InternalScope("cms")) + .filter(models.ReplicationRule.state.in_([RuleState.INJECT, RuleState.WAITING_APPROVAL, RuleState.SUSPENDED])) + .filter(func.regexp_like(models.ReplicationRule.rse_expression, r'^rse=[^&|]+$|^[^&|]+$')) + .join(container_to_dataset, models.ReplicationRule.name == container_to_dataset.name) + .filter(container_to_dataset.did_type == DIDType.CONTAINER) + .filter(container_to_dataset.child_type == DIDType.DATASET) + .join(dataset_to_file, container_to_dataset.child_name == dataset_to_file.name) + .filter(dataset_to_file.did_type == DIDType.DATASET) + .filter(dataset_to_file.child_type == DIDType.FILE) + .join(models.DataIdentifier, models.DataIdentifier.name == dataset_to_file.child_name) + .group_by(rse) + .group_by(models.ReplicationRule.activity) + .group_by(models.ReplicationRule.state) + ) + + with PrometheusPusher() as manager: + for rse, activity, state, count, volume in query.all(): + + manager.gauge( + name='rule_count_per_rse_activity_state.{rse}.{activity}.{state}', + documentation='Number of rules in a given rse, activity and state' + ).labels( + rse=rse, + activity=activity, + state=state + ).set(count) + + manager.gauge( + name='rule_volume_per_rse_activity_state.{rse}.{activity}.{state}', + documentation='Volume of rules in a given rse, activity and state' + ).labels( + rse=rse, + activity=activity, + state=state + ).set(volume) + ''' + + # Handling OK, REPLICATING, STUCK + # - The information is retried using the following tables in the specified order: + # rules -> requests -> rses. + # - For rules in these states, volume information can be retrieved from the requests (Request) + # and requests_history (RequestHistory) tables. However, requests_history is huge and not indexed. + # So, as before, directly querying the DB will be heavily time-consuming. + # - Additionally, the request table is not available in HDFS, so querying this table is only possible + # directly from the DB. Note that these tables contain different information, + # so a combination of both is needed for the full picture. + # - The requests_history table will be handled using HDFS dumps, while the requests table will be + # handled by a probe directly querying the DB. + + query = ( + session.query( + models.RSE.rse, + models.Request.activity, + models.ReplicationRule.state, + func.count(models.ReplicationRule.id), + func.sum(models.Request.bytes) + ) + .with_hint(models.RSE, 'INDEX_FFS(rses RSES_PK)', 'oracle') + .with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_PK)', 'oracle') + .with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_STATE_IDX)', 'oracle') + .with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_SCOPE_NAME_IDX)', 'oracle') + .with_hint(models.Request, 'INDEX_FFS(requests REQUESTS_RULEID_IDX)', 'oracle') + .with_hint(models.Request, 'INDEX_FFS(requests REQUESTS_DEST_RSE_ID_IDX)', 'oracle') + .with_hint(models.Request, 'INDEX_FFS(requests REQUESTS_TYP_STA_UPD_IDX)', 'oracle') + .select_from(models.ReplicationRule) + .filter(models.ReplicationRule.scope == InternalScope("cms")) + .filter(models.ReplicationRule.state.in_([RuleState.OK, RuleState.REPLICATING, RuleState.STUCK])) + .join(models.Request, models.ReplicationRule.id == models.Request.rule_id) + .join(models.RSE, models.Request.dest_rse_id == models.RSE.id) + .group_by(models.RSE.rse) + .group_by(models.ReplicationRule.state) + .group_by(models.Request.activity) + ) + + with PrometheusPusher() as manager: + for rse, activity, state, count, volume in query.all(): + + # print(rse, activity, state.name, count, volume) + + manager.gauge( + name='rule_count_per_rse_activity_state.{rse}.{activity}.{state}', + documentation='Number of rules in a given rse, activity and state' + ).labels( + rse=rse, + activity=activity, + state=state.name + ).set(count) + + manager.gauge( + name='rule_volume_per_rse_activity_state.{rse}.{activity}.{state}', + documentation='Volume of rules in a given rse, activity and state' + ).labels( + rse=rse, + activity=activity, + state=state.name + ).set(volume) + + except: + print(traceback.format_exc()) + sys.exit(UNKNOWN) + finally: + session.remove() + sys.exit(OK) + \ No newline at end of file