Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Broker config druid.broker.segment.ignoredTiers #11766

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1794,6 +1794,7 @@ See [cache configuration](#cache-configuration) for how to configure cache setti
|--------|---------------|-----------|-------|
|`druid.serverview.type`|batch or http|Segment discovery method to use. "http" enables discovering segments using HTTP instead of zookeeper.|batch|
|`druid.broker.segment.watchedTiers`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of tiers. By default, Broker would consider all tiers. This can be used to partition your dataSources in specific Historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none|
|`druid.broker.segment.ignoredTiers`|List of strings|The Broker watches the segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to ignore the segments being served from a list of tiers. By default, Broker considers all tiers. This config is mutually exclusive from `druid.broker.segment.watchedTiers` and at most only one of these can be configured on a Broker.|none|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should druid.broker.segment.watchedTiers docs also be updated to indicate that it is mutually exclusive with ignoredTiers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that would help remove any ambiguity.

|`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none|
|`druid.broker.segment.awaitInitializationOnStart`|Boolean|Whether the Broker will wait for its view of segments to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.sql.planner.awaitInitializationOnStart`, a related setting.|true|

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public class BrokerSegmentWatcherConfig
@JsonProperty
private Set<String> watchedTiers = null;

@JsonProperty
private Set<String> ignoredTiers = null;

@JsonProperty
private Set<String> watchedDataSources = null;

Expand All @@ -41,6 +44,11 @@ public Set<String> getWatchedTiers()
return watchedTiers;
}

public Set<String> getIgnoredTiers()
{
return ignoredTiers;
}

public Set<String> getWatchedDataSources()
{
return watchedDataSources;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,34 @@ public BrokerServerView(
this.baseView = baseView;
this.tierSelectorStrategy = tierSelectorStrategy;
this.emitter = emitter;
this.segmentWatcherConfig = segmentWatcherConfig;
this.clients = new ConcurrentHashMap<>();
this.selectors = new HashMap<>();
this.timelines = new HashMap<>();

// Validate and set the segment watcher config
this.segmentWatcherConfig = segmentWatcherConfig;
if (segmentWatcherConfig.getWatchedTiers() != null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i know this isn't new, but I wonder since we now doing some validation here if we should also explode if watchedTiers is empty, which also seems like an invalid configuration

Copy link
Contributor Author

@kfaraz kfaraz Oct 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would make sense to add a validate method in BrokerSegmentWatcherConfig that contains all these validations and just invoke segmentWatcherConfig.validate() from here or should we keep them here itself?

&& segmentWatcherConfig.getIgnoredTiers() != null) {
throw new ISE(
"At most one of 'druid.broker.segment.watchedTiers' "
+ "and 'druid.broker.segment.ignoredTiers' can be specified."
);
}

this.segmentFilter = (Pair<DruidServerMetadata, DataSegment> metadataAndSegment) -> {
// Include only watched tiers if specified
if (segmentWatcherConfig.getWatchedTiers() != null
&& !segmentWatcherConfig.getWatchedTiers().contains(metadataAndSegment.lhs.getTier())) {
return false;
}

// Exclude ignored tiers if specified
if (segmentWatcherConfig.getIgnoredTiers() != null
&& segmentWatcherConfig.getIgnoredTiers().contains(metadataAndSegment.lhs.getTier())) {
return false;
}

// Include only watched datasources if specified
if (segmentWatcherConfig.getWatchedDataSources() != null
&& !segmentWatcherConfig.getWatchedDataSources().contains(metadataAndSegment.rhs.getDataSource())) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ public void testSerde() throws Exception
);

Assert.assertNull(config.getWatchedTiers());
Assert.assertNull(config.getIgnoredTiers());

//non-defaults
json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"] }";
json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"], \"ignoredTiers\": [\"t3\", \"t4\"] }";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is sort of strange now that it is an invalid configuration, though technically it isn't enforced by the config itself so it isn't problematic here. maybe we should at least leave a comment that this config is illegal in practice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you are right, we should not use an invalid configuration here. Fixing.


config = MAPPER.readValue(
MAPPER.writeValueAsString(
Expand All @@ -57,6 +58,7 @@ public void testSerde() throws Exception
);

Assert.assertEquals(ImmutableSet.of("t1", "t2"), config.getWatchedTiers());
Assert.assertEquals(ImmutableSet.of("t3", "t4"), config.getIgnoredTiers());
Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources());

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.curator.CuratorTestBase;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.http.client.HttpClient;
Expand Down Expand Up @@ -383,22 +384,101 @@ public void testMultipleTiers() throws Exception
Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2));
}

