-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[controller] log compaction #1282
base: main
Are you sure you want to change the base?
Conversation
…er methods VeniceParentHelixAdmin::isCompactionReady VeniceParentHelixAdmin::isLastCompactionTimeOlderThanThresholdHours
…or readability & maintainability
…ForCompaction & VeniceParentHelixAdmin::isLastCompactionTimeOlderThanThresholdHours JavaDocs description - complete description of VeniceParentHelixAdmin::getStoresForCompaction - future tense to present tense for VeniceParentHelixAdmin::isLastCompactionTimeOlderThanThresholdHours
…ionReady & helper methods
…ersion is not found
…:filterStoresForCompaction - extract filterStoresForCompaction() logic for testing - write VeniceParentHelixAdminTest::testFilterStoresForCompaction
… TestVeniceParentHelixAdmin - migrate from integration test files to unit test files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job on putting up the first PR quickly. Looks very good. I have put in some comments.
Let me know if you have questions.
internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java
Outdated
Show resolved
Hide resolved
...mmon/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java
Outdated
Show resolved
Hide resolved
services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java
Outdated
Show resolved
Hide resolved
...s/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java
Outdated
Show resolved
Hide resolved
...s/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java
Outdated
Show resolved
Hide resolved
...s/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java
Outdated
Show resolved
Hide resolved
.../venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java
Outdated
Show resolved
Hide resolved
...s/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java
Outdated
Show resolved
Hide resolved
...nice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java
Outdated
Show resolved
Hide resolved
…tionReady package private
…data type ArrayList -> List
- remodularise VeniceParentHelixAdmin::isHybridStore to function var in VeniceParentHelixAdmin::isCompactionReady - make VeniceParentHelixAdmin::isLastCompactionTimeOlderThanThresholdHours hardcoded int to class constant
...s/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java
Outdated
Show resolved
Hide resolved
- change exception message in child controller
interface for repush: Azkaban/Airflow implementation for Linkedin internal. (potentially) Spark implementation for OSS.
…logic to VeniceHelixAdmin
- empty implementation for now
reason: since repush will be used for diff use cases (other than compaction), the term repush should be reserved for the abstracted repush action.
- instantiate in VeniceController
- this config determines the number of threads for LogCompactionService::executor
…UNT annotations & comments
…into CompactionManager - create CompactionManager & TestCompactionManager - move TestCompactionManager::testFilterStoresForCompaction from TestVeniceParentHelixAdmin
…utor setup in LogCompactionService::startInner
…g time unit from hour to millisecond
stupid autospacing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think overall the PR looks fine. General comments
- Please remove any TODOs if you are not planning to address them in the PR or short-term
- Mostly minor comments on code style.
- Update the status of the PR from draft.
internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java
Show resolved
Hide resolved
/** | ||
* Number of threads to use for scheduled log compaction | ||
*/ | ||
public static final String SCHEDULED_LOG_COMPACTION_THREAD_COUNT = "scheduled.log.compaction.thread.count"; | ||
|
||
/** | ||
* Time between each scheduled log compaction | ||
*/ | ||
public static final String SCHEDULED_LOG_COMPACTION_INTERVAL_MS = "scheduled.log.compaction.interval.ms"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping on this comment.
...nal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestHybrid.java
Outdated
Show resolved
Hide resolved
...egrationTest/java/com/linkedin/venice/endToEnd/logcompaction/TestRepushOrchestratorImpl.java
Outdated
Show resolved
Hide resolved
services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceController.java
Outdated
Show resolved
Hide resolved
try { | ||
if (!executor.awaitTermination(SCHEDULED_EXECUTOR_TIMEOUT_S, TimeUnit.SECONDS)) { | ||
executor.shutdownNow(); | ||
LOGGER.info("log compaction service shut down gracefully"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update log statement to describe timeout to make it explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean describe how long shutdown took?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it says shut down gracefully but in reality it is attempting to shut down forcefully because waiting for normal shutdown failed due to timeout.
Something along the lines of Timed out waiting for executor to shutdown. Attempting to forcefully close or shut it down
...troller/src/main/java/com/linkedin/venice/controller/logcompaction/LogCompactionService.java
Show resolved
Hide resolved
...venice-controller/src/main/java/com/linkedin/venice/controller/repush/RepushJobResponse.java
Outdated
Show resolved
Hide resolved
public class RepushJobResponse { | ||
private final String storeName; | ||
private final String execId; | ||
private final String execUrl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/execId/executionId
and same for the rest.
Also, how is execUrl
used? Consider adding another API in RepushOrchestrator
that provides status for a given executionId
. The status API can provide additional information like,
- when it was launched
- what store it is for
- which concrete orchestrator platform it uses
- weblink
etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
execUrl is the link to the AirflowUI, referring to its usage in SmartAssistantHandler
// construct the URL to the Airflow UI
return String.format("%s/admin/airflow/graph?dag_id=%s&execution_date=%s&run_id=%s",
airflowClient.getBasePath(), SMART_ASSISTANT_DAG_ID, result.getLogicalDate(),
result.getDagRunId());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re status API: do you mean a codepath getRepushStatus()
separate from the existing compactStore()
endpoint in StoresRoute
?
verify(admin, times(expectedCompactStoreInvocationCount)).compactStore(storeForCompaction); | ||
} | ||
|
||
// TODO: test LogCompactionTask::run() focus on edge cases |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you adding one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If any edge cases comes to mind, I welcome your input.
- encapsulate repush configs & dependencies (e.g. Client)
@@ -136,6 +144,12 @@ public class TestHybrid { | |||
private static final Logger LOGGER = LogManager.getLogger(TestHybrid.class); | |||
public static final int STREAMING_RECORD_SIZE = 1024; | |||
|
|||
// Log compaction test constants | |||
private static final int TEST_TIMEOUT = 999999; // ms |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using Integer.MAX_VALUE ?
...nal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestHybrid.java
Outdated
Show resolved
Hide resolved
...ntegrationTest/java/com/linkedin/venice/endToEnd/logcompaction/TestLogCompactionService.java
Show resolved
Hide resolved
- already neasted inside TestHybrid
internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java
Show resolved
Hide resolved
...ntegrationTest/java/com/linkedin/venice/endToEnd/logcompaction/TestLogCompactionService.java
Outdated
Show resolved
Hide resolved
- repushOrchestrator creation failure exception - compactStore() compaction failure exception
...nal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestHybrid.java
Outdated
Show resolved
Hide resolved
- dummy config values for new repush configs - decrease latch countdown wait time
- change RepushOrchestrator initialisation in VeniceHelixAdmin & TestHybrid
- RepushJobResponse::storeName -> ControllerResponse::name - set response error
Summary
How was this PR tested?
unit test: filter all stores on a cluster for compaction-ready stores
Does this PR introduce any user-facing changes?