From 3ae10940898ce7a65b8908cb2ab4aa021203f568 Mon Sep 17 00:00:00 2001 From: bossenti Date: Thu, 14 Dec 2023 13:09:42 +0100 Subject: [PATCH 1/2] fix: orchestrate adapter health check such that monitoring of adapters works fine --- .../management/health/AdapterHealthCheck.java | 136 ++++++++++++------ .../health/AdapterHealthCheckTest.java | 11 +- 2 files changed, 97 insertions(+), 50 deletions(-) diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java index f0882a1fc6..910c0c23d9 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java @@ -31,7 +31,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -66,28 +65,32 @@ public void run() { * {@link org.apache.streampipes.manager.health.PipelineHealthCheck}). */ public void checkAndRestoreAdapters() { - // Get all running adapters - Map allRunningInstancesAdapterDescriptions = - this.getAllRunningInstancesAdapterDescriptions(); + // Get all adapters that are supposed to run according to the backend storage + Map adapterInstancesSupposedToRun = + this.getAllAdaptersSupposedToRun(); - // Get all worker containers that run adapters + // group all adapter instances supposed to run by their worker service URL Map> groupByWorker = - this.getAllWorkersWithAdapters(allRunningInstancesAdapterDescriptions); + this.getAllWorkersWithAdapters(adapterInstancesSupposedToRun); // Get adapters that are not running anymore Map allAdaptersToRecover = - this.getAdaptersToRecover(groupByWorker, allRunningInstancesAdapterDescriptions); + this.getAdaptersToRecover(groupByWorker, adapterInstancesSupposedToRun); try { - if (!allRunningInstancesAdapterDescriptions.isEmpty()) { + if (!adapterInstancesSupposedToRun.isEmpty()) { // Filter adapters so that only healthy and running adapters are updated in the metrics endpoint - updateMonitoringMetrics( - allRunningInstancesAdapterDescriptions - .entrySet() - .stream() - .filter((entry -> !allAdaptersToRecover.containsKey(entry.getKey()))) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) - ); + var adaptersToMonitor = adapterInstancesSupposedToRun + .entrySet() + .stream() + .filter((entry -> !allAdaptersToRecover.containsKey(entry.getKey()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (!adaptersToMonitor.isEmpty()) { + updateMonitoringMetrics(adaptersToMonitor); + } else { + LOG.info("No running adapter instances to monitor."); + } } } catch (NoSuchElementException e) { LOG.error("Could not update adapter metrics due to an invalid state. ({})", e.getMessage()); @@ -118,6 +121,15 @@ protected void updateMonitoringMetrics(Map runningAd } private void updateTotalEventsPublished(AdapterMetrics adapterMetrics, String adapterId, String adapterName) { + + // Check if the adapter is already registered; if not, register it first. + // This step is crucial, especially when the StreamPipes Core service is restarted, + // and there are existing running adapters that need proper registration. + // Note: Proper registration is usually handled during the initial start of the adapter. + if (!adapterMetrics.contains(adapterId)) { + adapterMetrics.register(adapterId, adapterName); + } + adapterMetrics.updateTotalEventsPublished( adapterId, adapterName, @@ -128,7 +140,18 @@ private void updateTotalEventsPublished(AdapterMetrics adapterMetrics, String ad } - public Map getAllRunningInstancesAdapterDescriptions() { + /** + * Retrieves a map of all adapter instances that are supposed to be running according to the backend storage. + *

+ * This method queries the adapter storage to obtain information about all adapters + * and filters the running instances. The resulting map is keyed by the element ID + * of each running adapter, and the corresponding values are the respective + * {@link AdapterDescription} objects. + * + * @return A map containing all adapter instances supposed to be running according to the backend storage. + * The keys are element IDs, and the values are the corresponding adapter descriptions. + */ + public Map getAllAdaptersSupposedToRun() { Map result = new HashMap<>(); List allRunningInstancesAdapterDescription = this.adapterStorage.getAllAdapters(); allRunningInstancesAdapterDescription @@ -144,41 +167,68 @@ public Map getAllRunningInstancesAdapterDescriptions } public Map> getAllWorkersWithAdapters( - Map allRunningInstancesAdapterDescription) { + Map adapterInstancesSupposedToRun + ) { Map> groupByWorker = new HashMap<>(); - allRunningInstancesAdapterDescription.values().forEach(ad -> { - String selectedEndpointUrl = ad.getSelectedEndpointUrl(); - if (selectedEndpointUrl != null) { - if (groupByWorker.containsKey(selectedEndpointUrl)) { - groupByWorker.get(selectedEndpointUrl).add(ad); - } else { - List tmp = new ArrayList<>(); - tmp.add(ad); - groupByWorker.put(selectedEndpointUrl, tmp); - } - } - }); + adapterInstancesSupposedToRun.values() + .forEach(ad -> { + String selectedEndpointUrl = ad.getSelectedEndpointUrl(); + if (selectedEndpointUrl != null) { + if (groupByWorker.containsKey(selectedEndpointUrl)) { + groupByWorker.get(selectedEndpointUrl) + .add(ad); + } else { + groupByWorker.put(selectedEndpointUrl, List.of(ad)); + } + } + }); return groupByWorker; } + /** + * Retrieves a map of adapters to recover by comparing the provided groupings of adapter instances + * with the instances supposed to run according to the storage. + * For every adapter instance it is verified that it actually runs on a worker node. + * If this is not the case, it is added to the output of adapters to recover. + * + * @param adapterInstancesGroupedByWorker A map grouping adapter instances by worker. + * @param adapterInstancesSupposedToRun The map containing all adapter instances supposed to be running. + * @return A new map containing adapter instances to recover, filtered based on running instances. + */ public Map getAdaptersToRecover( - Map> groupByWorker, - Map allRunningInstancesAdapterDescription) { - groupByWorker.keySet().forEach(adapterEndpointUrl -> { - try { - List allRunningInstancesOfOneWorker = - WorkerRestClient.getAllRunningAdapterInstanceDescriptions( - adapterEndpointUrl + WorkerPaths.getRunningAdaptersPath()); - allRunningInstancesOfOneWorker.forEach(adapterDescription -> - allRunningInstancesAdapterDescription.remove(adapterDescription.getElementId())); - } catch (AdapterException e) { - LOG.info("Could not recover adapter at endpoint {} due to {}", adapterEndpointUrl, e.getMessage()); - } - }); + Map> adapterInstancesGroupedByWorker, + Map adapterInstancesSupposedToRun + ) { - return allRunningInstancesAdapterDescription; + // NOTE: This line is added to prevent modifying the existing map of instances supposed to run + // It looks like the parameter `adapterInstancesSupposedToRun` is not required at all, + // but this should be checked more carefully. + Map adaptersToRecover = new HashMap<>(adapterInstancesSupposedToRun); + + adapterInstancesGroupedByWorker.keySet() + .forEach(adapterEndpointUrl -> { + try { + List allRunningInstancesOfOneWorker = + WorkerRestClient.getAllRunningAdapterInstanceDescriptions( + adapterEndpointUrl + WorkerPaths.getRunningAdaptersPath()); + + // only keep adapters where there is no running adapter instance + // therefore, all others are removed + allRunningInstancesOfOneWorker.forEach(adapterDescription -> + adaptersToRecover.remove( + adapterDescription.getElementId())); + } catch (AdapterException e) { + LOG.info( + "Could not recover adapter at endpoint {} due to {}", + adapterEndpointUrl, + e.getMessage() + ); + } + }); + + return adaptersToRecover; } diff --git a/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/health/AdapterHealthCheckTest.java b/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/health/AdapterHealthCheckTest.java index 8eedbd725f..e4f3c337cf 100644 --- a/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/health/AdapterHealthCheckTest.java +++ b/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/health/AdapterHealthCheckTest.java @@ -23,7 +23,6 @@ import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.resource.management.SpResourceManager; import org.apache.streampipes.storage.api.IAdapterStorage; -import org.apache.streampipes.storage.management.StorageDispatcher; import org.junit.Before; import org.junit.Test; @@ -51,14 +50,13 @@ public void getAllRunningInstancesAdapterDescriptionsEmpty() { var healthCheck = new AdapterHealthCheck( adapterInstanceStorageMock, new AdapterMasterManagement( - StorageDispatcher.INSTANCE.getNoSqlStore() - .getAdapterInstanceStorage(), + adapterInstanceStorageMock, new SpResourceManager().manageAdapters(), new SpResourceManager().manageDataStreams(), AdapterMetricsManager.INSTANCE.getAdapterMetrics() ) ); - var result = healthCheck.getAllRunningInstancesAdapterDescriptions(); + var result = healthCheck.getAllAdaptersSupposedToRun(); assertTrue(result.isEmpty()); } @@ -82,14 +80,13 @@ public void getAllRunningInstancesAdapterDescriptionsMixed() { var healthCheck = new AdapterHealthCheck( adapterInstanceStorageMock, new AdapterMasterManagement( - StorageDispatcher.INSTANCE.getNoSqlStore() - .getAdapterInstanceStorage(), + adapterInstanceStorageMock, new SpResourceManager().manageAdapters(), new SpResourceManager().manageDataStreams(), AdapterMetricsManager.INSTANCE.getAdapterMetrics() ) ); - var result = healthCheck.getAllRunningInstancesAdapterDescriptions(); + var result = healthCheck.getAllAdaptersSupposedToRun(); assertEquals(1, result.size()); assertTrue(result.containsKey(nameRunningAdapter)); From 9a18c147bcd479900b509dbb034a9db4eb09d0a1 Mon Sep 17 00:00:00 2001 From: bossenti Date: Thu, 14 Dec 2023 13:36:07 +0100 Subject: [PATCH 2/2] style: fix formatting --- .../connect/management/health/AdapterHealthCheck.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java index 910c0c23d9..1d8843a62c 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java @@ -216,9 +216,10 @@ public Map getAdaptersToRecover( // only keep adapters where there is no running adapter instance // therefore, all others are removed - allRunningInstancesOfOneWorker.forEach(adapterDescription -> - adaptersToRecover.remove( - adapterDescription.getElementId())); + allRunningInstancesOfOneWorker.forEach( + adapterDescription -> + adaptersToRecover.remove( + adapterDescription.getElementId())); } catch (AdapterException e) { LOG.info( "Could not recover adapter at endpoint {} due to {}",