From 2ced9e255ae7481e209981d2d50bfd47b03862b7 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Wed, 7 Jun 2023 10:29:03 +0530 Subject: [PATCH 1/5] Make remote translog store path consistent to remote segment store Signed-off-by: bansvaru --- .../java/org/opensearch/index/translog/RemoteFsTranslog.java | 3 ++- .../index/translog/transfer/TranslogTransferManager.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 02d91974b652a..5d760fac28f92 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -65,6 +65,7 @@ public class RemoteFsTranslog extends Translog { private final SetOnce olderPrimaryCleaned = new SetOnce<>(); private static final int REMOTE_DELETION_PERMITS = 2; + private static final String TRANSLOG = "translog"; // Semaphore used to allow only single remote generation to happen at a time private final Semaphore remoteGenerationDeletionPermits = new Semaphore(REMOTE_DELETION_PERMITS); @@ -167,7 +168,7 @@ public static TranslogTransferManager buildTranslogTransferManager( return new TranslogTransferManager( shardId, new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool), - blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())), + blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(TRANSLOG), fileTransferTracker ); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 243fd8801a562..2facd0e0c2214 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -110,7 +110,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans fileSnapshot -> transferService.uploadBlobAsync( ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, - remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())), + remoteBaseTransferPath.add("data").add(String.valueOf(fileSnapshot.getPrimaryTerm())), latchedActionListener ) ); From 0b4640bf696466db6fad74974de123b7730f7d66 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Wed, 7 Jun 2023 20:49:40 +0530 Subject: [PATCH 2/5] fix translog base path and update tests Signed-off-by: bansvaru --- .../index/translog/RemoteFsTranslog.java | 2 +- .../transfer/TranslogTransferManager.java | 4 +- .../index/translog/RemoteFSTranslogTests.java | 54 +++++++------------ 3 files changed, 21 insertions(+), 39 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 5d760fac28f92..190ca6948f42a 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -65,7 +65,7 @@ public class RemoteFsTranslog extends Translog { private final SetOnce olderPrimaryCleaned = new SetOnce<>(); private static final int REMOTE_DELETION_PERMITS = 2; - private static final String TRANSLOG = "translog"; + public static final String TRANSLOG = "translog"; // Semaphore used to allow only single remote generation to happen at a time private final Semaphore remoteGenerationDeletionPermits = new Semaphore(REMOTE_DELETION_PERMITS); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 2facd0e0c2214..fc4192a4efe0e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -68,7 +68,7 @@ public TranslogTransferManager( ) { this.shardId = shardId; this.transferService = transferService; - this.remoteBaseTransferPath = remoteBaseTransferPath; + this.remoteBaseTransferPath = remoteBaseTransferPath.add("data"); this.remoteMetadataTransferPath = remoteBaseTransferPath.add(METADATA_DIR); this.fileTransferTracker = fileTransferTracker; } @@ -110,7 +110,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans fileSnapshot -> transferService.uploadBlobAsync( ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, - remoteBaseTransferPath.add("data").add(String.valueOf(fileSnapshot.getPrimaryTerm())), + remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())), latchedActionListener ) ); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index 24dae6f5be9ab..94b2f58ecd066 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -93,6 +93,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.opensearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; +import static org.opensearch.index.translog.RemoteFsTranslog.TRANSLOG; import static org.opensearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; @@ -483,21 +484,19 @@ public void testSimpleOperationsUpload() throws Exception { translog.rollGeneration(); assertEquals(6, translog.allUploaded().size()); - Set mdFiles = blobStoreTransferService.listAll( - repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add("metadata") - ); + Set mdFiles = blobStoreTransferService.listAll(getTranslogDirectory().add("metadata")); assertEquals(2, mdFiles.size()); logger.info("All md files {}", mdFiles); - Set tlogFiles = blobStoreTransferService.listAll( - repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(String.valueOf(primaryTerm.get())) - ); + Set tlogFiles = blobStoreTransferService.listAll(getTranslogDirectory().add("data").add(String.valueOf(primaryTerm.get()))); logger.info("All data files {}", tlogFiles); // assert content of ckp and tlog files BlobPath path = repository.basePath() .add(shardId.getIndex().getUUID()) .add(String.valueOf(shardId.id())) + .add(TRANSLOG) + .add("data") .add(String.valueOf(primaryTerm.get())); for (TranslogReader reader : translog.readers) { final long readerGeneration = reader.getGeneration(); @@ -537,6 +536,8 @@ public void testSimpleOperationsUpload() throws Exception { repository.basePath() .add(shardId.getIndex().getUUID()) .add(String.valueOf(shardId.id())) + .add(TRANSLOG) + .add("data") .add(String.valueOf(primaryTerm.get())) ).size() ); @@ -555,6 +556,8 @@ public void testSimpleOperationsUpload() throws Exception { repository.basePath() .add(shardId.getIndex().getUUID()) .add(String.valueOf(shardId.id())) + .add(TRANSLOG) + .add("data") .add(String.valueOf(primaryTerm.get())) ).size() ); @@ -583,14 +586,7 @@ public void testMetadataFileDeletion() throws Exception { assertEquals(1, translog.readers.size()); } assertBusy(() -> assertEquals(4, translog.allUploaded().size())); - assertBusy( - () -> assertEquals( - 2, - blobStoreTransferService.listAll( - repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR) - ).size() - ) - ); + assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); int moreDocs = randomIntBetween(3, 10); logger.info("numDocs={} moreDocs={}", numDocs, moreDocs); for (int i = numDocs; i < numDocs + moreDocs; i++) { @@ -599,14 +595,7 @@ public void testMetadataFileDeletion() throws Exception { translog.trimUnreferencedReaders(); assertEquals(1 + moreDocs, translog.readers.size()); assertBusy(() -> assertEquals(2 + 2L * moreDocs, translog.allUploaded().size())); - assertBusy( - () -> assertEquals( - 1 + moreDocs, - blobStoreTransferService.listAll( - repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR) - ).size() - ) - ); + assertBusy(() -> assertEquals(1 + moreDocs, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); int totalDocs = numDocs + moreDocs; translog.setMinSeqNoToKeep(totalDocs - 1); @@ -619,14 +608,7 @@ public void testMetadataFileDeletion() throws Exception { ); translog.setMinSeqNoToKeep(totalDocs); translog.trimUnreferencedReaders(); - assertBusy( - () -> assertEquals( - 2, - blobStoreTransferService.listAll( - repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR) - ).size() - ) - ); + assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); // Change primary term and test the deletion of older primaries String translogUUID = translog.translogUUID; @@ -642,9 +624,7 @@ public void testMetadataFileDeletion() throws Exception { long newPrimaryTerm = primaryTerm.incrementAndGet(); // Check all metadata files corresponds to old primary term - Set mdFileNames = blobStoreTransferService.listAll( - repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR) - ); + Set mdFileNames = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)); assertTrue(mdFileNames.stream().allMatch(name -> name.startsWith(String.valueOf(oldPrimaryTerm).concat("__")))); // Creating RemoteFsTranslog with the same location @@ -658,9 +638,7 @@ public void testMetadataFileDeletion() throws Exception { } // Check that all metadata files are belonging now to the new primary - mdFileNames = blobStoreTransferService.listAll( - repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR) - ); + mdFileNames = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)); assertTrue(mdFileNames.stream().allMatch(name -> name.startsWith(String.valueOf(newPrimaryTerm).concat("__")))); try { @@ -671,6 +649,10 @@ public void testMetadataFileDeletion() throws Exception { } } + private BlobPath getTranslogDirectory() { + return repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(TRANSLOG); + } + private Long populateTranslogOps(boolean withMissingOps) throws IOException { long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED; long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; From 42eb605ff9f6ebc44727104d1f736327fc253a13 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Thu, 8 Jun 2023 10:07:56 +0530 Subject: [PATCH 3/5] Minor code refactoring Signed-off-by: bansvaru --- .../transfer/TranslogTransferManager.java | 3 +- .../index/translog/RemoteFSTranslogTests.java | 30 ++++--------------- 2 files changed, 8 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index fc4192a4efe0e..62be6fe2d406e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -59,6 +59,7 @@ public class TranslogTransferManager { private static final Logger logger = LogManager.getLogger(TranslogTransferManager.class); private final static String METADATA_DIR = "metadata"; + private final static String DATA_DIR = "data"; public TranslogTransferManager( ShardId shardId, @@ -68,7 +69,7 @@ public TranslogTransferManager( ) { this.shardId = shardId; this.transferService = transferService; - this.remoteBaseTransferPath = remoteBaseTransferPath.add("data"); + this.remoteBaseTransferPath = remoteBaseTransferPath.add(DATA_DIR); this.remoteMetadataTransferPath = remoteBaseTransferPath.add(METADATA_DIR); this.fileTransferTracker = fileTransferTracker; } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index 94b2f58ecd066..07abbd54d6844 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -112,6 +112,7 @@ public class RemoteFSTranslogTests extends OpenSearchTestCase { private final AtomicReference persistedSeqNoConsumer = new AtomicReference<>(); private ThreadPool threadPool; private final static String METADATA_DIR = "metadata"; + private final static String DATA_DIR = "data"; BlobStoreRepository repository; BlobStoreTransferService blobStoreTransferService; @@ -484,20 +485,15 @@ public void testSimpleOperationsUpload() throws Exception { translog.rollGeneration(); assertEquals(6, translog.allUploaded().size()); - Set mdFiles = blobStoreTransferService.listAll(getTranslogDirectory().add("metadata")); + Set mdFiles = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)); assertEquals(2, mdFiles.size()); logger.info("All md files {}", mdFiles); - Set tlogFiles = blobStoreTransferService.listAll(getTranslogDirectory().add("data").add(String.valueOf(primaryTerm.get()))); + Set tlogFiles = blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))); logger.info("All data files {}", tlogFiles); // assert content of ckp and tlog files - BlobPath path = repository.basePath() - .add(shardId.getIndex().getUUID()) - .add(String.valueOf(shardId.id())) - .add(TRANSLOG) - .add("data") - .add(String.valueOf(primaryTerm.get())); + BlobPath path = getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get())); for (TranslogReader reader : translog.readers) { final long readerGeneration = reader.getGeneration(); logger.error("Asserting content of {}", readerGeneration); @@ -532,14 +528,7 @@ public void testSimpleOperationsUpload() throws Exception { assertEquals(4, translog.allUploaded().size()); assertEquals( 4, - blobStoreTransferService.listAll( - repository.basePath() - .add(shardId.getIndex().getUUID()) - .add(String.valueOf(shardId.id())) - .add(TRANSLOG) - .add("data") - .add(String.valueOf(primaryTerm.get())) - ).size() + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() ); }); @@ -552,14 +541,7 @@ public void testSimpleOperationsUpload() throws Exception { assertEquals(4, translog.allUploaded().size()); assertEquals( 4, - blobStoreTransferService.listAll( - repository.basePath() - .add(shardId.getIndex().getUUID()) - .add(String.valueOf(shardId.id())) - .add(TRANSLOG) - .add("data") - .add(String.valueOf(primaryTerm.get())) - ).size() + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() ); }); From a40649d3eff70f36058a95bd3075654f30e78e67 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Thu, 8 Jun 2023 10:54:20 +0530 Subject: [PATCH 4/5] fix spotless errors Signed-off-by: bansvaru --- .../org/opensearch/index/translog/RemoteFSTranslogTests.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index 07abbd54d6844..d963830e9e736 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -489,7 +489,9 @@ public void testSimpleOperationsUpload() throws Exception { assertEquals(2, mdFiles.size()); logger.info("All md files {}", mdFiles); - Set tlogFiles = blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))); + Set tlogFiles = blobStoreTransferService.listAll( + getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get())) + ); logger.info("All data files {}", tlogFiles); // assert content of ckp and tlog files From c7e7d96f9c629a6561e2f84c1eba9de9c59ba57d Mon Sep 17 00:00:00 2001 From: bansvaru Date: Thu, 8 Jun 2023 14:59:53 +0530 Subject: [PATCH 5/5] rename instance method to reflect updated semantics Signed-off-by: bansvaru --- .../transfer/TranslogTransferManager.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 62be6fe2d406e..352e7dc2cc0e6 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -50,7 +50,7 @@ public class TranslogTransferManager { private final ShardId shardId; private final TransferService transferService; - private final BlobPath remoteBaseTransferPath; + private final BlobPath remoteDataTransferPath; private final BlobPath remoteMetadataTransferPath; private final FileTransferTracker fileTransferTracker; @@ -64,13 +64,13 @@ public class TranslogTransferManager { public TranslogTransferManager( ShardId shardId, TransferService transferService, - BlobPath remoteBaseTransferPath, + BlobPath remoteDataTransferPath, FileTransferTracker fileTransferTracker ) { this.shardId = shardId; this.transferService = transferService; - this.remoteBaseTransferPath = remoteBaseTransferPath.add(DATA_DIR); - this.remoteMetadataTransferPath = remoteBaseTransferPath.add(METADATA_DIR); + this.remoteDataTransferPath = remoteDataTransferPath.add(DATA_DIR); + this.remoteMetadataTransferPath = remoteDataTransferPath.add(METADATA_DIR); this.fileTransferTracker = fileTransferTracker; } @@ -111,7 +111,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans fileSnapshot -> transferService.uploadBlobAsync( ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, - remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())), + remoteDataTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())), latchedActionListener ) ); @@ -165,7 +165,7 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th if (Files.exists(filePath)) { Files.delete(filePath); } - try (InputStream inputStream = transferService.downloadBlob(remoteBaseTransferPath.add(primaryTerm), fileName)) { + try (InputStream inputStream = transferService.downloadBlob(remoteDataTransferPath.add(primaryTerm), fileName)) { Files.copy(inputStream, filePath); } // Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync @@ -239,7 +239,7 @@ public void deleteGenerationAsync(long primaryTerm, Set generations, Runna */ public void deletePrimaryTermsAsync(long minPrimaryTermToKeep) { logger.info("Deleting primary terms from remote store lesser than {} for {}", minPrimaryTermToKeep, shardId); - transferService.listFoldersAsync(ThreadPool.Names.REMOTE_PURGE, remoteBaseTransferPath, new ActionListener<>() { + transferService.listFoldersAsync(ThreadPool.Names.REMOTE_PURGE, remoteDataTransferPath, new ActionListener<>() { @Override public void onResponse(Set folders) { Set primaryTermsInRemote = folders.stream().filter(folderName -> { @@ -272,7 +272,7 @@ public void onFailure(Exception e) { private void deletePrimaryTermAsync(long primaryTerm) { transferService.deleteAsync( ThreadPool.Names.REMOTE_PURGE, - remoteBaseTransferPath.add(String.valueOf(primaryTerm)), + remoteDataTransferPath.add(String.valueOf(primaryTerm)), new ActionListener<>() { @Override public void onResponse(Void unused) { @@ -318,7 +318,7 @@ private void deleteTranslogFilesAsync(long primaryTerm, List files, Runn try { transferService.deleteBlobsAsync( ThreadPool.Names.REMOTE_PURGE, - remoteBaseTransferPath.add(String.valueOf(primaryTerm)), + remoteDataTransferPath.add(String.valueOf(primaryTerm)), files, new ActionListener<>() { @Override