Skip to content

Commit

Permalink
CCR: Route requests to primary for remote store enabled leader clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitkala committed Oct 18, 2023
1 parent 0bd703b commit f60582f
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 15 deletions.
15 changes: 12 additions & 3 deletions src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,14 @@ import java.util.Optional
import java.util.function.Supplier

import org.opensearch.index.engine.NRTReplicationEngine
import org.opensearch.replication.util.ValidationUtil


@OpenForTesting
internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, RepositoryPlugin, EnginePlugin {

private lateinit var client: Client
private lateinit var clusterService: ClusterService
private lateinit var threadPool: ThreadPool
private lateinit var replicationMetadataManager: ReplicationMetadataManager
private lateinit var replicationSettings: ReplicationSettings
Expand Down Expand Up @@ -207,6 +209,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
repositoriesService: Supplier<RepositoriesService>): Collection<Any> {
this.client = client
this.threadPool = threadPool
this.clusterService = clusterService
this.replicationMetadataManager = ReplicationMetadataManager(clusterService, client,
ReplicationMetadataStore(client, clusterService, xContentRegistry))
this.replicationSettings = ReplicationSettings(clusterService)
Expand Down Expand Up @@ -379,9 +382,15 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
}

override fun getCustomTranslogDeletionPolicyFactory(): Optional<TranslogDeletionPolicyFactory> {
return Optional.of(TranslogDeletionPolicyFactory{
indexSettings, retentionLeasesSupplier -> ReplicationTranslogDeletionPolicy(indexSettings, retentionLeasesSupplier)
})
// We don't need a retention lease translog deletion policy for remote store enabled clusters as
// we fetch the operations directly from lucene in such cases.
return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService) == false) {
Optional.of(TranslogDeletionPolicyFactory { indexSettings, retentionLeasesSupplier ->
ReplicationTranslogDeletionPolicy(indexSettings, retentionLeasesSupplier)
})
} else {
Optional.empty()

Check warning on line 392 in src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt#L391-L392

Added lines #L391 - L392 were not covered by tests
}
}

override fun onIndexModule(indexModule: IndexModule) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,15 @@ import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.common.unit.TimeValue
import org.opensearch.core.index.shard.ShardId
import org.opensearch.index.shard.IndexShard
import org.opensearch.index.translog.Translog
import org.opensearch.indices.IndicesService
import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING
import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATION_EXECUTOR_NAME_LEADER
import org.opensearch.replication.seqno.RemoteClusterStats
import org.opensearch.replication.seqno.RemoteClusterTranslogService
import org.opensearch.replication.seqno.RemoteShardMetric
import org.opensearch.replication.util.completeWith
import org.opensearch.replication.util.coroutineContext
import org.opensearch.replication.util.stackTraceToString
import org.opensearch.replication.util.waitForGlobalCheckpoint
import org.opensearch.replication.util.*
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportActionProxy
import org.opensearch.transport.TransportService
Expand Down Expand Up @@ -79,7 +77,8 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
indexMetric.lastFetchTime.set(relativeStartNanos)

val indexShard = indicesService.indexServiceSafe(shardId.index).getShard(shardId.id)
if (indexShard.lastSyncedGlobalCheckpoint < request.fromSeqNo) {
val isRemoteStoreEnabled = ValidationUtil.isRemoteStoreEnabledCluster(clusterService)
if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) {
// There are no new operations to sync. Do a long poll and wait for GlobalCheckpoint to advance. If
// the checkpoint doesn't advance by the timeout this throws an ESTimeoutException which the caller
// should catch and start a new poll.
Expand All @@ -88,18 +87,18 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
// At this point indexShard.lastKnownGlobalCheckpoint has advanced but it may not yet have been synced
// to the translog, which means we can't return those changes. Return to the caller to retry.
// TODO: Figure out a better way to wait for the global checkpoint to be synced to the translog
if (indexShard.lastSyncedGlobalCheckpoint < request.fromSeqNo) {
assert(gcp > indexShard.lastSyncedGlobalCheckpoint) { "Checkpoint didn't advance at all" }
if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) {
assert(gcp > lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)) { "Checkpoint didn't advance at all $gcp ${lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)}" }
throw OpenSearchTimeoutException("global checkpoint not synced. Retry after a few miliseconds...")
}
}

