From f6e8654c5a3cb0b8e2412aca62b616646a00f024 Mon Sep 17 00:00:00 2001
From: Martijn van Groningen <martijn.v.groningen@gmail.com>
Date: Tue, 12 Feb 2019 15:04:52 +0100
Subject: [PATCH] Add rolling upgrade multi cluster test module (#38277)

This test starts 2 clusters, each with 3 nodes.
First the leader cluster is started and tests are run against it and
then the follower cluster is started and tests execute against this two cluster.

Then the follower cluster is upgraded, one node at a time.
After that the leader cluster is upgraded, one node at a time.
Every time a node is upgraded tests are ran while both clusters are online.
(and either leader cluster has mixed node versions or the follower cluster)

This commit only tests CCR index following, but could be used for CCS tests as well.
In particular for CCR, unidirectional index following is tested during a rolling upgrade.
During the test several indices are created and followed in the leader cluster before or
while the follower cluster is being upgraded.

This tests also verifies that attempting to follow an index in the upgraded cluster
from the not upgraded cluster fails. After both clusters are upgraded following the
index that previously failed should succeed.

Relates to #37231 and #38037
---
 .../build.gradle                              | 262 ++++++++++++++++++
 .../AbstractMultiClusterUpgradeTestCase.java  | 168 +++++++++++
 .../upgrades/CcrRollingUpgradeIT.java         | 165 +++++++++++
 3 files changed, 595 insertions(+)
 create mode 100644 x-pack/qa/rolling-upgrade-multi-cluster/build.gradle
 create mode 100644 x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/AbstractMultiClusterUpgradeTestCase.java
 create mode 100644 x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/CcrRollingUpgradeIT.java

diff --git a/x-pack/qa/rolling-upgrade-multi-cluster/build.gradle b/x-pack/qa/rolling-upgrade-multi-cluster/build.gradle
new file mode 100644
index 0000000000000..3f222be0594a9
--- /dev/null
+++ b/x-pack/qa/rolling-upgrade-multi-cluster/build.gradle
@@ -0,0 +1,262 @@
+import org.elasticsearch.gradle.Version
+import org.elasticsearch.gradle.test.RestIntegTestTask
+
+apply plugin: 'elasticsearch.standalone-test'
+
+dependencies {
+    // "org.elasticsearch.plugin:x-pack-core:${version}" doesn't work with idea because the testArtifacts are also here
+    testCompile project(path: xpackModule('core'), configuration: 'default')
+    testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') // to be moved in a later commit
+}
+
+// This is a top level task which we will add dependencies to below.
+// It is a single task that can be used to backcompat tests against all versions.
+task bwcTest {
+    description = 'Runs backwards compatibility tests.'
+    group = 'verification'
+}
+
+for (Version version : bwcVersions.wireCompatible) {
+    String taskPrefix = "v${version}"
+
+    // ============================================================================================
+    // Create leader cluster
+    // ============================================================================================
+
+    RestIntegTestTask leaderClusterTest = tasks.create(name: "${taskPrefix}#leader#clusterTest", type: RestIntegTestTask) {
+        mustRunAfter(precommit)
+    }
+
+    configure(extensions.findByName("${taskPrefix}#leader#clusterTestCluster")) {
+        bwcVersion = version
+        numBwcNodes = 3
+        numNodes = 3
+        clusterName = 'leader'
+        setting 'xpack.security.enabled', 'false'
+        setting 'xpack.monitoring.enabled', 'false'
+        setting 'xpack.ml.enabled', 'false'
+        setting 'xpack.watcher.enabled', 'false'
+        setting 'xpack.license.self_generated.type', 'trial'
+    }
+
+    Task leaderClusterTestRunner = tasks.getByName("${taskPrefix}#leader#clusterTestRunner")
+    leaderClusterTestRunner.configure {
+        systemProperty 'tests.rest.upgrade_state', 'none'
+        systemProperty 'tests.rest.cluster_name', 'leader'
+
+        systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
+        systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderClusterTest.nodes.get(0).transportUri()}"
+    }
+
+    // ============================================================================================
+    // Create follower cluster
+    // ============================================================================================
+
+    RestIntegTestTask followerClusterTest = tasks.create(name: "${taskPrefix}#follower#clusterTest", type: RestIntegTestTask) {
+        mustRunAfter(precommit)
+    }
+
+    configure(extensions.findByName("${taskPrefix}#follower#clusterTestCluster")) {
+        dependsOn leaderClusterTestRunner
+        bwcVersion = version
+        numBwcNodes = 3
+        numNodes = 3
+        clusterName = 'follower'
+        setting 'xpack.security.enabled', 'false'
+        setting 'xpack.monitoring.enabled', 'false'
+        setting 'xpack.ml.enabled', 'false'
+        setting 'xpack.watcher.enabled', 'false'
+        setting 'xpack.license.self_generated.type', 'trial'
+    }
+
+    Task followerClusterTestRunner = tasks.getByName("${taskPrefix}#follower#clusterTestRunner")
+    followerClusterTestRunner.configure {
+        systemProperty 'tests.rest.upgrade_state', 'none'
+        systemProperty 'tests.rest.cluster_name', 'follower'
+
+        systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
+        systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderClusterTest.nodes.get(0).transportUri()}"
+
+        systemProperty 'tests.follower_host', "${-> followerClusterTest.nodes.get(0).httpUri()}"
+        systemProperty 'tests.follower_remote_cluster_seed', "${-> followerClusterTest.nodes.get(0).transportUri()}"
+    }
+
+    // ============================================================================================
+    // Upgrade follower cluster
+    // ============================================================================================
+
+    Closure configureUpgradeCluster = {String prefix, String cluster, String name, Task lastRunner, int stopNode,
+                                       RestIntegTestTask clusterTest, Closure getOtherUnicastHostAddresses ->
+        configure(extensions.findByName("${prefix}#${cluster}#${name}")) {
+            dependsOn lastRunner, "${prefix}#${cluster}#clusterTestCluster#node${stopNode}.stop"
+            clusterName = cluster
+            otherUnicastHostAddresses = { getOtherUnicastHostAddresses() }
+            minimumMasterNodes = { 2 }
+            autoSetInitialMasterNodes = false
+            /* Override the data directory so the new node always gets the node we
+            * just stopped's data directory. */
+            dataDir = { nodeNumber -> clusterTest.nodes[stopNode].dataDir }
+            setting 'repositories.url.allowed_urls', 'http://snapshot.test*'
+            setting 'xpack.security.enabled', 'false'
+            setting 'xpack.monitoring.enabled', 'false'
+            setting 'xpack.ml.enabled', 'false'
+            setting 'xpack.watcher.enabled', 'false'
+            setting 'xpack.license.self_generated.type', 'trial'
+            setting 'node.name', "upgraded-node-${cluster}-${stopNode}"
+            setting 'node.attr.upgraded', 'true'
+        }
+    }
+
+    Task followerOneThirdUpgradedTest = tasks.create(name: "${taskPrefix}#follower#oneThirdUpgradedTest", type: RestIntegTestTask)
+
+    configureUpgradeCluster(taskPrefix, 'follower', 'oneThirdUpgradedTestCluster', followerClusterTestRunner, 0, followerClusterTest,
+            // Use all running nodes as seed nodes so there is no race between pinging and the tests
+            { [followerClusterTest.nodes.get(1).transportUri(), followerClusterTest.nodes.get(2).transportUri()] })
+
+    Task followerOneThirdUpgradedTestRunner = tasks.getByName("${taskPrefix}#follower#oneThirdUpgradedTestRunner")
+    followerOneThirdUpgradedTestRunner.configure {
+        systemProperty 'tests.rest.upgrade_state', 'one_third'
+        systemProperty 'tests.rest.cluster_name', 'follower'
+
+        systemProperty 'tests.follower_host', "${-> followerClusterTest.nodes.get(1).httpUri()}"
+        systemProperty 'tests.follower_remote_cluster_seed', "${-> followerClusterTest.nodes.get(1).transportUri()}"
+
+        systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
+        systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderClusterTest.nodes.get(0).transportUri()}"
+
+        finalizedBy "${taskPrefix}#follower#clusterTestCluster#node1.stop"
+    }
+
+    Task followerTwoThirdsUpgradedTest = tasks.create(name: "${taskPrefix}#follower#twoThirdsUpgradedTest", type: RestIntegTestTask)
+
+    configureUpgradeCluster(taskPrefix, 'follower', 'twoThirdsUpgradedTestCluster', followerOneThirdUpgradedTestRunner, 1, followerClusterTest,
+            // Use all running nodes as seed nodes so there is no race between pinging and the tests
+            { [followerClusterTest.nodes.get(2).transportUri(), followerOneThirdUpgradedTest.nodes.get(0).transportUri()] })
+
+    Task followerTwoThirdsUpgradedTestRunner = tasks.getByName("${taskPrefix}#follower#twoThirdsUpgradedTestRunner")
+    followerTwoThirdsUpgradedTestRunner.configure {
+        systemProperty 'tests.rest.upgrade_state', 'two_third'
+        systemProperty 'tests.rest.cluster_name', 'follower'
+
+        systemProperty 'tests.follower_host', "${-> followerClusterTest.nodes.get(2).httpUri()}"
+        systemProperty 'tests.follower_remote_cluster_seed', "${-> followerClusterTest.nodes.get(2).transportUri()}"
+
+        systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
+        systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderClusterTest.nodes.get(0).transportUri()}"
+
+        finalizedBy "${taskPrefix}#follower#clusterTestCluster#node2.stop"
+    }
+
+    Task followerUpgradedClusterTest = tasks.create(name: "${taskPrefix}#follower#upgradedClusterTest", type: RestIntegTestTask)
+
+    configureUpgradeCluster(taskPrefix, 'follower', 'upgradedClusterTestCluster', followerTwoThirdsUpgradedTestRunner, 2, followerClusterTest,
+            // Use all running nodes as seed nodes so there is no race between pinging and the tests
+            { [followerOneThirdUpgradedTest.nodes.get(0).transportUri(), followerTwoThirdsUpgradedTest.nodes.get(0).transportUri()] })
+
+    Task followerUpgradedClusterTestRunner = tasks.getByName("${taskPrefix}#follower#upgradedClusterTestRunner")
+    followerUpgradedClusterTestRunner.configure {
+        systemProperty 'tests.rest.upgrade_state', 'all'
+        systemProperty 'tests.rest.cluster_name', 'follower'
+
+        systemProperty 'tests.follower_host', "${-> followerOneThirdUpgradedTest.nodes.get(0).httpUri()}"
+        systemProperty 'tests.follower_remote_cluster_seed', "${-> followerOneThirdUpgradedTest.nodes.get(0).transportUri()}"
+
+        systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
+        systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderClusterTest.nodes.get(0).transportUri()}"
+
+        // This is needed, otherwise leader node 0 will stop after the leaderClusterTestRunner task has run.
+        // Here it is ok to stop, because in the next task, the leader node 0 gets upgraded.
+        finalizedBy "v${version}#leader#clusterTestCluster#node0.stop"
+    }
+
+    // ============================================================================================
+    // Upgrade leader cluster
+    // ============================================================================================
+
+    Task leaderOneThirdUpgradedTest = tasks.create(name: "${taskPrefix}#leader#oneThirdUpgradedTest", type: RestIntegTestTask)
+
+    configureUpgradeCluster(taskPrefix, 'leader', 'oneThirdUpgradedTestCluster', followerUpgradedClusterTestRunner, 0, leaderClusterTest,
+            // Use all running nodes as seed nodes so there is no race between pinging and the tests
+            { [leaderClusterTest.nodes.get(1).transportUri(), leaderClusterTest.nodes.get(2).transportUri()] })
+
+    Task leaderOneThirdUpgradedTestRunner = tasks.getByName("${taskPrefix}#leader#oneThirdUpgradedTestRunner")
+    leaderOneThirdUpgradedTestRunner.configure {
+        systemProperty 'tests.rest.upgrade_state', 'one_third'
+        systemProperty 'tests.rest.cluster_name', 'leader'
+
+        systemProperty 'tests.follower_host', "${-> followerUpgradedClusterTest.nodes.get(0).httpUri()}"
+        systemProperty 'tests.follower_remote_cluster_seed', "${-> followerUpgradedClusterTest.nodes.get(0).transportUri()}"
+
+        systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(2).httpUri()}"
+        systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderClusterTest.nodes.get(2).transportUri()}"
+
+        finalizedBy "${taskPrefix}#leader#clusterTestCluster#node1.stop"
+    }
+
+    Task leaderTwoThirdsUpgradedTest = tasks.create(name: "${taskPrefix}#leader#twoThirdsUpgradedTest", type: RestIntegTestTask)
+
+    configureUpgradeCluster(taskPrefix, 'leader', 'twoThirdsUpgradedTestCluster', leaderOneThirdUpgradedTestRunner, 1, leaderClusterTest,
+            // Use all running nodes as seed nodes so there is no race between pinging and the tests
+            { [leaderClusterTest.nodes.get(2).transportUri(), leaderOneThirdUpgradedTest.nodes.get(0).transportUri()] })
+
+    Task leaderTwoThirdsUpgradedTestRunner = tasks.getByName("${taskPrefix}#leader#twoThirdsUpgradedTestRunner")
+    leaderTwoThirdsUpgradedTestRunner.configure {
+        systemProperty 'tests.rest.upgrade_state', 'two_third'
+        systemProperty 'tests.rest.cluster_name', 'leader'
+
+        systemProperty 'tests.follower_host', "${-> followerUpgradedClusterTest.nodes.get(0).httpUri()}"
+        systemProperty 'tests.follower_remote_cluster_seed', "${-> followerUpgradedClusterTest.nodes.get(0).transportUri()}"
+
+        systemProperty 'tests.leader_host', "${-> leaderOneThirdUpgradedTest.nodes.get(0).httpUri()}"
+        systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderOneThirdUpgradedTest.nodes.get(0).transportUri()}"
+
+        finalizedBy "${taskPrefix}#leader#clusterTestCluster#node2.stop"
+    }
+
+    Task leaderUpgradedClusterTest = tasks.create(name: "${taskPrefix}#leader#upgradedClusterTest", type: RestIntegTestTask)
+
+    configureUpgradeCluster(taskPrefix, 'leader', "upgradedClusterTestCluster", leaderTwoThirdsUpgradedTestRunner, 2, leaderClusterTest,
+            // Use all running nodes as seed nodes so there is no race between pinging and the tests
+            { [leaderOneThirdUpgradedTest.nodes.get(0).transportUri(), leaderTwoThirdsUpgradedTest.nodes.get(0).transportUri()] })
+
+    Task leaderUpgradedClusterTestRunner = tasks.getByName("${taskPrefix}#leader#upgradedClusterTestRunner")
+    leaderUpgradedClusterTestRunner.configure {
+        systemProperty 'tests.rest.upgrade_state', 'all'
+        systemProperty 'tests.rest.cluster_name', 'leader'
+
+        systemProperty 'tests.follower_host', "${-> followerUpgradedClusterTest.nodes.get(0).httpUri()}"
+        systemProperty 'tests.follower_remote_cluster_seed', "${-> followerUpgradedClusterTest.nodes.get(0).transportUri()}"
+
+        systemProperty 'tests.leader_host', "${-> leaderTwoThirdsUpgradedTest.nodes.get(0).httpUri()}"
+        systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderTwoThirdsUpgradedTest.nodes.get(0).transportUri()}"
+
+        /*
+         * Force stopping all the upgraded nodes after the test runner
+         * so they are alive during the test.
+         */
+        finalizedBy "${taskPrefix}#follower#oneThirdUpgradedTestCluster#stop"
+        finalizedBy "${taskPrefix}#follower#twoThirdsUpgradedTestCluster#stop"
+        finalizedBy "${taskPrefix}#follower#upgradedClusterTestCluster#stop"
+        finalizedBy "${taskPrefix}#leader#oneThirdUpgradedTestCluster#stop"
+        finalizedBy "${taskPrefix}#leader#twoThirdsUpgradedTestCluster#stop"
+    }
+
+    if (project.bwc_tests_enabled) {
+        Task versionBwcTest = tasks.create(name: "${taskPrefix}#bwcTest") {
+            dependsOn = [leaderUpgradedClusterTest]
+        }
+        bwcTest.dependsOn(versionBwcTest)
+    }
+}
+
+unitTest.enabled = false // no unit tests for rolling upgrades, only the rest integration test
+
+// basic integ tests includes testing bwc against the most recent version
+task integTest {
+    if (project.bwc_tests_enabled) {
+        for (final def version : bwcVersions.unreleasedWireCompatible) {
+            dependsOn "v${version}#bwcTest"
+        }
+    }
+}
+check.dependsOn(integTest)
\ No newline at end of file
diff --git a/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/AbstractMultiClusterUpgradeTestCase.java b/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/AbstractMultiClusterUpgradeTestCase.java
new file mode 100644
index 0000000000000..d1420c54bb90d
--- /dev/null
+++ b/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/AbstractMultiClusterUpgradeTestCase.java
@@ -0,0 +1,168 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.upgrades;
+
+import org.apache.http.HttpHost;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.core.internal.io.IOUtils;
+import org.elasticsearch.test.rest.ESRestTestCase;
+import org.junit.AfterClass;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public abstract class AbstractMultiClusterUpgradeTestCase extends ESRestTestCase {
+
+    @Override
+    protected boolean preserveClusterUponCompletion() {
+        return true;
+    }
+
+    enum UpgradeState {
+        NONE,
+        ONE_THIRD,
+        TWO_THIRD,
+        ALL;
+
+        public static UpgradeState parse(String value) {
+            switch (value) {
+                case "none":
+                    return NONE;
+                case "one_third":
+                    return ONE_THIRD;
+                case "two_third":
+                    return TWO_THIRD;
+                case "all":
+                    return ALL;
+                default:
+                    throw new AssertionError("unknown cluster type: " + value);
+            }
+        }
+    }
+
+    protected final UpgradeState upgradeState = UpgradeState.parse(System.getProperty("tests.rest.upgrade_state"));
+
+    enum ClusterName {
+        LEADER,
+        FOLLOWER;
+
+        public static ClusterName parse(String value) {
+            switch (value) {
+                case "leader":
+                    return LEADER;
+                case "follower":
+                    return FOLLOWER;
+                default:
+                    throw new AssertionError("unknown cluster type: " + value);
+            }
+        }
+    }
+
+    protected final ClusterName clusterName = ClusterName.parse(System.getProperty("tests.rest.cluster_name"));
+
+    private static RestClient leaderClient;
+    private static RestClient followerClient;
+    private static boolean initialized = false;
+
+    @Before
+    public void initClientsAndConfigureClusters() throws IOException {
+        String leaderHost = System.getProperty("tests.leader_host");
+        if (leaderHost == null) {
+            throw new AssertionError("leader host is missing");
+        }
+
+        if (initialized) {
+            return;
+        }
+
+        String followerHost = System.getProperty("tests.follower_host");
+        if (clusterName == ClusterName.LEADER) {
+            leaderClient = buildClient(leaderHost);
+            if (followerHost != null) {
+                followerClient = buildClient(followerHost);
+            }
+        } else if (clusterName == ClusterName.FOLLOWER) {
+            if (followerHost == null) {
+                throw new AssertionError("follower host is missing");
+            }
+
+            leaderClient = buildClient(leaderHost);
+            followerClient = buildClient(followerHost);
+        } else {
+            throw new AssertionError("unknown cluster name: " + clusterName);
+        }
+
+        configureLeaderRemoteClusters();
+        configureFollowerRemoteClusters();
+        initialized = true;
+    }
+
+    private void configureLeaderRemoteClusters() throws IOException {
+        String leaderRemoteClusterSeed = System.getProperty("tests.leader_remote_cluster_seed");
+        if (leaderRemoteClusterSeed != null) {
+            logger.info("Configuring leader remote cluster [{}]", leaderRemoteClusterSeed);
+            Request request = new Request("PUT", "/_cluster/settings");
+            request.setJsonEntity("{\"persistent\": {\"cluster.remote.leader.seeds\": \"" + leaderRemoteClusterSeed + "\"}}");
+            assertThat(leaderClient.performRequest(request).getStatusLine().getStatusCode(), equalTo(200));
+            if (followerClient != null) {
+                assertThat(followerClient.performRequest(request).getStatusLine().getStatusCode(), equalTo(200));
+            }
+        } else {
+            logger.info("No leader remote cluster seed found.");
+        }
+    }
+
+    private void configureFollowerRemoteClusters() throws IOException {
+        String followerRemoteClusterSeed = System.getProperty("tests.follower_remote_cluster_seed");
+        if (followerRemoteClusterSeed != null) {
+            logger.info("Configuring follower remote cluster [{}]", followerRemoteClusterSeed);
+            Request request = new Request("PUT", "/_cluster/settings");
+            request.setJsonEntity("{\"persistent\": {\"cluster.remote.follower.seeds\": \"" + followerRemoteClusterSeed + "\"}}");
+            assertThat(leaderClient.performRequest(request).getStatusLine().getStatusCode(), equalTo(200));
+            assertThat(followerClient.performRequest(request).getStatusLine().getStatusCode(), equalTo(200));
+        } else {
+            logger.info("No follower remote cluster seed found.");
+        }
+    }
+
+    @AfterClass
+    public static void destroyClients() throws IOException {
+        try {
+            IOUtils.close(leaderClient, followerClient);
+        } finally {
+            leaderClient = null;
+            followerClient = null;
+        }
+    }
+
+    protected static RestClient leaderClient() {
+        return leaderClient;
+    }
+
+    protected static RestClient followerClient() {
+        return followerClient;
+    }
+
+    private RestClient buildClient(final String url) throws IOException {
+        int portSeparator = url.lastIndexOf(':');
+        HttpHost httpHost = new HttpHost(url.substring(0, portSeparator),
+            Integer.parseInt(url.substring(portSeparator + 1)), getProtocol());
+        return buildClient(restAdminSettings(), new HttpHost[]{httpHost});
+    }
+
+    protected static Map<?, ?> toMap(Response response) throws IOException {
+        return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
+    }
+
+}
diff --git a/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/CcrRollingUpgradeIT.java b/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/CcrRollingUpgradeIT.java
new file mode 100644
index 0000000000000..7f11931bd28f3
--- /dev/null
+++ b/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/CcrRollingUpgradeIT.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.upgrades;
+
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.ResponseException;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.support.XContentMapValues;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+
+public class CcrRollingUpgradeIT extends AbstractMultiClusterUpgradeTestCase {
+
+    public void testIndexFollowing() throws Exception {
+        logger.info("clusterName={}, upgradeState={}", clusterName, upgradeState);
+
+        if (clusterName == ClusterName.LEADER) {
+            switch (upgradeState) {
+                case NONE:
+                    createLeaderIndex(leaderClient(), "leader_index1");
+                    index(leaderClient(), "leader_index1", 64);
+                    createLeaderIndex(leaderClient(), "leader_index2");
+                    index(leaderClient(), "leader_index2", 64);
+                    break;
+                case ONE_THIRD:
+                    break;
+                case TWO_THIRD:
+                    break;
+                case ALL:
+                    createLeaderIndex(leaderClient(), "leader_index4");
+                    followIndex(followerClient(), "leader", "leader_index4", "follower_index4");
+                    index(leaderClient(), "leader_index4", 64);
+                    assertTotalHitCount("follower_index4", 64, followerClient());
+                    break;
+                default:
+                    throw new AssertionError("unexpected upgrade_state [" + upgradeState + "]");
+            }
+        } else if (clusterName == ClusterName.FOLLOWER) {
+            switch (upgradeState) {
+                case NONE:
+                    followIndex(followerClient(), "leader", "leader_index1", "follower_index1");
+                    assertTotalHitCount("follower_index1", 64, followerClient());
+                    break;
+                case ONE_THIRD:
+                    index(leaderClient(), "leader_index1", 64);
+                    assertTotalHitCount("follower_index1", 128, followerClient());
+
+                    followIndex(followerClient(), "leader", "leader_index2", "follower_index2");
+                    assertTotalHitCount("follower_index2", 64, followerClient());
+                    break;
+                case TWO_THIRD:
+                    index(leaderClient(), "leader_index1", 64);
+                    assertTotalHitCount("follower_index1", 192, followerClient());
+
+                    index(leaderClient(), "leader_index2", 64);
+                    assertTotalHitCount("follower_index2", 128, followerClient());
+
+                    createLeaderIndex(leaderClient(), "leader_index3");
+                    index(leaderClient(), "leader_index3", 64);
+                    followIndex(followerClient(), "leader", "leader_index3", "follower_index3");
+                    assertTotalHitCount("follower_index3", 64, followerClient());
+                    break;
+                case ALL:
+                    index(leaderClient(), "leader_index1", 64);
+                    assertTotalHitCount("follower_index1", 256, followerClient());
+
+                    index(leaderClient(), "leader_index2", 64);
+                    assertTotalHitCount("follower_index2", 192, followerClient());
+
+                    index(leaderClient(), "leader_index3", 64);
+                    assertTotalHitCount("follower_index3", 128, followerClient());
+                    break;
+                default:
+                    throw new AssertionError("unexpected upgrade_state [" + upgradeState + "]");
+            }
+        } else {
+            throw new AssertionError("unexpected cluster_name [" + clusterName + "]");
+        }
+    }
+
+    public void testCannotFollowLeaderInUpgradedCluster() throws Exception {
+        assumeTrue("Tests only runs with upgrade_state [all]", upgradeState == UpgradeState.ALL);
+
+        if (clusterName == ClusterName.FOLLOWER) {
+            // At this point the leader cluster has not been upgraded, but follower cluster has been upgrade.
+            // Create a leader index in the follow cluster and try to follow it in the leader cluster.
+            // This should fail, because the leader cluster at this point in time can't do file based recovery from follower.
+            createLeaderIndex(followerClient(), "not_supported");
+            index(followerClient(), "not_supported", 64);
+
+            ResponseException e = expectThrows(ResponseException.class,
+                () -> followIndex(leaderClient(), "follower", "not_supported", "not_supported"));
+            assertThat(e.getMessage(), containsString("the snapshot was created with Elasticsearch version ["));
+            assertThat(e.getMessage(), containsString("] which is higher than the version of this node ["));
+        } else if (clusterName == ClusterName.LEADER) {
+            // At this point all nodes in both clusters have been updated and
+            // the leader cluster can now follow leader_index4 in the follower cluster:
+            followIndex(leaderClient(), "follower", "not_supported", "not_supported");
+            assertTotalHitCount("not_supported", 64, leaderClient());
+        } else {
+            throw new AssertionError("unexpected cluster_name [" + clusterName + "]");
+        }
+    }
+
+    private static void createLeaderIndex(RestClient client, String indexName) throws IOException {
+        Settings indexSettings = Settings.builder()
+            .put("index.soft_deletes.enabled", true)
+            .put("index.number_of_shards", 1)
+            .put("index.number_of_replicas", 0)
+            .build();
+        createIndex(client, indexName, indexSettings);
+    }
+
+    private static void createIndex(RestClient client, String name, Settings settings) throws IOException {
+        Request request = new Request("PUT", "/" + name);
+        request.setJsonEntity("{\n \"settings\": " + Strings.toString(settings) + "}");
+        client.performRequest(request);
+    }
+
+    private static void followIndex(RestClient client, String leaderCluster, String leaderIndex, String followIndex) throws IOException {
+        final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow?wait_for_active_shards=1");
+        request.setJsonEntity("{\"remote_cluster\": \"" + leaderCluster + "\", \"leader_index\": \"" + leaderIndex +
+            "\", \"read_poll_timeout\": \"10ms\"}");
+        assertOK(client.performRequest(request));
+    }
+
+    private static void index(RestClient client, String index, int numDocs) throws IOException {
+        for (int i = 0; i < numDocs; i++) {
+            final Request request = new Request("POST", "/" + index + "/_doc/");
+            request.setJsonEntity("{}");
+            assertOK(client.performRequest(request));
+            if (randomIntBetween(0, 5) == 3) {
+                assertOK(client.performRequest(new Request("POST", "/" + index + "/_refresh")));
+            }
+        }
+    }
+
+    private static void assertTotalHitCount(final String index,
+                                            final int expectedTotalHits,
+                                            final RestClient client) throws Exception {
+        assertOK(client.performRequest(new Request("POST", "/" + index + "/_refresh")));
+        assertBusy(() -> verifyTotalHitCount(index, expectedTotalHits, client));
+    }
+
+    private static void verifyTotalHitCount(final String index,
+                                            final int expectedTotalHits,
+                                            final RestClient client) throws IOException {
+        final Request request = new Request("GET", "/" + index + "/_search");
+        request.addParameter(TOTAL_HITS_AS_INT_PARAM, "true");
+        Map<?, ?> response = toMap(client.performRequest(request));
+        final int totalHits = (int) XContentMapValues.extractValue("hits.total", response);
+        assertThat(totalHits, equalTo(expectedTotalHits));
+    }
+
+}