Skip to content

Commit

Permalink
Using OriginSettingClient for reindex data streams
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Jan 22, 2025
1 parent a620e7c commit 34b656b
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ private static String maybeRewriteSingleAuthenticationHeaderForVersion(
public static final String INFERENCE_ORIGIN = "inference";
public static final String APM_ORIGIN = "apm";
public static final String OTEL_ORIGIN = "otel";
public static final String REINDEX_DATA_STREAM_ORIGIN = "reindex_data_stream";

private ClientHelper() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,24 @@

package org.elasticsearch.xpack.core.security.user;

import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.elasticsearch.action.admin.indices.analyze.TransportReloadAnalyzersAction;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction;
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.downsample.DownsampleAction;
import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.core.security.support.MetadataUtils;
Expand Down Expand Up @@ -180,6 +189,40 @@ public class InternalUsers {
)
);

public static final InternalUser REINDEX_DATA_STREAM_USER = new InternalUser(
UsernamesField.REINDEX_DATA_STREAM_NAME,
new RoleDescriptor(
UsernamesField.REINDEX_DATA_STREAM_ROLE,
new String[] {},
new RoleDescriptor.IndicesPrivileges[] {
RoleDescriptor.IndicesPrivileges.builder()
.indices("*")
.privileges(
TransportDeleteIndexAction.TYPE.name(),
"indices:admin/data_stream/index/reindex",
"indices:admin/index/create_from_source",
TransportAddIndexBlockAction.TYPE.name(),
TransportCreateIndexAction.TYPE.name(),
TransportClusterSearchShardsAction.TYPE.name(),
TransportUpdateSettingsAction.TYPE.name(),
RefreshAction.NAME,
ReindexAction.NAME,
TransportSearchAction.NAME,
TransportBulkAction.NAME,
TransportIndexAction.NAME,
TransportSearchScrollAction.TYPE.name(),
ModifyDataStreamsAction.NAME
)
.allowRestrictedIndices(false)
.build() },
null,
null,
new String[] {},
MetadataUtils.DEFAULT_RESERVED_METADATA,
Map.of()
)
);

