From 10b1eddafbeefe4e71c5f0d16241b00c8c00dc2b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 16 Feb 2018 07:30:15 -0500 Subject: [PATCH] Fix test concurrent remote connection updates This test has a race condition. The action listener used to listen for connections has a guard against being executed twice. However, this listener can be executed twice. After on success is invoked the test starts to tear down. At this point, the threads the test forked will terminate and the remote cluster connection will be closed. However, a thread forked to the management thread pool by the remote cluster connection can still be executing and try to continue connecting. This thread will be cancelled when the remote cluster connection is closed and this leads to the action listener being invoked again. To address this, we explicitly check that the reason that on failure was invoked was cancellation, and we assert that the listener was already previously invoked. Interestingly, this issue has always been present yet a recent change (#28667) exposed errors that occur on tasks submitted to the thread pool and were silently being lost. Relates #28695 --- .../RemoteClusterConnectionTests.java | 34 ++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index cbf558a7720cc..05281721e3125 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -557,7 +557,6 @@ public void onNodeDisconnected(DiscoveryNode node) { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/28695") public void testTriggerUpdatesConcurrently() throws IOException, InterruptedException { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); @@ -591,17 +590,28 @@ public void run() { CountDownLatch latch = new CountDownLatch(numConnectionAttempts); for (int i = 0; i < numConnectionAttempts; i++) { AtomicBoolean executed = new AtomicBoolean(false); - ActionListener listener = ActionListener.wrap(x -> { - assertTrue(executed.compareAndSet(false, true)); - latch.countDown();}, x -> { - assertTrue(executed.compareAndSet(false, true)); - latch.countDown(); - if (x instanceof RejectedExecutionException) { - // that's fine - } else { - throw new AssertionError(x); - } - }); + ActionListener listener = ActionListener.wrap( + x -> { + assertTrue(executed.compareAndSet(false, true)); + latch.countDown();}, + x -> { + /* + * This can occur on a thread submitted to the thread pool while we are closing the + * remote cluster connection at the end of the test. + */ + if (x instanceof CancellableThreads.ExecutionCancelledException) { + // we should already be shutting down + assertTrue(executed.get()); + return; + } + + assertTrue(executed.compareAndSet(false, true)); + latch.countDown(); + + if (!(x instanceof RejectedExecutionException)) { + throw new AssertionError(x); + } + }); connection.updateSeedNodes(seedNodes, listener); } latch.await();