Skip to content

Commit

Permalink
apache#2258 Lessen gc pressure of coordinator with a lot of segments
Browse files Browse the repository at this point in the history
  • Loading branch information
navis committed Jun 20, 2019
1 parent 7199c5b commit 5cc542f
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 144 deletions.
38 changes: 38 additions & 0 deletions common/src/main/java/io/druid/timeline/DataSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import com.google.common.collect.Iterables;
import io.druid.common.Intervals;
import io.druid.jackson.CommaListJoinDeserializer;
import io.druid.jackson.CommaListJoinSerializer;
import io.druid.query.SegmentDescriptor;
Expand All @@ -43,6 +44,8 @@

import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
*/
Expand All @@ -66,6 +69,41 @@ public String apply(DataSegment input)
}
};

private static final String ISO_TIME_REGEX = "\\d{1,4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}[^_]*";

private static final Pattern INTERVAL_FRACTION =
Pattern.compile("_(" + ISO_TIME_REGEX + ")_(" + ISO_TIME_REGEX + ")_");

public static String parseDateSource(String identifier)
{
final Matcher matcher = INTERVAL_FRACTION.matcher(identifier);
return !matcher.find() ? null : identifier.substring(0, matcher.start());
}

public static SegmentDescriptor parse(String identifier)
{
final Matcher matcher = INTERVAL_FRACTION.matcher(identifier);
if (!matcher.find()) {
return null;
}
int index1 = matcher.start();
String dataSource = identifier.substring(0, index1);
DateTime start = DateTime.parse(matcher.group(1));
DateTime end = DateTime.parse(matcher.group(2));

int versionStart = matcher.end();
int versionEnd = identifier.indexOf('_', versionStart);
String version;
int partitionNum = 0;
if (versionEnd < 0) {
version = identifier.substring(versionStart);
} else {
version = identifier.substring(versionStart, versionEnd);
partitionNum = Integer.valueOf(identifier.substring(versionEnd + 1));
}
return new SegmentDescriptor(Intervals.of(start, end), version, partitionNum);
}

public static String delimiter = "_";
private final Integer binaryVersion;
private static final Interner<String> interner = Interners.newWeakInterner();
Expand Down
24 changes: 20 additions & 4 deletions server/src/main/java/io/druid/client/DruidDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,27 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.python.google.common.collect.ImmutableList;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

/**
*/
public class DruidDataSource
{
public static final Function<String, DruidDataSource> FACTORY = new Function<String, DruidDataSource>()
{
@Override
public DruidDataSource apply(String datasourceName)
{
return new DruidDataSource(datasourceName, ImmutableMap.of("created", new DateTime().toString()));
}
};

private final String name;
private final Map<String, String> properties;
private final Map<String, DataSegment> segmentsMap;
Expand Down Expand Up @@ -70,12 +81,17 @@ public List<DataSegment> getSegmentsSorted()
return segments;
}

public synchronized DruidDataSource addSegment(String partitionName, DataSegment dataSegment)
public synchronized DruidDataSource addSegment(DataSegment dataSegment)
{
segmentsMap.put(partitionName, dataSegment);
segmentsMap.put(dataSegment.getIdentifier(), dataSegment);
return this;
}

public synchronized boolean addSegmentIfAbsent(DataSegment dataSegment)
{
return segmentsMap.putIfAbsent(dataSegment.getIdentifier(), dataSegment) == null;
}

public synchronized DruidDataSource addSegments(Map<String, DataSegment> partitionMap)
{
segmentsMap.putAll(partitionMap);
Expand All @@ -88,9 +104,9 @@ public synchronized DruidDataSource removeSegment(String segmentId)
return this;
}

public synchronized boolean contains(DataSegment segment)
public synchronized boolean contains(String id)
{
return segmentsMap.containsKey(segment.getIdentifier());
return segmentsMap.containsKey(id);
}

public synchronized boolean isEmpty()
Expand Down
56 changes: 23 additions & 33 deletions server/src/main/java/io/druid/client/DruidServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;

import javax.annotation.concurrent.GuardedBy;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -69,9 +68,6 @@ public ImmutableDruidServer apply(DruidServer input)

private volatile long currSize;

@GuardedBy("lock")
private ImmutableDruidServer immutableView;

public DruidServer(
DruidNode node,
DruidServerConfig config,
Expand Down Expand Up @@ -165,8 +161,10 @@ public DataSegment getSegment(String segmentName)
return segments.get(segmentName);
}

public DruidServer addDataSegment(String segmentId, DataSegment segment)
public DruidServer addDataSegment(DataSegment segment)
{
final String segmentId = segment.getIdentifier();

synchronized (lock) {
DataSegment shouldNotExist = segments.get(segmentId);

Expand All @@ -175,7 +173,6 @@ public DruidServer addDataSegment(String segmentId, DataSegment segment)
return this;
}

immutableView = null;
String dataSourceName = segment.getDataSource();
DruidDataSource dataSource = dataSources.get(dataSourceName);

Expand All @@ -187,7 +184,7 @@ public DruidServer addDataSegment(String segmentId, DataSegment segment)
dataSources.put(dataSourceName, dataSource);
}

dataSource.addSegment(segmentId, segment);
dataSource.addSegment(segment);

segments.put(segmentId, segment);
currSize += segment.getSize();
Expand All @@ -198,9 +195,8 @@ public DruidServer addDataSegment(String segmentId, DataSegment segment)
public DruidServer addDataSegments(DruidServer server)
{
synchronized (lock) {
immutableView = null;
for (Map.Entry<String, DataSegment> entry : server.segments.entrySet()) {
addDataSegment(entry.getKey(), entry.getValue());
addDataSegment(entry.getValue());
}
}
return this;
Expand All @@ -227,7 +223,6 @@ public DruidServer removeDataSegment(String segmentId)
);
return this;
}
immutableView = null;

dataSource.removeSegment(segmentId);

Expand Down Expand Up @@ -298,28 +293,23 @@ public int compareTo(Object o)

public ImmutableDruidServer toImmutableDruidServer()
{
synchronized (lock) {
if (immutableView == null) {
immutableView = new ImmutableDruidServer(
metadata,
currSize,
ImmutableMap.copyOf(
Maps.transformValues(
dataSources,
new Function<DruidDataSource, ImmutableDruidDataSource>()
{
@Override
public ImmutableDruidDataSource apply(DruidDataSource input)
{
return input.toImmutableDruidDataSource();
}
}
)
),
ImmutableMap.copyOf(segments)
);
}
return immutableView;
}
return new ImmutableDruidServer(
metadata,
currSize,
ImmutableMap.copyOf(
Maps.transformValues(
dataSources,
new Function<DruidDataSource, ImmutableDruidDataSource>()
{
@Override
public ImmutableDruidDataSource apply(DruidDataSource input)
{
return input.toImmutableDruidDataSource();
}
}
)
),
ImmutableMap.copyOf(segments)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ protected void addSingleInventory(
return;
}

container.addDataSegment(inventory.getIdentifier(), inventory);
container.addDataSegment(inventory);

runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,9 @@ public Map<String, List<Rule>> fold(
)
);

log.info("Polled and found rules for %,d datasource(s)", newRules.size());
if (!rules.get().equals(newRules)) {
log.info("Polled and found rules for %,d datasource(s)", newRules.size());
}

rules.set(newRules);
retryStartTime = 0;
Expand Down
Loading

0 comments on commit 5cc542f

Please sign in to comment.