-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce global checkpoint listeners #32696
Introduce global checkpoint listeners #32696
Conversation
This commit introduces the ability for global checkpoint listeners to be registered at the shard level. These listeners are notified when the global checkpoint is updated, and also when the shard closes. To encapsulate these listeners, we introduce a shard-level component that handles synchronization of notification and modifications to the collection of listeners.
Pinging @elastic/es-distributed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jasontedor . I think we should change the API a bit (see my comment). I also wonder if you considered sharing code between this component and RefreshListeners which is very similar in nature. I say consider as it may very well just end up with generic/boxing hell (as the underlying primitives are different).
* | ||
* @param listener the listener | ||
*/ | ||
synchronized void add(final GlobalCheckpointListener listener) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should have parameter that indicates the global checkpoint last sampled by the component trying to register the listener. We can then immediately call the listener if the last global checkpoint this component was notified about (needs to be captured) is higher. It think this would help avoiding race conditions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my POC I was doing this on the API layer (transport layer) where we have a request with the last known global checkpoint indeed but I agree it makes sense to move that to here.
@bleskes It was the boxing/raw generics that I wanted to avoid indeed. |
/** | ||
* Callback when the global checkpoint is updated or the shard is closed. If the shard is closed, the value of the global checkpoint | ||
* will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null. If the | ||
* global checkpoint is updated, the exception will be null. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I wonder if we should have an onFailure method here for all kind of failures and send the IndexShardClosedException down that route. The down side is of course that people wouldn't be able to pass a method references, but the method won't need to start with if (e != null) etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to use a functional interface for enabling the use of lambda expressions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx @jasontedor . I left some more comments.
*/ | ||
synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener) { | ||
if (closed) { | ||
throw new IllegalStateException("can not listen for global checkpoint changes on a closed shard [" + shardId + "]"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we throw an AlreadyClosedException like everywhere else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scratch that. I think we should be consistent and pass IndexShardClosedException
to the listener in that case (like it was registered and then we were closed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 50a9a6c.
|
||
/** | ||
* Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the | ||
* listener will fire immediately on the calling thread. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the code runs the listener via the executor, which isn't inline with what the comment says. Which one do you intend to hold?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the doc went out of sync with the intent. I pushed 22d13a8.
*/ | ||
void globalCheckpointUpdated(final long globalCheckpoint) { | ||
assert globalCheckpoint >= NO_OPS_PERFORMED; | ||
lastKnownGlobalCheckpoint = globalCheckpoint; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this has to be synchronized to avoid race conditions with add. Say an add method already sampled the lastKnownGlobalCheckpoint
and concluded that it has to add a listener. It then goes on to update the listeners
array but haven't done so yet (it's still null). The notifyListeners
is called here and it runs the listeners != null
test before the add method made it non null. It that case I think we miss the listener.
I may be missing something here, but my point is that this is too complex IMO. I don't see why we can't just make this a simple fully synchronized method. The listeners are call on executor anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. I pushed 22d13a8.
…listeners * elastic/master: (58 commits) [ML] Partition-wise maximum scores (elastic#32748) [DOCS] XContentBuilder#bytes method removed, using BytesReference.bytes(docBuilder) (elastic#32771) HLRC: migration get assistance API (elastic#32744) Add a task to run forbiddenapis using cli (elastic#32076) [Kerberos] Add debug log statement for exceptions (elastic#32663) Make x-pack core pull transport-nio (elastic#32757) Painless: Clean Up Whitelist Names (elastic#32791) Cat apis: Fix index creation time to use strict date format (elastic#32510) Clear Job#finished_time when it is opened (elastic#32605) (elastic#32755) Test: Only sniff host metadata for node_selectors (elastic#32750) Update scripted metric docs to use `state` variable (elastic#32695) Painless: Clean up PainlessCast (elastic#32754) [TEST] Certificate NONE not allowed in FIPS JVM (elastic#32753) [ML] Refactor ProcessCtrl into Autodetect and Normalizer builders (elastic#32720) Access build tools resources (elastic#32201) Tests: Disable rolling upgrade tests with system key on fips JVM (elastic#32775) HLRC: Ban LoggingDeprecationHandler (elastic#32756) Fix test reproducability in AbstractBuilderTestCase setup (elastic#32403) Only require java<version>_home env var if needed Tests: Muted ScriptDocValuesDatesTests.testJodaTimeBwc ...
@ywelsch Would you take over reviewing this PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left an initial question
void globalCheckpointUpdated(final long globalCheckpoint) { | ||
assert globalCheckpoint >= NO_OPS_PERFORMED; | ||
synchronized (this) { | ||
lastKnownGlobalCheckpoint = globalCheckpoint; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the add method assumes that the global checkpoint is always strictly increasing. Add an assertion here that this is so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 48089fb.
final List<GlobalCheckpointListener> currentListeners; | ||
synchronized (this) { | ||
currentListeners = listeners; | ||
listeners = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks as if the listeners are only notified once and then need to reregister if they want more events? I can see why this kind of behavior makes sense for the refresh listeners, with refresh being an expensive operation that ensures that all events registered before the refresh will now see the changes they're waiting for. With global checkpoints, it's less clear to me, as they can be potentially updated many many times per second, so wouldn't you want to stay registered to receive events. If not, will this lead to a storm of reregister events? I think I need to better understand the integration point here, i.e., how this API will be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ywelsch See #32651. The intended usage is in CCR where a remote cluster will, when the remote cluster is fully caught up, (remotely) attach a single-use listener to the local cluster for the next global checkpoint change. When the global checkpoint is updated, the listener will be invoked which will return a response to the remote cluster that will act as letting the remote cluster know that there are now additional changes to be fetched.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, given this behavior, I wonder if we can make the listener interface simpler (same as Boaz's concern). As the listeners are only notified once and then need to reregister if they want more events, I wonder if it's simpler to just signal an UNASSIGNED_SEQ_NO (or the lastKnownGlobalCheckpoint) on a closing and then have the caller fail on a repeated call to the add method (by throwing directly the exception in that method, not relaying it to the listener). This means that the listener can remain a functional interface (but just a LongConsumer). WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ywelsch Personally I do not buy the argument that
@Override
protected void asyncShardOperation(
final Request request, final ShardId shardId, final ActionListener<Response> listener) throws IOException {
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.id());
indexShard.addGlobalCheckpointListener(
request.getGlobalCheckpoint(),
(g, e) -> {
if (g != UNASSIGNED_SEQ_NO) {
listener.onResponse(new Response(g));
} else {
listener.onFailure(e);
}
});
}
is less clean than
@Override
protected void asyncShardOperation(
final Request request, final ShardId shardId, final ActionListener<Response> listener) throws IOException {
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.id());
indexShard.addGlobalCheckpointListener(
request.getGlobalCheckpoint(),
g -> {
if (g != UNASSIGNED_SEQ_NO) {
listener.onResponse(new Response(g));
} else {
listener.onFailure(new IndexShardClosedException(shardId));
}
});
}
We still have to have a check in one form or the other whether or not closing has been signaled to us, so there's always going to be an if
check. At least, that is what I read the comment from @bleskes as arguing:
The down side is of course that people wouldn't be able to pass a method references, but the method won't need to start with if (e != null) etc.
In fact, I would argue the approach I have taken is cleaner as it's the shard telling us that we are closed rather than it being signaled indirectly through the value of the global checkpoint passed in the callback. Sure the actual interface is simpler but I prefer the explicit approach. So I think we should either stick with what I have, or have an onClosed
callback and lose the ability for the interface to be a functional interface.
Regarding throwing on attempting to register a listener on a closed shard, that was my preferred approach too but @bleskes thought otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There also still the option of adding an onClosed method to the interface with a default NOOP implementation. It will remain a functional interface, and if most tests don't care about the shard closed case, they can treat it like a functional interface. I'll leave the decision to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would not be a functional interface for the purposes of the production use-case for this that I have in mind (we have to handle the shard closed event). Thanks, I will leave as-is.
@ywelsch is taking over the review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left two smaller questions around testing and one ask around simplifying synchronization. Looks good otherwise
* Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the | ||
* listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. If the | ||
* shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global | ||
* checkpoint listeners. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you mention here that the listener will be deregistered on notification?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 5b63507.
notifyListeners(globalCheckpoint, null); | ||
} | ||
|
||
private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for simplicity, let's move this under the mutex everywhere (and add assert Thread.holdsLock(this)
to the beginning of the method). It looks like whenever we call this method, we already go under a mutex for a quick operation before this. In case noone is using the listener functionality, this will therefore amount to the same overhead. In case this infrastructure will be used, listeners should not fly in by the millions in a second, so listeners will mostly be null and there should be no overhead. Let's optimize this in the future if we see any issue. I was scratching my head for a bit to check if concurrency was correct here (and I believe it is), but it's not worth the complexity imho.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I pushed 88dee76.
assertThat(count.get(), equalTo(1)); | ||
} | ||
|
||
public void testConcurrency() throws BrokenBarrierException, InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also add closing to this test? So that each listener gets notified exactly once, even under concurrent closing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, this caught an issue. I pushed 9533679.
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger); | ||
final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED); | ||
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.get()); | ||
final CyclicBarrier barrier = new CyclicBarrier(3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why 3 here? How do we ensure we end up running every thread to completion? (I have a hard time counting the barrier.await()
here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are three threads. The thread repeatedly firing updates, the thread repeatedly adding listeners, and the main test thread. We use the barrier to synchronize their start (they all wait on the barrier, then go), and to signal when the updating and adding thread are done (they all wait on the barrier again). So within the updating thread we have:
wait on barrier
do the updating loop
wait on barrier
while within the adding thread we have
wait on barrier
do the adding loop
wait on barrier
and the main test thread does two waits
wait on barrier
wait on barrier
The first wait corresponds to the top wait of the updating and adding threads, and the second wait corresponds to the bottom wait of the updating and adding threads.
Does that help?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it is best to look at it like this:
Updating thread:
wait on barrier
do the updating loop
wait on barrier
Adding thread:
wait on barrier
do the adding loop
wait on barrier
Main test thread:
wait on barrier
no-op
wait on barrier
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, thanks for the explanation, makes sense. Can you add a comment to that effect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ywelsch This is ready for another round from you. |
…listeners * elastic/master: Watcher: Remove unused hipchat render method (elastic#32211) Watcher: Remove extraneous auth classes (elastic#32300) Watcher: migrate PagerDuty v1 events API to v2 API (elastic#32285) [TEST] Select free port for Minio (elastic#32837) MINOR: Remove `IndexTemplateFilter` (elastic#32841) Core: Add java time version of rounding classes (elastic#32641) Aggregations/HL Rest client fix: missing scores (elastic#32774) HLRC: Add Delete License API (elastic#32586) INGEST: Create Index Before Pipeline Execute (elastic#32786) Fix NOOP bulk updates (elastic#32819) Remove client connections from TcpTransport (elastic#31886) Increase logging testRetentionPolicyChangeDuringRecovery AwaitsFix case-functions.sql-spec Mute security-cli tests in FIPS JVM (elastic#32812) SCRIPTING: Support BucketAggScript return null (elastic#32811) Unmute WildFly tests in FIPS JVM (elastic#32814) [TEST] Force a stop to save rollup state before continuing (elastic#32787) [test] disable packaging tests for suse boxes Mute IndicesRequestIT#testBulk [ML][DOCS] Refer to rules feature as custom rules (elastic#32785)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger); | ||
final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED); | ||
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.get()); | ||
final CyclicBarrier barrier = new CyclicBarrier(3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, thanks for the explanation, makes sense. Can you add a comment to that effect?
This commit introduces the ability for global checkpoint listeners to be registered at the shard level. These listeners are notified when the global checkpoint is updated, and also when the shard closes. To encapsulate these listeners, we introduce a shard-level component that handles synchronization of notification and modifications to the collection of listeners.
* elastic/master: Revert "cluster formation DSL - Gradle integration - part 2 (#32028)" (#32876) cluster formation DSL - Gradle integration - part 2 (#32028) Introduce global checkpoint listeners (#32696) Move connection profile into connection manager (#32858) [ML] Temporarily disabling rolling-upgrade tests Use generic AcknowledgedResponse instead of extended classes (#32859) [ML] Removing old per-partition normalization code (#32816) Use JDK 10 for 6.4 BWC builds (#32866) Removed flaky test. Looks like randomisation makes these assertions unreliable. [test] mute IndexShardTests.testDocStats Introduce the dissect library (#32297) Security: remove password hash bootstrap check (#32440) Move validation to server for put user requests (#32471) [ML] Add high level REST client docs for ML put job endpoint (#32843) Test: Fix forbidden uses in test framework (#32824) Painless: Change fqn_only to no_import (#32817) [test] mute testSearchWithSignificantTermsAgg Watcher: Remove unused hipchat render method (#32211) Watcher: Remove extraneous auth classes (#32300) Watcher: migrate PagerDuty v1 events API to v2 API (#32285)
* 6.x: (96 commits) Introduce global checkpoint listeners (#32696) Use JDK 10 for 6.4 BWC builds (#32866) Remove unused imports - follow up to removal of test in issue 32855 Removed flaky test. Looks like randomisation makes these assertions unreliable. This test is superfluous - it was added to address #32770 but it later turned out there was an existing test that just required a fix to provide the missing test coverage. [test] mute IndexShardTests.testDocStats Test: Fix forbidden uses in test framework (#32824) Security: remove password hash bootstrap check (#32440) Move validation to server for put user requests (#32471) [ML] Add high level REST client docs for ML put job endpoint (#32843) Painless: Change fqn_only to no_import (#32817) [test] mute testSearchWithSignificantTermsAgg Backport: CompletableContext class to avoid throwable (#32829) [TEST] Select free port for Minio (#32837) SCRIPTING: Support BucketAggScript return null (#32811) (#32833) HLRC: Add Delete License API (#32586) Aggregations/HL Rest client fix: missing scores (#32774) HLRC: migration get assistance API (#32744) Fix NOOP bulk updates (#32819) Increase logging testRetentionPolicyChangeDuringRecovery AwaitsFix case-functions.sql-spec ...
This commit introduces the ability for global checkpoint listeners to be registered at the shard level. These listeners are notified when the global checkpoint is updated, and also when the shard closes. To encapsulate these listeners, we introduce a shard-level component that handles synchronization of notification and modifications to the collection of listeners.
Relates #32651