Skip to content

Commit

Permalink
KNOX-2994 - PollingConfigurationAnalyzer starts after the Knox GW is …
Browse files Browse the repository at this point in the history
…up and running
  • Loading branch information
smolnar82 committed Jan 3, 2024
1 parent 14954a0 commit 438c12f
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ void failedToInstantiateJAASConfigurationFileImplementation(String implementatio
text = "Started ClouderaManager cluster configuration monitor (checking every {0} seconds)")
void startedClouderaManagerConfigMonitor(long pollingInterval);

@Message(level = MessageLevel.INFO,
text = "The Knox Gateway is not yet ready to monitor ClouderaManager cluster configuration changes.")
void gatewayIsNotYetReadyToMonitorClouderaManagerConfigs();

@Message(level = MessageLevel.INFO, text = "Stopping ClouderaManager cluster configuration monitor")
void stoppingClouderaManagerConfigMonitor();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.knox.gateway.services.security.KeystoreService;
import org.apache.knox.gateway.services.security.KeystoreServiceException;
import org.apache.knox.gateway.services.topology.TopologyService;
import org.apache.knox.gateway.services.topology.impl.GatewayStatusService;
import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService;
import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig;
import org.apache.knox.gateway.topology.discovery.cm.ClouderaManagerServiceDiscoveryMessages;
Expand Down Expand Up @@ -155,6 +156,8 @@ public class PollingConfigurationAnalyzer implements Runnable {

private final GatewayConfig gatewayConfig;

private GatewayStatusService gatewayStatusService;

PollingConfigurationAnalyzer(final GatewayConfig gatewayConfig,
final ClusterConfigurationCache configCache,
final AliasService aliasService,
Expand Down Expand Up @@ -207,56 +210,68 @@ public void run() {
log.startedClouderaManagerConfigMonitor(interval);
isActive = true;

boolean gatewayStatusOk = false;
while (isActive) {
try {
final List<String> clustersToStopMonitoring = new ArrayList<>();

for (Map.Entry<String, List<String>> entry : configCache.getClusterNames().entrySet()) {
String address = entry.getKey();
for (String clusterName : entry.getValue()) {
if (configCache.getDiscoveryConfig(address, clusterName) == null) {
log.noClusterConfiguration(clusterName, address);
continue;
}
log.checkingClusterConfiguration(clusterName, address);
if (!gatewayStatusOk) {
gatewayStatusOk = getGatewayStatusService() != null && getGatewayStatusService().status();
}
if (gatewayStatusOk) {
monitorClusterConfigurationChanges();
} else {
log.gatewayIsNotYetReadyToMonitorClouderaManagerConfigs();
}
waitFor(interval);
}

// Check here for existing descriptor references, and add to the removal list if there are not any
if (!clusterReferencesExist(address, clusterName)) {
clustersToStopMonitoring.add(address + FQCN_DELIM + clusterName);
continue;
}
log.stoppedClouderaManagerConfigMonitor();
}

// Configuration changes don't mean anything without corresponding service start/restarts. Therefore, monitor
// start events, and check the configuration only of the restarted service(s) to identify changes
// that should trigger re-discovery.
final List<RelevantEvent> relevantEvents = getRelevantEvents(address, clusterName);
private void monitorClusterConfigurationChanges() {
try {
final List<String> clustersToStopMonitoring = new ArrayList<>();

for (Map.Entry<String, List<String>> entry : configCache.getClusterNames().entrySet()) {
String address = entry.getKey();
for (String clusterName : entry.getValue()) {
if (configCache.getDiscoveryConfig(address, clusterName) == null) {
log.noClusterConfiguration(clusterName, address);
continue;
}
log.checkingClusterConfiguration(clusterName, address);

// If there are no recent start events, then nothing to do now
if (!relevantEvents.isEmpty()) {
// If a change has occurred, notify the listeners
if (hasConfigChanged(address, clusterName, relevantEvents) || hasScaleEvent(relevantEvents)) {
notifyChangeListener(address, clusterName);
}
// these events should not be processed again even if the next CM query result contains them
relevantEvents.forEach(re -> processedEvents.put(re.auditEvent.getId(), 1L));
}
// Check here for existing descriptor references, and add to the removal list if there are not any
if (!clusterReferencesExist(address, clusterName)) {
clustersToStopMonitoring.add(address + FQCN_DELIM + clusterName);
continue;
}
}

// Remove outdated entries from the cache
for (String fqcn : clustersToStopMonitoring) {
String[] parts = fqcn.split(FQCN_DELIM);
stopMonitoring(parts[0], parts[1]);
// Configuration changes don't mean anything without corresponding service start/restarts. Therefore, monitor
// start events, and check the configuration only of the restarted service(s) to identify changes
// that should trigger re-discovery.
final List<RelevantEvent> relevantEvents = getRelevantEvents(address, clusterName);

// If there are no recent start events, then nothing to do now
if (!relevantEvents.isEmpty()) {
// If a change has occurred, notify the listeners
if (hasConfigChanged(address, clusterName, relevantEvents) || hasScaleEvent(relevantEvents)) {
notifyChangeListener(address, clusterName);
}
// these events should not be processed again even if the next CM query result contains them
relevantEvents.forEach(re -> processedEvents.put(re.auditEvent.getId(), 1L));
}
}
clustersToStopMonitoring.clear(); // reset the removal list
}

} catch (Exception e) {
log.clouderaManagerConfigurationChangesMonitoringError(e);
// Remove outdated entries from the cache
for (String fqcn : clustersToStopMonitoring) {
String[] parts = fqcn.split(FQCN_DELIM);
stopMonitoring(parts[0], parts[1]);
}
waitFor(interval);
}
clustersToStopMonitoring.clear(); // reset the removal list

log.stoppedClouderaManagerConfigMonitor();
} catch (Exception e) {
log.clouderaManagerConfigurationChangesMonitoringError(e);
}
}

private boolean hasScaleEvent(List<RelevantEvent> relevantEvents) {
Expand Down Expand Up @@ -372,6 +387,16 @@ private ClusterConfigurationMonitorService getConfigMonitorService() {
return ccms;
}

private GatewayStatusService getGatewayStatusService() {
if (gatewayStatusService == null) {
final GatewayServices gatewayServices = GatewayServer.getGatewayServices();
if (gatewayServices != null) {
gatewayStatusService = gatewayServices.getService(ServiceType.GATEWAY_STATUS_SERVICE);
}
}
return gatewayStatusService;
}

/**
* Determine if any descriptors reference the specified discovery source and cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import org.apache.knox.gateway.services.GatewayServices;
import org.apache.knox.gateway.services.ServiceType;
import org.apache.knox.gateway.services.topology.TopologyService;
import org.apache.knox.gateway.services.topology.impl.GatewayStatusService;
import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService;
import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig;
import org.apache.knox.gateway.topology.discovery.cm.model.hdfs.NameNodeServiceModelGenerator;
import org.apache.knox.gateway.topology.discovery.cm.model.hive.HiveOnTezServiceModelGenerator;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Test;

import java.io.File;
Expand All @@ -58,6 +60,11 @@

public class PollingConfigurationAnalyzerTest {

@After
public void tearDown() {
setGatewayServices(null);
}

@Test(expected = IllegalArgumentException.class)
public void testRestartEventWithWrongApiEventCategory() {
doTestStartEvent(ApiEventCategory.LOG_EVENT);
Expand Down Expand Up @@ -341,11 +348,16 @@ public void testClusterConfigMonitorTerminationForNoLongerReferencedClusters() {
return null;
}).once();

//GatewayStatusService mock
final GatewayStatusService gatewayStatusService = EasyMock.createNiceMock(GatewayStatusService.class);
EasyMock.expect(gatewayStatusService.status()).andReturn(Boolean.TRUE).anyTimes();

// GatewayServices mock
GatewayServices gws = EasyMock.createNiceMock(GatewayServices.class);
EasyMock.expect(gws.getService(ServiceType.TOPOLOGY_SERVICE)).andReturn(ts).anyTimes();
EasyMock.expect(gws.getService(ServiceType.CLUSTER_CONFIGURATION_MONITOR_SERVICE)).andReturn(ccms).anyTimes();
EasyMock.replay(ts, ccms, gws);
EasyMock.expect(gws.getService(ServiceType.GATEWAY_STATUS_SERVICE)).andReturn(gatewayStatusService).anyTimes();
EasyMock.replay(ts, ccms, gatewayStatusService, gws);

try {
setGatewayServices(gws);
Expand Down Expand Up @@ -413,6 +425,26 @@ public void testNotificationSentAfterUpScaleEvent() {
doTestEventWithConfigChange(revisionEvent, clusterName);
}

@Test
public void shouldNotPerformClusterConfigurationChangeMonitoringIfKnoxGatewayIsNotYetReady() {
final String address = "http://host1:1234";
final String clusterName = "Cluster 10";

// Simulate a successful restart waiting for staleness event with id = 123
final ApiEvent rollingRestartEvent = createApiEvent(clusterName, HiveOnTezServiceModelGenerator.SERVICE_TYPE, HiveOnTezServiceModelGenerator.SERVICE,
PollingConfigurationAnalyzer.RESTART_WAITING_FOR_STALENESS_SUCCESS_COMMAND, PollingConfigurationAnalyzer.SUCCEEDED_STATUS, "EV_CLUSTER_RESTARTED",
"123");

final ChangeListener listener = new ChangeListener();
final TestablePollingConfigAnalyzer pca = buildPollingConfigAnalyzer(address, clusterName, Collections.emptyMap(), listener, false);

// this should NOT trigger a notification because the Knox Gateway is not yet
// ready (by GatewayStatusService.status())
listener.clearNotification();
doTestEvent(rollingRestartEvent, address, clusterName, Collections.emptyMap(), Collections.emptyMap(), pca);
assertFalse("Unexpected change notification", listener.wasNotified(address, clusterName));
}

private void doTestStartEvent(final ApiEventCategory category) {
final String clusterName = "My Cluster";
final String serviceType = NameNodeServiceModelGenerator.SERVICE_TYPE;
Expand Down Expand Up @@ -472,6 +504,11 @@ private ChangeListener doTestEvent(final ApiEvent event, final String address, f

private TestablePollingConfigAnalyzer buildPollingConfigAnalyzer(final String address, final String clusterName,
final Map<String, ServiceConfigurationModel> serviceConfigurationModels, ChangeListener listener) {
return buildPollingConfigAnalyzer(address, clusterName, serviceConfigurationModels, listener, true);
}

private TestablePollingConfigAnalyzer buildPollingConfigAnalyzer(final String address, final String clusterName,
final Map<String, ServiceConfigurationModel> serviceConfigurationModels, ChangeListener listener, boolean isKnoxGatewayReady) {
final GatewayConfig gatewayConfig = EasyMock.createNiceMock(GatewayConfig.class);
EasyMock.expect(gatewayConfig.getIncludedSSLCiphers()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.expect(gatewayConfig.getIncludedSSLProtocols()).andReturn(Collections.emptySet()).anyTimes();
Expand All @@ -495,6 +532,19 @@ private TestablePollingConfigAnalyzer buildPollingConfigAnalyzer(final String ad
EasyMock.expect(configCache.getClusterServiceConfigurations(address, clusterName)).andReturn(serviceConfigurationModels).anyTimes();
EasyMock.replay(configCache);

if (isKnoxGatewayReady) {
// GatewayStatusService mock
final GatewayStatusService gatewayStatusService = EasyMock.createNiceMock(GatewayStatusService.class);
EasyMock.expect(gatewayStatusService.status()).andReturn(Boolean.TRUE).anyTimes();

// GatewayServices mock
GatewayServices gws = EasyMock.createNiceMock(GatewayServices.class);
EasyMock.expect(gws.getService(ServiceType.GATEWAY_STATUS_SERVICE)).andReturn(gatewayStatusService).anyTimes();
EasyMock.replay(gatewayStatusService, gws);

setGatewayServices(gws);
}

return new TestablePollingConfigAnalyzer(gatewayConfig, configCache, listener);
}

Expand Down

0 comments on commit 438c12f

Please sign in to comment.