relativeStartNanos = System.nanoTime()
// At this point lastSyncedGlobalCheckpoint is at least fromSeqNo
val toSeqNo = min(indexShard.lastSyncedGlobalCheckpoint, request.toSeqNo)
val toSeqNo = min(lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled), request.toSeqNo)

var ops: List<Translog.Operation> = listOf()
var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId)
var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) && isRemoteStoreEnabled == false
if(fetchFromTranslog) {
try {
ops = translogService.getHistoryOfOperations(indexShard, request.fromSeqNo, toSeqNo)
Expand Down Expand Up @@ -137,12 +136,22 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
indexMetric.ops.addAndGet(ops.size.toLong())

ops.stream().forEach{op -> indexMetric.bytesRead.addAndGet(op.estimateSize()) }

GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, indexShard.lastSyncedGlobalCheckpoint)
GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled))
}
}
}

private fun lastGlobalCheckpoint(indexShard: IndexShard, isRemoteStoreEnabled: Boolean): Long {
// We rely on lastSyncedGlobalCheckpoint as it has been durably written to disk. In case of remote store
// enabled clusters, the semantics are slightly different, and we can't use lastSyncedGlobalCheckpoint. Falling back to
// lastKnownGlobalCheckpoint in such cases.
return if (isRemoteStoreEnabled) {
indexShard.lastKnownGlobalCheckpoint

Check warning on line 149 in src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt#L149

Added line #L149 was not covered by tests
} else {
indexShard.lastSyncedGlobalCheckpoint
}
}


private fun isTranslogPruningByRetentionLeaseEnabled(shardId: ShardId): Boolean {
val enabled = clusterService.state().metadata.indices.get(shardId.indexName)
Expand All @@ -162,7 +171,9 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
}

override fun shards(state: ClusterState, request: InternalRequest): ShardsIterator {
val shardIt = state.routingTable().shardRoutingTable(request.request().shardId)
// Random active shards
return state.routingTable().shardRoutingTable(request.request().shardId).activeInitializingShardsRandomIt()
return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService)) shardIt.primaryShardIt()
else shardIt.activeInitializingShardsRandomIt()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ class RemoteClusterRepository(private val repositoryMetadata: RepositoryMetadata

// Remove translog pruning for the follower index
builder.remove(REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING.key)
builder.remove(IndexMetadata.SETTING_REMOTE_STORE_ENABLED)
builder.remove(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY)
builder.remove(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY)

val indexMdBuilder = IndexMetadata.builder(indexMetadata).settings(builder)
indexMetadata.aliases.values.forEach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import org.opensearch.env.Environment
import org.opensearch.index.IndexNotFoundException
import java.io.UnsupportedEncodingException
import org.opensearch.cluster.service.ClusterService
import org.opensearch.node.Node
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute
import org.opensearch.replication.ReplicationPlugin.Companion.KNN_INDEX_SETTING
import org.opensearch.replication.ReplicationPlugin.Companion.KNN_PLUGIN_PRESENT_SETTING
import org.opensearch.replication.action.changes.TransportGetChangesAction
import java.nio.file.Files
import java.nio.file.Path
import java.util.Locale
Expand Down Expand Up @@ -154,4 +157,8 @@ object ValidationUtil {

}

fun isRemoteStoreEnabledCluster(clusterService: ClusterService): Boolean {
return clusterService.settings.getByPrefix(Node.NODE_ATTRIBUTES.key + RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty == false
}

}

0 comments on commit f60582f

Please sign in to comment.