@Test
public void testIgnoredTiers() throws Exception
{
segmentViewInitLatch = new CountDownLatch(1);
segmentAddedLatch = new CountDownLatch(4);
segmentRemovedLatch = new CountDownLatch(0);

// Setup a Broker that does not watch Tier 1
final String tier1 = "tier1";
final String tier2 = "tier2";
setupViews(null, Sets.newHashSet(tier1));

// Historical Tier 1 has segments 1 and 2, Tier 2 has segments 2 and 3
final DruidServer server11 = setupHistoricalServer(tier1, "localhost:1", 1);
final DruidServer server21 = setupHistoricalServer(tier2, "localhost:2", 1);

final DataSegment segment1 = dataSegmentWithIntervalAndVersion("2020-01-01/P1D", "v1");
announceSegmentForServer(server11, segment1, zkPathsConfig, jsonMapper);

final DataSegment segment2 = dataSegmentWithIntervalAndVersion("2020-01-02/P1D", "v1");
announceSegmentForServer(server11, segment2, zkPathsConfig, jsonMapper);
announceSegmentForServer(server21, segment2, zkPathsConfig, jsonMapper);

final DataSegment segment3 = dataSegmentWithIntervalAndVersion("2020-01-03/P1D", "v1");
announceSegmentForServer(server21, segment3, zkPathsConfig, jsonMapper);

// Wait for the segments to be added
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));

// Get the timeline for the datasource
TimelineLookup<String, ServerSelector> timeline = brokerServerView.getTimeline(
DataSourceAnalysis.forDataSource(new TableDataSource(segment1.getDataSource()))
).get();

// Verify that the timeline has no entry for the interval of segment 1
Assert.assertTrue(timeline.lookup(segment1.getInterval()).isEmpty());

// Verify that there is one entry for the interval of segment 2
List<TimelineObjectHolder<String, ServerSelector>> timelineHolders =
timeline.lookup(segment2.getInterval());
Assert.assertEquals(1, timelineHolders.size());

TimelineObjectHolder<String, ServerSelector> timelineHolder = timelineHolders.get(0);
Assert.assertEquals(segment2.getInterval(), timelineHolder.getInterval());
Assert.assertEquals(segment2.getVersion(), timelineHolder.getVersion());

PartitionHolder<ServerSelector> partitionHolder = timelineHolder.getObject();
Assert.assertTrue(partitionHolder.isComplete());
Assert.assertEquals(1, Iterables.size(partitionHolder));

ServerSelector selector = (partitionHolder.iterator().next()).getObject();
Assert.assertFalse(selector.isEmpty());
Assert.assertEquals(segment2, selector.getSegment());

// Verify that the ServerSelector always picks Tier 1
for (int i = 0; i < 5; ++i) {
Assert.assertEquals(server21, selector.pick(null).getServer());
}
Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2));
}

@Test(expected = ISE.class)
public void testInvalidWatchedTiersConfig() throws Exception
{
// Verify that specifying both ignoredTiers and watchedTiers fails startup
final String tier1 = "tier1";
final String tier2 = "tier2";
setupViews(Sets.newHashSet(tier2), Sets.newHashSet(tier1));
}

/**
* Creates a DruidServer of type HISTORICAL and sets up a ZNode for it.
*/
private DruidServer setupHistoricalServer(String tier, String name, int priority)
{
final DruidServer historical = new DruidServer(
return setupDruidServer(ServerType.HISTORICAL, tier, name, priority);
}

/**
* Creates a DruidServer of the specified type and sets up a ZNode for it.
*/
private DruidServer setupDruidServer(ServerType serverType, String tier, String name, int priority)
{
final DruidServer druidServer = new DruidServer(
name,
name,
null,
1000000,
ServerType.HISTORICAL,
serverType,
tier,
priority
);
setupZNodeForServer(historical, zkPathsConfig, jsonMapper);
return historical;
setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);
return druidServer;
}

private Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> createExpected(
Expand Down Expand Up @@ -442,6 +522,11 @@ private void setupViews() throws Exception
}

private void setupViews(Set<String> watchedTiers) throws Exception
{
setupViews(watchedTiers, null);
}

private void setupViews(Set<String> watchedTiers, Set<String> ignoredTiers) throws Exception
{
baseView = new BatchServerInventoryView(
zkPathsConfig,
Expand Down Expand Up @@ -500,6 +585,12 @@ public Set<String> getWatchedTiers()
{
return watchedTiers;
}

@Override
public Set<String> getIgnoredTiers()
{
return ignoredTiers;
}
}
);

Expand Down