Skip to content

Commit

Permalink
Merge branch 'main' into feature/synthtic-source-recovery-default
Browse files Browse the repository at this point in the history
  • Loading branch information
salvatore-campagna authored Jan 10, 2025
2 parents 4934292 + 6ca7e75 commit b5ddc28
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 19 deletions.
77 changes: 60 additions & 17 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@
import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.index.shard.IndexShard.PrimaryPermitCheck.CHECK_PRIMARY_MODE;

public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard {

Expand Down Expand Up @@ -3568,58 +3569,100 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
);
}

/**
* Check to run before running the primary permit operation
*/
public enum PrimaryPermitCheck {
CHECK_PRIMARY_MODE,
/**
* IMPORTANT: Currently intented to be used only for acquiring primary permits during the recovery of hollow shards.
* Don't disable primary mode checks unless you're really sure.
*/
NONE
}

/**
* Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
* ActionListener will then be called using the provided executor.
*
*/
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, Executor executorOnDelay) {
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false);
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false, CHECK_PRIMARY_MODE);
}

public void acquirePrimaryOperationPermit(
ActionListener<Releasable> onPermitAcquired,
Executor executorOnDelay,
boolean forceExecution
) {
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, forceExecution, CHECK_PRIMARY_MODE);
}

public void acquirePrimaryOperationPermit(
ActionListener<Releasable> onPermitAcquired,
Executor executorOnDelay,
boolean forceExecution,
PrimaryPermitCheck primaryPermitCheck
) {
verifyNotClosed();
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;
indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, forceExecution);
indexShardOperationPermits.acquire(
wrapPrimaryOperationPermitListener(primaryPermitCheck, onPermitAcquired),
executorOnDelay,
forceExecution
);
}

public boolean isPrimaryMode() {
assert indexShardOperationPermits.getActiveOperationsCount() != 0 : "must hold permit to check primary mode";
return replicationTracker.isPrimaryMode();
}

public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> onPermitAcquired, final TimeValue timeout) {
acquireAllPrimaryOperationsPermits(onPermitAcquired, timeout, CHECK_PRIMARY_MODE);
}

/**
* Acquire all primary operation permits. Once all permits are acquired, the provided ActionListener is called.
* It is the responsibility of the caller to close the {@link Releasable}.
*/
public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> onPermitAcquired, final TimeValue timeout) {
public void acquireAllPrimaryOperationsPermits(
final ActionListener<Releasable> onPermitAcquired,
final TimeValue timeout,
final PrimaryPermitCheck primaryPermitCheck
) {
verifyNotClosed();
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;

asyncBlockOperations(wrapPrimaryOperationPermitListener(onPermitAcquired), timeout.duration(), timeout.timeUnit());
asyncBlockOperations(
wrapPrimaryOperationPermitListener(primaryPermitCheck, onPermitAcquired),
timeout.duration(),
timeout.timeUnit()
);
}

/**
* Wraps the action to run on a primary after acquiring permit. This wrapping is used to check if the shard is in primary mode before
* executing the action.
* Wraps the action to run on a primary after acquiring permit.
*
* @param primaryPermitCheck check to run before the primary mode operation
* @param listener the listener to wrap
* @return the wrapped listener
*/
private ActionListener<Releasable> wrapPrimaryOperationPermitListener(final ActionListener<Releasable> listener) {
return listener.delegateFailure((l, r) -> {
if (isPrimaryMode()) {
l.onResponse(r);
} else {
r.close();
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
}
});
private ActionListener<Releasable> wrapPrimaryOperationPermitListener(
final PrimaryPermitCheck primaryPermitCheck,
final ActionListener<Releasable> listener
) {
return switch (primaryPermitCheck) {
case CHECK_PRIMARY_MODE -> listener.delegateFailure((l, r) -> {
if (isPrimaryMode()) {
l.onResponse(r);
} else {
r.close();
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
}
});
case NONE -> listener;
};
}

private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) {
Expand Down Expand Up @@ -3657,7 +3700,7 @@ public void runUnderPrimaryPermit(final Runnable runnable, final Consumer<Except
runnable.run();
}
}, onFailure);
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay);
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false, CHECK_PRIMARY_MODE);
}

private <E extends Exception> void bumpPrimaryTerm(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,21 @@ public void onFailure(final Exception e) {
}
}, TimeValue.timeValueSeconds(30));
latch.await();

// It's possible to acquire permits if we skip the primary mode check
var permitAcquiredLatch = new CountDownLatch(1);
indexShard.acquirePrimaryOperationPermit(ActionListener.wrap(r -> {
r.close();
permitAcquiredLatch.countDown();
}, Assert::assertNotNull), EsExecutors.DIRECT_EXECUTOR_SERVICE, false, IndexShard.PrimaryPermitCheck.NONE);
safeAwait(permitAcquiredLatch);

var allPermitsAcquiredLatch = new CountDownLatch(1);
indexShard.acquireAllPrimaryOperationsPermits(ActionListener.wrap(r -> {
r.close();
allPermitsAcquiredLatch.countDown();
}, Assert::assertNotNull), TimeValue.timeValueSeconds(30), IndexShard.PrimaryPermitCheck.NONE);
safeAwait(allPermitsAcquiredLatch);
}

