diff --git a/src/server/src/main/java/io/cassandrareaper/jmx/JmxProxyImpl.java b/src/server/src/main/java/io/cassandrareaper/jmx/JmxProxyImpl.java index 2bb94bba1..af1673ad6 100644 --- a/src/server/src/main/java/io/cassandrareaper/jmx/JmxProxyImpl.java +++ b/src/server/src/main/java/io/cassandrareaper/jmx/JmxProxyImpl.java @@ -113,7 +113,7 @@ final class JmxProxyImpl implements JmxProxy { private final ConcurrentMap repairStatusExecutors = Maps.newConcurrentMap(); private final ConcurrentMap repairStatusHandlers = Maps.newConcurrentMap(); private final MetricRegistry metricRegistry; - private final StreamManagerMBean smProxy; + private final Optional smProxy; private JmxProxyImpl( String host, @@ -125,7 +125,7 @@ private JmxProxyImpl( EndpointSnitchInfoMBean endpointSnitchMbean, FailureDetectorMBean fdProxy, MetricRegistry metricRegistry, - StreamManagerMBean smProxy) { + Optional smProxy) { this.host = host; this.hostBeforeTranslation = hostBeforeTranslation; @@ -174,7 +174,7 @@ static JmxProxy connect( * * @param handler Implementation of {@link RepairStatusHandler} to process incoming notifications * of repair events. - * @param host hostname or ip address of Cassandra node + * @param originalHost hostname or ip address of Cassandra node * @param port port number to use for JMX connection * @param username username to use for JMX authentication * @param password password to use for JMX authentication @@ -206,7 +206,7 @@ private static JmxProxy connect( throw new ReaperException("Failure during preparations for JMX connection", e); } try { - Map env = new HashMap<>(); + final Map env = new HashMap<>(); if (username != null && password != null) { String[] creds = {username, password}; env.put(JMXConnector.CREDENTIALS, creds); @@ -218,11 +218,19 @@ private static JmxProxy connect( StorageServiceMBean ssProxy = JMX.newMBeanProxy(mbeanServerConn, ObjectNames.STORAGE_SERVICE, StorageServiceMBean.class); - String cassandraVersion = ssProxy.getReleaseVersion(); + final String cassandraVersion = ssProxy.getReleaseVersion(); if (cassandraVersion.startsWith("2.0") || cassandraVersion.startsWith("1.")) { ssProxy = JMX.newMBeanProxy(mbeanServerConn, ObjectNames.STORAGE_SERVICE, StorageServiceMBean20.class); } + final Optional smProxy; + // StreamManagerMbean is only available since Cassandra 2.0 + if (cassandraVersion.startsWith("1.")) { + smProxy = Optional.empty(); + } else { + smProxy = Optional.of(JMX.newMBeanProxy(mbeanServerConn, ObjectNames.STREAM_MANAGER, StreamManagerMBean.class)); + } + JmxProxy proxy = new JmxProxyImpl( host, @@ -234,11 +242,13 @@ private static JmxProxy connect( JMX.newMBeanProxy(mbeanServerConn, ObjectNames.ENDPOINT_SNITCH_INFO, EndpointSnitchInfoMBean.class), JMX.newMBeanProxy(mbeanServerConn, ObjectNames.FAILURE_DETECTOR, FailureDetectorMBean.class), metricRegistry, - JMX.newMBeanProxy(mbeanServerConn, ObjectNames.STREAM_MANAGER, StreamManagerMBean.class)); + smProxy); // registering listeners throws bunch of exceptions, so do it here rather than in the constructor mbeanServerConn.addNotificationListener(ObjectNames.STORAGE_SERVICE, proxy, null, null); - mbeanServerConn.addNotificationListener(ObjectNames.STREAM_MANAGER, proxy, null, null); + if (smProxy.isPresent()) { + mbeanServerConn.addNotificationListener(ObjectNames.STREAM_MANAGER, proxy, null, null); + } LOG.debug("JMX connection to {} properly connected: {}", host, jmxUrl.toString()); return proxy; @@ -951,7 +961,7 @@ CompactionManagerMBean getCompactionManagerMBean() { return cmProxy; } - StreamManagerMBean getStreamManagerMBean() { + Optional getStreamManagerMBean() { return smProxy; } diff --git a/src/server/src/main/java/io/cassandrareaper/jmx/StreamsProxy.java b/src/server/src/main/java/io/cassandrareaper/jmx/StreamsProxy.java index d45f394fc..4745eb980 100644 --- a/src/server/src/main/java/io/cassandrareaper/jmx/StreamsProxy.java +++ b/src/server/src/main/java/io/cassandrareaper/jmx/StreamsProxy.java @@ -23,7 +23,7 @@ import javax.management.openmbean.CompositeData; import com.google.common.base.Preconditions; - +import com.google.common.collect.ImmutableSet; public final class StreamsProxy { @@ -40,6 +40,10 @@ public static StreamsProxy create(JmxProxy proxy) { } public Set listStreams() { - return proxy.getStreamManagerMBean().getCurrentStreams(); + if (proxy.getStreamManagerMBean().isPresent()) { + return proxy.getStreamManagerMBean().get().getCurrentStreams(); + } else { + return ImmutableSet.of(); + } } } diff --git a/src/server/src/test/java/io/cassandrareaper/jmx/JmxProxyTest.java b/src/server/src/test/java/io/cassandrareaper/jmx/JmxProxyTest.java index d14de0021..6ef44c973 100644 --- a/src/server/src/test/java/io/cassandrareaper/jmx/JmxProxyTest.java +++ b/src/server/src/test/java/io/cassandrareaper/jmx/JmxProxyTest.java @@ -19,6 +19,8 @@ import io.cassandrareaper.ReaperException; +import java.util.Optional; + import javax.management.MBeanServerConnection; import com.google.common.base.Preconditions; @@ -48,7 +50,7 @@ public static void mockGetStorageServiceMBean(JmxProxy proxy, StorageServiceMBea public static void mockGetStreamManagerMBean(JmxProxy proxy, StreamManagerMBean streamingManagerMBean) { Preconditions.checkArgument(proxy instanceof JmxProxyImpl, "only JmxProxyImpl is supported"); - Mockito.when(((JmxProxyImpl)proxy).getStreamManagerMBean()).thenReturn(streamingManagerMBean); + Mockito.when(((JmxProxyImpl)proxy).getStreamManagerMBean()).thenReturn(Optional.of(streamingManagerMBean)); } public static void mockGetEndpointSnitchInfoMBean(JmxProxy proxy, EndpointSnitchInfoMBean endpointSnitchInfoMBean) {