Skip to content

Commit

Permalink
Merge remote-tracking branch 'opensearch-main/2.x' into remote-checks…
Browse files Browse the repository at this point in the history
…um-2.x
  • Loading branch information
Himshikha Gupta committed Sep 3, 2024
2 parents 0978f7b + 67eceaa commit 9421646
Show file tree
Hide file tree
Showing 50 changed files with 1,387 additions and 163 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
- Support cancellation for cat shards and node stats API.([#13966](https://github.com/opensearch-project/OpenSearch/pull/13966))
- [Streaming Indexing] Introduce bulk HTTP API streaming flavor ([#15381](https://github.com/opensearch-project/OpenSearch/pull/15381))
- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124))
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
- [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343)))
- Add limit on number of processors for Ingest pipeline([#15460](https://github.com/opensearch-project/OpenSearch/pull/15465)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -969,9 +969,13 @@ public <T extends Writeable> void writeOptionalArray(@Nullable T[] array) throws
}

public void writeOptionalWriteable(@Nullable Writeable writeable) throws IOException {
writeOptionalWriteable((out, writable) -> writable.writeTo(out), writeable);
}

public <T extends Writeable> void writeOptionalWriteable(final Writer<T> writer, @Nullable T writeable) throws IOException {
if (writeable != null) {
writeBoolean(true);
writeable.writeTo(this);
writer.write(this, writeable);
} else {
writeBoolean(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.BlobStoreException;
Expand Down Expand Up @@ -424,6 +425,7 @@ public void finalizeSnapshot(
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
Expand All @@ -436,6 +438,7 @@ public void finalizeSnapshot(
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
listener
);
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -127,6 +128,7 @@ public void finalizeSnapshot(
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
super.finalizeSnapshot(
Expand All @@ -136,6 +138,7 @@ public void finalizeSnapshot(
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
listener
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public ClusterAllocationExplanation(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
shardRouting.writeTo(out);
out.writeOptionalWriteable(currentNode);
out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), currentNode);
out.writeOptionalWriteable(relocationTargetNode);
out.writeOptionalWriteable(clusterInfo);
shardAllocationDecision.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;

import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHALLOW_SNAPSHOT_V2;

/**
* Transport action for create snapshot operation
*
Expand All @@ -56,12 +60,15 @@
public class TransportCreateSnapshotAction extends TransportClusterManagerNodeAction<CreateSnapshotRequest, CreateSnapshotResponse> {
private final SnapshotsService snapshotsService;

private final RepositoriesService repositoriesService;

@Inject
public TransportCreateSnapshotAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
SnapshotsService snapshotsService,
RepositoriesService repositoriesService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
Expand All @@ -75,6 +82,7 @@ public TransportCreateSnapshotAction(
indexNameExpressionResolver
);
this.snapshotsService = snapshotsService;
this.repositoriesService = repositoriesService;
}

@Override
Expand Down Expand Up @@ -103,15 +111,22 @@ protected void clusterManagerOperation(
ClusterState state,
final ActionListener<CreateSnapshotResponse> listener
) {

if (state.nodes().getMinNodeVersion().before(SnapshotsService.NO_REPO_INITIALIZE_VERSION)) {
if (request.waitForCompletion()) {
snapshotsService.executeSnapshotLegacy(request, ActionListener.map(listener, CreateSnapshotResponse::new));
} else {
snapshotsService.createSnapshotLegacy(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
}

} else {
Repository repository = repositoriesService.repository(request.repository());
boolean isSnapshotV2 = SHALLOW_SNAPSHOT_V2.get(repository.getMetadata().settings());

if (request.waitForCompletion()) {
snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new));
} else if (isSnapshotV2) {
snapshotsService.executeSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
} else {
snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ public DiscoveryNode getNode() {

@Override
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
node.writeToWithAttribute(out);
}
}
14 changes: 12 additions & 2 deletions server/src/main/java/org/opensearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(stateUUID);
metadata.writeTo(out);
routingTable.writeTo(out);
nodes.writeTo(out);
nodes.writeToWithAttribute(out);
blocks.writeTo(out);
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
Expand Down Expand Up @@ -887,13 +887,23 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(toUuid);
out.writeLong(toVersion);
routingTable.writeTo(out);
nodes.writeTo(out);
nodesWriteToWithAttributes(nodes, out);
metadata.writeTo(out);
blocks.writeTo(out);
customs.writeTo(out);
out.writeVInt(minimumClusterManagerNodesOnPublishingClusterManager);
}

private void nodesWriteToWithAttributes(Diff<DiscoveryNodes> nodes, StreamOutput out) throws IOException {
DiscoveryNodes part = nodes.apply(null);
if (part != null) {
out.writeBoolean(true);
part.writeToWithAttribute(out);
} else {
out.writeBoolean(false);
}
}

@Override
public ClusterState apply(ClusterState state) {
Builder builder = new Builder(clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public Join(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
sourceNode.writeTo(out);
targetNode.writeTo(out);
sourceNode.writeToWithAttribute(out);
targetNode.writeToWithAttribute(out);
out.writeLong(term);
out.writeLong(lastAcceptedTerm);
out.writeLong(lastAcceptedVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public JoinRequest(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
sourceNode.writeToWithAttribute(out);
if (out.getVersion().onOrAfter(LegacyESVersion.V_7_7_0)) {
out.writeLong(minimumTerm);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public StartJoinRequest(StreamInput input) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
sourceNode.writeToWithAttribute(out);
out.writeLong(term);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,15 @@ public static TemplatesMetadata fromXContent(XContentParser parser) throws IOExc
String currentFieldName = parser.currentName();
if (currentFieldName == null) {
token = parser.nextToken();
if (token == XContentParser.Token.START_OBJECT) {
// move to the field name
token = parser.nextToken();
}
currentFieldName = parser.currentName();
}
if (currentFieldName != null) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.START_OBJECT) {
// move to the field name
token = parser.nextToken();
}
if (parser.currentName() != null && token != XContentParser.Token.END_OBJECT) {
do {
builder.put(IndexTemplateMetadata.Builder.fromXContent(parser, parser.currentName()));
}
} while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT);
}
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,27 @@ public DiscoveryNode(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
writeNodeDetails(out);
if (out.getVersion().onOrAfter(Version.V_2_17_0)) {
writeToUtil(out, false);
} else {
writeToUtil(out, true);
}
}

out.writeVInt(attributes.size());
for (Map.Entry<String, String> entry : attributes.entrySet()) {
out.writeString(entry.getKey());
out.writeString(entry.getValue());
public void writeToWithAttribute(StreamOutput out) throws IOException {
writeToUtil(out, true);
}

public void writeToUtil(StreamOutput out, boolean includeAllAttributes) throws IOException {
writeNodeDetails(out);
if (includeAllAttributes) {
out.writeVInt(attributes.size());
for (Map.Entry<String, String> entry : attributes.entrySet()) {
out.writeString(entry.getKey());
out.writeString(entry.getValue());
}
} else {
out.writeVInt(0);
}
writeRolesAndVersion(out);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,10 +702,18 @@ public String shortSummary() {

@Override
public void writeTo(StreamOutput out) throws IOException {
writeToUtil((output, value) -> value.writeTo(output), out);
}

public void writeToWithAttribute(StreamOutput out) throws IOException {
writeToUtil((output, value) -> value.writeToWithAttribute(output), out);
}

private void writeToUtil(final Writer<DiscoveryNode> writer, StreamOutput out) throws IOException {
writeClusterManager(out);
out.writeVInt(nodes.size());
for (DiscoveryNode node : this) {
node.writeTo(out);
writer.write(out, node);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public List<NodeAllocationResult> getNodeDecisions() {

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(targetNode);
out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), targetNode);
if (nodeDecisions != null) {
out.writeBoolean(true);
out.writeList(nodeDecisions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public NodeAllocationResult(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
node.writeToWithAttribute(out);
out.writeOptionalWriteable(shardStoreInfo);
out.writeOptionalWriteable(canAllocateDecision);
nodeDecision.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ public UploadedMetadata getUploadedMetadata() {

@Override
public InputStream serialize() throws IOException {
return DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, generateBlobFileName(), getCompressor()).streamInput();
return DISCOVERY_NODES_FORMAT.serialize(
(out, discoveryNode) -> discoveryNode.writeToWithAttribute(out),
discoveryNodes,
generateBlobFileName(),
getCompressor()
).streamInput();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.IndexInput;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.compositeindex.CompositeIndexMetadata;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.MetricStat;
Expand All @@ -30,6 +31,7 @@
*
* @opensearch.experimental
*/
@ExperimentalApi
public class StarTreeMetadata extends CompositeIndexMetadata {
private static final Logger logger = LogManager.getLogger(StarTreeMetadata.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ public StarTreeNode getChildForDimensionValue(Long dimensionValue) throws IOExce
StarTreeNode resultStarTreeNode = null;
if (null != dimensionValue) {
resultStarTreeNode = binarySearchChild(dimensionValue);
assert null != resultStarTreeNode;
}
return resultStarTreeNode;
}
Expand Down
Loading

0 comments on commit 9421646

Please sign in to comment.