From daa55500d882c7b33b61aeae1ab42b979398c7c2 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 4 Oct 2021 12:54:09 +0530 Subject: [PATCH 1/7] Add Broker config `druid.broker.segment.ignoredTiers` --- docs/configuration/index.md | 1 + .../client/BrokerSegmentWatcherConfig.java | 8 ++ .../apache/druid/client/BrokerServerView.java | 19 ++++ .../BrokerSegmentWatcherConfigTest.java | 4 +- .../druid/client/BrokerServerViewTest.java | 89 ++++++++++++++++++- 5 files changed, 116 insertions(+), 5 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 7e170f4573a8..7509d5e9cc02 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1794,6 +1794,7 @@ See [cache configuration](#cache-configuration) for how to configure cache setti |--------|---------------|-----------|-------| |`druid.serverview.type`|batch or http|Segment discovery method to use. "http" enables discovering segments using HTTP instead of zookeeper.|batch| |`druid.broker.segment.watchedTiers`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of tiers. By default, Broker would consider all tiers. This can be used to partition your dataSources in specific Historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none| +|`druid.broker.segment.ignoredTiers`|List of strings|The Broker watches the segment announcements from processes that serve segments to build a cache to relate each process to the segments. This configuration allows the Broker to ignore the segments being served from a list of tiers. By default, Broker would consider all tiers. This can be used to partition your dataSources in specific Historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none| |`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none| |`druid.broker.segment.awaitInitializationOnStart`|Boolean|Whether the Broker will wait for its view of segments to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.sql.planner.awaitInitializationOnStart`, a related setting.|true| diff --git a/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java b/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java index 0abb45673631..c487c713f232 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java +++ b/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java @@ -30,6 +30,9 @@ public class BrokerSegmentWatcherConfig @JsonProperty private Set watchedTiers = null; + @JsonProperty + private Set ignoredTiers = null; + @JsonProperty private Set watchedDataSources = null; @@ -41,6 +44,11 @@ public Set getWatchedTiers() return watchedTiers; } + public Set getIgnoredTiers() + { + return ignoredTiers; + } + public Set getWatchedDataSources() { return watchedDataSources; diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 6d668839d7ae..1f2ce763ccd2 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -112,16 +112,33 @@ public BrokerServerView( this.timelines = new HashMap<>(); this.segmentFilter = (Pair metadataAndSegment) -> { + log.info( + "Kashif: Trying filter for Segment [%s], Server Type [%s], Server [%s: %s]", + metadataAndSegment.rhs.getId(), + metadataAndSegment.lhs.getType(), + metadataAndSegment.lhs.getTier(), + metadataAndSegment.lhs.getHostAndPort() + ); + + // Include only watched tiers if specified if (segmentWatcherConfig.getWatchedTiers() != null && !segmentWatcherConfig.getWatchedTiers().contains(metadataAndSegment.lhs.getTier())) { return false; } + // Exclude ignored tiers if specified + if (segmentWatcherConfig.getIgnoredTiers() != null + && segmentWatcherConfig.getIgnoredTiers().contains(metadataAndSegment.lhs.getTier())) { + return false; + } + + // Include only watched datasources if specified if (segmentWatcherConfig.getWatchedDataSources() != null && !segmentWatcherConfig.getWatchedDataSources().contains(metadataAndSegment.rhs.getDataSource())) { return false; } + // Include realtime nodes only if they are watched return true; }; ExecutorService exec = Execs.singleThreaded("BrokerServerView-%s"); @@ -224,6 +241,7 @@ private void serverAddedSegment(final DruidServerMetadata server, final DataSegm // query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite query // loop... if (!server.getType().equals(ServerType.BROKER)) { + log.info("Kashif: Adding segment[%s] for server[%s]", segment, server); log.debug("Adding segment[%s] for server[%s]", segment, server); ServerSelector selector = selectors.get(segmentId); if (selector == null) { @@ -257,6 +275,7 @@ private void serverRemovedSegment(DruidServerMetadata server, DataSegment segmen final ServerSelector selector; synchronized (lock) { + log.info("Kashif: Removing segment[%s] from server[%s].", segmentId, server); log.debug("Removing segment[%s] from server[%s].", segmentId, server); // we don't store broker segments here, but still run the callbacks for the segment being removed from the server diff --git a/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java b/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java index aa69b9c0d6f5..8858bc1c46a0 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java @@ -45,9 +45,10 @@ public void testSerde() throws Exception ); Assert.assertNull(config.getWatchedTiers()); + Assert.assertNull(config.getIgnoredTiers()); //non-defaults - json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"] }"; + json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"], \"ignoredTiers\": [\"t3\", \"t4\"] }"; config = MAPPER.readValue( MAPPER.writeValueAsString( @@ -57,6 +58,7 @@ public void testSerde() throws Exception ); Assert.assertEquals(ImmutableSet.of("t1", "t2"), config.getWatchedTiers()); + Assert.assertEquals(ImmutableSet.of("t3", "t4"), config.getIgnoredTiers()); Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources()); } diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index d864d3206bf1..af656ba435cb 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -383,22 +383,92 @@ public void testMultipleTiers() throws Exception Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2)); } + @Test + public void testIgnoredTiers() throws Exception + { + segmentViewInitLatch = new CountDownLatch(1); + segmentAddedLatch = new CountDownLatch(4); + segmentRemovedLatch = new CountDownLatch(0); + + // Setup a Broker that does not watch Tier 1 + final String tier1 = "tier1"; + final String tier2 = "tier2"; + setupViews(null, Sets.newHashSet(tier1)); + + // Historical Tier 1 has segments 1 and 2, Tier 2 has segments 2 and 3 + final DruidServer server11 = setupHistoricalServer(tier1, "localhost:1", 1); + final DruidServer server21 = setupHistoricalServer(tier2, "localhost:2", 1); + + final DataSegment segment1 = dataSegmentWithIntervalAndVersion("2020-01-01/P1D", "v1"); + announceSegmentForServer(server11, segment1, zkPathsConfig, jsonMapper); + + final DataSegment segment2 = dataSegmentWithIntervalAndVersion("2020-01-02/P1D", "v1"); + announceSegmentForServer(server11, segment2, zkPathsConfig, jsonMapper); + announceSegmentForServer(server21, segment2, zkPathsConfig, jsonMapper); + + final DataSegment segment3 = dataSegmentWithIntervalAndVersion("2020-01-03/P1D", "v1"); + announceSegmentForServer(server21, segment3, zkPathsConfig, jsonMapper); + + // Wait for the segments to be added + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); + + // Get the timeline for the datasource + TimelineLookup timeline = brokerServerView.getTimeline( + DataSourceAnalysis.forDataSource(new TableDataSource(segment1.getDataSource())) + ).get(); + + // Verify that the timeline has no entry for the interval of segment 1 + Assert.assertTrue(timeline.lookup(segment1.getInterval()).isEmpty()); + + // Verify that there is one entry for the interval of segment 2 + List> timelineHolders = + timeline.lookup(segment2.getInterval()); + Assert.assertEquals(1, timelineHolders.size()); + + TimelineObjectHolder timelineHolder = timelineHolders.get(0); + Assert.assertEquals(segment2.getInterval(), timelineHolder.getInterval()); + Assert.assertEquals(segment2.getVersion(), timelineHolder.getVersion()); + + PartitionHolder partitionHolder = timelineHolder.getObject(); + Assert.assertTrue(partitionHolder.isComplete()); + Assert.assertEquals(1, Iterables.size(partitionHolder)); + + ServerSelector selector = (partitionHolder.iterator().next()).getObject(); + Assert.assertFalse(selector.isEmpty()); + Assert.assertEquals(segment2, selector.getSegment()); + + // Verify that the ServerSelector always picks Tier 1 + for (int i = 0; i < 5; ++i) { + Assert.assertEquals(server21, selector.pick(null).getServer()); + } + Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2)); + } + /** * Creates a DruidServer of type HISTORICAL and sets up a ZNode for it. */ private DruidServer setupHistoricalServer(String tier, String name, int priority) { - final DruidServer historical = new DruidServer( + return setupDruidServer(ServerType.HISTORICAL, tier, name, priority); + } + + /** + * Creates a DruidServer of the specified type and sets up a ZNode for it. + */ + private DruidServer setupDruidServer(ServerType serverType, String tier, String name, int priority) + { + final DruidServer druidServer = new DruidServer( name, name, null, 1000000, - ServerType.HISTORICAL, + serverType, tier, priority ); - setupZNodeForServer(historical, zkPathsConfig, jsonMapper); - return historical; + setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper); + return druidServer; } private Pair>> createExpected( @@ -442,6 +512,11 @@ private void setupViews() throws Exception } private void setupViews(Set watchedTiers) throws Exception + { + setupViews(watchedTiers, null); + } + + private void setupViews(Set watchedTiers, Set ignoredTiers) throws Exception { baseView = new BatchServerInventoryView( zkPathsConfig, @@ -500,6 +575,12 @@ public Set getWatchedTiers() { return watchedTiers; } + + @Override + public Set getIgnoredTiers() + { + return ignoredTiers; + } } ); From 2c99f7c17f058f074074fa1399636f8272eea7fe Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 4 Oct 2021 12:59:02 +0530 Subject: [PATCH 2/7] Remove test log lines --- docs/configuration/index.md | 2 +- .../java/org/apache/druid/client/BrokerServerView.java | 10 ---------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 7509d5e9cc02..d2d38274a0d9 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1794,7 +1794,7 @@ See [cache configuration](#cache-configuration) for how to configure cache setti |--------|---------------|-----------|-------| |`druid.serverview.type`|batch or http|Segment discovery method to use. "http" enables discovering segments using HTTP instead of zookeeper.|batch| |`druid.broker.segment.watchedTiers`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of tiers. By default, Broker would consider all tiers. This can be used to partition your dataSources in specific Historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none| -|`druid.broker.segment.ignoredTiers`|List of strings|The Broker watches the segment announcements from processes that serve segments to build a cache to relate each process to the segments. This configuration allows the Broker to ignore the segments being served from a list of tiers. By default, Broker would consider all tiers. This can be used to partition your dataSources in specific Historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none| +|`druid.broker.segment.ignoredTiers`|List of strings|The Broker watches the segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to ignore the segments being served from a list of tiers. By default, Broker would consider all tiers. This can be used to partition your dataSources in specific Historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none| |`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none| |`druid.broker.segment.awaitInitializationOnStart`|Boolean|Whether the Broker will wait for its view of segments to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.sql.planner.awaitInitializationOnStart`, a related setting.|true| diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 1f2ce763ccd2..bd5f33bff007 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -112,14 +112,6 @@ public BrokerServerView( this.timelines = new HashMap<>(); this.segmentFilter = (Pair metadataAndSegment) -> { - log.info( - "Kashif: Trying filter for Segment [%s], Server Type [%s], Server [%s: %s]", - metadataAndSegment.rhs.getId(), - metadataAndSegment.lhs.getType(), - metadataAndSegment.lhs.getTier(), - metadataAndSegment.lhs.getHostAndPort() - ); - // Include only watched tiers if specified if (segmentWatcherConfig.getWatchedTiers() != null && !segmentWatcherConfig.getWatchedTiers().contains(metadataAndSegment.lhs.getTier())) { @@ -241,7 +233,6 @@ private void serverAddedSegment(final DruidServerMetadata server, final DataSegm // query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite query // loop... if (!server.getType().equals(ServerType.BROKER)) { - log.info("Kashif: Adding segment[%s] for server[%s]", segment, server); log.debug("Adding segment[%s] for server[%s]", segment, server); ServerSelector selector = selectors.get(segmentId); if (selector == null) { @@ -275,7 +266,6 @@ private void serverRemovedSegment(DruidServerMetadata server, DataSegment segmen final ServerSelector selector; synchronized (lock) { - log.info("Kashif: Removing segment[%s] from server[%s].", segmentId, server); log.debug("Removing segment[%s] from server[%s].", segmentId, server); // we don't store broker segments here, but still run the callbacks for the segment being removed from the server From 0c9c03bc85083c7d1a53049f29a8672e6228aab7 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 4 Oct 2021 13:00:37 +0530 Subject: [PATCH 3/7] Remove outdated comment --- .../src/main/java/org/apache/druid/client/BrokerServerView.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index bd5f33bff007..0f9856787f29 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -130,7 +130,6 @@ public BrokerServerView( return false; } - // Include realtime nodes only if they are watched return true; }; ExecutorService exec = Execs.singleThreaded("BrokerServerView-%s"); From 0cf0dbdb0d5ac1751c71453567f5952796df01e4 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 5 Oct 2021 12:03:26 +0530 Subject: [PATCH 4/7] Fail Broker on startup if both ignoredTiers and watchedTiers are specified --- .../org/apache/druid/client/BrokerServerView.java | 11 ++++++++++- .../org/apache/druid/client/BrokerServerViewTest.java | 10 ++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 0f9856787f29..4740552551bf 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -106,11 +106,20 @@ public BrokerServerView( this.baseView = baseView; this.tierSelectorStrategy = tierSelectorStrategy; this.emitter = emitter; - this.segmentWatcherConfig = segmentWatcherConfig; this.clients = new ConcurrentHashMap<>(); this.selectors = new HashMap<>(); this.timelines = new HashMap<>(); + // Validate and set the segment watcher config + this.segmentWatcherConfig = segmentWatcherConfig; + if (segmentWatcherConfig.getWatchedTiers() != null + && segmentWatcherConfig.getIgnoredTiers() != null) { + throw new ISE( + "At most one of 'druid.broker.segment.watchedTiers' " + + "and 'druid.broker.segment.ignoredTiers' can be specified." + ); + } + this.segmentFilter = (Pair metadataAndSegment) -> { // Include only watched tiers if specified if (segmentWatcherConfig.getWatchedTiers() != null diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index af656ba435cb..e48dcc34115d 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -33,6 +33,7 @@ import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.curator.CuratorTestBase; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.http.client.HttpClient; @@ -445,6 +446,15 @@ public void testIgnoredTiers() throws Exception Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2)); } + @Test(expected = ISE.class) + public void testInvalidWatchedTiersConfig() throws Exception + { + // Verify that specifying both ignoredTiers and watchedTiers fails startup + final String tier1 = "tier1"; + final String tier2 = "tier2"; + setupViews(Sets.newHashSet(tier2), Sets.newHashSet(tier1)); + } + /** * Creates a DruidServer of type HISTORICAL and sets up a ZNode for it. */ From a6f40d31a582dbef67f29c502d7bfbd501a9af9c Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 5 Oct 2021 12:12:08 +0530 Subject: [PATCH 5/7] Fix docs for new config --- docs/configuration/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index d2d38274a0d9..dad9cb2ffb04 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1794,7 +1794,7 @@ See [cache configuration](#cache-configuration) for how to configure cache setti |--------|---------------|-----------|-------| |`druid.serverview.type`|batch or http|Segment discovery method to use. "http" enables discovering segments using HTTP instead of zookeeper.|batch| |`druid.broker.segment.watchedTiers`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of tiers. By default, Broker would consider all tiers. This can be used to partition your dataSources in specific Historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none| -|`druid.broker.segment.ignoredTiers`|List of strings|The Broker watches the segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to ignore the segments being served from a list of tiers. By default, Broker would consider all tiers. This can be used to partition your dataSources in specific Historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none| +|`druid.broker.segment.ignoredTiers`|List of strings|The Broker watches the segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to ignore the segments being served from a list of tiers. By default, Broker considers all tiers. This config is mutually exclusive from `druid.broker.segment.watchedTiers` and at most only one of these can be configured on a Broker.|none| |`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none| |`druid.broker.segment.awaitInitializationOnStart`|Boolean|Whether the Broker will wait for its view of segments to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.sql.planner.awaitInitializationOnStart`, a related setting.|true| From a94e562408079e62789434788b0fdda94e3a88d8 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 5 Oct 2021 14:31:46 +0530 Subject: [PATCH 6/7] Validate watchedTiers and ignoredTiers configs --- docs/configuration/index.md | 4 +- .../apache/druid/client/BrokerServerView.java | 37 +++++++++++++++---- .../BrokerSegmentWatcherConfigTest.java | 15 +++++++- .../druid/client/BrokerServerViewTest.java | 12 ++++++ 4 files changed, 57 insertions(+), 11 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index dad9cb2ffb04..3ade4c175b9a 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1793,8 +1793,8 @@ See [cache configuration](#cache-configuration) for how to configure cache setti |Property|Possible Values|Description|Default| |--------|---------------|-----------|-------| |`druid.serverview.type`|batch or http|Segment discovery method to use. "http" enables discovering segments using HTTP instead of zookeeper.|batch| -|`druid.broker.segment.watchedTiers`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of tiers. By default, Broker would consider all tiers. This can be used to partition your dataSources in specific Historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none| -|`druid.broker.segment.ignoredTiers`|List of strings|The Broker watches the segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to ignore the segments being served from a list of tiers. By default, Broker considers all tiers. This config is mutually exclusive from `druid.broker.segment.watchedTiers` and at most only one of these can be configured on a Broker.|none| +|`druid.broker.segment.watchedTiers`|List of strings|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to only consider segments being served from a list of tiers. By default, Broker considers all tiers. This can be used to partition your dataSources in specific Historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources. This config is mutually exclusive from `druid.broker.segment.ignoredTiers` and at most one of these can be configured on a Broker.|none| +|`druid.broker.segment.ignoredTiers`|List of strings|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to ignore the segments being served from a list of tiers. By default, Broker considers all tiers. This config is mutually exclusive from `druid.broker.segment.watchedTiers` and at most one of these can be configured on a Broker.|none| |`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none| |`druid.broker.segment.awaitInitializationOnStart`|Boolean|Whether the Broker will wait for its view of segments to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.sql.planner.awaitInitializationOnStart`, a related setting.|true| diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 4740552551bf..f4269157b6ad 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -111,14 +111,8 @@ public BrokerServerView( this.timelines = new HashMap<>(); // Validate and set the segment watcher config + validateSegmentWatcherConfig(segmentWatcherConfig); this.segmentWatcherConfig = segmentWatcherConfig; - if (segmentWatcherConfig.getWatchedTiers() != null - && segmentWatcherConfig.getIgnoredTiers() != null) { - throw new ISE( - "At most one of 'druid.broker.segment.watchedTiers' " - + "and 'druid.broker.segment.ignoredTiers' can be specified." - ); - } this.segmentFilter = (Pair metadataAndSegment) -> { // Include only watched tiers if specified @@ -201,6 +195,35 @@ public void awaitInitialization() throws InterruptedException initialized.await(); } + /** + * Validates the given BrokerSegmentWatcherConfig. + *
    + *
  • At most one of watchedTiers or ignoredTiers can be set
  • + *
  • If set, watchedTiers must be non-empty
  • + *
  • If set, ignoredTiers must be non-empty
  • + *
+ */ + private void validateSegmentWatcherConfig(BrokerSegmentWatcherConfig watcherConfig) + { + if (watcherConfig.getWatchedTiers() != null + && watcherConfig.getIgnoredTiers() != null) { + throw new ISE( + "At most one of 'druid.broker.segment.watchedTiers' " + + "and 'druid.broker.segment.ignoredTiers' can be configured." + ); + } + + if (watcherConfig.getWatchedTiers() != null + && watcherConfig.getWatchedTiers().isEmpty()) { + throw new ISE("If configured, 'druid.broker.segment.watchedTiers' must be non-empty"); + } + + if (watcherConfig.getIgnoredTiers() != null + && watcherConfig.getIgnoredTiers().isEmpty()) { + throw new ISE("If configured, 'druid.broker.segment.ignoredTiers' must be non-empty"); + } + } + private QueryableDruidServer addServer(DruidServer server) { QueryableDruidServer retVal = new QueryableDruidServer<>(server, makeDirectClient(server)); diff --git a/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java b/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java index 8858bc1c46a0..acbbd4b9ebae 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java @@ -48,7 +48,7 @@ public void testSerde() throws Exception Assert.assertNull(config.getIgnoredTiers()); //non-defaults - json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"], \"ignoredTiers\": [\"t3\", \"t4\"] }"; + json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"] }"; config = MAPPER.readValue( MAPPER.writeValueAsString( @@ -58,8 +58,19 @@ public void testSerde() throws Exception ); Assert.assertEquals(ImmutableSet.of("t1", "t2"), config.getWatchedTiers()); - Assert.assertEquals(ImmutableSet.of("t3", "t4"), config.getIgnoredTiers()); Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources()); + // json with ignoredTiers + json = "{ \"ignoredTiers\": [\"t3\", \"t4\"], \"watchedDataSources\": [\"ds1\", \"ds2\"] }"; + + config = MAPPER.readValue( + MAPPER.writeValueAsString( + MAPPER.readValue(json, BrokerSegmentWatcherConfig.class) + ), + BrokerSegmentWatcherConfig.class + ); + + Assert.assertEquals(ImmutableSet.of("t3", "t4"), config.getIgnoredTiers()); + Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources()); } } diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index e48dcc34115d..6e93c372b346 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -455,6 +455,18 @@ public void testInvalidWatchedTiersConfig() throws Exception setupViews(Sets.newHashSet(tier2), Sets.newHashSet(tier1)); } + @Test(expected = ISE.class) + public void testEmptyWatchedTiersConfig() throws Exception + { + setupViews(Collections.emptySet(), null); + } + + @Test(expected = ISE.class) + public void testEmptyIgnoredTiersConfig() throws Exception + { + setupViews(null, Collections.emptySet()); + } + /** * Creates a DruidServer of type HISTORICAL and sets up a ZNode for it. */ From 523ea703300add80d098cd9c529d9c9893d46dc3 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 5 Oct 2021 14:36:01 +0530 Subject: [PATCH 7/7] Fix tests --- .../org/apache/druid/client/BrokerSegmentWatcherConfigTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java b/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java index acbbd4b9ebae..8eab4629489a 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java @@ -58,6 +58,7 @@ public void testSerde() throws Exception ); Assert.assertEquals(ImmutableSet.of("t1", "t2"), config.getWatchedTiers()); + Assert.assertNull(config.getIgnoredTiers()); Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources()); // json with ignoredTiers @@ -70,6 +71,7 @@ public void testSerde() throws Exception BrokerSegmentWatcherConfig.class ); + Assert.assertNull(config.getWatchedTiers()); Assert.assertEquals(ImmutableSet.of("t3", "t4"), config.getIgnoredTiers()); Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources()); }