Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: orchestrate adapter health check such that monitoring of adapters works as expected #2335

Merged
merged 2 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -66,28 +65,32 @@ public void run() {
* {@link org.apache.streampipes.manager.health.PipelineHealthCheck}).
*/
public void checkAndRestoreAdapters() {
// Get all running adapters
Map<String, AdapterDescription> allRunningInstancesAdapterDescriptions =
this.getAllRunningInstancesAdapterDescriptions();
// Get all adapters that are supposed to run according to the backend storage
Map<String, AdapterDescription> adapterInstancesSupposedToRun =
this.getAllAdaptersSupposedToRun();

// Get all worker containers that run adapters
// group all adapter instances supposed to run by their worker service URL
Map<String, List<AdapterDescription>> groupByWorker =
this.getAllWorkersWithAdapters(allRunningInstancesAdapterDescriptions);
this.getAllWorkersWithAdapters(adapterInstancesSupposedToRun);

// Get adapters that are not running anymore
Map<String, AdapterDescription> allAdaptersToRecover =
this.getAdaptersToRecover(groupByWorker, allRunningInstancesAdapterDescriptions);
this.getAdaptersToRecover(groupByWorker, adapterInstancesSupposedToRun);

try {
if (!allRunningInstancesAdapterDescriptions.isEmpty()) {
if (!adapterInstancesSupposedToRun.isEmpty()) {
// Filter adapters so that only healthy and running adapters are updated in the metrics endpoint
updateMonitoringMetrics(
allRunningInstancesAdapterDescriptions
.entrySet()
.stream()
.filter((entry -> !allAdaptersToRecover.containsKey(entry.getKey())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
);
var adaptersToMonitor = adapterInstancesSupposedToRun
.entrySet()
.stream()
.filter((entry -> !allAdaptersToRecover.containsKey(entry.getKey())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

if (!adaptersToMonitor.isEmpty()) {
updateMonitoringMetrics(adaptersToMonitor);
} else {
LOG.info("No running adapter instances to monitor.");
}
}
} catch (NoSuchElementException e) {
LOG.error("Could not update adapter metrics due to an invalid state. ({})", e.getMessage());
Expand Down Expand Up @@ -118,6 +121,15 @@ protected void updateMonitoringMetrics(Map<String, AdapterDescription> runningAd
}

private void updateTotalEventsPublished(AdapterMetrics adapterMetrics, String adapterId, String adapterName) {

// Check if the adapter is already registered; if not, register it first.
// This step is crucial, especially when the StreamPipes Core service is restarted,
// and there are existing running adapters that need proper registration.
// Note: Proper registration is usually handled during the initial start of the adapter.
if (!adapterMetrics.contains(adapterId)) {
adapterMetrics.register(adapterId, adapterName);
}

adapterMetrics.updateTotalEventsPublished(
adapterId,
adapterName,
Expand All @@ -128,7 +140,18 @@ private void updateTotalEventsPublished(AdapterMetrics adapterMetrics, String ad
}


public Map<String, AdapterDescription> getAllRunningInstancesAdapterDescriptions() {
/**
* Retrieves a map of all adapter instances that are supposed to be running according to the backend storage.
* <p>
* This method queries the adapter storage to obtain information about all adapters
* and filters the running instances. The resulting map is keyed by the element ID
* of each running adapter, and the corresponding values are the respective
* {@link AdapterDescription} objects.
*
* @return A map containing all adapter instances supposed to be running according to the backend storage.
* The keys are element IDs, and the values are the corresponding adapter descriptions.
*/
public Map<String, AdapterDescription> getAllAdaptersSupposedToRun() {
Map<String, AdapterDescription> result = new HashMap<>();
List<AdapterDescription> allRunningInstancesAdapterDescription = this.adapterStorage.getAllAdapters();
allRunningInstancesAdapterDescription
Expand All @@ -144,41 +167,69 @@ public Map<String, AdapterDescription> getAllRunningInstancesAdapterDescriptions
}

public Map<String, List<AdapterDescription>> getAllWorkersWithAdapters(
Map<String, AdapterDescription> allRunningInstancesAdapterDescription) {
Map<String, AdapterDescription> adapterInstancesSupposedToRun
) {

Map<String, List<AdapterDescription>> groupByWorker = new HashMap<>();
allRunningInstancesAdapterDescription.values().forEach(ad -> {
String selectedEndpointUrl = ad.getSelectedEndpointUrl();
if (selectedEndpointUrl != null) {
if (groupByWorker.containsKey(selectedEndpointUrl)) {
groupByWorker.get(selectedEndpointUrl).add(ad);
} else {
List<AdapterDescription> tmp = new ArrayList<>();
tmp.add(ad);
groupByWorker.put(selectedEndpointUrl, tmp);
}
}
});
adapterInstancesSupposedToRun.values()
.forEach(ad -> {
String selectedEndpointUrl = ad.getSelectedEndpointUrl();
if (selectedEndpointUrl != null) {
if (groupByWorker.containsKey(selectedEndpointUrl)) {
groupByWorker.get(selectedEndpointUrl)
.add(ad);
} else {
groupByWorker.put(selectedEndpointUrl, List.of(ad));
}
}
});

return groupByWorker;
}

/**
* Retrieves a map of adapters to recover by comparing the provided groupings of adapter instances
* with the instances supposed to run according to the storage.
* For every adapter instance it is verified that it actually runs on a worker node.
* If this is not the case, it is added to the output of adapters to recover.
*
* @param adapterInstancesGroupedByWorker A map grouping adapter instances by worker.
* @param adapterInstancesSupposedToRun The map containing all adapter instances supposed to be running.
* @return A new map containing adapter instances to recover, filtered based on running instances.
*/
public Map<String, AdapterDescription> getAdaptersToRecover(
Map<String, List<AdapterDescription>> groupByWorker,
Map<String, AdapterDescription> allRunningInstancesAdapterDescription) {
groupByWorker.keySet().forEach(adapterEndpointUrl -> {
try {
List<AdapterDescription> allRunningInstancesOfOneWorker =
WorkerRestClient.getAllRunningAdapterInstanceDescriptions(
adapterEndpointUrl + WorkerPaths.getRunningAdaptersPath());
allRunningInstancesOfOneWorker.forEach(adapterDescription ->
allRunningInstancesAdapterDescription.remove(adapterDescription.getElementId()));
} catch (AdapterException e) {
LOG.info("Could not recover adapter at endpoint {} due to {}", adapterEndpointUrl, e.getMessage());
}
});
Map<String, List<AdapterDescription>> adapterInstancesGroupedByWorker,
Map<String, AdapterDescription> adapterInstancesSupposedToRun
) {

return allRunningInstancesAdapterDescription;
// NOTE: This line is added to prevent modifying the existing map of instances supposed to run
// It looks like the parameter `adapterInstancesSupposedToRun` is not required at all,
// but this should be checked more carefully.
Map<String, AdapterDescription> adaptersToRecover = new HashMap<>(adapterInstancesSupposedToRun);

adapterInstancesGroupedByWorker.keySet()
.forEach(adapterEndpointUrl -> {
try {
List<AdapterDescription> allRunningInstancesOfOneWorker =
WorkerRestClient.getAllRunningAdapterInstanceDescriptions(
adapterEndpointUrl + WorkerPaths.getRunningAdaptersPath());

// only keep adapters where there is no running adapter instance
// therefore, all others are removed
allRunningInstancesOfOneWorker.forEach(
adapterDescription ->
adaptersToRecover.remove(
adapterDescription.getElementId()));
} catch (AdapterException e) {
LOG.info(
"Could not recover adapter at endpoint {} due to {}",
adapterEndpointUrl,
e.getMessage()
);
}
});

return adaptersToRecover;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.resource.management.SpResourceManager;
import org.apache.streampipes.storage.api.IAdapterStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;

import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -51,14 +50,13 @@ public void getAllRunningInstancesAdapterDescriptionsEmpty() {
var healthCheck = new AdapterHealthCheck(
adapterInstanceStorageMock,
new AdapterMasterManagement(
StorageDispatcher.INSTANCE.getNoSqlStore()
.getAdapterInstanceStorage(),
adapterInstanceStorageMock,
new SpResourceManager().manageAdapters(),
new SpResourceManager().manageDataStreams(),
AdapterMetricsManager.INSTANCE.getAdapterMetrics()
)
);
var result = healthCheck.getAllRunningInstancesAdapterDescriptions();
var result = healthCheck.getAllAdaptersSupposedToRun();

assertTrue(result.isEmpty());
}
Expand All @@ -82,14 +80,13 @@ public void getAllRunningInstancesAdapterDescriptionsMixed() {
var healthCheck = new AdapterHealthCheck(
adapterInstanceStorageMock,
new AdapterMasterManagement(
StorageDispatcher.INSTANCE.getNoSqlStore()
.getAdapterInstanceStorage(),
adapterInstanceStorageMock,
new SpResourceManager().manageAdapters(),
new SpResourceManager().manageDataStreams(),
AdapterMetricsManager.INSTANCE.getAdapterMetrics()
)
);
var result = healthCheck.getAllRunningInstancesAdapterDescriptions();
var result = healthCheck.getAllAdaptersSupposedToRun();

assertEquals(1, result.size());
assertTrue(result.containsKey(nameRunningAdapter));
Expand Down
Loading