From 363b78f73888d465731ba1d972f60345976a20be Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Fri, 9 Aug 2024 17:58:33 -0700 Subject: [PATCH] wait on release lock operation completion before returning update tif source config response Signed-off-by: Surya Sashank Nistala --- .../threatIntel/common/TIFLockService.java | 15 ++++- .../TransportIndexTIFSourceConfigAction.java | 56 ++++++++++++++----- .../SourceConfigWithoutS3RestApiIT.java | 8 --- 3 files changed, 57 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/common/TIFLockService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/common/TIFLockService.java index 8b3917791..f88aebd9b 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/common/TIFLockService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/common/TIFLockService.java @@ -46,7 +46,7 @@ public TIFLockService(final ClusterService clusterService, final Client client) } /** - * Synchronous method of #acquireLock + * Event-driven method of #acquireLock * * @param tifJobName tifJobName to acquire lock on * @param lockDurationSeconds the lock duration in seconds @@ -81,6 +81,19 @@ public void releaseLock(final LockModel lockModel) { ); } + /** + * Wrapper method of LockService#release + * + * @param lockModel the lock model + */ + public void releaseLockEventDriven(final LockModel lockModel, final ActionListener listener) { + log.debug("Releasing lock with id [{}]", lockModel.getLockId()); + lockService.release( + lockModel, + ActionListener.wrap(listener::onResponse, exception -> log.error("Failed to release the lock", exception)) + ); + } + /** * Synchronous method of LockService#renewLock * diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportIndexTIFSourceConfigAction.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportIndexTIFSourceConfigAction.java index 440191146..d12e4542e 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportIndexTIFSourceConfigAction.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportIndexTIFSourceConfigAction.java @@ -91,10 +91,10 @@ private void retrieveLockAndCreateTIFConfig(SAIndexTIFSourceConfigRequest reques try { lockService.acquireLock(request.getTIFConfigDto().getId(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { if (lock == null) { + log.error("another processor is a lock, BAD_REQUEST error", RestStatus.BAD_REQUEST); listener.onFailure( new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later") ); - log.error("another processor is a lock, BAD_REQUEST error", RestStatus.BAD_REQUEST); return; } try { @@ -106,29 +106,59 @@ private void retrieveLockAndCreateTIFConfig(SAIndexTIFSourceConfigRequest reques user, ActionListener.wrap( saTifSourceConfigDtoResponse -> { - lockService.releaseLock(lock); - listener.onResponse(new SAIndexTIFSourceConfigResponse( - saTifSourceConfigDtoResponse.getId(), - saTifSourceConfigDtoResponse.getVersion(), - RestStatus.OK, - saTifSourceConfigDtoResponse + lockService.releaseLockEventDriven(lock, ActionListener.wrap( + r -> listener.onResponse(new SAIndexTIFSourceConfigResponse( + saTifSourceConfigDtoResponse.getId(), + saTifSourceConfigDtoResponse.getVersion(), + RestStatus.OK, + saTifSourceConfigDtoResponse + )), + e -> { + log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config [%s].", lock.getLockId(), saTifSourceConfigDto.getId()), e); + listener.onResponse(new SAIndexTIFSourceConfigResponse( + saTifSourceConfigDtoResponse.getId(), + saTifSourceConfigDtoResponse.getVersion(), + RestStatus.OK, + saTifSourceConfigDtoResponse + )); + } )); }, e -> { - lockService.releaseLock(lock); - log.error("Failed to create IOCs and threat intel source config"); - listener.onFailure(e); + lockService.releaseLockEventDriven(lock, ActionListener.wrap( + r -> { + log.error("Failed to create IOCs and threat intel source config", e); + listener.onFailure(e); + }, + ex -> { + String action = RestRequest.Method.PUT.equals(request.getMethod()) ? "update" : "create"; + log.error(String.format("Failed to %s IOCs and threat intel source config", action), e); + log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config.", lock.getLockId()), e); + listener.onFailure(e); + } + )); } ) ); } catch (Exception e) { - lockService.releaseLock(lock); - listener.onFailure(e); log.error("listener failed when executing", e); + lockService.releaseLockEventDriven(lock, ActionListener.wrap( + r -> { + log.error("Failed to create IOCs and threat intel source config", e); + listener.onFailure(e); + }, + ex -> { + String action = RestRequest.Method.PUT.equals(request.getMethod()) ? "update" : "create"; + log.error(String.format("Failed to %s IOCs and threat intel source config", action), e); + log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config.", lock.getLockId()), e); + listener.onFailure(e); + } + )); } }, exception -> { + String action = RestRequest.Method.PUT.equals(request.getMethod()) ? "update" : "create"; + log.error(String.format("Failed to acquire lock while trying to %s tif source config", action), exception); listener.onFailure(exception); - log.error("execution failed", exception); })); } catch (Exception e) { log.error("Failed to acquire lock for job", e); diff --git a/src/test/java/org/opensearch/securityanalytics/resthandler/SourceConfigWithoutS3RestApiIT.java b/src/test/java/org/opensearch/securityanalytics/resthandler/SourceConfigWithoutS3RestApiIT.java index c7bf6465b..1c76ba6b3 100644 --- a/src/test/java/org/opensearch/securityanalytics/resthandler/SourceConfigWithoutS3RestApiIT.java +++ b/src/test/java/org/opensearch/securityanalytics/resthandler/SourceConfigWithoutS3RestApiIT.java @@ -362,7 +362,6 @@ public void testUpdateIocUploadSourceConfig() throws IOException, InterruptedExc iocHits = (List>) respMap.get(ListIOCsActionResponse.HITS_FIELD); assertEquals(1, iocHits.size()); - Thread.sleep(10000); } public void testActivateDeactivateIocUploadSourceConfig() throws IOException, InterruptedException { @@ -474,7 +473,6 @@ public void testActivateDeactivateIocUploadSourceConfig() throws IOException, In iocTypes, false ); - Thread.sleep(10000); // update source config with hashes ioc type response = makeRequest(client(), "PUT", SecurityAnalyticsPlugin.THREAT_INTEL_SOURCE_URI +"/" + createdId, Collections.emptyMap(), toHttpEntity(saTifSourceConfigDto)); Assert.assertEquals(RestStatus.OK, restStatus(response)); @@ -502,7 +500,6 @@ public void testActivateDeactivateIocUploadSourceConfig() throws IOException, In iocHits = (List>) respMap.get(ListIOCsActionResponse.HITS_FIELD); assertEquals(1, iocHits.size()); - Thread.sleep(10000); saTifSourceConfigDto = new SATIFSourceConfigDto( saTifSourceConfigDto.getId(), @@ -525,7 +522,6 @@ public void testActivateDeactivateIocUploadSourceConfig() throws IOException, In iocTypes, true ); - Thread.sleep(10000); // update source config with hashes ioc type response = makeRequest(client(), "PUT", SecurityAnalyticsPlugin.THREAT_INTEL_SOURCE_URI +"/" + createdId, Collections.emptyMap(), toHttpEntity(saTifSourceConfigDto)); Assert.assertEquals(RestStatus.OK, restStatus(response)); @@ -602,8 +598,6 @@ public void testActivateDeactivateUrlDownloadSourceConfig() throws IOException, List findingIndices = getIocIndices(); Assert.assertEquals(1, findingIndices.size()); - Thread.sleep(10000); // TODO: pass in action listener when releasing lock - // try to update default source config again to ensure operation is not accepted when enabled_for_scan is unchanged try { makeRequest(client(), "PUT", SecurityAnalyticsPlugin.THREAT_INTEL_SOURCE_URI +"/" + id, Collections.emptyMap(), toHttpEntity(saTifSourceConfigDto)); @@ -944,8 +938,6 @@ public void testUpdateDefaultSourceConfigThrowsError() throws IOException, Inter Assert.assertTrue(e.getMessage().contains("unsupported_operation_exception")); } - Thread.sleep(100); // TODO: pass in action listener when releasing lock - // update default source config again to ensure lock was released try { makeRequest(client(), "PUT", SecurityAnalyticsPlugin.THREAT_INTEL_SOURCE_URI +"/" + id, Collections.emptyMap(), toHttpEntity(saTifSourceConfigDto));