Skip to content

Commit

Permalink
Fixing after merging to main
Browse files Browse the repository at this point in the history
Signed-off-by: Vacha Shah <[email protected]>
  • Loading branch information
VachaShah committed Apr 11, 2024
1 parent ed00f89 commit 324c58f
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,6 @@ public class FeatureFlags {
*/
public static final String WRITEABLE_REMOTE_INDEX = "opensearch.experimental.feature.writeable_remote_index.enabled";

/**
* Gates the optimization to enable bloom filters for doc id lookup.
*/
public static final String DOC_ID_FUZZY_SET = "opensearch.experimental.optimize_doc_id_lookup.fuzzy_set.enabled";

/**
* Gates the functionality of integrating protobuf within search API and node-to-node communication.
*/
Expand Down Expand Up @@ -168,5 +163,4 @@ public static boolean isEnabled(Setting<Boolean> featureFlag) {
return featureFlag.getDefault(Settings.EMPTY);
}
}

}
1 change: 0 additions & 1 deletion server/src/main/java/org/opensearch/search/SearchHits.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,5 +342,4 @@ private static Relation parseRelation(String relation) {
throw new IllegalArgumentException("invalid total hits relation: " + relation);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down
42 changes: 27 additions & 15 deletions server/src/main/java/org/opensearch/transport/InboundHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,35 @@ public class InboundHandler {
Tracer tracer
) {
this.threadPool = threadPool;
this.protocolMessageHandlers = Map.of(
NativeInboundMessage.NATIVE_PROTOCOL,
new NativeMessageHandler(
threadPool,
outboundHandler,
namedWriteableRegistry,
handshaker,
requestHandlers,
responseHandlers,
tracer,
keepAlive
)
);
if (FeatureFlags.isEnabled(FeatureFlags.PROTOBUF_SETTING)) {
this.protocolMessageHandlers.put(
this.protocolMessageHandlers = Map.of(
ProtobufInboundMessage.PROTOBUF_PROTOCOL,
new ProtobufMessageHandler(threadPool, responseHandlers)
new ProtobufMessageHandler(threadPool, responseHandlers),
NativeInboundMessage.NATIVE_PROTOCOL,
new NativeMessageHandler(
threadPool,
outboundHandler,
namedWriteableRegistry,
handshaker,
requestHandlers,
responseHandlers,
tracer,
keepAlive
)
);
} else {
this.protocolMessageHandlers = Map.of(
NativeInboundMessage.NATIVE_PROTOCOL,
new NativeMessageHandler(
threadPool,
outboundHandler,
namedWriteableRegistry,
handshaker,
requestHandlers,
responseHandlers,
tracer,
keepAlive
)
);
}
}
Expand Down
14 changes: 11 additions & 3 deletions server/src/main/java/org/opensearch/transport/InboundPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,15 @@ public InboundPipeline(
this.statsTracker = statsTracker;
this.decoder = decoder;
this.aggregator = aggregator;
this.protocolBytesHandlers = List.of(new NativeInboundBytesHandler(pending, decoder, aggregator, statsTracker));
this.messageHandler = messageHandler;
if (FeatureFlags.isEnabled(FeatureFlags.PROTOBUF_SETTING)) {
protocolBytesHandlers.add(new ProtobufInboundBytesHandler());
this.protocolBytesHandlers = List.of(
new ProtobufInboundBytesHandler(),
new NativeInboundBytesHandler(pending, decoder, aggregator, statsTracker)
);
} else {
this.protocolBytesHandlers = List.of(new NativeInboundBytesHandler(pending, decoder, aggregator, statsTracker));
}
this.messageHandler = messageHandler;
}

@Override
Expand Down Expand Up @@ -129,6 +133,10 @@ public void handleBytes(TcpChannel channel, ReleasableBytesReference reference)
}

public void doHandleBytes(TcpChannel channel, ReleasableBytesReference reference) throws IOException {
channel.getChannelStats().markAccessed(relativeTimeInMillis.getAsLong());
statsTracker.markBytesRead(reference.length());
pending.add(reference.retain());

// If we don't have a current handler, we should try to find one based on the protocol of the incoming bytes.
if (currentHandler == null) {
for (InboundBytesHandler handler : protocolBytesHandlers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
*/
public class ProtobufInboundBytesHandler implements InboundBytesHandler {

public void ProtobufInboundBytesHandler() {}
public ProtobufInboundBytesHandler() {}

@Override
public void doHandleBytes(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/** Protobuf transport protocol package. */
package org.opensearch.transport.protobufprotocol;
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ public class InboundHandlerTests extends OpenSearchTestCase {
private final Version version = Version.CURRENT;

private TaskManager taskManager;
private NamedWriteableRegistry namedWriteableRegistry;
private TransportHandshaker handshaker;
private TransportKeepAlive keepAlive;
private Transport.ResponseHandlers responseHandlers;
private Transport.RequestHandlers requestHandlers;
private InboundHandler handler;
Expand All @@ -105,8 +108,8 @@ public void sendMessage(BytesReference reference, ActionListener<Void> listener)
}
}
};
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
TransportHandshaker handshaker = new TransportHandshaker(version, threadPool, (n, c, r, v) -> {});
namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
handshaker = new TransportHandshaker(version, threadPool, (n, c, r, v) -> {});
outboundHandler = new OutboundHandler(
"node",
version,
Expand All @@ -115,7 +118,7 @@ public void sendMessage(BytesReference reference, ActionListener<Void> listener)
threadPool,
BigArrays.NON_RECYCLING_INSTANCE
);
TransportKeepAlive keepAlive = new TransportKeepAlive(threadPool, outboundHandler::sendBytes);
keepAlive = new TransportKeepAlive(threadPool, outboundHandler::sendBytes);
requestHandlers = new Transport.RequestHandlers();
responseHandlers = new Transport.ResponseHandlers();
handler = new InboundHandler(
Expand Down Expand Up @@ -251,6 +254,17 @@ public TestResponse read(StreamInput in) throws IOException {

@SuppressForbidden(reason = "manipulates system properties for testing")
public void testProtobufResponse() throws Exception {
System.setProperty(FeatureFlags.PROTOBUF_SETTING.getKey(), "true");
InboundHandler inboundHandler = new InboundHandler(
threadPool,
outboundHandler,
namedWriteableRegistry,
handshaker,
keepAlive,
requestHandlers,
responseHandlers,
NoopTracer.INSTANCE
);
String action = "test-request";
int headerSize = TcpHeader.headerSize(version);
AtomicReference<TestRequest> requestCaptor = new AtomicReference<>();
Expand Down Expand Up @@ -314,7 +328,7 @@ public QueryFetchSearchResult read(InputStream in) throws IOException {
Header requestHeader = new Header(fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version);
InboundMessage requestMessage = new InboundMessage(requestHeader, ReleasableBytesReference.wrap(requestContent), () -> {});
requestHeader.finishParsingHeader(requestMessage.openOrGetStreamInput());
handler.inboundMessage(channel, requestMessage);
inboundHandler.inboundMessage(channel, requestMessage);

TransportChannel transportChannel = channelCaptor.get();
assertEquals(Version.CURRENT, transportChannel.getVersion());
Expand All @@ -324,13 +338,12 @@ public QueryFetchSearchResult read(InputStream in) throws IOException {
QuerySearchResult queryResult = OutboundHandlerTests.createQuerySearchResult();
FetchSearchResult fetchResult = OutboundHandlerTests.createFetchSearchResult();
QueryFetchSearchResult response = new QueryFetchSearchResult(queryResult, fetchResult);
System.setProperty(FeatureFlags.PROTOBUF, "true");
transportChannel.sendResponse(response);

BytesReference fullResponseBytes = channel.getMessageCaptor().get();
byte[] incomingBytes = BytesReference.toBytes(fullResponseBytes.slice(3, fullResponseBytes.length() - 3));
ProtobufInboundMessage nodeToNodeMessage = new ProtobufInboundMessage(new ByteArrayInputStream(incomingBytes));
handler.inboundMessage(channel, nodeToNodeMessage);
inboundHandler.inboundMessage(channel, nodeToNodeMessage);
QueryFetchSearchResult result = responseCaptor.get();
assertNotNull(result);
assertEquals(queryResult.getMaxScore(), result.queryResult().getMaxScore(), 0.0);
Expand Down

0 comments on commit 324c58f

Please sign in to comment.