-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
CMS: probe checking the number and volume of rules per rse, activity …
…and state
- Loading branch information
1 parent
a7c0556
commit 94b72e2
Showing
1 changed file
with
190 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
#!/usr/bin/env python3 | ||
# Copyright 2012-2024 CERN | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
# Authors: | ||
# - Christos Emmanouil, <[email protected]>, 2024 | ||
|
||
""" | ||
Probe to check number and volume of rules per rse, activity and state. | ||
Handling rules in the states OK, REPLICATING, STUCK. | ||
""" | ||
|
||
import sys | ||
import traceback | ||
|
||
from sqlalchemy import func, case | ||
from sqlalchemy.orm import aliased | ||
|
||
from rucio.db.sqla import models | ||
from rucio.db.sqla.session import get_session | ||
from rucio.common.types import InternalScope | ||
from rucio.db.sqla.constants import RuleState, DIDType | ||
|
||
from utils import common | ||
PrometheusPusher = common.PrometheusPusher | ||
|
||
# Exit statuses | ||
OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 | ||
|
||
if __name__ == "__main__": | ||
|
||
try: | ||
|
||
session = get_session() | ||
|
||
''' | ||
# Handling WAITING_APPROVAL, INJECT, SUSPENDED with unique RSE destination | ||
# - The information is retried using the following tables in the specified order: | ||
# rules -> contents -> dids. | ||
# - For these states, volume information can be retrieved from the contents and dids tables. | ||
# However, only rules with container dids are counted. If a rule is created using a did | ||
# of a different type, it will be ignored. | ||
# - For rules in this state, information about the destination RSE is (only maybe?) available | ||
# at the rules table (ReplicationRule) under rse_expression. However, the rse_expression field | ||
# can be generic, leading to multiple RSEs as possible destinations. | ||
# To group by RSE, a regex pattern is required to match only single RSE expressions. | ||
# Sometimes, single RSE expressions can be something like 'rse=...', so this part must be removed. | ||
# - The query is heavily time-consuming due to the contents table (DataIdentifierAssociation), | ||
# which is huge and not properly indexed. | ||
# In the DB, creating an index for the contents table, focusing on NAME, CHILD_NAME in combination | ||
# with DID_TYPE, CHILD_TYPE, can improve the speed of directly querying the DB. | ||
# However, index creation can impact DB performance during creation and later on, | ||
# as the DB will need to update this index as well. | ||
# - For this reason, handling rules of these states is better implemented using HDFS dumps. | ||
rse = case((models.ReplicationRule.rse_expression.startswith("rse="), | ||
func.substr(models.ReplicationRule.rse_expression, 5, func.length(models.ReplicationRule.rse_expression) - 4)), | ||
else_=models.ReplicationRule.rse_expression).label("rse") | ||
container_to_dataset = aliased(models.DataIdentifierAssociation) | ||
dataset_to_file = aliased(models.DataIdentifierAssociation) | ||
query = ( | ||
session.query( | ||
rse, | ||
models.ReplicationRule.activity, | ||
models.ReplicationRule.state, | ||
func.count(models.ReplicationRule.id), | ||
func.sum(models.DataIdentifier.bytes) | ||
) | ||
.with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_STATE_IDX)', 'oracle') | ||
.with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_SCOPE_NAME_IDX)', 'oracle') | ||
.with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_SC_NA_AC_RS_CO_UQ_IDX)', 'oracle') | ||
.with_hint(models.DataIdentifierAssociation, 'INDEX_FFS(contents CONTENTS_PK)', 'oracle') | ||
.with_hint(models.DataIdentifier, 'INDEX_FFS(dids DIDS_PK)', 'oracle') | ||
.select_from(models.ReplicationRule) | ||
.filter(models.ReplicationRule.scope == InternalScope("cms")) | ||
.filter(models.ReplicationRule.state.in_([RuleState.INJECT, RuleState.WAITING_APPROVAL, RuleState.SUSPENDED])) | ||
.filter(func.regexp_like(models.ReplicationRule.rse_expression, r'^rse=[^&|]+$|^[^&|]+$')) | ||
.join(container_to_dataset, models.ReplicationRule.name == container_to_dataset.name) | ||
.filter(container_to_dataset.did_type == DIDType.CONTAINER) | ||
.filter(container_to_dataset.child_type == DIDType.DATASET) | ||
.join(dataset_to_file, container_to_dataset.child_name == dataset_to_file.name) | ||
.filter(dataset_to_file.did_type == DIDType.DATASET) | ||
.filter(dataset_to_file.child_type == DIDType.FILE) | ||
.join(models.DataIdentifier, models.DataIdentifier.name == dataset_to_file.child_name) | ||
.group_by(rse) | ||
.group_by(models.ReplicationRule.activity) | ||
.group_by(models.ReplicationRule.state) | ||
) | ||
with PrometheusPusher() as manager: | ||
for rse, activity, state, count, volume in query.all(): | ||
manager.gauge( | ||
name='rule_count_per_rse_activity_state.{rse}.{activity}.{state}', | ||
documentation='Number of rules in a given rse, activity and state' | ||
).labels( | ||
rse=rse, | ||
activity=activity, | ||
state=state | ||
).set(count) | ||
manager.gauge( | ||
name='rule_volume_per_rse_activity_state.{rse}.{activity}.{state}', | ||
documentation='Volume of rules in a given rse, activity and state' | ||
).labels( | ||
rse=rse, | ||
activity=activity, | ||
state=state | ||
).set(volume) | ||
''' | ||
|
||
# Handling OK, REPLICATING, STUCK | ||
# - The information is retried using the following tables in the specified order: | ||
# rules -> requests -> rses. | ||
# - For rules in these states, volume information can be retrieved from the requests (Request) | ||
# and requests_history (RequestHistory) tables. However, requests_history is huge and not indexed. | ||
# So, as before, directly querying the DB will be heavily time-consuming. | ||
# - Additionally, the request table is not available in HDFS, so querying this table is only possible | ||
# directly from the DB. Note that these tables contain different information, | ||
# so a combination of both is needed for the full picture. | ||
# - The requests_history table will be handled using HDFS dumps, while the requests table will be | ||
# handled by a probe directly querying the DB. | ||
|
||
query = ( | ||
session.query( | ||
models.RSE.rse, | ||
models.Request.activity, | ||
models.ReplicationRule.state, | ||
func.count(models.ReplicationRule.id), | ||
func.sum(models.Request.bytes) | ||
) | ||
.with_hint(models.RSE, 'INDEX_FFS(rses RSES_PK)', 'oracle') | ||
.with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_PK)', 'oracle') | ||
.with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_STATE_IDX)', 'oracle') | ||
.with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_SCOPE_NAME_IDX)', 'oracle') | ||
.with_hint(models.Request, 'INDEX_FFS(requests REQUESTS_RULEID_IDX)', 'oracle') | ||
.with_hint(models.Request, 'INDEX_FFS(requests REQUESTS_DEST_RSE_ID_IDX)', 'oracle') | ||
.with_hint(models.Request, 'INDEX_FFS(requests REQUESTS_TYP_STA_UPD_IDX)', 'oracle') | ||
.select_from(models.ReplicationRule) | ||
.filter(models.ReplicationRule.scope == InternalScope("cms")) | ||
.filter(models.ReplicationRule.state.in_([RuleState.OK, RuleState.REPLICATING, RuleState.STUCK])) | ||
.join(models.Request, models.ReplicationRule.id == models.Request.rule_id) | ||
.join(models.RSE, models.Request.dest_rse_id == models.RSE.id) | ||
.group_by(models.RSE.rse) | ||
.group_by(models.ReplicationRule.state) | ||
.group_by(models.Request.activity) | ||
) | ||
|
||
with PrometheusPusher() as manager: | ||
for rse, activity, state, count, volume in query.all(): | ||
|
||
# print(rse, activity, state.name, count, volume) | ||
|
||
manager.gauge( | ||
name='rule_count_per_rse_activity_state.{rse}.{activity}.{state}', | ||
documentation='Number of rules in a given rse, activity and state' | ||
).labels( | ||
rse=rse, | ||
activity=activity, | ||
state=state.name | ||
).set(count) | ||
|
||
manager.gauge( | ||
name='rule_volume_per_rse_activity_state.{rse}.{activity}.{state}', | ||
documentation='Volume of rules in a given rse, activity and state' | ||
).labels( | ||
rse=rse, | ||
activity=activity, | ||
state=state.name | ||
).set(volume) | ||
|
||
except: | ||
print(traceback.format_exc()) | ||
sys.exit(UNKNOWN) | ||
finally: | ||
session.remove() | ||
sys.exit(OK) | ||
|