Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDAP-21096] Remove in-memory launching queue from RunRecordMonitorService #15800

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from

Conversation

vsethi09
Copy link
Contributor

@vsethi09 vsethi09 commented Jan 13, 2025

Context

Multiple instances of RunRecordMonitoringService cannot run as distributed services as the in-memory cache of the launching queue will result in inconsistencies.

Removed the in-memory launching queue from the RunRecordMonitoringService and used AppMetadataStore APIs.

For more context, see: #15773 (comment).

Note: RunRecordMonitoringService is renamed as FlowControlMonitoringService.

Testing

  • Unit Tests
  • CDAP sandbox
  • Docker image

@vsethi09 vsethi09 added the build Triggers github actions build label Jan 13, 2025
@vsethi09 vsethi09 force-pushed the feature/CDAP-21096_fix_RunRecordMonitorService_queue_cache branch 5 times, most recently from dc068c3 to 0ea30c6 Compare January 15, 2025 20:37
@vsethi09 vsethi09 changed the title [WIP] Remove in-memory launching queue in RunRecordMonitorService [CDAP-21096] Remove in-memory launching queue from RunRecordMonitorService Jan 15, 2025
@vsethi09 vsethi09 marked this pull request as ready for review January 15, 2025 20:42
@vsethi09 vsethi09 force-pushed the feature/CDAP-21096_fix_RunRecordMonitorService_queue_cache branch from 0ea30c6 to abcd4c5 Compare January 15, 2025 20:49
if (runRecordDetail.getStatus() == ProgramRunStatus.PENDING) {
runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId());
flowControlMonitorService.addRequest(runRecordDetail.getProgramRunId(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is needed if the run is already in the database.

Copy link
Contributor Author

@vsethi09 vsethi09 Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required for emitting flow control metrics.

Replaced addRequest() with emitFlowControlMetrics() call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, originally we did it to recreate the in-memory store from database store. Now we read a database store. I don't see a reason to imeddiately write back. Do I miss something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When active runs are restored (in restoreActiveRuns), the launching count metrics needs to be updated for runs in Pending state, otherwise metrics will show incorrect value.

This is only for consistency of launching count metrics.

Even if we don't update the launching count metrics, and a run request is made, it will update the DB and the launching count metric.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metrics we already emit in the end of startup once, no need to do it for every run

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

Yes, metrics are emitted [here](

after all launches are restored.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, coming back, why do you need to call flowControlService.addRequest? Run is already in the database, no need to add it again

@vsethi09 vsethi09 force-pushed the feature/CDAP-21096_fix_RunRecordMonitorService_queue_cache branch 2 times, most recently from 343101b to db8a3ab Compare January 17, 2025 15:24
@vsethi09 vsethi09 requested a review from tivv January 17, 2025 15:26
@vsethi09 vsethi09 force-pushed the feature/CDAP-21096_fix_RunRecordMonitorService_queue_cache branch from db8a3ab to 66da732 Compare January 17, 2025 16:07
int runningCount = getRunningCount(store);
return new Counter(launchingCount, runningCount);
});
LOG.info("Added request with runId {}.", programRunId);

Check notice

Code scanning / SonarCloud

Logging should not be vulnerable to injection attacks Low

Change this code to not log user-controlled data. See more on SonarQube Cloud
@vsethi09 vsethi09 force-pushed the feature/CDAP-21096_fix_RunRecordMonitorService_queue_cache branch 3 times, most recently from ea09155 to d9e4c13 Compare January 20, 2025 16:35
*
* @return Counter with total number of launching and running program runs.
*/
public Counter getCount() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consider using plural (getCounts) or object name ('getCounter)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to getCounter().

Similarly renamed addRequestAndGetCount() to addRequestAndGetCounter()

* * total number of run records with {@link ProgramRunStatus#PENDING} status + total number of
* run records with {@link ProgramRunStatus#STARTING} status.
*/
private final Integer launchingCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since both are always filled, can be primitives now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to int.

if (runRecordDetail.getStatus() == ProgramRunStatus.PENDING) {
runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId());
flowControlMonitorService.addRequest(runRecordDetail.getProgramRunId(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, coming back, why do you need to call flowControlService.addRequest? Run is already in the database, no need to add it again

ProgramDescriptor programDescriptor = this.store.loadProgram(
runRecordDetail.getProgramRunId().getParent());
if (runRecordDetail.getStatus() == ProgramRunStatus.STARTING) {
flowControlService.addRequest(runRecordDetail.getProgramRunId(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need this. Run is already in the database.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

@@ -1167,6 +1254,7 @@ public RunRecordDetail recordProgramOrphaned(ProgramRunId programRunId, long end
.setCluster(cluster)
.setSourceId(sourceId)
.build();
addFlowControlStatusKey(key, meta);
writeToStructuredTableWithPrimaryKeys(
key, meta, getRunRecordsTable(), StoreDefinition.AppMetadataStore.RUN_RECORD_DATA);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: create a helper method for this two calls. (or 3 with LOG.trace). This would ensure adding status keys won't be missed and reduce duplicated code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added 2 liner helper function: writeRunRecordWithPrimaryKeys().

Removed: addFlowControlStatusKey() and merged it into writeRunRecordWithPrimaryKeys().

Not adding the LOG.trace as at some places the program run status is logged while at some places cluster status if logged.

@@ -150,6 +151,16 @@ public class AppMetadataStore {
.put(ProgramRunStatus.REJECTED, TYPE_RUN_RECORD_COMPLETED)
.build();

private static final String TYPE_FLOW_CONTROL_LAUNCHING = "flowControlLaunching";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider not using prefix in values. This would save database space and generally it's clear that it's flow control from field name. Also do you want to differentiate NONE from NULL? Most run records would be in NONE state, so using null would save quite some space. Also assume we don't do migrations, so counters would be off for any programs running while upgrading and that's fine (there should not be any). But having NULL would save you from updateing all old runs, all those would be consistent

@vsethi09 vsethi09 force-pushed the feature/CDAP-21096_fix_RunRecordMonitorService_queue_cache branch from d9e4c13 to 31c5b1c Compare January 22, 2025 10:32
Copy link

Quality Gate Failed Quality Gate failed

Failed conditions
B Security Rating on New Code (required ≥ A)
C Reliability Rating on New Code (required ≥ A)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
build Triggers github actions build
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants