diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index a4a036232d49b..e71cc10760f8b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.ListenerTimeouts; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -176,32 +177,30 @@ public void getSnapshotInfo(GetSnapshotInfoContext context) { final List snapshotIds = context.snapshotIds(); assert snapshotIds.size() == 1 && SNAPSHOT_ID.equals(snapshotIds.iterator().next()) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId but saw " + snapshotIds; - Client remoteClient = getRemoteClusterClient(); - ClusterStateResponse response = remoteClient.admin() - .cluster() - .prepareState() - .clear() - .setMetadata(true) - .setNodes(true) - .get(ccrSettings.getRecoveryActionTimeout()); - Metadata responseMetadata = response.getState().metadata(); - Map indicesMap = responseMetadata.indices(); - List indices = new ArrayList<>(indicesMap.keySet()); - - // fork to the snapshot meta pool because the context expects to run on it and asserts that it does - threadPool.executor(ThreadPool.Names.SNAPSHOT_META) - .execute( - () -> context.onResponse( - new SnapshotInfo( + try { + getRemoteClusterClient().admin() + .cluster() + .prepareState() + .clear() + .setMetadata(true) + .setNodes(true) + // fork to the snapshot meta pool because the context expects to run on it and asserts that it does + .execute(new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SNAPSHOT_META, context.map(response -> { + Metadata responseMetadata = response.getState().metadata(); + Map indicesMap = responseMetadata.indices(); + List indices = new ArrayList<>(indicesMap.keySet()); + return new SnapshotInfo( new Snapshot(this.metadata.name(), SNAPSHOT_ID), indices, new ArrayList<>(responseMetadata.dataStreams().keySet()), Collections.emptyList(), response.getState().getNodes().getMaxNodeVersion(), SnapshotState.SUCCESS - ) - ) - ); + ); + }), false)); + } catch (Exception e) { + context.onFailure(e); + } } @Override @@ -259,44 +258,42 @@ public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, Sna @Override public void getRepositoryData(ActionListener listener) { - ActionListener.completeWith(listener, () -> { - Client remoteClient = getRemoteClusterClient(); - ClusterStateResponse response = remoteClient.admin() - .cluster() - .prepareState() - .clear() - .setMetadata(true) - .get(ccrSettings.getRecoveryActionTimeout()); - Metadata remoteMetadata = response.getState().getMetadata(); - - Map copiedSnapshotIds = new HashMap<>(); - Map snapshotsDetails = Maps.newMapWithExpectedSize(copiedSnapshotIds.size()); - Map> indexSnapshots = Maps.newMapWithExpectedSize(copiedSnapshotIds.size()); - - Map remoteIndices = remoteMetadata.getIndices(); - for (String indexName : remoteMetadata.getConcreteAllIndices()) { - // Both the Snapshot name and UUID are set to _latest_ - SnapshotId snapshotId = new SnapshotId(LATEST, LATEST); - copiedSnapshotIds.put(indexName, snapshotId); - final long nowMillis = threadPool.absoluteTimeInMillis(); - snapshotsDetails.put( - indexName, - new RepositoryData.SnapshotDetails(SnapshotState.SUCCESS, Version.CURRENT, nowMillis, nowMillis, "") + try { + getRemoteClusterClient().admin().cluster().prepareState().clear().setMetadata(true).execute(listener.map(response -> { + final Metadata remoteMetadata = response.getState().getMetadata(); + final String[] concreteAllIndices = remoteMetadata.getConcreteAllIndices(); + final Map copiedSnapshotIds = Maps.newMapWithExpectedSize(concreteAllIndices.length); + final Map snapshotsDetails = Maps.newMapWithExpectedSize(concreteAllIndices.length); + final Map> indexSnapshots = Maps.newMapWithExpectedSize(concreteAllIndices.length); + final Map remoteIndices = remoteMetadata.getIndices(); + for (String indexName : concreteAllIndices) { + // Both the Snapshot name and UUID are set to _latest_ + final SnapshotId snapshotId = new SnapshotId(LATEST, LATEST); + copiedSnapshotIds.put(indexName, snapshotId); + final long nowMillis = threadPool.absoluteTimeInMillis(); + snapshotsDetails.put( + indexName, + new RepositoryData.SnapshotDetails(SnapshotState.SUCCESS, Version.CURRENT, nowMillis, nowMillis, "") + ); + indexSnapshots.put( + new IndexId(indexName, remoteIndices.get(indexName).getIndex().getUUID()), + Collections.singletonList(snapshotId) + ); + } + return new RepositoryData( + MISSING_UUID, + 1, + copiedSnapshotIds, + snapshotsDetails, + indexSnapshots, + ShardGenerations.EMPTY, + IndexMetaDataGenerations.EMPTY, + MISSING_UUID ); - Index index = remoteIndices.get(indexName).getIndex(); - indexSnapshots.put(new IndexId(indexName, index.getUUID()), Collections.singletonList(snapshotId)); - } - return new RepositoryData( - MISSING_UUID, - 1, - copiedSnapshotIds, - snapshotsDetails, - indexSnapshots, - ShardGenerations.EMPTY, - IndexMetaDataGenerations.EMPTY, - MISSING_UUID - ); - }); + })); + } catch (Exception e) { + listener.onFailure(e); + } } @Override