Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove client connections from TcpTransport #31886

Merged
merged 44 commits into from
Aug 13, 2018
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
6b6981f
Work on connection manager
Tim-Brooks Jul 6, 2018
744375e
WIP
Tim-Brooks Jul 6, 2018
002b371
WIP
Tim-Brooks Jul 7, 2018
06a0093
At least fix checkstyle
Tim-Brooks Jul 7, 2018
f9e7080
Fix test
Tim-Brooks Jul 9, 2018
67bc0b2
Add comment
Tim-Brooks Jul 9, 2018
31e3acf
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Jul 16, 2018
17a334a
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Jul 23, 2018
c3544ad
Remove additional listeners
Tim-Brooks Jul 23, 2018
fc886bf
Work on fixing tests
Tim-Brooks Jul 23, 2018
f699321
Remove unused
Tim-Brooks Jul 24, 2018
03acc19
Fix checkstyle
Tim-Brooks Jul 24, 2018
796a728
Work on test infra
Tim-Brooks Jul 24, 2018
8005de4
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Jul 24, 2018
43dd3ad
Work on tests
Tim-Brooks Jul 25, 2018
acb743a
Remove methods
Tim-Brooks Jul 25, 2018
2eacbfa
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Jul 25, 2018
ca9fe7c
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Jul 25, 2018
aaf2d87
Use new behaviors
Tim-Brooks Jul 25, 2018
f5778c0
Fix some tests
Tim-Brooks Jul 25, 2018
2302fa1
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Jul 26, 2018
95a7b85
Working on fixing tests
Tim-Brooks Jul 26, 2018
2c5badd
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Jul 26, 2018
5106852
Work on fixing tests
Tim-Brooks Jul 27, 2018
af37b63
Fix tests
Tim-Brooks Jul 27, 2018
3e4b898
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Jul 27, 2018
0436eb0
Work on transport client nodes service tests
Tim-Brooks Jul 27, 2018
f429f9b
fix tests and temporarily mute test
Tim-Brooks Jul 28, 2018
28eafbb
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Jul 30, 2018
c45c5ef
Fix tests
Tim-Brooks Aug 1, 2018
2e8a026
Fix checkstyle
Tim-Brooks Aug 1, 2018
630cfaa
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Aug 1, 2018
3ee4743
Work on cleaning up
Tim-Brooks Aug 1, 2018
293a572
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Aug 1, 2018
2789d0d
Make connection methods required
Tim-Brooks Aug 1, 2018
617521f
Work on tests
Tim-Brooks Aug 1, 2018
76d429c
Fix tests
Tim-Brooks Aug 2, 2018
8c26686
java doc
Tim-Brooks Aug 2, 2018
071490a
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Aug 2, 2018
44f6e1d
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Aug 2, 2018
4c26165
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Aug 6, 2018
453b853
Merge remote-tracking branch 'upstream/master' into new_cm
Tim-Brooks Aug 7, 2018
d18be8c
Changes from review
Tim-Brooks Aug 13, 2018
f259bae
Changes docs
Tim-Brooks Aug 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,12 @@ protected Netty4TcpServerChannel bind(String name, InetSocketAddress address) {
return esChannel;
}

ScheduledPing getPing() {
return scheduledPing;
long successfulPingCount() {
return successfulPings.count();
}

long failedPingCount() {
return failedPings.count();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,13 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
Expand Down Expand Up @@ -83,22 +80,19 @@ public void testScheduledPing() throws Exception {
serviceB.connectToNode(nodeA);

assertBusy(() -> {
assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(100L));
assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(100L));
assertThat(nettyA.successfulPingCount(), greaterThan(100L));
assertThat(nettyB.successfulPingCount(), greaterThan(100L));
});
assertThat(nettyA.getPing().getFailedPings(), equalTo(0L));
assertThat(nettyB.getPing().getFailedPings(), equalTo(0L));
assertThat(nettyA.failedPingCount(), equalTo(0L));
assertThat(nettyB.failedPingCount(), equalTo(0L));

serviceA.registerRequestHandler("internal:sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
new TransportRequestHandler<TransportRequest.Empty>() {
@Override
public void messageReceived(TransportRequest.Empty request, TransportChannel channel, Task task) {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY);
} catch (IOException e) {
logger.error("Unexpected failure", e);
fail(e.getMessage());
}
(request, channel, task) -> {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY);
} catch (IOException e) {
logger.error("Unexpected failure", e);
fail(e.getMessage());
}
});

Expand Down Expand Up @@ -130,11 +124,11 @@ public void handleException(TransportException exp) {
}

assertBusy(() -> {
assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(200L));
assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(200L));
assertThat(nettyA.successfulPingCount(), greaterThan(200L));
assertThat(nettyB.successfulPingCount(), greaterThan(200L));
});
assertThat(nettyA.getPing().getFailedPings(), equalTo(0L));
assertThat(nettyB.getPing().getFailedPings(), equalTo(0L));
assertThat(nettyA.failedPingCount(), equalTo(0L));
assertThat(nettyB.failedPingCount(), equalTo(0L));

Releasables.close(serviceA, serviceB);
terminate(threadPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void onFailure(Exception e) {
}

@Override
protected void doRun() throws Exception {
protected void doRun() {
try (Releasable ignored = nodeLocks.acquire(node)) {
validateAndConnectIfNeeded(node);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.transport;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.concurrent.CompletableContext;


/**
* Abstract Transport.Connection that provides common close logic.
*/
public abstract class CloseableConnection implements Transport.Connection {

private final CompletableContext<Void> closeContext = new CompletableContext<>();

@Override
public void addCloseListener(ActionListener<Void> listener) {
closeContext.addListener(ActionListener.toBiConsumer(listener));
}

@Override
public boolean isClosed() {
return closeContext.isDone();
}

@Override
public void close() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should protect from double closing here.

closeContext.complete(null);
}
}
Loading