Skip to content

Commit

Permalink
Self review
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Jul 19, 2023
1 parent c7f56fc commit 7b67543
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,6 @@ public class RemoteSegmentMetadata {

private final byte[] segmentInfosBytes;

// private final long primaryTerm;
// private final long generation;
//
// private final long version;
//
// private final long length;
//
// private final String codec;

public ReplicationCheckpoint getReplicationCheckpoint() {
return replicationCheckpoint;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@
*/
public class GetSegmentFilesResponse extends TransportResponse {

public List<StoreFileMetadata> getFiles() {
return files;
}

List<StoreFileMetadata> files;

public GetSegmentFilesResponse(List<StoreFileMetadata> files) {
Expand All @@ -37,6 +33,10 @@ public GetSegmentFilesResponse(StreamInput out) throws IOException {
out.readList(StoreFileMetadata::new);
}

public List<StoreFileMetadata> getFiles() {
return files;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(files);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.opensearch.indices.replication.common.ReplicationTarget;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -146,7 +145,7 @@ public void startReplication(ActionListener<Void> listener) {
final StepListener<CheckpointInfoResponse> checkpointInfoListener = new StepListener<>();
final StepListener<GetSegmentFilesResponse> getFilesListener = new StepListener<>();

logger.info(new ParameterizedMessage("Starting Replication Target: {}", description()));
logger.trace(new ParameterizedMessage("Starting Replication Target: {}", description()));
// Get list of files to copy from this checkpoint.
state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO);
cancellableThreads.checkForCancel();
Expand All @@ -156,12 +155,10 @@ public void startReplication(ActionListener<Void> listener) {
final List<StoreFileMetadata> filesToFetch = getFiles(checkpointInfo);
state.setStage(SegmentReplicationState.Stage.GET_FILES);
cancellableThreads.checkForCancel();
logger.info("--> Before getFiles {}", Arrays.toString(indexShard.store().directory().listAll()));
source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, indexShard, getFilesListener);
}, listener::onFailure);

getFilesListener.whenComplete(response -> {
logger.info("--> After getFiles {}", Arrays.toString(indexShard.store().directory().listAll()));
finalizeReplication(checkpointInfoListener.result(), getFilesListener.result());
listener.onResponse(null);
}, listener::onFailure);
Expand Down Expand Up @@ -262,7 +259,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse,
@Override
public void cancel(String reason) {
if (finished.get() == false) {
logger.info(new ParameterizedMessage("Cancelling replication for target {}", description()));
logger.trace(new ParameterizedMessage("Cancelling replication for target {}", description()));
cancellableThreads.cancel(reason);
source.cancel();
}
Expand Down

0 comments on commit 7b67543

Please sign in to comment.