if (Assertions.ENABLED && indexShard.routingEntry().isRelocationTarget() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.common.Strings.EMPTY_ARRAY;
import static org.elasticsearch.transport.RemoteClusterPortSettings.TRANSPORT_VERSION_ADVANCED_REMOTE_CLUSTER_SECURITY;
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
Expand Down Expand Up @@ -170,21 +171,49 @@ public Authentication(StreamInput in) throws IOException {
type = AuthenticationType.REALM;
metadata = Map.of();
}

if (innerUser != null) {
authenticatingSubject = new Subject(innerUser, authenticatedBy, version, metadata);
authenticatingSubject = new Subject(
copyUserWithRolesRemovedForLegacyApiKeys(version, innerUser),
authenticatedBy,
version,
metadata
);
// The lookup user for run-as currently doesn't have authentication metadata associated with them because
// lookupUser only returns the User object. The lookup user for authorization delegation does have
// authentication metadata, but the realm does not expose this difference between authenticatingUser and
// delegateUser so effectively this is handled together with the authenticatingSubject not effectiveSubject.
// Note: we do not call copyUserWithRolesRemovedForLegacyApiKeys here because an API key is never the target of run-as
effectiveSubject = new Subject(outerUser, lookedUpBy, version, Map.of());
} else {
authenticatingSubject = effectiveSubject = new Subject(outerUser, authenticatedBy, version, metadata);
authenticatingSubject = effectiveSubject = new Subject(
copyUserWithRolesRemovedForLegacyApiKeys(version, outerUser),
authenticatedBy,
version,
metadata
);
}

if (Assertions.ENABLED) {
checkConsistency();
}
}

private User copyUserWithRolesRemovedForLegacyApiKeys(TransportVersion version, User user) {
// API keys prior to 7.8 had synthetic role names. Strip these out to maintain the invariant that API keys don't have role names
if (type == AuthenticationType.API_KEY && version.onOrBefore(TransportVersions.V_7_8_0) && user.roles().length > 0) {
logger.debug(
"Stripping [{}] roles from API key user [{}] for legacy version [{}]",
user.roles().length,
user.principal(),
version
);
return new User(user.principal(), EMPTY_ARRAY, user.fullName(), user.email(), user.metadata(), user.enabled());
} else {
return user;
}
}

/**
* Get the {@link Subject} that performs the actual authentication. This normally means it provides a credentials.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,21 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.TransportVersionUtils;
import org.elasticsearch.transport.RemoteClusterPortSettings;
import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer;
import org.elasticsearch.xpack.core.security.user.ElasticUser;
import org.elasticsearch.xpack.core.security.user.InternalUsers;
import org.elasticsearch.xpack.core.security.user.KibanaSystemUser;
import org.elasticsearch.xpack.core.security.user.KibanaUser;
import org.elasticsearch.xpack.core.security.user.User;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;

import static org.elasticsearch.xpack.core.security.authc.Authentication.AuthenticationSerializationHelper;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -171,4 +176,47 @@ public void testReservedUserSerialization() throws Exception {

assertEquals(kibanaSystemUser, readFrom);
}

public void testRolesRemovedFromUserForLegacyApiKeys() throws IOException {
TransportVersion transportVersion = TransportVersionUtils.randomVersionBetween(
random(),
TransportVersions.V_7_0_0,
TransportVersions.V_7_8_0
);
Subject authenticatingSubject = new Subject(
new User("foo", "role"),
new Authentication.RealmRef(AuthenticationField.API_KEY_REALM_NAME, AuthenticationField.API_KEY_REALM_TYPE, "node"),
transportVersion,
Map.of(AuthenticationField.API_KEY_ID_KEY, "abc")
);
Subject effectiveSubject = new Subject(
new User("bar", "role"),
new Authentication.RealmRef("native", "native", "node"),
transportVersion,
Map.of()
);

{
Authentication actual = AuthenticationContextSerializer.decode(
Authentication.doEncode(authenticatingSubject, authenticatingSubject, Authentication.AuthenticationType.API_KEY)
);
assertThat(actual.getAuthenticatingSubject().getUser().roles(), is(emptyArray()));
}

{
Authentication actual = AuthenticationContextSerializer.decode(
Authentication.doEncode(effectiveSubject, authenticatingSubject, Authentication.AuthenticationType.API_KEY)
);
assertThat(actual.getAuthenticatingSubject().getUser().roles(), is(emptyArray()));
assertThat(actual.getEffectiveSubject().getUser().roles(), is(arrayContaining("role")));
}

{
// do not strip roles for authentication methods other than API key
Authentication actual = AuthenticationContextSerializer.decode(
Authentication.doEncode(effectiveSubject, effectiveSubject, Authentication.AuthenticationType.REALM)
);
assertThat(actual.getAuthenticatingSubject().getUser().roles(), is(arrayContaining("role")));
}
}
}

0 comments on commit b5ddc28

Please sign in to comment.