Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] log compaction #1282

Open
wants to merge 79 commits into
base: main
Choose a base branch
from

Conversation

WhitneyDeng
Copy link

Summary

  • purpose: to get stores ready for compaction based on programmatically defined criteria
  • added API endpoints to get compaction-ready stores

How was this PR tested?

unit test: filter all stores on a cluster for compaction-ready stores

Does this PR introduce any user-facing changes?

  • No. You can skip the rest of this section.
  • Yes. Make sure to explain your proposed changes and call out the behavior change.

Whitney Deng added 10 commits October 29, 2024 11:52
…er methods VeniceParentHelixAdmin::isCompactionReady VeniceParentHelixAdmin::isLastCompactionTimeOlderThanThresholdHours
…ForCompaction & VeniceParentHelixAdmin::isLastCompactionTimeOlderThanThresholdHours JavaDocs description

- complete description of VeniceParentHelixAdmin::getStoresForCompaction
- future tense to present tense for VeniceParentHelixAdmin::isLastCompactionTimeOlderThanThresholdHours
…:filterStoresForCompaction

- extract filterStoresForCompaction() logic for testing
- write VeniceParentHelixAdminTest::testFilterStoresForCompaction
… TestVeniceParentHelixAdmin

- migrate from integration test files to unit test files
Copy link
Contributor

@mynameborat mynameborat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job on putting up the first PR quickly. Looks very good. I have put in some comments.

Let me know if you have questions.

Whitney Deng added 5 commits November 6, 2024 11:38
- remodularise VeniceParentHelixAdmin::isHybridStore to function var in VeniceParentHelixAdmin::isCompactionReady
- make VeniceParentHelixAdmin::isLastCompactionTimeOlderThanThresholdHours hardcoded int to class constant
Whitney Deng added 13 commits November 21, 2024 09:10
- change exception message in child controller
interface for repush: Azkaban/Airflow implementation for Linkedin internal. (potentially) Spark implementation for OSS.
- empty implementation for now
reason: since repush will be used for diff use cases (other than compaction), the term repush should be reserved for the abstracted repush action.
- instantiate in VeniceController
- this config determines the number of threads for LogCompactionService::executor
…into CompactionManager

- create CompactionManager & TestCompactionManager
- move TestCompactionManager::testFilterStoresForCompaction from TestVeniceParentHelixAdmin
…utor setup in LogCompactionService::startInner
stupid autospacing
Copy link
Contributor

@mynameborat mynameborat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think overall the PR looks fine. General comments

  • Please remove any TODOs if you are not planning to address them in the PR or short-term
  • Mostly minor comments on code style.
  • Update the status of the PR from draft.

Comment on lines +240 to +248
/**
* Number of threads to use for scheduled log compaction
*/
public static final String SCHEDULED_LOG_COMPACTION_THREAD_COUNT = "scheduled.log.compaction.thread.count";

/**
* Time between each scheduled log compaction
*/
public static final String SCHEDULED_LOG_COMPACTION_INTERVAL_MS = "scheduled.log.compaction.interval.ms";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping on this comment.

try {
if (!executor.awaitTermination(SCHEDULED_EXECUTOR_TIMEOUT_S, TimeUnit.SECONDS)) {
executor.shutdownNow();
LOGGER.info("log compaction service shut down gracefully");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update log statement to describe timeout to make it explicit.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean describe how long shutdown took?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it says shut down gracefully but in reality it is attempting to shut down forcefully because waiting for normal shutdown failed due to timeout.

Something along the lines of Timed out waiting for executor to shutdown. Attempting to forcefully close or shut it down

public class RepushJobResponse {
private final String storeName;
private final String execId;
private final String execUrl;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/execId/executionId and same for the rest.

Also, how is execUrl used? Consider adding another API in RepushOrchestrator that provides status for a given executionId. The status API can provide additional information like,

  • when it was launched
  • what store it is for
  • which concrete orchestrator platform it uses
  • weblink
    etc

Copy link
Author

@WhitneyDeng WhitneyDeng Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execUrl is the link to the AirflowUI, referring to its usage in SmartAssistantHandler

// construct the URL to the Airflow UI
return String.format("%s/admin/airflow/graph?dag_id=%s&execution_date=%s&run_id=%s",
          airflowClient.getBasePath(), SMART_ASSISTANT_DAG_ID, result.getLogicalDate(),
          result.getDagRunId());

https://github.com/linkedin-multiproduct/nuage-venice/pull/696/files#diff-0dd3d578a191ea41c4cd025ddbe113157a3785de4648368e87e139a1047ac997

Copy link
Author

@WhitneyDeng WhitneyDeng Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re status API: do you mean a codepath getRepushStatus() separate from the existing compactStore() endpoint in StoresRoute?

verify(admin, times(expectedCompactStoreInvocationCount)).compactStore(storeForCompaction);
}

// TODO: test LogCompactionTask::run() focus on edge cases
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you adding one?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If any edge cases comes to mind, I welcome your input.

- encapsulate repush configs & dependencies (e.g. Client)
@@ -136,6 +144,12 @@ public class TestHybrid {
private static final Logger LOGGER = LogManager.getLogger(TestHybrid.class);
public static final int STREAMING_RECORD_SIZE = 1024;

// Log compaction test constants
private static final int TEST_TIMEOUT = 999999; // ms
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Integer.MAX_VALUE ?

Whitney Deng added 6 commits February 6, 2025 15:58
- dummy config values for new repush configs
- decrease latch countdown wait time
- change RepushOrchestrator initialisation in VeniceHelixAdmin & TestHybrid
- RepushJobResponse::storeName -> ControllerResponse::name
- set response error
@WhitneyDeng WhitneyDeng marked this pull request as ready for review February 11, 2025 18:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants