diff --git a/src/main/java/org/opensearch/sdk/ExtensionsRunner.java b/src/main/java/org/opensearch/sdk/ExtensionsRunner.java
index 062deef2..0627ccfd 100644
--- a/src/main/java/org/opensearch/sdk/ExtensionsRunner.java
+++ b/src/main/java/org/opensearch/sdk/ExtensionsRunner.java
@@ -12,20 +12,14 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.opensearch.Version;
-import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.node.DiscoveryNode;
-import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.NamedWriteableRegistryParseRequest;
import org.opensearch.extensions.OpenSearchRequest;
import org.opensearch.extensions.rest.ExtensionRestRequest;
import org.opensearch.extensions.rest.RegisterRestActionsRequest;
import org.opensearch.extensions.settings.RegisterCustomSettingsRequest;
-import org.opensearch.common.network.NetworkModule;
-import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
-import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.discovery.InitializeExtensionsRequest;
import org.opensearch.extensions.ExtensionActionListenerOnFailureRequest;
import org.opensearch.extensions.DiscoveryExtension;
@@ -35,12 +29,7 @@
import org.opensearch.extensions.ExtensionRequest;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.index.IndicesModuleRequest;
-import org.opensearch.indices.IndicesModule;
-import org.opensearch.indices.breaker.CircuitBreakerService;
-import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.rest.RestHandler.Route;
-import org.opensearch.transport.netty4.Netty4Transport;
-import org.opensearch.transport.SharedGroupFactory;
import org.opensearch.sdk.handlers.ActionListenerOnFailureResponseHandler;
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
@@ -54,11 +43,7 @@
import org.opensearch.sdk.handlers.UpdateSettingsRequestHandler;
import org.opensearch.sdk.handlers.ExtensionStringResponseHandler;
import org.opensearch.sdk.handlers.OpensearchRequestHandler;
-import org.opensearch.search.SearchModule;
import org.opensearch.threadpool.ThreadPool;
-import org.opensearch.transport.ClusterConnectionManager;
-import org.opensearch.transport.ConnectionManager;
-import org.opensearch.transport.TransportInterceptor;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportSettings;
@@ -69,14 +54,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import java.util.function.Consumer;
-import static java.util.Collections.emptySet;
-import static org.opensearch.common.UUIDs.randomBase64UUID;
-
/**
* The primary class to run an extension.
*
@@ -109,8 +88,6 @@ public class ExtensionsRunner {
* This field is initialized by a call from {@link ExtensionsInitRequestHandler}.
*/
public final Settings settings;
- private final TransportInterceptor NOOP_TRANSPORT_INTERCEPTOR = new TransportInterceptor() {
- };
private ExtensionNamedWriteableRegistry namedWriteableRegistryApi = new ExtensionNamedWriteableRegistry();
private ExtensionsInitRequestHandler extensionsInitRequestHandler = new ExtensionsInitRequestHandler();
private OpensearchRequestHandler opensearchRequestHandler = new OpensearchRequestHandler();
@@ -118,6 +95,7 @@ public class ExtensionsRunner {
private ExtensionsIndicesModuleNameRequestHandler extensionsIndicesModuleNameRequestHandler =
new ExtensionsIndicesModuleNameRequestHandler();
private ExtensionsRestRequestHandler extensionsRestRequestHandler = new ExtensionsRestRequestHandler();
+ private NettyTransport nettyTransport = new NettyTransport();
/*
* TODO: expose an interface for extension to register actions
@@ -170,7 +148,7 @@ private ExtensionsRunner(Extension extension) throws IOException {
// save custom settings
this.customSettings = extension.getSettings();
// initialize the transport service
- this.initializeExtensionTransportService(this.getSettings());
+ nettyTransport.initializeExtensionTransportService(this.getSettings(), this);
// start listening on configured port and wait for connection from OpenSearch
this.startActionListener(0);
}
@@ -213,83 +191,6 @@ DiscoveryNode getOpensearchNode() {
return opensearchNode;
}
- /**
- * Initializes a Netty4Transport object. This object will be wrapped in a {@link TransportService} object.
- *
- * @param settings The transport settings to configure.
- * @param threadPool A thread pool to use.
- * @return The configured Netty4Transport object.
- */
- public Netty4Transport getNetty4Transport(Settings settings, ThreadPool threadPool) {
- NetworkService networkService = new NetworkService(Collections.emptyList());
- PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
- IndicesModule indicesModule = new IndicesModule(Collections.emptyList());
- SearchModule searchModule = new SearchModule(settings, Collections.emptyList());
-
- List namedWriteables = Stream.of(
- NetworkModule.getNamedWriteables().stream(),
- indicesModule.getNamedWriteables().stream(),
- searchModule.getNamedWriteables().stream(),
- null,
- ClusterModule.getNamedWriteables().stream()
- ).flatMap(Function.identity()).collect(Collectors.toList());
-
- final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);
-
- final CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
-
- Netty4Transport transport = new Netty4Transport(
- settings,
- Version.CURRENT,
- threadPool,
- networkService,
- pageCacheRecycler,
- namedWriteableRegistry,
- circuitBreakerService,
- new SharedGroupFactory(settings)
- );
-
- return transport;
- }
-
- /**
- * Initializes the TransportService object for this extension. This object will control communication between the extension and OpenSearch.
- *
- * @param settings The transport settings to configure.
- * @return The initialized TransportService object.
- */
- public TransportService initializeExtensionTransportService(Settings settings) {
-
- ThreadPool threadPool = new ThreadPool(settings);
-
- Netty4Transport transport = getNetty4Transport(settings, threadPool);
-
- final ConnectionManager connectionManager = new ClusterConnectionManager(settings, transport);
-
- // Stop any existing transport service
- if (extensionTransportService != null) {
- extensionTransportService.stop();
- }
-
- // create transport service
- extensionTransportService = new TransportService(
- settings,
- transport,
- threadPool,
- NOOP_TRANSPORT_INTERCEPTOR,
- boundAddress -> DiscoveryNode.createLocal(
- Settings.builder().put(NODE_NAME_SETTING, settings.get(NODE_NAME_SETTING)).build(),
- boundAddress.publishAddress(),
- randomBase64UUID()
- ),
- null,
- emptySet(),
- connectionManager
- );
- startTransportService(extensionTransportService);
- return extensionTransportService;
- }
-
/**
* Starts a TransportService.
*
@@ -591,7 +492,7 @@ public static void main(String[] args) throws IOException {
ExtensionsRunner extensionsRunner = new ExtensionsRunner();
// initialize the transport service
- extensionsRunner.initializeExtensionTransportService(extensionsRunner.getSettings());
+ extensionsRunner.nettyTransport.initializeExtensionTransportService(extensionsRunner.getSettings(), extensionsRunner);
// start listening on configured port and wait for connection from OpenSearch
extensionsRunner.startActionListener(0);
}
diff --git a/src/main/java/org/opensearch/sdk/NettyTransport.java b/src/main/java/org/opensearch/sdk/NettyTransport.java
new file mode 100644
index 00000000..634dc5e7
--- /dev/null
+++ b/src/main/java/org/opensearch/sdk/NettyTransport.java
@@ -0,0 +1,120 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+package org.opensearch.sdk;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.opensearch.Version;
+import org.opensearch.cluster.ClusterModule;
+import org.opensearch.cluster.node.DiscoveryNode;
+import org.opensearch.common.io.stream.NamedWriteableRegistry;
+import org.opensearch.common.network.NetworkModule;
+import org.opensearch.common.network.NetworkService;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.common.util.PageCacheRecycler;
+import org.opensearch.indices.IndicesModule;
+import org.opensearch.indices.breaker.CircuitBreakerService;
+import org.opensearch.indices.breaker.NoneCircuitBreakerService;
+import org.opensearch.search.SearchModule;
+import org.opensearch.threadpool.ThreadPool;
+import org.opensearch.transport.SharedGroupFactory;
+import org.opensearch.transport.TransportInterceptor;
+import org.opensearch.transport.TransportService;
+import org.opensearch.transport.netty4.Netty4Transport;
+
+import static java.util.Collections.emptySet;
+import static org.opensearch.common.UUIDs.randomBase64UUID;
+
+/**
+ * This class initializes a Netty4Transport object and control communication between the extension and OpenSearch.
+ */
+
+public class NettyTransport {
+ private static final String NODE_NAME_SETTING = "node.name";
+ private final TransportInterceptor NOOP_TRANSPORT_INTERCEPTOR = new TransportInterceptor() {
+ };
+
+ /**
+ * Initializes a Netty4Transport object. This object will be wrapped in a {@link TransportService} object.
+ *
+ * @param settings The transport settings to configure.
+ * @param threadPool A thread pool to use.
+ * @return The configured Netty4Transport object.
+ */
+ public Netty4Transport getNetty4Transport(Settings settings, ThreadPool threadPool) {
+ NetworkService networkService = new NetworkService(Collections.emptyList());
+ PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
+ IndicesModule indicesModule = new IndicesModule(Collections.emptyList());
+ SearchModule searchModule = new SearchModule(settings, Collections.emptyList());
+
+ List namedWriteables = Stream.of(
+ NetworkModule.getNamedWriteables().stream(),
+ indicesModule.getNamedWriteables().stream(),
+ searchModule.getNamedWriteables().stream(),
+ ClusterModule.getNamedWriteables().stream()
+ ).flatMap(Function.identity()).collect(Collectors.toList());
+
+ final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);
+
+ final CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
+
+ Netty4Transport transport = new Netty4Transport(
+ settings,
+ Version.CURRENT,
+ threadPool,
+ networkService,
+ pageCacheRecycler,
+ namedWriteableRegistry,
+ circuitBreakerService,
+ new SharedGroupFactory(settings)
+ );
+
+ return transport;
+ }
+
+ /**
+ * Initializes the TransportService object for this extension. This object will control communication between the extension and OpenSearch.
+ *
+ * @param settings The transport settings to configure.
+ * @param extensionsRunner method to call
+ * @return The initialized TransportService object.
+ */
+ public TransportService initializeExtensionTransportService(Settings settings, ExtensionsRunner extensionsRunner) {
+
+ ThreadPool threadPool = new ThreadPool(settings);
+
+ Netty4Transport transport = getNetty4Transport(settings, threadPool);
+
+ // Stop any existing transport service
+ if (extensionsRunner.extensionTransportService != null) {
+ extensionsRunner.extensionTransportService.stop();
+ }
+
+ // create transport service
+ extensionsRunner.extensionTransportService = new TransportService(
+ settings,
+ transport,
+ threadPool,
+ NOOP_TRANSPORT_INTERCEPTOR,
+ boundAddress -> DiscoveryNode.createLocal(
+ Settings.builder().put(NODE_NAME_SETTING, settings.get(NODE_NAME_SETTING)).build(),
+ boundAddress.publishAddress(),
+ randomBase64UUID()
+ ),
+ null,
+ emptySet()
+ );
+ extensionsRunner.startTransportService(extensionsRunner.extensionTransportService);
+ return extensionsRunner.extensionTransportService;
+ }
+
+}
diff --git a/src/test/java/org/opensearch/sdk/TestNetty4Transport.java b/src/test/java/org/opensearch/sdk/TestNetty4Transport.java
index c3ef71ae..7782348b 100644
--- a/src/test/java/org/opensearch/sdk/TestNetty4Transport.java
+++ b/src/test/java/org/opensearch/sdk/TestNetty4Transport.java
@@ -24,12 +24,11 @@
public class TestNetty4Transport extends OpenSearchTestCase {
- private ExtensionsRunner extensionsRunner;
private ThreadPool threadPool;
+ private NettyTransport nettyTransport = new NettyTransport();
@BeforeEach
public void setUp() throws IOException {
- this.extensionsRunner = new ExtensionsRunner();
this.threadPool = new TestThreadPool("test");
}
@@ -44,7 +43,7 @@ public void testNettyCanBindToMultiplePorts() throws IOException {
.put("transport.profiles.client1.port", 0)
.build();
- Netty4Transport transport = extensionsRunner.getNetty4Transport(settings, threadPool);
+ Netty4Transport transport = nettyTransport.getNetty4Transport(settings, threadPool);
try {
startNetty4Transport(transport);
@@ -67,7 +66,7 @@ public void testDefaultProfileInheritsFomStandardSettings() throws IOException {
.put("transport.profiles.client1.port", 0)
.build();
- Netty4Transport transport = extensionsRunner.getNetty4Transport(settings, threadPool);
+ Netty4Transport transport = nettyTransport.getNetty4Transport(settings, threadPool);
try {
startNetty4Transport(transport);
@@ -94,7 +93,7 @@ public void testThatProfileWithoutPortFails() throws IOException {
// attempt creating netty object with invalid settings
IllegalStateException ex = expectThrows(
IllegalStateException.class,
- () -> extensionsRunner.getNetty4Transport(settings, threadPool)
+ () -> nettyTransport.getNetty4Transport(settings, threadPool)
);
assertEquals("profile [no_port] has no port configured", ex.getMessage());
} finally {
@@ -112,7 +111,7 @@ public void testDefaultProfilePortOverridesGeneralConfiguration() throws IOExcep
.put("transport.profiles.default.port", 0) // default port configuration will overwrite attempt
.build();
- Netty4Transport transport = extensionsRunner.getNetty4Transport(settings, threadPool);
+ Netty4Transport transport = nettyTransport.getNetty4Transport(settings, threadPool);
try {
startNetty4Transport(transport);
diff --git a/src/test/java/org/opensearch/sdk/TransportCommunicationIT.java b/src/test/java/org/opensearch/sdk/TransportCommunicationIT.java
index 787eaae9..8cbfe284 100644
--- a/src/test/java/org/opensearch/sdk/TransportCommunicationIT.java
+++ b/src/test/java/org/opensearch/sdk/TransportCommunicationIT.java
@@ -35,6 +35,7 @@ public class TransportCommunicationIT extends OpenSearchIntegTestCase {
private final int port = 7777;
private final String host = "127.0.0.1";
private volatile String clientResult;
+ private NettyTransport nettyTransport = new NettyTransport();
@Override
@BeforeEach
@@ -51,9 +52,8 @@ public void setUp() {
@Test
public void testSocketSetup() throws IOException {
- ExtensionsRunner extensionsRunner = new ExtensionsRunner();
ThreadPool threadPool = new TestThreadPool("test");
- Netty4Transport transport = extensionsRunner.getNetty4Transport(settings, threadPool);
+ Netty4Transport transport = nettyTransport.getNetty4Transport(settings, threadPool);
// start netty transport and ensure that address info is exposed
try {
@@ -147,7 +147,7 @@ private void startTransportandClient(Settings settings, Thread client) throws IO
// retrieve transport service
ExtensionsRunner extensionsRunner = new ExtensionsRunner();
// start transport service
- TransportService transportService = extensionsRunner.initializeExtensionTransportService(settings);
+ TransportService transportService = nettyTransport.initializeExtensionTransportService(settings, extensionsRunner);
assertEquals(Lifecycle.State.STARTED, transportService.lifecycleState());