Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduced Global checkpoints for Sequence Numbers #15485

Merged
merged 15 commits into from
Jun 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
*/
public class ActionListenerResponseHandler<Response extends TransportResponse> extends BaseTransportResponseHandler<Response> {

private final ActionListener<Response> listener;
private final ActionListener<? super Response> listener;
private final Supplier<Response> responseSupplier;

public ActionListenerResponseHandler(ActionListener<Response> listener, Supplier<Response> responseSupplier) {
public ActionListenerResponseHandler(ActionListener<? super Response> listener, Supplier<Response> responseSupplier) {
this.listener = Objects.requireNonNull(listener);
this.responseSupplier = Objects.requireNonNull(responseSupplier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.TransportResponse;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -85,7 +84,8 @@ public class ReplicationOperation<Request extends ReplicationRequest<Request>, R

void execute() throws Exception {
final String writeConsistencyFailure = checkWriteConsistency ? checkWriteConsistency() : null;
final ShardId shardId = primary.routingEntry().shardId();
final ShardRouting primaryRouting = primary.routingEntry();
final ShardId shardId = primaryRouting.shardId();
if (writeConsistencyFailure != null) {
finishAsFailed(new UnavailableShardsException(shardId,
"{} Timeout: [{}], request: [{}]", writeConsistencyFailure, request.timeout(), request));
Expand All @@ -96,6 +96,7 @@ void execute() throws Exception {
pendingShards.incrementAndGet(); // increase by 1 until we finish all primary coordination
Tuple<Response, ReplicaRequest> primaryResponse = primary.perform(request);
successfulShards.incrementAndGet(); // mark primary as successful
primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());
finalResponse = primaryResponse.v1();
ReplicaRequest replicaRequest = primaryResponse.v2();
assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term";
Expand All @@ -107,7 +108,7 @@ void execute() throws Exception {
// to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then.
// If the index gets deleted after primary operation, we skip replication
List<ShardRouting> shards = getShards(shardId, clusterStateSupplier.get());
final String localNodeId = primary.routingEntry().currentNodeId();
final String localNodeId = primaryRouting.currentNodeId();
for (final ShardRouting shard : shards) {
if (executeOnReplicas == false || shard.unassigned()) {
if (shard.primary() == false) {
Expand Down Expand Up @@ -136,10 +137,11 @@ private void performOnReplica(final ShardRouting shard, final ReplicaRequest rep

totalShards.incrementAndGet();
pendingShards.incrementAndGet();
replicasProxy.performOn(shard, replicaRequest, new ActionListener<TransportResponse.Empty>() {
replicasProxy.performOn(shard, replicaRequest, new ActionListener<ReplicaResponse>() {
@Override
public void onResponse(TransportResponse.Empty empty) {
public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
primary.updateLocalCheckpointForShard(response.allocationId(), response.localCheckpoint());
decPendingAndFinishIfNeeded();
}

Expand Down Expand Up @@ -301,18 +303,30 @@ interface Primary<Request extends ReplicationRequest<Request>, ReplicaRequest ex
*/
Tuple<Response, ReplicaRequest> perform(Request request) throws Exception;


/**
* Notifies the primary of a local checkpoint for the given allocation.
*
* Note: The primary will use this information to advance the global checkpoint if possible.
*
* @param allocationId allocation ID of the shard corresponding to the supplied local checkpoint
* @param checkpoint the *local* checkpoint for the shard
*/
void updateLocalCheckpointForShard(String allocationId, long checkpoint);

/** returns the local checkpoint of the primary shard */
long localCheckpoint();
}

interface Replicas<ReplicaRequest extends ReplicationRequest<ReplicaRequest>> {

/**
* performs the the given request on the specified replica
*
* @param replica {@link ShardRouting} of the shard this request should be executed on
* @param replicaRequest operation to peform
* @param listener a callback to call once the operation has been complicated, either successfully or with an error.
*/
void performOn(ShardRouting replica, ReplicaRequest replicaRequest, ActionListener<TransportResponse.Empty> listener);
void performOn(ShardRouting replica, ReplicaRequest replicaRequest, ActionListener<ReplicaResponse> listener);

/**
* Fail the specified shard, removing it from the current set of active shards
Expand All @@ -331,6 +345,18 @@ void failShard(ShardRouting replica, ShardRouting primary, String message, Throw
Consumer<Throwable> onPrimaryDemoted, Consumer<Throwable> onIgnoredFailure);
}

/**
* An interface to encapsulate the metadata needed from replica shards when they respond to operations performed on them
*/
interface ReplicaResponse {

/** the local check point for the shard. see {@link org.elasticsearch.index.seqno.SequenceNumbersService#getLocalCheckpoint()} */
long localCheckpoint();

/** the allocation id of the replica shard */
String allocationId();
}

public static class RetryOnPrimaryException extends ElasticsearchException {
public RetryOnPrimaryException(ShardId shardId, String msg) {
this(shardId, msg, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ReplicationResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
Expand All @@ -43,6 +44,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -425,16 +427,18 @@ protected void responseWithFailure(Throwable t) {
@Override
protected void doRun() throws Exception {
setPhase(task, "replica");
final ReplicaResponse response;
assert request.shardId() != null : "request shardId must be set";
try (Releasable ignored = acquireReplicaOperationLock(request.shardId(), request.primaryTerm())) {
try (ShardReference replica = getReplicaShardReference(request.shardId(), request.primaryTerm())) {
shardOperationOnReplica(request);
response = new ReplicaResponse(replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint());
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, request.shardId(),
request);
}
}
setPhase(task, "finished");
channel.sendResponse(TransportResponse.Empty.INSTANCE);
channel.sendResponse(response);
}
}

Expand Down Expand Up @@ -705,13 +709,13 @@ protected PrimaryShardReference getPrimaryShardReference(ShardId shardId) {
}

/**
* Acquire an operation on replicas. The lock is closed as soon as
* Get a reference to a replica shard. The reference is released as soon as
* replication is completed on the node.
*/
protected Releasable acquireReplicaOperationLock(ShardId shardId, long primaryTerm) {
protected ShardReference getReplicaShardReference(ShardId shardId, long primaryTerm) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
return indexShard.acquireReplicaOperationLock(primaryTerm);
return new ShardReference(indexShard, indexShard.acquireReplicaOperationLock(primaryTerm));
}

/**
Expand All @@ -722,12 +726,12 @@ protected boolean shouldExecuteReplication(Settings settings) {
return IndexMetaData.isIndexUsingShadowReplicas(settings) == false;
}

class PrimaryShardReference implements ReplicationOperation.Primary<Request, ReplicaRequest, Response>, Releasable {
class ShardReference implements Releasable {

private final IndexShard indexShard;
protected final IndexShard indexShard;
private final Releasable operationLock;

PrimaryShardReference(IndexShard indexShard, Releasable operationLock) {
ShardReference(IndexShard indexShard, Releasable operationLock) {
this.indexShard = indexShard;
this.operationLock = operationLock;
}
Expand All @@ -737,6 +741,22 @@ public void close() {
operationLock.close();
}

public long getLocalCheckpoint() {
return indexShard.getLocalCheckpoint();
}

public ShardRouting routingEntry() {
return indexShard.routingEntry();
}

}

class PrimaryShardReference extends ShardReference implements ReplicationOperation.Primary<Request, ReplicaRequest, Response> {

PrimaryShardReference(IndexShard indexShard, Releasable operationLock) {
super(indexShard, operationLock);
}

public boolean isRelocated() {
return indexShard.state() == IndexShardState.RELOCATED;
}
Expand All @@ -758,23 +778,67 @@ public Tuple<Response, ReplicaRequest> perform(Request request) throws Exception
}

@Override
public ShardRouting routingEntry() {
return indexShard.routingEntry();
public void updateLocalCheckpointForShard(String allocationId, long checkpoint) {
indexShard.updateLocalCheckpointForShard(allocationId, checkpoint);
}

@Override
public long localCheckpoint() {
return indexShard.getLocalCheckpoint();
}
}


static class ReplicaResponse extends ActionResponse implements ReplicationOperation.ReplicaResponse {
private long localCheckpoint;
private String allocationId;

ReplicaResponse() {

}

ReplicaResponse(String allocationId, long localCheckpoint) {
this.allocationId = allocationId;
this.localCheckpoint = localCheckpoint;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
localCheckpoint = in.readZLong();
allocationId = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeZLong(localCheckpoint);
out.writeString(allocationId);
}

@Override
public long localCheckpoint() {
return localCheckpoint;
}

@Override
public String allocationId() {
return allocationId;
}
}

final class ReplicasProxy implements ReplicationOperation.Replicas<ReplicaRequest> {

@Override
public void performOn(ShardRouting replica, ReplicaRequest request, ActionListener<TransportResponse.Empty> listener) {
public void performOn(ShardRouting replica, ReplicaRequest request, ActionListener<ReplicationOperation.ReplicaResponse> listener) {
String nodeId = replica.currentNodeId();
final DiscoveryNode node = clusterService.state().nodes().get(nodeId);
if (node == null) {
listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]"));
return;
}
transportService.sendRequest(node, transportReplicaAction, request, transportOptions,
new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE));
new ActionListenerResponseHandler<>(listener, ReplicaResponse::new));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import org.elasticsearch.indices.IndexCreationException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.script.ScriptService;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

Expand Down Expand Up @@ -299,7 +298,8 @@ public ClusterState execute(ClusterState currentState) throws Exception {
// Set up everything, now locally create the index to see that things are ok, and apply
final IndexMetaData tmpImd = IndexMetaData.builder(request.index()).settings(actualIndexSettings).build();
// create the index here (on the master) to validate it can be created, as well as adding the mapping
final IndexService indexService = indicesService.createIndex(nodeServicesProvider, tmpImd, Collections.emptyList());
final IndexService indexService = indicesService.createIndex(nodeServicesProvider, tmpImd,
Collections.emptyList(), shardId -> {});
createdIndex = indexService.index();
// now add the mappings
MapperService mapperService = indexService.mapperService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ public ClusterState execute(final ClusterState currentState) {
if (indexService == null) {
// temporarily create the index and add mappings so we can parse the filter
try {
indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.emptyList());
indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData,
Collections.emptyList(), shardId -> {});
for (ObjectCursor<MappingMetaData> cursor : indexMetaData.getMappings().values()) {
MappingMetaData mappingMetaData = cursor.value;
indexService.mapperService().merge(mappingMetaData.type(), mappingMetaData.source(), MapperService.MergeReason.MAPPING_RECOVERY, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.percolator.PercolatorFieldMapper;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidTypeNameException;

Expand Down Expand Up @@ -140,7 +139,7 @@ ClusterState executeRefresh(final ClusterState currentState, final List<RefreshT
IndexService indexService = indicesService.indexService(indexMetaData.getIndex());
if (indexService == null) {
// we need to create the index here, and add the current mapping to it, so we can merge
indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.emptyList());
indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.emptyList(), shardId -> {});
removeIndex = true;
for (ObjectCursor<MappingMetaData> metaData : indexMetaData.getMappings().values()) {
// don't apply the default mapping, it has been applied when the mapping was created
Expand Down Expand Up @@ -224,7 +223,7 @@ public BatchResult<PutMappingClusterStateUpdateRequest> execute(ClusterState cur
// close it later once we are done with mapping update
indicesToClose.add(indexMetaData.getIndex());
IndexService indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData,
Collections.emptyList());
Collections.emptyList(), shardId -> {});
// add mappings for all types, we need them for cross-type validation
for (ObjectCursor<MappingMetaData> mapping : indexMetaData.getMappings().values()) {
indexService.mapperService().merge(mapping.value.type(), mapping.value.source(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,15 @@ public List<ShardRouting> activeShards() {
return this.activeShards;
}

/**
* Returns a {@link List} of all initializing shards, including target shards of relocations
*
* @return a {@link List} of shards
*/
public List<ShardRouting> getAllInitializingShards() {
return this.allInitializingShards;
}

/**
* Returns a {@link List} of active shards
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.percolator.PercolatorFieldMapper;
import org.elasticsearch.index.seqno.LocalCheckpointService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.index.store.IndexStore;
Expand Down Expand Up @@ -116,6 +117,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.QUERY_STRING_LENIENT_SETTING,
IndexSettings.ALLOW_UNMAPPED,
IndexSettings.INDEX_CHECK_ON_STARTUP,
IndexSettings.INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL,
LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE,
ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING,
IndexSettings.INDEX_GC_DELETES_SETTING,
IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING,
Expand Down
Loading