/**
* Internal user that can rollover an index/data stream.
*/
Expand Down Expand Up @@ -234,6 +277,7 @@ public class InternalUsers {
ASYNC_SEARCH_USER,
STORAGE_USER,
DATA_STREAM_LIFECYCLE_USER,
REINDEX_DATA_STREAM_USER,
SYNONYMS_USER,
LAZY_ROLLOVER_USER
).collect(Collectors.toUnmodifiableMap(InternalUser::principal, Function.identity()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public final class UsernamesField {
public static final String REMOTE_MONITORING_INDEXING_ROLE = "remote_monitoring_agent";
public static final String LAZY_ROLLOVER_NAME = "_lazy_rollover";
public static final String LAZY_ROLLOVER_ROLE = "_lazy_rollover";
public static final String REINDEX_DATA_STREAM_NAME = "_reindex_data_stream";
public static final String REINDEX_DATA_STREAM_ROLE = "_reindex_data_stream";

private UsernamesField() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAction;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -42,6 +43,7 @@
import java.util.Map;
import java.util.NoSuchElementException;

import static org.elasticsearch.xpack.core.ClientHelper.REINDEX_DATA_STREAM_ORIGIN;
import static org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate.getReindexRequiredPredicate;

public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExecutor<ReindexDataStreamTaskParams> {
Expand All @@ -60,12 +62,14 @@ public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExec
private static final Logger logger = LogManager.getLogger(ReindexDataStreamPersistentTaskExecutor.class);
private static final TimeValue TASK_KEEP_ALIVE_TIME = TimeValue.timeValueDays(1);
private final Client client;
private final OriginSettingClient originSettingClient;
private final ClusterService clusterService;
private final ThreadPool threadPool;

public ReindexDataStreamPersistentTaskExecutor(Client client, ClusterService clusterService, String taskName, ThreadPool threadPool) {
super(taskName, threadPool.generic());
this.client = client;
this.originSettingClient = new OriginSettingClient(client, REINDEX_DATA_STREAM_ORIGIN);
this.clusterService = clusterService;
this.threadPool = threadPool;
}
Expand Down Expand Up @@ -107,15 +111,15 @@ protected void nodeOperation(
request.setParentTask(taskId);
assert task instanceof ReindexDataStreamTask;
final ReindexDataStreamTask reindexDataStreamTask = (ReindexDataStreamTask) task;
ExecuteWithHeadersClient reindexClient = new ExecuteWithHeadersClient(client, params.headers());
reindexClient.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> {
ExecuteWithHeadersClient userClient = new ExecuteWithHeadersClient(client, params.headers());
userClient.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> {
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = response.getDataStreams();
if (dataStreamInfos.size() == 1) {
DataStream dataStream = dataStreamInfos.getFirst().getDataStream();
if (getReindexRequiredPredicate(clusterService.state().metadata()).test(dataStream.getWriteIndex())) {
RolloverRequest rolloverRequest = new RolloverRequest(sourceDataStream, null);
rolloverRequest.setParentTask(taskId);
reindexClient.execute(
userClient.execute(
RolloverAction.INSTANCE,
rolloverRequest,
ActionListener.wrap(
Expand All @@ -125,7 +129,6 @@ protected void nodeOperation(
reindexDataStreamTask,
params,
state,
reindexClient,
sourceDataStream,
taskId
),
Expand All @@ -139,7 +142,6 @@ protected void nodeOperation(
reindexDataStreamTask,
params,
state,
reindexClient,
sourceDataStream,
taskId
);
Expand All @@ -156,7 +158,6 @@ private void reindexIndices(
ReindexDataStreamTask reindexDataStreamTask,
ReindexDataStreamTaskParams params,
ReindexDataStreamPersistentTaskState state,
ExecuteWithHeadersClient reindexClient,
String sourceDataStream,
TaskId parentTaskId
) {
Expand Down Expand Up @@ -188,7 +189,7 @@ private void reindexIndices(
List<Index> indicesRemaining = Collections.synchronizedList(new ArrayList<>(indicesToBeReindexed));
logger.debug("Reindexing {} indices, with up to {} handled concurrently", indicesRemaining.size(), maxConcurrentIndices);
for (int i = 0; i < maxConcurrentIndices; i++) {
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId);
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, sourceDataStream, listener, parentTaskId);
}
// This takes care of the additional latch count referenced above:
listener.onResponse(null);
Expand All @@ -197,7 +198,6 @@ private void reindexIndices(
private void maybeProcessNextIndex(
List<Index> indicesRemaining,
ReindexDataStreamTask reindexDataStreamTask,
ExecuteWithHeadersClient reindexClient,
String sourceDataStream,
CountDownActionListener listener,
TaskId parentTaskId
Expand All @@ -216,16 +216,16 @@ private void maybeProcessNextIndex(
reindexDataStreamIndexRequest.setParentTask(parentTaskId);

SubscribableListener.<ReindexDataStreamIndexAction.Response>newForked(
l -> reindexClient.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, l)
l -> originSettingClient.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, l)
)
.<AcknowledgedResponse>andThen(
(l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, reindexClient, parentTaskId)
(l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, parentTaskId)
)
.<AcknowledgedResponse>andThen(l -> deleteIndex(index.getName(), reindexClient, parentTaskId, l))
.<AcknowledgedResponse>andThen(l -> deleteIndex(index.getName(), parentTaskId, l))
.addListener(ActionListener.wrap(unused -> {
reindexDataStreamTask.reindexSucceeded(index.getName());
listener.onResponse(null);
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId);
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, sourceDataStream, listener, parentTaskId);
}, e -> {
reindexDataStreamTask.reindexFailed(index.getName(), e);
listener.onResponse(null);
Expand All @@ -237,7 +237,6 @@ private void updateDataStream(
String oldIndex,
String newIndex,
ActionListener<AcknowledgedResponse> listener,
ExecuteWithHeadersClient reindexClient,
TaskId parentTaskId
) {
ModifyDataStreamsAction.Request modifyDataStreamRequest = new ModifyDataStreamsAction.Request(
Expand All @@ -246,18 +245,13 @@ private void updateDataStream(
List.of(DataStreamAction.removeBackingIndex(dataStream, oldIndex), DataStreamAction.addBackingIndex(dataStream, newIndex))
);
modifyDataStreamRequest.setParentTask(parentTaskId);
reindexClient.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener);
originSettingClient.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener);
}

private void deleteIndex(
String indexName,
ExecuteWithHeadersClient reindexClient,
TaskId parentTaskId,
ActionListener<AcknowledgedResponse> listener
) {
private void deleteIndex(String indexName, TaskId parentTaskId, ActionListener<AcknowledgedResponse> listener) {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
deleteIndexRequest.setParentTask(parentTaskId);
reindexClient.execute(TransportDeleteIndexAction.TYPE, deleteIndexRequest, listener);
originSettingClient.execute(TransportDeleteIndexAction.TYPE, deleteIndexRequest, listener);
}

private void completeSuccessfulPersistentTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.OTEL_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.PROFILING_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.REINDEX_DATA_STREAM_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.ROLLUP_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
Expand Down Expand Up @@ -136,6 +137,9 @@ public static void switchUserBasedOnActionOriginAndExecute(
case DATA_STREAM_LIFECYCLE_ORIGIN:
securityContext.executeAsInternalUser(InternalUsers.DATA_STREAM_LIFECYCLE_USER, version, consumer);
break;
case REINDEX_DATA_STREAM_ORIGIN:
securityContext.executeAsInternalUser(InternalUsers.REINDEX_DATA_STREAM_USER, version, consumer);
break;
case LAZY_ROLLOVER_ORIGIN:
securityContext.executeAsInternalUser(InternalUsers.LAZY_ROLLOVER_USER, version, consumer);
break;
Expand Down

0 comments on commit 34b656b

Please sign in to comment.