diff --git a/CHANGELOG.md b/CHANGELOG.md index 80e944260ca41..a5511b69ca519 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add getter for path field in NestedQueryBuilder ([#4636](https://github.com/opensearch-project/OpenSearch/pull/4636)) - Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151)) - Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854)) +- Add support for ignoring missing Javadoc on generated code using annotation ([#7604](https://github.com/opensearch-project/OpenSearch/pull/7604)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 @@ -97,6 +98,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Support to clear filecache using clear indices cache API ([#7498](https://github.com/opensearch-project/OpenSearch/pull/7498)) - Create NamedRoute to map extension routes to a shortened name ([#6870](https://github.com/opensearch-project/OpenSearch/pull/6870)) - Added @dbwiddis as on OpenSearch maintainer ([#7665](https://github.com/opensearch-project/OpenSearch/pull/7665)) +- [Extensions] Add ExtensionAwarePlugin extension point to add custom settings for extensions ([#7526](https://github.com/opensearch-project/OpenSearch/pull/7526)) +- Add new cluster setting to set default index replication type ([#7420](https://github.com/opensearch-project/OpenSearch/pull/7420)) ### Dependencies - Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.3 (#7564) @@ -124,6 +127,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `io.projectreactor.netty:reactor-netty-core` from 1.1.5 to 1.1.7 (#7657) - Bump `org.apache.maven:maven-model` from 3.9.1 to 3.9.2 (#7655) - Bump `com.google.api:gax` from 2.17.0 to 2.27.0 (#7697) +- Bump `io.projectreactor.netty:reactor-netty` from 1.1.4 to 1.1.7 ([#7725](https://github.com/opensearch-project/OpenSearch/pull/7725)) +- Bump `io.projectreactor.netty:reactor-netty-http` from 1.1.4 to 1.1.7 ([#7725](https://github.com/opensearch-project/OpenSearch/pull/7725)) ### Changed - Enable `./gradlew build` on MacOS by disabling bcw tests ([#7303](https://github.com/opensearch-project/OpenSearch/pull/7303)) @@ -131,6 +136,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow access to indices cache clear APIs for read only indexes ([#7303](https://github.com/opensearch-project/OpenSearch/pull/7303)) - Changed concurrent-search threadpool type to be resizable and support task resource tracking ([#7502](https://github.com/opensearch-project/OpenSearch/pull/7502)) - Default search preference to _primary for searchable snapshot indices ([#7628](https://github.com/opensearch-project/OpenSearch/pull/7628)) +- [Segment Replication] Remove codec name string match check for checkpoints ([#7741](https://github.com/opensearch-project/OpenSearch/pull/7741)) ### Deprecated @@ -140,6 +146,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add more index blocks check for resize APIs ([#6774](https://github.com/opensearch-project/OpenSearch/pull/6774)) - Replaces ZipInputStream with ZipFile to fix Zip Slip vulnerability ([#7230](https://github.com/opensearch-project/OpenSearch/pull/7230)) - Add missing validation/parsing of SearchBackpressureMode of SearchBackpressureSettings ([#7541](https://github.com/opensearch-project/OpenSearch/pull/7541)) +- [Search Pipelines] Better exception handling in search pipelines ([#7735](https://github.com/opensearch-project/OpenSearch/pull/7735)) +- Fix input validation in segments and delete pit request ([#6645](https://github.com/opensearch-project/OpenSearch/pull/6645)) ### Security diff --git a/buildSrc/version.properties b/buildSrc/version.properties index cfa672ff4e67b..549f2574581d3 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -23,6 +23,7 @@ kotlin = 1.7.10 antlr4 = 4.11.1 guava = 31.1-jre protobuf = 3.22.3 +jboss_annotation = 1.0.2.Final # when updating the JNA version, also update the version in buildSrc/build.gradle jna = 5.5.0 diff --git a/distribution/src/config/opensearch.yml b/distribution/src/config/opensearch.yml index c2d84c6c71582..107fe345c942b 100644 --- a/distribution/src/config/opensearch.yml +++ b/distribution/src/config/opensearch.yml @@ -102,10 +102,9 @@ ${path.logs} # # ---------------------------------- Experimental Features ----------------------------------- # -# Gates the visibility of the index setting that allows changing of replication type. -# Once the feature is ready for production release, this feature flag can be removed. +# Gates the visibility of the experimental segment replication features until they are production ready. # -#opensearch.experimental.feature.replication_type.enabled: false +#opensearch.experimental.feature.segment_replication_experimental.enabled: false # # # Gates the visibility of the index setting that allows persisting data to remote store along with local disk. diff --git a/doc-tools/missing-doclet/build.gradle b/doc-tools/missing-doclet/build.gradle index e16900afce876..114ccc948951a 100644 --- a/doc-tools/missing-doclet/build.gradle +++ b/doc-tools/missing-doclet/build.gradle @@ -5,10 +5,6 @@ plugins { group 'org.opensearch' version '1.0.0-SNAPSHOT' -repositories { - mavenCentral() -} - tasks.withType(JavaCompile) { options.compilerArgs += ["--release", targetCompatibility.toString()] options.encoding = "UTF-8" diff --git a/doc-tools/missing-doclet/src/main/java/org/opensearch/missingdoclet/MissingDoclet.java b/doc-tools/missing-doclet/src/main/java/org/opensearch/missingdoclet/MissingDoclet.java index 021fc04a42e86..e6122e7baf91a 100644 --- a/doc-tools/missing-doclet/src/main/java/org/opensearch/missingdoclet/MissingDoclet.java +++ b/doc-tools/missing-doclet/src/main/java/org/opensearch/missingdoclet/MissingDoclet.java @@ -13,6 +13,7 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -291,6 +292,22 @@ private void checkComment(Element element) { if (ignored.contains(element.toString())) { return; } + // Ignore classes annotated with @Generated and all enclosed elements in them. + if (isGenerated(element)) { + return; + } + Element enclosing = element.getEnclosingElement(); + if (enclosing != null && isGenerated(enclosing)) { + return; + } + // If a package contains only generated classes, ignore the package as well. + if (element.getKind() == ElementKind.PACKAGE) { + List enclosedElements = element.getEnclosedElements(); + Optional elm = enclosedElements.stream().findFirst().filter(e -> ((e.getKind() != ElementKind.CLASS) || !isGenerated(e))); + if (elm.isEmpty()) { + return; + } + } var tree = docTrees.getDocCommentTree(element); if (tree == null || tree.getFirstSentence().isEmpty()) { // Check for methods that override other stuff and perhaps inherit their Javadocs. @@ -313,6 +330,17 @@ private void checkComment(Element element) { } } + // Ignore classes annotated with @Generated and all enclosed elements in them. + private boolean isGenerated(Element element) { + return element + .getAnnotationMirrors() + .stream() + .anyMatch(m -> m + .getAnnotationType() + .toString() /* ClassSymbol.toString() returns class name */ + .equalsIgnoreCase("javax.annotation.Generated")); + } + private boolean hasInheritedJavadocs(Element element) { boolean hasOverrides = element.getAnnotationMirrors().stream() .anyMatch(ann -> ann.getAnnotationType().toString().equals(Override.class.getName())); diff --git a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/30_filter_query.yml b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/30_filter_query.yml index 730b10d87e6a8..d5fe91aab5378 100644 --- a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/30_filter_query.yml +++ b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/30_filter_query.yml @@ -122,3 +122,27 @@ teardown: index: test body: { } - match: { hits.total.value: 2 } +--- +"Test invalid inline query": + - do: + catch: bad_request + search: + index: test + body: { + search_pipeline: { + "request_processors": [ + { + "filter_query": { + "query": { + "woozlewuzzle": { + "field": "foo" + } + } + } + } + ] + } + } + - match: { status: 400 } + - match: { error.type: "parsing_exception"} + - match: { error.reason: "unknown query [woozlewuzzle]"} diff --git a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/50_script_processor.yml b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/50_script_processor.yml index bba52285fd58d..9b2dc0c41ff31 100644 --- a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/50_script_processor.yml +++ b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/50_script_processor.yml @@ -83,10 +83,6 @@ teardown: "profile": true } - length: { hits.hits: 2 } - - match: { _shards.total: 1 } - - match: { hits.total.value: 1 } - - match: { hits.hits.0._score: 1.0 } - - match: { hits.hits.1._score: 1.0 } - is_false: hits.hits.0._explanation - is_false: hits.hits.1._explanation - is_false: hits.hits.0._seq_no diff --git a/plugins/repository-azure/build.gradle b/plugins/repository-azure/build.gradle index 5478a36fd5885..c32660a3de259 100644 --- a/plugins/repository-azure/build.gradle +++ b/plugins/repository-azure/build.gradle @@ -58,9 +58,9 @@ dependencies { api 'com.azure:azure-storage-blob:12.21.1' api 'org.reactivestreams:reactive-streams:1.0.4' api 'io.projectreactor:reactor-core:3.5.1' - api 'io.projectreactor.netty:reactor-netty:1.1.4' + api 'io.projectreactor.netty:reactor-netty:1.1.7' api 'io.projectreactor.netty:reactor-netty-core:1.1.7' - api 'io.projectreactor.netty:reactor-netty-http:1.1.4' + api 'io.projectreactor.netty:reactor-netty-http:1.1.7' api "org.slf4j:slf4j-api:${versions.slf4j}" api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}" api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}" diff --git a/plugins/repository-azure/licenses/reactor-netty-1.1.4.jar.sha1 b/plugins/repository-azure/licenses/reactor-netty-1.1.4.jar.sha1 deleted file mode 100644 index ab76deb3bc1f1..0000000000000 --- a/plugins/repository-azure/licenses/reactor-netty-1.1.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -1b66183ba316fbbd2d212eb9e9a3ba060ba557b0 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/reactor-netty-1.1.7.jar.sha1 b/plugins/repository-azure/licenses/reactor-netty-1.1.7.jar.sha1 new file mode 100644 index 0000000000000..01a9b1d34d52f --- /dev/null +++ b/plugins/repository-azure/licenses/reactor-netty-1.1.7.jar.sha1 @@ -0,0 +1 @@ +c16497c29f96ea7b1db538cb0ddde55d9be173fe \ No newline at end of file diff --git a/plugins/repository-azure/licenses/reactor-netty-http-1.1.4.jar.sha1 b/plugins/repository-azure/licenses/reactor-netty-http-1.1.4.jar.sha1 deleted file mode 100644 index 7848fcdd8f5cb..0000000000000 --- a/plugins/repository-azure/licenses/reactor-netty-http-1.1.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -ca8b2f1b23e4593577e0f570e04bb80cd29ce1e3 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/reactor-netty-http-1.1.7.jar.sha1 b/plugins/repository-azure/licenses/reactor-netty-http-1.1.7.jar.sha1 new file mode 100644 index 0000000000000..33bf2aabcfc9b --- /dev/null +++ b/plugins/repository-azure/licenses/reactor-netty-http-1.1.7.jar.sha1 @@ -0,0 +1 @@ +39d7c0a13afa471b426a30bcf82664496ad34723 \ No newline at end of file diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java index cec43159ff116..36a90d0c3ce6d 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java @@ -245,6 +245,7 @@ public void testIndexing() throws IOException, ParseException { * * @throws Exception */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/7679") public void testIndexingWithSegRep() throws Exception { final String indexName = "test-index-segrep"; final int shardCount = 3; diff --git a/server/build.gradle b/server/build.gradle index 4a71c456ccae4..eefc174b6a403 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -165,6 +165,7 @@ dependencies { // protobuf api "com.google.protobuf:protobuf-java:${versions.protobuf}" + implementation "org.jboss.spec.javax.annotation:jboss-annotations-api_1.2_spec:${versions.jboss_annotation}" testImplementation(project(":test:framework")) { // tests use the locally compiled version of server @@ -193,6 +194,7 @@ tasks.named("forbiddenPatterns").configure { exclude '**/*.dic' exclude '**/*.binary' exclude '**/*.st' + exclude '**/*.meta' } tasks.named("testingConventions").configure { @@ -238,6 +240,16 @@ protobuf { protoc { artifact = "com.google.protobuf:protoc:${versions.protobuf}" } + + generateProtoTasks { + all().each { task -> + task.builtins { + java { + option "annotate_code" + } + } + } + } } tasks.named("processResources").configure { @@ -424,29 +436,6 @@ tasks.named("dependencyLicenses").configure { } } -tasks.named("missingJavadoc").configure { - /* - * Generated code doesn't follow javadocs formats. - * Unfortunately the missingJavadoc task doesnt take *. - * TODO: Add support to missingJavadoc task to ignore all generated source code - * https://github.com/opensearch-project/OpenSearch/issues/7264 - */ - dependsOn("generateProto") - javadocMissingIgnore = [ - "org.opensearch.extensions.proto.ExtensionIdentityProto", - "org.opensearch.extensions.proto.ExtensionIdentityProto.ExtensionIdentityOrBuilder", - "org.opensearch.extensions.proto.RegisterRestActionsProto", - "org.opensearch.extensions.proto.RegisterRestActionsProto.RegisterRestActionsOrBuilder", - "org.opensearch.extensions.proto.ExtensionRequestProto", - "org.opensearch.extensions.proto.ExtensionRequestProto.ExtensionRequestOrBuilder", - "org.opensearch.extensions.proto.RegisterTransportActionsProto", - "org.opensearch.extensions.proto.RegisterTransportActionsProto.RegisterTransportActionsOrBuilder", - "org.opensearch.extensions.proto.ExtensionTransportMessageProto", - "org.opensearch.extensions.proto.ExtensionTransportMessageProto.ExtensionTransportMessageOrBuilder", - "org.opensearch.extensions.proto" - ] -} - tasks.named("filepermissions").configure { mustRunAfter("generateProto") } diff --git a/server/licenses/jboss-annotations-api_1.2_spec-1.0.2.Final.jar.sha1 b/server/licenses/jboss-annotations-api_1.2_spec-1.0.2.Final.jar.sha1 new file mode 100644 index 0000000000000..bbf7f430cbcae --- /dev/null +++ b/server/licenses/jboss-annotations-api_1.2_spec-1.0.2.Final.jar.sha1 @@ -0,0 +1 @@ +0d6b20c0e95c5b38f313cc2ab1a55560cedabe1f \ No newline at end of file diff --git a/server/licenses/jboss-annotations-api_1.2_spec-LICENSE.txt b/server/licenses/jboss-annotations-api_1.2_spec-LICENSE.txt new file mode 100644 index 0000000000000..261eeb9e9f8b2 --- /dev/null +++ b/server/licenses/jboss-annotations-api_1.2_spec-LICENSE.txt @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/server/licenses/jboss-annotations-api_1.2_spec-NOTICE.txt b/server/licenses/jboss-annotations-api_1.2_spec-NOTICE.txt new file mode 100644 index 0000000000000..6c7dc983f8c7a --- /dev/null +++ b/server/licenses/jboss-annotations-api_1.2_spec-NOTICE.txt @@ -0,0 +1,12 @@ +OpenSearch (https://opensearch.org/) +Copyright OpenSearch Contributors + +This product includes software developed by +Elasticsearch (http://www.elastic.co). +Copyright 2009-2018 Elasticsearch + +This product includes software developed by The Apache Software +Foundation (http://www.apache.org/). + +This product includes software developed by +Joda.org (http://www.joda.org/). diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationClusterSettingIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationClusterSettingIT.java index 9ae577912283a..57578cdbfa8e8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationClusterSettingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationClusterSettingIT.java @@ -8,11 +8,13 @@ package org.opensearch.indices.replication; -import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; +import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.Index; import org.opensearch.index.IndexModule; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndexDescriptor; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; @@ -24,8 +26,8 @@ import java.util.Collections; import java.util.Arrays; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; import static org.opensearch.indices.IndicesService.CLUSTER_SETTING_REPLICATION_TYPE; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class SegmentReplicationClusterSettingIT extends OpenSearchIntegTestCase { @@ -45,11 +47,6 @@ public Settings indexSettings() { .build(); } - @Override - protected Settings featureFlagSettings() { - return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true").build(); - } - @Override protected boolean addMockInternalEngine() { return false; @@ -77,71 +74,119 @@ protected Collection> nodePlugins() { return Arrays.asList(SegmentReplicationClusterSettingIT.TestPlugin.class, MockTransportService.TestPlugin.class); } - public void testReplicationWithSegmentReplicationClusterSetting() throws Exception { - - boolean isSystemIndex = randomBoolean(); - String indexName = isSystemIndex ? SYSTEM_INDEX_NAME : INDEX_NAME; + public void testSystemIndexWithSegmentReplicationClusterSetting() throws Exception { // Starting two nodes with primary and replica shards respectively. final String primaryNode = internalCluster().startNode(); - createIndex(indexName); - ensureYellowAndNoInitializingShards(indexName); + createIndex(SYSTEM_INDEX_NAME); + ensureYellowAndNoInitializingShards(SYSTEM_INDEX_NAME); final String replicaNode = internalCluster().startNode(); - ensureGreen(indexName); - - final int initialDocCount = scaledRandomIntBetween(20, 30); - for (int i = 0; i < initialDocCount; i++) { - client().prepareIndex(indexName).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); - } - - refresh(indexName); - assertBusy(() -> { - assertHitCount(client(replicaNode).prepareSearch(indexName).setSize(0).setPreference("_only_local").get(), initialDocCount); - }); - - SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin() + ensureGreen(SYSTEM_INDEX_NAME); + final GetSettingsResponse response = client().admin() .indices() - .prepareSegmentReplicationStats(indexName) - .execute() + .getSettings(new GetSettingsRequest().indices(SYSTEM_INDEX_NAME).includeDefaults(true)) .actionGet(); - if (isSystemIndex) { - // Verify that Segment Replication did not happen on the replica shard. - assertNull(segmentReplicationStatsResponse.getReplicationStats().get(indexName)); - } else { - // Verify that Segment Replication happened on the replica shard. - assertFalse(segmentReplicationStatsResponse.getReplicationStats().get(indexName).get(0).getReplicaStats().isEmpty()); - } + assertEquals(response.getSetting(SYSTEM_INDEX_NAME, SETTING_REPLICATION_TYPE), ReplicationType.DOCUMENT.toString()); + + // Verify index setting isSegRepEnabled is false. + Index index = resolveIndex(SYSTEM_INDEX_NAME); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode); + assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false); } - public void testIndexReplicationSettingOverridesClusterSetting() throws Exception { + public void testIndexReplicationSettingOverridesSegRepClusterSetting() throws Exception { + Settings settings = Settings.builder().put(CLUSTER_SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); + final String ANOTHER_INDEX = "test-index"; + // Starting two nodes with primary and replica shards respectively. - final String primaryNode = internalCluster().startNode(); + final String primaryNode = internalCluster().startNode(settings); prepareCreate( INDEX_NAME, Settings.builder() // we want to override cluster replication setting by passing a index replication setting .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT) ).get(); - ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replicaNode = internalCluster().startNode(); - ensureGreen(INDEX_NAME); + createIndex(ANOTHER_INDEX); + ensureYellowAndNoInitializingShards(INDEX_NAME, ANOTHER_INDEX); + final String replicaNode = internalCluster().startNode(settings); + + // Randomly close and open index. + if (randomBoolean()) { + logger.info("--> Closing the index "); + client().admin().indices().prepareClose(INDEX_NAME).get(); - final int initialDocCount = scaledRandomIntBetween(20, 30); - for (int i = 0; i < initialDocCount; i++) { - client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + logger.info("--> Opening the index"); + client().admin().indices().prepareOpen(INDEX_NAME).get(); } + ensureGreen(INDEX_NAME, ANOTHER_INDEX); + + final GetSettingsResponse response = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(INDEX_NAME, ANOTHER_INDEX).includeDefaults(true)) + .actionGet(); + assertEquals(response.getSetting(INDEX_NAME, SETTING_REPLICATION_TYPE), ReplicationType.DOCUMENT.toString()); + assertEquals(response.getSetting(ANOTHER_INDEX, SETTING_REPLICATION_TYPE), ReplicationType.SEGMENT.toString()); + + // Verify index setting isSegRepEnabled. + Index index = resolveIndex(INDEX_NAME); + Index anotherIndex = resolveIndex(ANOTHER_INDEX); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode); + assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false); + assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabled(), true); + } - refresh(INDEX_NAME); - assertBusy(() -> { - assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - }); + public void testIndexReplicationSettingOverridesDocRepClusterSetting() throws Exception { + Settings settings = Settings.builder().put(CLUSTER_SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build(); + final String ANOTHER_INDEX = "test-index"; + final String primaryNode = internalCluster().startNode(settings); + prepareCreate( + INDEX_NAME, + Settings.builder() + // we want to override cluster replication setting by passing a index replication setting + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + ).get(); + createIndex(ANOTHER_INDEX); + ensureYellowAndNoInitializingShards(INDEX_NAME, ANOTHER_INDEX); + final String replicaNode = internalCluster().startNode(settings); + ensureGreen(INDEX_NAME, ANOTHER_INDEX); - SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin() + final GetSettingsResponse response = client().admin() .indices() - .prepareSegmentReplicationStats(INDEX_NAME) - .execute() + .getSettings(new GetSettingsRequest().indices(INDEX_NAME, ANOTHER_INDEX).includeDefaults(true)) .actionGet(); - // Verify that Segment Replication did not happen on the replica shard. - assertNull(segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME)); + assertEquals(response.getSetting(INDEX_NAME, SETTING_REPLICATION_TYPE), ReplicationType.SEGMENT.toString()); + assertEquals(response.getSetting(ANOTHER_INDEX, SETTING_REPLICATION_TYPE), ReplicationType.DOCUMENT.toString()); + + // Verify index setting isSegRepEnabled. + Index index = resolveIndex(INDEX_NAME); + Index anotherIndex = resolveIndex(ANOTHER_INDEX); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode); + assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), true); + assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabled(), false); } + + public void testHiddenIndicesWithReplicationStrategyClusterSetting() throws Exception { + final String primaryNode = internalCluster().startNode(); + final String replicaNode = internalCluster().startNode(); + prepareCreate( + INDEX_NAME, + Settings.builder() + // we want to set index as hidden + .put("index.hidden", true) + ).get(); + ensureGreen(INDEX_NAME); + + // Verify that document replication strategy is used for hidden indices. + final GetSettingsResponse response = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(INDEX_NAME).includeDefaults(true)) + .actionGet(); + assertEquals(response.getSetting(INDEX_NAME, SETTING_REPLICATION_TYPE), ReplicationType.DOCUMENT.toString()); + + // Verify index setting isSegRepEnabled. + Index index = resolveIndex(INDEX_NAME); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode); + assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false); + } + } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/CreateRemoteIndexIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/CreateRemoteIndexIT.java index f8b56857168b8..a3235fd2a91d4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/CreateRemoteIndexIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/CreateRemoteIndexIT.java @@ -31,6 +31,7 @@ import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING; import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_STORE_ENABLED_SETTING; import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING; +import static org.opensearch.indices.IndicesService.CLUSTER_SETTING_REPLICATION_TYPE; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST) @@ -58,11 +59,7 @@ protected Settings nodeSettings(int nodeOriginal) { @Override protected Settings featureFlagSettings() { - return Settings.builder() - .put(super.featureFlagSettings()) - .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") - .put(FeatureFlags.REMOTE_STORE, "true") - .build(); + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build(); } @Before @@ -120,7 +117,15 @@ public void testRemoteStoreDisabledByUser() throws Exception { .getIndex(new GetIndexRequest().indices("test-idx-1").includeDefaults(true)) .get(); Settings indexSettings = getIndexResponse.settings().get("test-idx-1"); - verifyRemoteStoreIndexSettings(indexSettings, "false", null, null, null, null, null); + verifyRemoteStoreIndexSettings( + indexSettings, + "false", + null, + null, + null, + client().settings().get(CLUSTER_SETTING_REPLICATION_TYPE), + null + ); } public void testRemoteStoreEnabledByUserWithoutRemoteRepoAndSegmentReplicationIllegalArgumentException() throws Exception { diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java index e22165ae3d7fa..3f79166f6a189 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java @@ -17,7 +17,9 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; +import org.opensearch.index.Index; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.rest.RestStatus; import org.opensearch.test.InternalTestCluster; @@ -28,6 +30,8 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; +import static org.opensearch.indices.IndicesService.CLUSTER_SETTING_REPLICATION_TYPE; import static org.opensearch.indices.replication.SegmentReplicationBaseIT.waitForSearchableDocs; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -261,4 +265,44 @@ public void testRestoreOnReplicaNode() throws Exception { SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); assertHitCount(resp, DOC_COUNT); } + + public void testSnapshotRestoreOnIndexWithSegRepClusterSetting() throws Exception { + Settings settings = Settings.builder() + .put(super.featureFlagSettings()) + .put(CLUSTER_SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + + // Starting two nodes with primary and replica shards respectively. + final String primaryNode = internalCluster().startNode(settings); + prepareCreate( + INDEX_NAME, + Settings.builder() + // we want to override cluster replication setting by passing a index replication setting + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT) + ).get(); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replicaNode = internalCluster().startNode(settings); + ensureGreen(INDEX_NAME); + + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME)); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null); + + // Assertions + assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME).includeDefaults(true)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, SETTING_REPLICATION_TYPE), ReplicationType.DOCUMENT.toString()); + + // Verify index setting isSegRepEnabled. + Index index = resolveIndex(RESTORED_INDEX_NAME); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class); + assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java index 4602e23eaf8e5..a6414e3e0c37e 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java @@ -43,8 +43,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject() .field(Fields.SHARD_ID, remoteSegmentUploadShardStats.shardId) - .field(Fields.LOCAL_REFRESH_TIMESTAMP, remoteSegmentUploadShardStats.localRefreshTimeMs) - .field(Fields.REMOTE_REFRESH_TIMESTAMP, remoteSegmentUploadShardStats.remoteRefreshTimeMs) + .field(Fields.REFRESH_TIME_LAG_IN_MILLIS, remoteSegmentUploadShardStats.refreshTimeLagMs) .field(Fields.REFRESH_LAG, remoteSegmentUploadShardStats.localRefreshNumber - remoteSegmentUploadShardStats.remoteRefreshNumber) .field(Fields.BYTES_LAG, remoteSegmentUploadShardStats.bytesLag) @@ -90,16 +89,6 @@ public void writeTo(StreamOutput out) throws IOException { static final class Fields { static final String SHARD_ID = "shard_id"; - /** - * Last successful local refresh timestamp in milliseconds - */ - static final String LOCAL_REFRESH_TIMESTAMP = "local_refresh_timestamp_in_millis"; - - /** - * Last successful remote refresh timestamp in milliseconds - */ - static final String REMOTE_REFRESH_TIMESTAMP = "remote_refresh_timestamp_in_millis"; - /** * Lag in terms of bytes b/w local and remote store */ @@ -110,6 +99,11 @@ static final class Fields { */ static final String REFRESH_LAG = "refresh_lag"; + /** + * Time in millis remote refresh is behind local refresh + */ + static final String REFRESH_TIME_LAG_IN_MILLIS = "refresh_time_lag_in_millis"; + /** * Total write rejections due to remote store backpressure kick in */ diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java index 5c22a39cf3a40..d843b3a9452a5 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java @@ -44,8 +44,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import static org.opensearch.action.search.SearchContextId.decode; @@ -94,8 +96,7 @@ public TransportPitSegmentsAction( */ @Override protected void doExecute(Task task, PitSegmentsRequest request, ActionListener listener) { - List pitIds = request.getPitIds(); - if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) { + if (request.getPitIds().size() == 1 && "_all".equals(request.getPitIds().get(0))) { pitService.getAllPits(ActionListener.wrap(response -> { request.clearAndSetPitIds(response.getPitInfos().stream().map(ListPitInfo::getPitId).collect(Collectors.toList())); super.doExecute(task, request, listener); @@ -114,7 +115,9 @@ protected void doExecute(Task task, PitSegmentsRequest request, ActionListener iterators = new ArrayList<>(); - for (String pitId : request.getPitIds()) { + // remove duplicates from the request + Set uniquePitIds = new LinkedHashSet<>(request.getPitIds()); + for (String pitId : uniquePitIds) { SearchContextId searchContext = decode(namedWriteableRegistry, pitId); for (Map.Entry entry : searchContext.shards().entrySet()) { final SearchContextIdForNode perNode = entry.getValue(); diff --git a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java index b85fe302a748f..217fcc1489df7 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java @@ -18,8 +18,10 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** @@ -46,8 +48,7 @@ public TransportDeletePitAction( */ @Override protected void doExecute(Task task, DeletePitRequest request, ActionListener listener) { - List pitIds = request.getPitIds(); - if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) { + if (request.getPitIds().size() == 1 && "_all".equals(request.getPitIds().get(0))) { deleteAllPits(listener); } else { deletePits(listener, request); @@ -59,7 +60,9 @@ protected void doExecute(Task task, DeletePitRequest request, ActionListener listener, DeletePitRequest request) { Map> nodeToContextsMap = new HashMap<>(); - for (String pitId : request.getPitIds()) { + // remove duplicates from the request + Set uniquePitIds = new LinkedHashSet<>(request.getPitIds()); + for (String pitId : uniquePitIds) { SearchContextId contextId = SearchContextId.decode(namedWriteableRegistry, pitId); for (SearchContextIdForNode contextIdForNode : contextId.shards().values()) { PitSearchContextIdForNode pitSearchContext = new PitSearchContextIdForNode(pitId, contextIdForNode); diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 5ae4e13caa353..fd42089a4a9d5 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -401,7 +401,7 @@ private void executeRequest( ); } catch (Exception e) { originalListener.onFailure(e); - throw new RuntimeException(e); + return; } ActionListener rewriteListener = ActionListener.wrap(source -> { diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index 89abdbdb5419f..6e22de7c72d07 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -583,7 +583,8 @@ private ClusterState applyCreateIndexRequestWithV1Templates( settings, indexScopedSettings, shardLimitValidator, - indexSettingProviders + indexSettingProviders, + systemIndices.validateSystemIndex(request.index()) ); int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, null); IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(currentState, aggregatedIndexSettings, request, routingNumShards); @@ -647,7 +648,8 @@ private ClusterState applyCreateIndexRequestWithV2Template( settings, indexScopedSettings, shardLimitValidator, - indexSettingProviders + indexSettingProviders, + systemIndices.validateSystemIndex(request.index()) ); int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, null); IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(currentState, aggregatedIndexSettings, request, routingNumShards); @@ -727,7 +729,8 @@ private ClusterState applyCreateIndexRequestWithExistingMetadata( settings, indexScopedSettings, shardLimitValidator, - indexSettingProviders + indexSettingProviders, + sourceMetadata.isSystem() ); final int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, sourceMetadata); IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(currentState, aggregatedIndexSettings, request, routingNumShards); @@ -810,7 +813,8 @@ static Settings aggregateIndexSettings( Settings settings, IndexScopedSettings indexScopedSettings, ShardLimitValidator shardLimitValidator, - Set indexSettingProviders + Set indexSettingProviders, + boolean isSystemIndex ) { // Create builders for the template and request settings. We transform these into builders // because we may want settings to be "removed" from these prior to being set on the new @@ -894,6 +898,7 @@ static Settings aggregateIndexSettings( indexSettingsBuilder.put(IndexMetadata.SETTING_INDEX_PROVIDED_NAME, request.getProvidedName()); indexSettingsBuilder.put(SETTING_INDEX_UUID, UUIDs.randomBase64UUID()); + updateReplicationStrategy(indexSettingsBuilder, request.settings(), settings, isSystemIndex); updateRemoteStoreSettings(indexSettingsBuilder, request.settings(), settings); if (sourceMetadata != null) { @@ -928,6 +933,27 @@ static Settings aggregateIndexSettings( return indexSettings; } + /** + * Updates index settings to set replication strategy by default based on cluster level settings + * @param settingsBuilder index settings builder to be updated with relevant settings + * @param requestSettings settings passed in during index create request + * @param clusterSettings cluster level settings + */ + private static void updateReplicationStrategy( + Settings.Builder settingsBuilder, + Settings requestSettings, + Settings clusterSettings, + boolean isSystemIndex + ) { + if (isSystemIndex || IndexMetadata.INDEX_HIDDEN_SETTING.get(requestSettings)) { + settingsBuilder.put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT); + return; + } + if (CLUSTER_REPLICATION_TYPE_SETTING.exists(clusterSettings) && INDEX_REPLICATION_TYPE_SETTING.exists(requestSettings) == false) { + settingsBuilder.put(SETTING_REPLICATION_TYPE, CLUSTER_REPLICATION_TYPE_SETTING.get(clusterSettings)); + } + } + /** * Updates index settings to enable remote store by default based on cluster level settings * @param settingsBuilder index settings builder to be updated with relevant settings diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 3da1784907c61..6db763ff148ce 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -260,6 +260,7 @@ public void apply(Settings value, Settings current, Settings previous) { IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING, IndicesService.INDICES_ID_FIELD_DATA_ENABLED_SETTING, IndicesService.WRITE_DANGLING_INDICES_INFO_SETTING, + IndicesService.CLUSTER_REPLICATION_TYPE_SETTING, MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING, MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING, Metadata.SETTING_READ_ONLY_SETTING, @@ -669,8 +670,6 @@ public void apply(Settings value, Settings current, Settings previous) { * setting should be moved to {@link #BUILT_IN_CLUSTER_SETTINGS}. */ public static final Map, List> FEATURE_FLAGGED_CLUSTER_SETTINGS = Map.of( - List.of(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL), - List.of(IndicesService.CLUSTER_REPLICATION_TYPE_SETTING), List.of(FeatureFlags.REMOTE_STORE), List.of( IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING, diff --git a/server/src/main/java/org/opensearch/common/settings/Setting.java b/server/src/main/java/org/opensearch/common/settings/Setting.java index a0cdf35ee0ad2..32a6686398a2a 100644 --- a/server/src/main/java/org/opensearch/common/settings/Setting.java +++ b/server/src/main/java/org/opensearch/common/settings/Setting.java @@ -132,6 +132,11 @@ public enum Property { */ Deprecated, + /** + * Extension scope + */ + ExtensionScope, + /** * Node scope */ diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionScopedSettings.java b/server/src/main/java/org/opensearch/extensions/ExtensionScopedSettings.java new file mode 100644 index 0000000000000..0c87ce31df737 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/ExtensionScopedSettings.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions; + +import org.opensearch.common.settings.AbstractScopedSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Setting.Property; +import org.opensearch.common.settings.SettingUpgrader; +import org.opensearch.common.settings.Settings; + +import java.util.Collections; +import java.util.Set; + +/** + * Encapsulates all valid extension level settings. + * + * @opensearch.internal + */ +public final class ExtensionScopedSettings extends AbstractScopedSettings { + + public ExtensionScopedSettings(final Set> settingsSet) { + this(settingsSet, Collections.emptySet()); + } + + public ExtensionScopedSettings(final Set> settingsSet, final Set> settingUpgraders) { + super(Settings.EMPTY, settingsSet, settingUpgraders, Property.ExtensionScope); + } +} diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java index 2878aa047c667..9d74e8f22d2b1 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java @@ -16,9 +16,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; @@ -37,6 +39,7 @@ import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterSettingsResponse; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Setting; import org.opensearch.core.util.FileSystemUtils; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.settings.Settings; @@ -106,6 +109,7 @@ public static enum OpenSearchRequestType { private CustomSettingsRequestHandler customSettingsRequestHandler; private TransportService transportService; private ClusterService clusterService; + private final Set> additionalSettings; private Settings environmentSettings; private AddSettingsUpdateConsumerRequestHandler addSettingsUpdateConsumerRequestHandler; private NodeClient client; @@ -114,9 +118,10 @@ public static enum OpenSearchRequestType { * Instantiate a new ExtensionsManager object to handle requests and responses from extensions. This is called during Node bootstrap. * * @param extensionsPath Path to a directory containing extensions. + * @param additionalSettings Additional settings to read in from extensions.yml * @throws IOException If the extensions discovery file is not properly retrieved. */ - public ExtensionsManager(Path extensionsPath) throws IOException { + public ExtensionsManager(Path extensionsPath, Set> additionalSettings) throws IOException { logger.info("ExtensionsManager initialized"); this.extensionsPath = extensionsPath; this.initializedExtensions = new HashMap(); @@ -125,6 +130,11 @@ public ExtensionsManager(Path extensionsPath) throws IOException { // will be initialized in initializeServicesAndRestHandler which is called after the Node is initialized this.transportService = null; this.clusterService = null; + // Settings added to extensions.yml by ExtensionAwarePlugins, such as security settings + this.additionalSettings = new HashSet<>(); + if (additionalSettings != null) { + this.additionalSettings.addAll(additionalSettings); + } this.client = null; this.extensionTransportActionsHandler = null; @@ -466,6 +476,7 @@ private ExtensionsSettings readFromExtensionsYml(Path filePath) throws IOExcepti } List> unreadExtensions = new ArrayList<>((Collection>) obj.get("extensions")); List readExtensions = new ArrayList(); + Set additionalSettingsKeys = additionalSettings.stream().map(s -> s.getKey()).collect(Collectors.toSet()); for (HashMap extensionMap : unreadExtensions) { try { // checking to see whether any required fields are missing from extension.yml file or not @@ -500,6 +511,16 @@ private ExtensionsSettings readFromExtensionsYml(Path filePath) throws IOExcepti } } + ExtensionScopedSettings extAdditionalSettings = new ExtensionScopedSettings(additionalSettings); + Map additionalSettingsMap = extensionMap.entrySet() + .stream() + .filter(kv -> additionalSettingsKeys.contains(kv.getKey())) + .collect(Collectors.toMap(map -> map.getKey(), map -> map.getValue())); + + Settings.Builder output = Settings.builder(); + output.loadFromMap(additionalSettingsMap); + extAdditionalSettings.applySettings(output.build()); + // Create extension read from yml config readExtensions.add( new Extension( @@ -510,7 +531,8 @@ private ExtensionsSettings readFromExtensionsYml(Path filePath) throws IOExcepti extensionMap.get("version").toString(), extensionMap.get("opensearchVersion").toString(), extensionMap.get("minimumCompatibleVersion").toString(), - extensionDependencyList + extensionDependencyList, + extAdditionalSettings ) ); } catch (IOException e) { diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsSettings.java b/server/src/main/java/org/opensearch/extensions/ExtensionsSettings.java index fd11aec973d42..9d21469c8fa28 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsSettings.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsSettings.java @@ -44,6 +44,7 @@ public static class Extension { private String opensearchVersion; private String minimumCompatibleVersion; private List dependencies = Collections.emptyList(); + private ExtensionScopedSettings additionalSettings; public Extension( String name, @@ -53,7 +54,8 @@ public Extension( String version, String opensearchVersion, String minimumCompatibleVersion, - List dependencies + List dependencies, + ExtensionScopedSettings additionalSettings ) { this.name = name; this.uniqueId = uniqueId; @@ -63,6 +65,7 @@ public Extension( this.opensearchVersion = opensearchVersion; this.minimumCompatibleVersion = minimumCompatibleVersion; this.dependencies = dependencies; + this.additionalSettings = additionalSettings; } public Extension() { @@ -127,6 +130,10 @@ public List getDependencies() { return dependencies; } + public ExtensionScopedSettings getAdditionalSettings() { + return additionalSettings; + } + public String getMinimumCompatibleVersion() { return minimumCompatibleVersion; } diff --git a/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java index eb9b389b7a4b1..fb7160bc1bc67 100644 --- a/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java @@ -11,6 +11,7 @@ import java.io.IOException; import java.nio.file.Path; import java.util.Optional; +import java.util.Set; import org.opensearch.action.ActionModule; import org.opensearch.client.node.NodeClient; @@ -31,7 +32,7 @@ public class NoopExtensionsManager extends ExtensionsManager { public NoopExtensionsManager() throws IOException { - super(Path.of("")); + super(Path.of(""), Set.of()); } @Override diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 4b63a4ca5e42a..de7dc102939ce 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -48,7 +48,6 @@ import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.common.Strings; import org.opensearch.index.translog.Translog; -import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.ingest.IngestService; import org.opensearch.node.Node; @@ -765,13 +764,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti nodeName = Node.NODE_NAME_SETTING.get(settings); this.indexMetadata = indexMetadata; numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null); - if (FeatureFlags.isEnabled(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL) - && indexMetadata.isSystem() == false - && settings.get(IndexMetadata.SETTING_REPLICATION_TYPE) == null) { - replicationType = IndicesService.CLUSTER_REPLICATION_TYPE_SETTING.get(settings); - } else { - replicationType = IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(settings); - } + replicationType = IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(settings); isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false); isRemoteTranslogStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false); remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY); diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java index 800cf176548a0..dfd52949d3a6b 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java @@ -445,10 +445,9 @@ void updateUploadTimeMsMovingAverageWindowSize(int updatedSize) { public RemoteRefreshSegmentTracker.Stats stats() { return new RemoteRefreshSegmentTracker.Stats( shardId, + timeMsLag, localRefreshSeqNo, - localRefreshTimeMs, remoteRefreshSeqNo, - remoteRefreshTimeMs, uploadBytesStarted, uploadBytesSucceeded, uploadBytesFailed, @@ -473,10 +472,9 @@ public RemoteRefreshSegmentTracker.Stats stats() { public static class Stats implements Writeable { public final ShardId shardId; + public final long refreshTimeLagMs; public final long localRefreshNumber; - public final long localRefreshTimeMs; public final long remoteRefreshNumber; - public final long remoteRefreshTimeMs; public final long uploadBytesStarted; public final long uploadBytesFailed; public final long uploadBytesSucceeded; @@ -493,10 +491,9 @@ public static class Stats implements Writeable { public Stats( ShardId shardId, + long refreshTimeLagMs, long localRefreshNumber, - long localRefreshTimeMs, long remoteRefreshNumber, - long remoteRefreshTimeMs, long uploadBytesStarted, long uploadBytesSucceeded, long uploadBytesFailed, @@ -512,10 +509,9 @@ public Stats( long bytesLag ) { this.shardId = shardId; + this.refreshTimeLagMs = refreshTimeLagMs; this.localRefreshNumber = localRefreshNumber; - this.localRefreshTimeMs = localRefreshTimeMs; this.remoteRefreshNumber = remoteRefreshNumber; - this.remoteRefreshTimeMs = remoteRefreshTimeMs; this.uploadBytesStarted = uploadBytesStarted; this.uploadBytesFailed = uploadBytesFailed; this.uploadBytesSucceeded = uploadBytesSucceeded; @@ -534,10 +530,9 @@ public Stats( public Stats(StreamInput in) throws IOException { try { this.shardId = new ShardId(in); + this.refreshTimeLagMs = in.readLong(); this.localRefreshNumber = in.readLong(); - this.localRefreshTimeMs = in.readLong(); this.remoteRefreshNumber = in.readLong(); - this.remoteRefreshTimeMs = in.readLong(); this.uploadBytesStarted = in.readLong(); this.uploadBytesFailed = in.readLong(); this.uploadBytesSucceeded = in.readLong(); @@ -559,10 +554,9 @@ public Stats(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { shardId.writeTo(out); + out.writeLong(refreshTimeLagMs); out.writeLong(localRefreshNumber); - out.writeLong(localRefreshTimeMs); out.writeLong(remoteRefreshNumber); - out.writeLong(remoteRefreshTimeMs); out.writeLong(uploadBytesStarted); out.writeLong(uploadBytesFailed); out.writeLong(uploadBytesSucceeded); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index b417133e4a89d..ce5d05065860f 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1629,12 +1629,6 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp ); return false; } - if (localCheckpoint.getCodec().equals(requestCheckpoint.getCodec()) == false) { - logger.trace( - () -> new ParameterizedMessage("Shard does not support the received lucene codec version {}", requestCheckpoint.getCodec()) - ); - return false; - } return true; } diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 3ab0a7539fb06..6f04c6cf6f665 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -14,7 +14,6 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.index.IndexService; import org.opensearch.index.shard.IndexShard; @@ -148,12 +147,6 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener { if (segrepHandler != null) { logger.warn("Override handler for allocation id {}", request.getTargetAllocationId()); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index d9f318e78597c..ae722b8742e94 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -57,14 +57,9 @@ public ReplicationCheckpoint getCheckpoint() { return this.checkpoint; } - public SegmentReplicationTarget( - ReplicationCheckpoint checkpoint, - IndexShard indexShard, - SegmentReplicationSource source, - ReplicationListener listener - ) { + public SegmentReplicationTarget(IndexShard indexShard, SegmentReplicationSource source, ReplicationListener listener) { super("replication_target", indexShard, new ReplicationLuceneIndex(), listener); - this.checkpoint = checkpoint; + this.checkpoint = indexShard.getLatestReplicationCheckpoint(); this.source = source; this.state = new SegmentReplicationState( indexShard.routingEntry(), @@ -101,7 +96,7 @@ public SegmentReplicationState state() { } public SegmentReplicationTarget retryCopy() { - return new SegmentReplicationTarget(checkpoint, indexShard, source, listener); + return new SegmentReplicationTarget(indexShard, source, listener); } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 6c1547fbee82b..1ce208a9a8234 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -228,7 +228,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe } final Thread thread = Thread.currentThread(); if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) { - startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() { + startReplication(replicaShard, new SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) { logger.trace( @@ -301,17 +301,8 @@ protected void updateLatestReceivedCheckpoint(ReplicationCheckpoint receivedChec } } - public SegmentReplicationTarget startReplication( - final ReplicationCheckpoint checkpoint, - final IndexShard indexShard, - final SegmentReplicationListener listener - ) { - final SegmentReplicationTarget target = new SegmentReplicationTarget( - checkpoint, - indexShard, - sourceFactory.get(indexShard), - listener - ); + public SegmentReplicationTarget startReplication(final IndexShard indexShard, final SegmentReplicationListener listener) { + final SegmentReplicationTarget target = new SegmentReplicationTarget(indexShard, sourceFactory.get(indexShard), listener); startReplication(target); return target; } @@ -429,57 +420,49 @@ public void messageReceived(final ForceSyncRequest request, TransportChannel cha channel.sendResponse(TransportResponse.Empty.INSTANCE); return; } - startReplication( - ReplicationCheckpoint.empty(request.getShardId(), indexShard.getDefaultCodecName()), - indexShard, - new SegmentReplicationTargetService.SegmentReplicationListener() { - @Override - public void onReplicationDone(SegmentReplicationState state) { - logger.trace( - () -> new ParameterizedMessage( - "[shardId {}] [replication id {}] Replication complete to {}, timing data: {}", - indexShard.shardId().getId(), - state.getReplicationId(), - indexShard.getLatestReplicationCheckpoint(), - state.getTimingData() - ) - ); - try { - // Promote engine type for primary target - if (indexShard.recoveryState().getPrimary() == true) { - indexShard.resetToWriteableEngine(); - } - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } catch (InterruptedException | TimeoutException | IOException e) { - throw new RuntimeException(e); + startReplication(indexShard, new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + logger.trace( + () -> new ParameterizedMessage( + "[shardId {}] [replication id {}] Replication complete to {}, timing data: {}", + indexShard.shardId().getId(), + state.getReplicationId(), + indexShard.getLatestReplicationCheckpoint(), + state.getTimingData() + ) + ); + try { + // Promote engine type for primary target + if (indexShard.recoveryState().getPrimary() == true) { + indexShard.resetToWriteableEngine(); } + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } catch (InterruptedException | TimeoutException | IOException e) { + throw new RuntimeException(e); } + } - @Override - public void onReplicationFailure( - SegmentReplicationState state, - ReplicationFailedException e, - boolean sendShardFailure - ) { - logger.trace( - () -> new ParameterizedMessage( - "[shardId {}] [replication id {}] Replication failed, timing data: {}", - indexShard.shardId().getId(), - state.getReplicationId(), - state.getTimingData() - ) - ); - if (sendShardFailure == true) { - indexShard.failShard("replication failure", e); - } - try { - channel.sendResponse(e); - } catch (IOException ex) { - throw new RuntimeException(ex); - } + @Override + public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { + logger.trace( + () -> new ParameterizedMessage( + "[shardId {}] [replication id {}] Replication failed, timing data: {}", + indexShard.shardId().getId(), + state.getReplicationId(), + state.getTimingData() + ) + ); + if (sendShardFailure == true) { + indexShard.failShard("replication failure", e); + } + try { + channel.sendResponse(e); + } catch (IOException ex) { + throw new RuntimeException(ex); } } - ); + }); } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 6373036443045..fd522b7a14c58 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -54,6 +54,7 @@ import org.opensearch.extensions.NoopExtensionsManager; import org.opensearch.monitor.fs.FsInfo; import org.opensearch.monitor.fs.FsProbe; +import org.opensearch.plugins.ExtensionAwarePlugin; import org.opensearch.plugins.SearchPipelinePlugin; import org.opensearch.tracing.TracerFactory; import org.opensearch.search.backpressure.SearchBackpressureService; @@ -236,6 +237,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -468,7 +470,12 @@ protected Node( final IdentityService identityService = new IdentityService(settings, identityPlugins); if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { - this.extensionsManager = new ExtensionsManager(initialEnvironment.extensionDir()); + final List extensionAwarePlugins = pluginsService.filterPlugins(ExtensionAwarePlugin.class); + Set> additionalSettings = new HashSet<>(); + for (ExtensionAwarePlugin extAwarePlugin : extensionAwarePlugins) { + additionalSettings.addAll(extAwarePlugin.getExtensionSettings()); + } + this.extensionsManager = new ExtensionsManager(initialEnvironment.extensionDir(), additionalSettings); } else { this.extensionsManager = new NoopExtensionsManager(); } diff --git a/server/src/main/java/org/opensearch/plugins/ExtensionAwarePlugin.java b/server/src/main/java/org/opensearch/plugins/ExtensionAwarePlugin.java new file mode 100644 index 0000000000000..c8426bc964287 --- /dev/null +++ b/server/src/main/java/org/opensearch/plugins/ExtensionAwarePlugin.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugins; + +import org.opensearch.common.settings.Setting; + +import java.util.Collections; +import java.util.List; + +/** + * Plugin that provides extra settings for extensions + * + * @opensearch.experimental + */ +public interface ExtensionAwarePlugin { + + /** + * Returns a list of additional {@link Setting} definitions that this plugin adds for extensions + */ + default List> getExtensionSettings() { + return Collections.emptyList(); + } +} diff --git a/server/src/main/java/org/opensearch/plugins/IdentityPlugin.java b/server/src/main/java/org/opensearch/plugins/IdentityPlugin.java index 4cb15f4ab3cbe..511b5595c5328 100644 --- a/server/src/main/java/org/opensearch/plugins/IdentityPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/IdentityPlugin.java @@ -22,5 +22,5 @@ public interface IdentityPlugin { * * Should never return null * */ - public Subject getSubject(); + Subject getSubject(); } diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java index a486e636cbb7d..87c09bd971284 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -400,8 +400,12 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce pipeline = pipelineHolder.pipeline; } } - SearchRequest transformedRequest = pipeline.transformRequest(searchRequest); - return new PipelinedRequest(pipeline, transformedRequest); + try { + SearchRequest transformedRequest = pipeline.transformRequest(searchRequest); + return new PipelinedRequest(pipeline, transformedRequest); + } catch (Exception e) { + throw new SearchPipelineProcessingException(e); + } } Map> getRequestProcessorFactories() { diff --git a/server/src/main/java/org/opensearch/tasks/TaskInfo.java b/server/src/main/java/org/opensearch/tasks/TaskInfo.java index 2e5415279d804..d66647d134d43 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskInfo.java +++ b/server/src/main/java/org/opensearch/tasks/TaskInfo.java @@ -185,7 +185,7 @@ public TaskInfo(StreamInput in) throws IOException { } else { resourceStats = null; } - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + if (in.getVersion().onOrAfter(Version.V_2_8_0)) { cancellationStartTime = in.readOptionalLong(); } else { cancellationStartTime = null; @@ -210,7 +210,7 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_1_0)) { out.writeOptionalWriteable(resourceStats); } - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + if (out.getVersion().onOrAfter(Version.V_2_8_0)) { out.writeOptionalLong(cancellationStartTime); } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java index 950048fe67f9d..041a979a687d5 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java @@ -14,43 +14,22 @@ import java.util.Map; import static org.opensearch.test.OpenSearchTestCase.assertEquals; -import static org.opensearch.test.OpenSearchTestCase.randomIntBetween; /** * Helper utilities for Remote Store stats tests */ public class RemoteStoreStatsTestHelper { static RemoteRefreshSegmentTracker.Stats createPressureTrackerStats(ShardId shardId) { - return new RemoteRefreshSegmentTracker.Stats( - shardId, - 3, - System.nanoTime() / 1_000_000L + randomIntBetween(10, 100), - 2, - System.nanoTime() / 1_000_000L + randomIntBetween(10, 100), - 10, - 5, - 5, - 10, - 5, - 5, - 3, - 2, - 5, - 2, - 3, - 4, - 9 - ); + return new RemoteRefreshSegmentTracker.Stats(shardId, 100, 3, 2, 10, 5, 5, 10, 5, 5, 3, 2, 5, 2, 3, 4, 9); } static void compareStatsResponse(Map statsObject, RemoteRefreshSegmentTracker.Stats pressureTrackerStats) { assertEquals(statsObject.get(RemoteStoreStats.Fields.SHARD_ID), pressureTrackerStats.shardId.toString()); - assertEquals(statsObject.get(RemoteStoreStats.Fields.LOCAL_REFRESH_TIMESTAMP), (int) pressureTrackerStats.localRefreshTimeMs); + assertEquals(statsObject.get(RemoteStoreStats.Fields.REFRESH_TIME_LAG_IN_MILLIS), (int) pressureTrackerStats.refreshTimeLagMs); assertEquals( statsObject.get(RemoteStoreStats.Fields.REFRESH_LAG), (int) (pressureTrackerStats.localRefreshNumber - pressureTrackerStats.remoteRefreshNumber) ); - assertEquals(statsObject.get(RemoteStoreStats.Fields.REMOTE_REFRESH_TIMESTAMP), (int) pressureTrackerStats.remoteRefreshTimeMs); assertEquals(statsObject.get(RemoteStoreStats.Fields.BYTES_LAG), (int) pressureTrackerStats.bytesLag); assertEquals(statsObject.get(RemoteStoreStats.Fields.BACKPRESSURE_REJECTION_COUNT), (int) pressureTrackerStats.rejectionCount); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTests.java index b3cb4864c3b7f..94d2eae31c040 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTests.java @@ -62,10 +62,9 @@ public void testSerialization() throws Exception { try (StreamInput in = out.bytes().streamInput()) { RemoteStoreStats deserializedStats = new RemoteStoreStats(in); assertEquals(deserializedStats.getStats().shardId.toString(), stats.getStats().shardId.toString()); + assertEquals(deserializedStats.getStats().refreshTimeLagMs, stats.getStats().refreshTimeLagMs); assertEquals(deserializedStats.getStats().localRefreshNumber, stats.getStats().localRefreshNumber); - assertEquals(deserializedStats.getStats().localRefreshTimeMs, stats.getStats().localRefreshTimeMs); assertEquals(deserializedStats.getStats().remoteRefreshNumber, stats.getStats().remoteRefreshNumber); - assertEquals(deserializedStats.getStats().remoteRefreshTimeMs, stats.getStats().remoteRefreshTimeMs); assertEquals(deserializedStats.getStats().uploadBytesStarted, stats.getStats().uploadBytesStarted); assertEquals(deserializedStats.getStats().uploadBytesSucceeded, stats.getStats().uploadBytesSucceeded); assertEquals(deserializedStats.getStats().uploadBytesFailed, stats.getStats().uploadBytesFailed); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java index 00d496fde0434..2abb1e99facef 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -817,7 +817,8 @@ public void testAggregateSettingsAppliesSettingsFromTemplatesAndRequest() { Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService(), - Collections.emptySet() + Collections.emptySet(), + false ); assertThat(aggregatedIndexSettings.get("template_setting"), equalTo("value1")); @@ -879,7 +880,8 @@ public void testRequestDataHavePriorityOverTemplateData() throws Exception { Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService(), - Collections.emptySet() + Collections.emptySet(), + false ); assertThat(resolvedAliases.get(0).getSearchRouting(), equalTo("fromRequest")); @@ -901,7 +903,8 @@ public void testDefaultSettings() { Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService(), - Collections.emptySet() + Collections.emptySet(), + false ); assertThat(aggregatedIndexSettings.get(SETTING_NUMBER_OF_SHARDS), equalTo("1")); @@ -916,7 +919,8 @@ public void testSettingsFromClusterState() { Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 15).build(), IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService(), - Collections.emptySet() + Collections.emptySet(), + false ); assertThat(aggregatedIndexSettings.get(SETTING_NUMBER_OF_SHARDS), equalTo("15")); @@ -953,7 +957,8 @@ public void testTemplateOrder() throws Exception { Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService(), - Collections.emptySet() + Collections.emptySet(), + false ); List resolvedAliases = resolveAndValidateAliases( request.index(), @@ -992,7 +997,8 @@ public void testAggregateIndexSettingsIgnoresTemplatesOnCreateFromSourceIndex() Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService(), - Collections.emptySet() + Collections.emptySet(), + false ); assertThat(aggregatedIndexSettings.get("templateSetting"), is(nullValue())); @@ -1214,7 +1220,8 @@ public void testRemoteStoreNoUserOverrideConflictingReplicationTypeIndexSettings settings, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService(), - Collections.emptySet() + Collections.emptySet(), + false ) ); assertThat( @@ -1245,7 +1252,8 @@ public void testRemoteStoreNoUserOverrideExceptReplicationTypeSegmentIndexSettin settings, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService(), - Collections.emptySet() + Collections.emptySet(), + false ); verifyRemoteStoreIndexSettings( indexSettings, @@ -1277,7 +1285,8 @@ public void testRemoteStoreNoUserOverrideIndexSettings() { settings, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService(), - Collections.emptySet() + Collections.emptySet(), + false ); verifyRemoteStoreIndexSettings( indexSettings, @@ -1311,9 +1320,10 @@ public void testRemoteStoreDisabledByUserIndexSettings() { settings, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService(), - Collections.emptySet() + Collections.emptySet(), + false ); - verifyRemoteStoreIndexSettings(indexSettings, "false", null, null, null, null, null); + verifyRemoteStoreIndexSettings(indexSettings, "false", null, null, null, ReplicationType.SEGMENT.toString(), null); } public void testRemoteStoreTranslogDisabledByUserIndexSettings() { @@ -1337,7 +1347,8 @@ public void testRemoteStoreTranslogDisabledByUserIndexSettings() { settings, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService(), - Collections.emptySet() + Collections.emptySet(), + false ); verifyRemoteStoreIndexSettings(indexSettings, "true", "my-segment-repo-1", "false", null, ReplicationType.SEGMENT.toString(), null); } @@ -1366,7 +1377,8 @@ public void testRemoteStoreOverrideSegmentRepoIndexSettings() { settings, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService(), - Collections.emptySet() + Collections.emptySet(), + false ); verifyRemoteStoreIndexSettings( indexSettings, @@ -1400,7 +1412,8 @@ public void testRemoteStoreOverrideTranslogRepoIndexSettings() { settings, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService(), - Collections.emptySet() + Collections.emptySet(), + false ); verifyRemoteStoreIndexSettings( indexSettings, @@ -1434,7 +1447,8 @@ public void testRemoteStoreOverrideReplicationTypeIndexSettings() { settings, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService(), - Collections.emptySet() + Collections.emptySet(), + false ); verifyRemoteStoreIndexSettings(indexSettings, null, null, null, null, ReplicationType.DOCUMENT.toString(), null); } @@ -1508,7 +1522,8 @@ public void testSoftDeletesDisabledIsRejected() { Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService(), - Collections.emptySet() + Collections.emptySet(), + false ); }); assertThat( @@ -1537,7 +1552,8 @@ public void testValidateTranslogRetentionSettings() { Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService(), - Collections.emptySet() + Collections.emptySet(), + false ); assertWarnings( "Translog retention settings [index.translog.retention.age] " @@ -1584,7 +1600,8 @@ public void testDeprecatedSimpleFSStoreSettings() { Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService(), - Collections.emptySet() + Collections.emptySet(), + false ); assertWarnings( "[simplefs] is deprecated and will be removed in 2.0. Use [niofs], which offers equal " @@ -1592,6 +1609,88 @@ public void testDeprecatedSimpleFSStoreSettings() { ); } + public void testClusterReplicationSetting() { + Settings settings = Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build(); + request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); + Settings indexSettings = aggregateIndexSettings( + ClusterState.EMPTY_STATE, + request, + Settings.EMPTY, + null, + settings, + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + randomShardLimitService(), + Collections.emptySet(), + false + ); + assertEquals(ReplicationType.SEGMENT.toString(), indexSettings.get(SETTING_REPLICATION_TYPE)); + } + + public void testIndexSettingOverridesClusterReplicationSetting() { + Settings settings = Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build(); + request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); + final Settings.Builder requestSettings = Settings.builder(); + // Set index setting replication type as DOCUMENT + requestSettings.put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT); + request.settings(requestSettings.build()); + Settings indexSettings = aggregateIndexSettings( + ClusterState.EMPTY_STATE, + request, + Settings.EMPTY, + null, + settings, + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + randomShardLimitService(), + Collections.emptySet(), + false + ); + // Verify if index setting overrides cluster replication setting + assertEquals(ReplicationType.DOCUMENT.toString(), indexSettings.get(SETTING_REPLICATION_TYPE)); + } + + public void testHiddenIndexUsesDocumentReplication() { + Settings settings = Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build(); + request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); + final Settings.Builder requestSettings = Settings.builder(); + // Set index setting replication type as DOCUMENT + requestSettings.put("index.hidden", true); + request.settings(requestSettings.build()); + Settings indexSettings = aggregateIndexSettings( + ClusterState.EMPTY_STATE, + request, + Settings.EMPTY, + null, + settings, + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + randomShardLimitService(), + Collections.emptySet(), + false + ); + // Verify replication type is Document Replication + assertEquals(ReplicationType.DOCUMENT.toString(), indexSettings.get(SETTING_REPLICATION_TYPE)); + } + + public void testSystemIndexUsesDocumentReplication() { + Settings settings = Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build(); + request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); + final Settings.Builder requestSettings = Settings.builder(); + request.settings(requestSettings.build()); + // set isSystemIndex parameter as true + Settings indexSettings = aggregateIndexSettings( + ClusterState.EMPTY_STATE, + request, + Settings.EMPTY, + null, + settings, + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + randomShardLimitService(), + Collections.emptySet(), + true + ); + // Verify replication type is Document Replication + assertEquals(ReplicationType.DOCUMENT.toString(), indexSettings.get(SETTING_REPLICATION_TYPE)); + } + private IndexTemplateMetadata addMatchingTemplate(Consumer configurator) { IndexTemplateMetadata.Builder builder = templateMetadataBuilder("template1", "te*"); configurator.accept(builder); @@ -1653,4 +1752,5 @@ private void verifyRemoteStoreIndexSettings( assertEquals(remoteTranslogRepo, indexSettings.get(SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY)); assertEquals(translogBufferInterval, indexSettings.get(SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL)); } + } diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java index 42a050270466d..75a1d9ec62c82 100644 --- a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java +++ b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java @@ -32,6 +32,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -70,6 +71,7 @@ import org.opensearch.extensions.settings.RegisterCustomSettingsRequest; import org.opensearch.identity.IdentityService; import org.opensearch.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.plugins.ExtensionAwarePlugin; import org.opensearch.rest.RestController; import org.opensearch.test.FeatureFlagSetter; import org.opensearch.test.MockLogAppender; @@ -91,6 +93,8 @@ public class ExtensionsManagerTests extends OpenSearchTestCase { private RestController restController; private SettingsModule settingsModule; private ClusterService clusterService; + private ExtensionAwarePlugin extAwarePlugin; + private Setting customSetting = Setting.simpleString("custom_extension_setting", "none", Property.ExtensionScope); private NodeClient client; private MockNioTransport transport; private Path extensionDir; @@ -108,6 +112,7 @@ public class ExtensionsManagerTests extends OpenSearchTestCase { " version: '0.0.7'", " opensearchVersion: '3.0.0'", " minimumCompatibleVersion: '3.0.0'", + " custom_extension_setting: 'custom_setting'", " - name: secondExtension", " uniqueId: 'uniqueid2'", " hostAddress: '127.0.0.1'", @@ -152,6 +157,15 @@ public void setup() throws Exception { Collections.emptySet() ); actionModule = mock(ActionModule.class); + extAwarePlugin = new ExtensionAwarePlugin() { + + @Override + public List> getExtensionSettings() { + List> settings = new ArrayList>(); + settings.add(customSetting); + return settings; + } + }; dynamicActionRegistry = mock(DynamicActionRegistry.class); restController = new RestController( emptySet(), @@ -192,7 +206,7 @@ public void tearDown() throws Exception { public void testDiscover() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); List expectedExtensions = new ArrayList(); @@ -245,7 +259,7 @@ public void testNonUniqueExtensionsDiscovery() throws Exception { .collect(Collectors.toList()); Files.write(emptyExtensionDir.resolve("extensions.yml"), nonUniqueYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(emptyExtensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(emptyExtensionDir, Set.of()); List expectedExtensions = new ArrayList(); @@ -294,7 +308,7 @@ public void testMissingRequiredFieldsInExtensionDiscovery() throws Exception { ) ); - extensionsManager = new ExtensionsManager(emptyExtensionDir); + extensionsManager = new ExtensionsManager(emptyExtensionDir, Set.of()); mockLogAppender.assertAllExpectationsMatched(); } @@ -379,7 +393,7 @@ public void testNonAccessibleDirectory() throws Exception { AccessControlException e = expectThrows( AccessControlException.class, - () -> new ExtensionsManager(PathUtils.get("")) + () -> new ExtensionsManager(PathUtils.get(""), Set.of()) ); assertEquals("access denied (\"java.io.FilePermission\" \"\" \"read\")", e.getMessage()); } @@ -398,7 +412,7 @@ public void testNoExtensionsFile() throws Exception { ) ); - new ExtensionsManager(extensionDir); + new ExtensionsManager(extensionDir, Set.of()); mockLogAppender.assertAllExpectationsMatched(); } @@ -412,12 +426,12 @@ public void testEmptyExtensionsFile() throws Exception { Settings settings = Settings.builder().build(); - expectThrows(IOException.class, () -> new ExtensionsManager(emptyExtensionDir)); + expectThrows(IOException.class, () -> new ExtensionsManager(emptyExtensionDir, Set.of())); } public void testInitialize() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); @@ -460,7 +474,7 @@ public void testInitialize() throws Exception { public void testHandleRegisterRestActionsRequest() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; @@ -475,7 +489,7 @@ public void testHandleRegisterRestActionsRequest() throws Exception { public void testHandleRegisterSettingsRequest() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; @@ -491,7 +505,7 @@ public void testHandleRegisterSettingsRequest() throws Exception { } public void testHandleRegisterRestActionsRequestWithInvalidMethod() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; @@ -506,7 +520,7 @@ public void testHandleRegisterRestActionsRequestWithInvalidMethod() throws Excep } public void testHandleRegisterRestActionsRequestWithInvalidDeprecatedMethod() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; @@ -521,7 +535,7 @@ public void testHandleRegisterRestActionsRequestWithInvalidDeprecatedMethod() th } public void testHandleRegisterRestActionsRequestWithInvalidUri() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; List actionsList = List.of("GET", "PUT /bar", "POST /baz"); @@ -535,7 +549,7 @@ public void testHandleRegisterRestActionsRequestWithInvalidUri() throws Exceptio } public void testHandleRegisterRestActionsRequestWithInvalidDeprecatedUri() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; List actionsList = List.of("GET /foo", "PUT /bar", "POST /baz"); @@ -549,7 +563,7 @@ public void testHandleRegisterRestActionsRequestWithInvalidDeprecatedUri() throw } public void testHandleExtensionRequest() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); ExtensionRequest clusterStateRequest = new ExtensionRequest(ExtensionRequestProto.RequestType.REQUEST_EXTENSION_CLUSTER_STATE); @@ -705,7 +719,7 @@ public void testEnvironmentSettingsDefaultValue() throws Exception { public void testAddSettingsUpdateConsumerRequest() throws Exception { Path extensionDir = createTempDir(); Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); List> componentSettings = List.of( @@ -752,7 +766,7 @@ public void testHandleAddSettingsUpdateConsumerRequest() throws Exception { Path extensionDir = createTempDir(); Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); List> componentSettings = List.of( @@ -774,7 +788,7 @@ public void testHandleAddSettingsUpdateConsumerRequest() throws Exception { public void testUpdateSettingsRequest() throws Exception { Path extensionDir = createTempDir(); Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); Setting componentSetting = Setting.boolSetting("falseSetting", false, Property.Dynamic); @@ -803,7 +817,7 @@ public void testUpdateSettingsRequest() throws Exception { public void testRegisterHandler() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); TransportService mockTransportService = spy( new TransportService( @@ -853,12 +867,64 @@ public void testIncompatibleExtensionRegistration() throws IOException, IllegalA ); Files.write(extensionDir.resolve("extensions.yml"), incompatibleExtension, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); assertEquals(0, extensionsManager.getExtensionIdMap().values().size()); mockLogAppender.assertAllExpectationsMatched(); } } + public void testAdditionalExtensionSettingsForExtensionWithCustomSettingSet() throws Exception { + Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); + + Set> additionalSettings = extAwarePlugin.getExtensionSettings().stream().collect(Collectors.toSet()); + + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, additionalSettings); + + DiscoveryExtensionNode extension = new DiscoveryExtensionNode( + "firstExtension", + "uniqueid1", + new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300), + new HashMap(), + Version.fromString("3.0.0"), + Version.fromString("3.0.0"), + List.of() + ); + DiscoveryExtensionNode initializedExtension = extensionsManager.getExtensionIdMap().get(extension.getId()); + assertEquals(extension.getName(), initializedExtension.getName()); + assertEquals(extension.getId(), initializedExtension.getId()); + assertTrue(extensionsManager.lookupExtensionSettingsById(extension.getId()).isPresent()); + assertEquals( + "custom_setting", + extensionsManager.lookupExtensionSettingsById(extension.getId()).get().getAdditionalSettings().get(customSetting) + ); + } + + public void testAdditionalExtensionSettingsForExtensionWithoutCustomSettingSet() throws Exception { + Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); + + Set> additionalSettings = extAwarePlugin.getExtensionSettings().stream().collect(Collectors.toSet()); + + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, additionalSettings); + + DiscoveryExtensionNode extension = new DiscoveryExtensionNode( + "secondExtension", + "uniqueid2", + new TransportAddress(InetAddress.getByName("127.0.0.1"), 9301), + new HashMap(), + Version.fromString("2.0.0"), + Version.fromString("2.0.0"), + List.of() + ); + DiscoveryExtensionNode initializedExtension = extensionsManager.getExtensionIdMap().get(extension.getId()); + assertEquals(extension.getName(), initializedExtension.getName()); + assertEquals(extension.getId(), initializedExtension.getId()); + assertTrue(extensionsManager.lookupExtensionSettingsById(extension.getId()).isPresent()); + assertEquals( + "none", + extensionsManager.lookupExtensionSettingsById(extension.getId()).get().getAdditionalSettings().get(customSetting) + ); + } + private void initialize(ExtensionsManager extensionsManager) { transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java index 4360fc0fe4011..2ee254b10facc 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java @@ -411,9 +411,8 @@ public void testStatsObjectCreation() { pressureTracker = constructTracker(); RemoteRefreshSegmentTracker.Stats pressureTrackerStats = pressureTracker.stats(); assertEquals(pressureTracker.getShardId(), pressureTrackerStats.shardId); - assertEquals(pressureTracker.getLocalRefreshTimeMs(), (int) pressureTrackerStats.localRefreshTimeMs); + assertEquals(pressureTracker.getTimeMsLag(), (int) pressureTrackerStats.refreshTimeLagMs); assertEquals(pressureTracker.getLocalRefreshSeqNo(), (int) pressureTrackerStats.localRefreshNumber); - assertEquals(pressureTracker.getRemoteRefreshTimeMs(), (int) pressureTrackerStats.remoteRefreshTimeMs); assertEquals(pressureTracker.getRemoteRefreshSeqNo(), (int) pressureTrackerStats.remoteRefreshNumber); assertEquals(pressureTracker.getBytesLag(), (int) pressureTrackerStats.bytesLag); assertEquals(pressureTracker.getRejectionCount(), (int) pressureTrackerStats.rejectionCount); @@ -441,9 +440,8 @@ public void testStatsObjectCreationViaStream() throws IOException { try (StreamInput in = out.bytes().streamInput()) { RemoteRefreshSegmentTracker.Stats deserializedStats = new RemoteRefreshSegmentTracker.Stats(in); assertEquals(deserializedStats.shardId, pressureTrackerStats.shardId); - assertEquals((int) deserializedStats.localRefreshTimeMs, (int) pressureTrackerStats.localRefreshTimeMs); + assertEquals((int) deserializedStats.refreshTimeLagMs, (int) pressureTrackerStats.refreshTimeLagMs); assertEquals((int) deserializedStats.localRefreshNumber, (int) pressureTrackerStats.localRefreshNumber); - assertEquals((int) deserializedStats.remoteRefreshTimeMs, (int) pressureTrackerStats.remoteRefreshTimeMs); assertEquals((int) deserializedStats.remoteRefreshNumber, (int) pressureTrackerStats.remoteRefreshNumber); assertEquals((int) deserializedStats.bytesLag, (int) pressureTrackerStats.bytesLag); assertEquals((int) deserializedStats.rejectionCount, (int) pressureTrackerStats.rejectionCount); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 0d95f40652523..764d60dbd5b82 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -40,6 +40,7 @@ import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.SnapshotMatchers; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.CheckpointInfoResponse; @@ -294,7 +295,7 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException { public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { IndexShard primaryShard = newStartedShard(true); SegmentReplicationTargetService sut; - sut = prepareForReplication(primaryShard, null); + sut = prepareForReplication(primaryShard, null, mock(TransportService.class), mock(IndicesService.class)); SegmentReplicationTargetService spy = spy(sut); // Starting a new shard in PrimaryMode and shard routing primary. @@ -314,7 +315,7 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, Codec.getDefault().getName()), spyShard); // Verify that checkpoint is not processed as shard routing is primary. - verify(spy, times(0)).startReplication(any(), any(), any()); + verify(spy, times(0)).startReplication(any(), any()); closeShards(primaryShard); } @@ -1027,7 +1028,10 @@ private void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCoun private void resolveCheckpointInfoResponseListener(ActionListener listener, IndexShard primary) { try { - final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId, primary.getDefaultCodecName()), primary); + final CopyState copyState = new CopyState( + ReplicationCheckpoint.empty(primary.shardId, primary.getLatestReplicationCheckpoint().getCodec()), + primary + ); listener.onResponse( new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); @@ -1041,7 +1045,6 @@ private void startReplicationAndAssertCancellation(IndexShard replica, SegmentRe throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); final SegmentReplicationTarget target = targetService.startReplication( - ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()), replica, new SegmentReplicationTargetService.SegmentReplicationListener() { @Override diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index 6e27a4db6afec..677352cdd5120 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -48,7 +48,7 @@ public class OngoingSegmentReplicationsTests extends IndexShardTestCase { private final IndicesService mockIndicesService = mock(IndicesService.class); - private ReplicationCheckpoint testCheckpoint, olderCodecTestCheckpoint; + private ReplicationCheckpoint testCheckpoint; private DiscoveryNode primaryDiscoveryNode; private DiscoveryNode replicaDiscoveryNode; private IndexShard primary; @@ -79,7 +79,6 @@ public void setUp() throws Exception { // This mirrors the creation of the ReplicationCheckpoint inside CopyState testCheckpoint = new ReplicationCheckpoint(testShardId, primary.getOperationPrimaryTerm(), 0L, 0L, defaultCodecName); - olderCodecTestCheckpoint = new ReplicationCheckpoint(testShardId, primary.getOperationPrimaryTerm(), 0L, 0L, "Lucene94"); IndexService mockIndexService = mock(IndexService.class); when(mockIndicesService.indexServiceSafe(testShardId.getIndex())).thenReturn(mockIndexService); when(mockIndexService.getShard(testShardId.id())).thenReturn(primary); @@ -94,44 +93,6 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testSuccessfulCodecCompatibilityCheck() throws Exception { - indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); - primary.refresh("Test"); - OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); - // replica checkpoint is on same/higher lucene codec than primary - final CheckpointInfoRequest request = new CheckpointInfoRequest( - 1L, - replica.routingEntry().allocationId().getId(), - replicaDiscoveryNode, - testCheckpoint - ); - final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { - listener.onResponse(null); - }; - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - } - - public void testFailCodecCompatibilityCheck() throws Exception { - indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); - primary.refresh("Test"); - OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); - // replica checkpoint is on lower/older lucene codec than primary - final CheckpointInfoRequest request = new CheckpointInfoRequest( - 1L, - replica.routingEntry().allocationId().getId(), - replicaDiscoveryNode, - olderCodecTestCheckpoint - ); - final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { - listener.onResponse(null); - }; - try { - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - } catch (CancellableThreads.ExecutionCancelledException ex) { - Assert.assertTrue(ex.getMessage().contains("Requested unsupported codec version")); - } - } - public void testPrepareAndSendSegments() throws IOException { indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); primary.refresh("Test"); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index a3c016d5ba0df..1d1777758972c 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -11,12 +11,13 @@ import org.junit.Assert; import org.mockito.Mockito; import org.opensearch.OpenSearchException; +import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; -import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.shard.IndexShard; @@ -24,14 +25,21 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.ForceSyncRequest; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationCollection; import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.EmptyTransportResponseHandler; +import org.opensearch.transport.TransportRequestOptions; +import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportService; - +import org.opensearch.test.transport.CapturingTransport; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -40,6 +48,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -62,6 +71,14 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { private ReplicationCheckpoint newPrimaryCheckpoint; + private TransportService transportService; + private TestThreadPool testThreadPool; + private DiscoveryNode localNode; + + private IndicesService indicesService; + + private static long TRANSPORT_TIMEOUT = 30000;// 30sec + @Override public void setUp() throws Exception { super.setUp(); @@ -69,43 +86,66 @@ public void setUp() throws Exception { .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()) .build(); - CodecService codecService = new CodecService(null, null); - String defaultCodecName = codecService.codec(CodecService.DEFAULT_CODEC).getName(); primaryShard = newStartedShard(true, settings); + String primaryCodec = primaryShard.getLatestReplicationCheckpoint().getCodec(); replicaShard = newShard(false, settings, new NRTReplicationEngineFactory()); recoverReplica(replicaShard, primaryShard, true, getReplicationFunc(replicaShard)); - checkpoint = new ReplicationCheckpoint(replicaShard.shardId(), 0L, 0L, 0L, defaultCodecName); + checkpoint = new ReplicationCheckpoint( + replicaShard.shardId(), + 0L, + 0L, + 0L, + replicaShard.getLatestReplicationCheckpoint().getCodec() + ); SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class); replicationSource = mock(SegmentReplicationSource.class); when(replicationSourceFactory.get(replicaShard)).thenReturn(replicationSource); - sut = prepareForReplication(primaryShard, null); + testThreadPool = new TestThreadPool("test", Settings.EMPTY); + localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); + CapturingTransport transport = new CapturingTransport(); + transportService = transport.createTransportService( + Settings.EMPTY, + testThreadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> localNode, + null, + Collections.emptySet() + ); + transportService.start(); + transportService.acceptIncomingRequests(); + + indicesService = mock(IndicesService.class); + + sut = prepareForReplication(primaryShard, null, transportService, indicesService); initialCheckpoint = replicaShard.getLatestReplicationCheckpoint(); aheadCheckpoint = new ReplicationCheckpoint( initialCheckpoint.getShardId(), initialCheckpoint.getPrimaryTerm(), initialCheckpoint.getSegmentsGen(), initialCheckpoint.getSegmentInfosVersion() + 1, - defaultCodecName + primaryCodec ); newPrimaryCheckpoint = new ReplicationCheckpoint( initialCheckpoint.getShardId(), initialCheckpoint.getPrimaryTerm() + 1, initialCheckpoint.getSegmentsGen(), initialCheckpoint.getSegmentInfosVersion() + 1, - defaultCodecName + primaryCodec ); } @Override public void tearDown() throws Exception { closeShards(primaryShard, replicaShard); + ThreadPool.terminate(testThreadPool, 30, TimeUnit.SECONDS); + testThreadPool = null; super.tearDown(); } public void testsSuccessfulReplication_listenerCompletes() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - sut.startReplication(checkpoint, replicaShard, new SegmentReplicationTargetService.SegmentReplicationListener() { + sut.startReplication(replicaShard, new SegmentReplicationTargetService.SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) { assertEquals(SegmentReplicationState.Stage.DONE, state.getStage()); @@ -148,7 +188,6 @@ public void getSegmentFiles( } }; final SegmentReplicationTarget target = new SegmentReplicationTarget( - checkpoint, replicaShard, source, new SegmentReplicationTargetService.SegmentReplicationListener() { @@ -174,14 +213,13 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile public void testAlreadyOnNewCheckpoint() { SegmentReplicationTargetService spy = spy(sut); spy.onNewCheckpoint(replicaShard.getLatestReplicationCheckpoint(), replicaShard); - verify(spy, times(0)).startReplication(any(), any(), any()); + verify(spy, times(0)).startReplication(any(), any()); } public void testShardAlreadyReplicating() throws InterruptedException { // Create a spy of Target Service so that we can verify invocation of startReplication call with specific checkpoint on it. SegmentReplicationTargetService serviceSpy = spy(sut); final SegmentReplicationTarget target = new SegmentReplicationTarget( - initialCheckpoint, replicaShard, replicationSource, mock(SegmentReplicationTargetService.SegmentReplicationListener.class) @@ -207,7 +245,7 @@ public void testShardAlreadyReplicating() throws InterruptedException { // wait for the new checkpoint to arrive, before the listener completes. latch.await(30, TimeUnit.SECONDS); verify(targetSpy, times(0)).cancel(any()); - verify(serviceSpy, times(0)).startReplication(eq(aheadCheckpoint), eq(replicaShard), any()); + verify(serviceSpy, times(0)).startReplication(eq(replicaShard), any()); } public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws IOException, InterruptedException { @@ -216,7 +254,6 @@ public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws I // Create a Mockito spy of target to stub response of few method calls. final SegmentReplicationTarget targetSpy = spy( new SegmentReplicationTarget( - initialCheckpoint, replicaShard, replicationSource, mock(SegmentReplicationTargetService.SegmentReplicationListener.class) @@ -228,7 +265,7 @@ public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws I // of latch. doAnswer(invocation -> { // short circuit loop on new checkpoint request - doReturn(null).when(serviceSpy).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any()); + doReturn(null).when(serviceSpy).startReplication(eq(replicaShard), any()); // a new checkpoint arrives before we've completed. serviceSpy.onNewCheckpoint(newPrimaryCheckpoint, replicaShard); try { @@ -245,20 +282,20 @@ public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws I // wait for the new checkpoint to arrive, before the listener completes. assertEquals(CANCELLED, targetSpy.state().getStage()); verify(targetSpy, times(1)).cancel("Cancelling stuck target after new primary"); - verify(serviceSpy, times(1)).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any()); + verify(serviceSpy, times(1)).startReplication(eq(replicaShard), any()); } public void testNewCheckpointBehindCurrentCheckpoint() { SegmentReplicationTargetService spy = spy(sut); spy.onNewCheckpoint(checkpoint, replicaShard); - verify(spy, times(0)).startReplication(any(), any(), any()); + verify(spy, times(0)).startReplication(any(), any()); } public void testShardNotStarted() throws IOException { SegmentReplicationTargetService spy = spy(sut); IndexShard shard = newShard(false); spy.onNewCheckpoint(checkpoint, shard); - verify(spy, times(0)).startReplication(any(), any(), any()); + verify(spy, times(0)).startReplication(any(), any()); closeShards(shard); } @@ -274,7 +311,7 @@ public void testRejectCheckpointOnShardPrimaryMode() throws IOException { spy.onNewCheckpoint(aheadCheckpoint, spyShard); // Verify that checkpoint is not processed as shard is in PrimaryMode. - verify(spy, times(0)).startReplication(any(), any(), any()); + verify(spy, times(0)).startReplication(any(), any()); closeShards(primaryShard); } @@ -351,4 +388,40 @@ public void testUpdateLatestReceivedCheckpoint() { sut.updateLatestReceivedCheckpoint(aheadCheckpoint, replicaShard); assertEquals(sut.latestReceivedCheckpoint.get(replicaShard.shardId()), aheadCheckpoint); } + + public void testForceSegmentSyncHandler() throws Exception { + ForceSyncRequest forceSyncRequest = new ForceSyncRequest(1L, 1L, replicaShard.shardId()); + when(indicesService.getShardOrNull(forceSyncRequest.getShardId())).thenReturn(replicaShard); + TransportResponse response = transportService.submitRequest( + localNode, + SegmentReplicationTargetService.Actions.FORCE_SYNC, + forceSyncRequest, + TransportRequestOptions.builder().withTimeout(TRANSPORT_TIMEOUT).build(), + EmptyTransportResponseHandler.INSTANCE_SAME + ).txGet(); + assertEquals(TransportResponse.Empty.INSTANCE, response); + } + + public void testForceSegmentSyncHandlerWithFailure() throws Exception { + IndexShard spyReplicaShard = spy(replicaShard); + ForceSyncRequest forceSyncRequest = new ForceSyncRequest(1L, 1L, replicaShard.shardId()); + when(indicesService.getShardOrNull(forceSyncRequest.getShardId())).thenReturn(spyReplicaShard); + IOException exception = new IOException("dummy failure"); + doThrow(exception).when(spyReplicaShard).finalizeReplication(any()); + + // prevent shard failure to avoid test setup assertion + doNothing().when(spyReplicaShard).failShard(eq("replication failure"), any()); + Exception finalizeException = expectThrows(Exception.class, () -> { + transportService.submitRequest( + localNode, + SegmentReplicationTargetService.Actions.FORCE_SYNC, + forceSyncRequest, + TransportRequestOptions.builder().withTimeout(TRANSPORT_TIMEOUT).build(), + EmptyTransportResponseHandler.INSTANCE_SAME + ).txGet(); + }); + Throwable nestedException = finalizeException.getCause().getCause(); + assertTrue(nestedException instanceof IOException); + assertTrue(nestedException.getMessage().contains("dummy failure")); + } } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index a029d87f4a575..0e711af1afa62 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -8,7 +8,6 @@ package org.opensearch.indices.replication; -import org.apache.lucene.codecs.Codec; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.StringField; @@ -108,7 +107,7 @@ public void setUp() throws Exception { spyIndexShard.getPendingPrimaryTerm(), testSegmentInfos.getGeneration(), testSegmentInfos.version, - Codec.getDefault().getName() + indexShard.getLatestReplicationCheckpoint().getCodec() ); } @@ -141,7 +140,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); segrepTarget.startReplication(new ActionListener() { @Override @@ -189,7 +188,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); segrepTarget.startReplication(new ActionListener() { @Override @@ -232,7 +231,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); segrepTarget.startReplication(new ActionListener() { @Override @@ -275,7 +274,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); doThrow(exception).when(spyIndexShard).finalizeReplication(any()); @@ -320,7 +319,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); doThrow(exception).when(spyIndexShard).finalizeReplication(any()); @@ -364,7 +363,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); when(spyIndexShard.getSegmentMetadataMap()).thenReturn(SI_SNAPSHOT_DIFFERENT); segrepTarget.startReplication(new ActionListener() { @Override @@ -416,7 +415,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); when(spyIndexShard.getSegmentMetadataMap()).thenReturn(storeMetadataSnapshots.get(0).asMap()); segrepTarget.startReplication(new ActionListener() { @Override diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index 516227e9a13d8..d49d9fd41031c 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -680,12 +680,12 @@ public void testInlinePipeline() throws Exception { requestProcessorConfig.put("scale", 2); Map requestProcessorObject = new HashMap<>(); requestProcessorObject.put("scale_request_size", requestProcessorConfig); - pipelineSourceMap.put("request_processors", List.of(requestProcessorObject)); + pipelineSourceMap.put(Pipeline.REQUEST_PROCESSORS_KEY, List.of(requestProcessorObject)); Map responseProcessorConfig = new HashMap<>(); responseProcessorConfig.put("score", 2); Map responseProcessorObject = new HashMap<>(); responseProcessorObject.put("fixed_score", responseProcessorConfig); - pipelineSourceMap.put("response_processors", List.of(responseProcessorObject)); + pipelineSourceMap.put(Pipeline.RESPONSE_PROCESSORS_KEY, List.of(responseProcessorObject)); SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().size(100).searchPipelineSource(pipelineSourceMap); SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); @@ -723,4 +723,67 @@ public void testInfo() { assertTrue(info.containsProcessor(Pipeline.REQUEST_PROCESSORS_KEY, "scale_request_size")); assertTrue(info.containsProcessor(Pipeline.RESPONSE_PROCESSORS_KEY, "fixed_score")); } + + public void testExceptionOnPipelineCreation() { + Map> badFactory = Map.of( + "bad_factory", + (pf, t, f, c) -> { throw new RuntimeException(); } + ); + SearchPipelineService searchPipelineService = createWithProcessors(badFactory, Collections.emptyMap()); + + Map pipelineSourceMap = new HashMap<>(); + pipelineSourceMap.put(Pipeline.REQUEST_PROCESSORS_KEY, List.of(Map.of("bad_factory", Collections.emptyMap()))); + + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().searchPipelineSource(pipelineSourceMap); + SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); + + // Exception thrown when creating the pipeline + expectThrows(SearchPipelineProcessingException.class, () -> searchPipelineService.resolvePipeline(searchRequest)); + + } + + public void testExceptionOnRequestProcessing() { + SearchRequestProcessor throwingRequestProcessor = new FakeRequestProcessor("throwing_request", null, null, r -> { + throw new RuntimeException(); + }); + Map> throwingRequestProcessorFactory = Map.of( + "throwing_request", + (pf, t, f, c) -> throwingRequestProcessor + ); + + SearchPipelineService searchPipelineService = createWithProcessors(throwingRequestProcessorFactory, Collections.emptyMap()); + + Map pipelineSourceMap = new HashMap<>(); + pipelineSourceMap.put(Pipeline.REQUEST_PROCESSORS_KEY, List.of(Map.of("throwing_request", Collections.emptyMap()))); + + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().searchPipelineSource(pipelineSourceMap); + SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); + + // Exception thrown when processing the request + expectThrows(SearchPipelineProcessingException.class, () -> searchPipelineService.resolvePipeline(searchRequest)); + } + + public void testExceptionOnResponseProcessing() throws Exception { + SearchResponseProcessor throwingResponseProcessor = new FakeResponseProcessor("throwing_response", null, null, r -> { + throw new RuntimeException(); + }); + Map> throwingResponseProcessorFactory = Map.of( + "throwing_response", + (pf, t, f, c) -> throwingResponseProcessor + ); + + SearchPipelineService searchPipelineService = createWithProcessors(Collections.emptyMap(), throwingResponseProcessorFactory); + + Map pipelineSourceMap = new HashMap<>(); + pipelineSourceMap.put(Pipeline.RESPONSE_PROCESSORS_KEY, List.of(Map.of("throwing_response", Collections.emptyMap()))); + + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().size(100).searchPipelineSource(pipelineSourceMap); + SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); + + PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + + SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); + // Exception thrown when processing response + expectThrows(SearchPipelineProcessingException.class, () -> pipelinedRequest.transformResponse(response)); + } } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index b785574ca52b2..b1dd4fb1dcc1e 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1298,13 +1298,17 @@ public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) { * @param primaryShard {@link IndexShard} - The primary shard to replicate from. * @param target {@link IndexShard} - The target replica shard in segment replication. */ - public final SegmentReplicationTargetService prepareForReplication(IndexShard primaryShard, IndexShard target) { + public final SegmentReplicationTargetService prepareForReplication( + IndexShard primaryShard, + IndexShard target, + TransportService transportService, + IndicesService indicesService + ) { final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); - final IndicesService indicesService = mock(IndicesService.class); final SegmentReplicationTargetService targetService = new SegmentReplicationTargetService( threadPool, new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), - mock(TransportService.class), + transportService, sourceFactory, indicesService ); @@ -1317,7 +1321,7 @@ public void getCheckpointMetadata( ) { try { final CopyState copyState = new CopyState( - ReplicationCheckpoint.empty(primaryShard.shardId, primaryShard.getDefaultCodecName()), + ReplicationCheckpoint.empty(primaryShard.shardId, primaryShard.getLatestReplicationCheckpoint().getCodec()), primaryShard ); listener.onResponse( @@ -1371,9 +1375,13 @@ public final List replicateSegments(IndexShard primary } List ids = new ArrayList<>(); for (IndexShard replica : replicaShards) { - final SegmentReplicationTargetService targetService = prepareForReplication(primaryShard, replica); + final SegmentReplicationTargetService targetService = prepareForReplication( + primaryShard, + replica, + mock(TransportService.class), + mock(IndicesService.class) + ); final SegmentReplicationTarget target = targetService.startReplication( - ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()), replica, new SegmentReplicationTargetService.SegmentReplicationListener() { @Override