-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Materialized view implementation #5556
Materialized view implementation #5556
Conversation
@zhangxinyu1 thanks for raising this PR! Would you add a link to the proposal here? |
Oh, never mind. It's already here. |
I restarted Travis. @zhangxinyu1 would you check the TeamCity inspection failure? |
8d328ac
to
fdf16a9
Compare
@zhangxinyu1 thanks for the fix. I'll start my review. BTW, did you have a chance to test this feature in some real clusters? |
@jihoonson Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed up to MaterializedViewMetadataCoordinator
.
import java.util.Objects; | ||
import java.util.Set; | ||
|
||
@JsonTypeName("view") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we will have more types of views in the future. Please use more specific name like derivativeDataSource
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, this annotation is not needed since you added a NamedType
here.
Preconditions.checkNotNull(baseDataSource, "baseDataSource cannot be null. This is not a valid DerivativeDataSourceMetadata."); | ||
Preconditions.checkNotNull(dimensions, "dimensions cannot be null. This is not a valid DerivativeDataSourceMetadata."); | ||
Preconditions.checkNotNull(metrics, "metrics cannot be null. This is not a valid DerivativeDataSourceMetadata."); | ||
this.baseDataSource = baseDataSource; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can be simplified to this.baseDataSource = Preconditions.checkNotNull(baseDataSource, "baseDataSource cannot be null. This is not a valid DerivativeDataSourceMetadata.");
} | ||
|
||
@Override | ||
public boolean matches(DataSourceMetadata other) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the logic is almost same with equals()
. Then it would be better to call equals()
here.
@Override | ||
public DataSourceMetadata plus(DataSourceMetadata other) | ||
{ | ||
// DerivedDataSourceMetadata is not allowed to change |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, this should throw UnsupportedOperationException
. If this causes a problem, you might need to add some methods like isMergeable()
and isSubtractable()
to the DataSourceMetadata
interface.
@Override | ||
public DataSourceMetadata minus(DataSourceMetadata other) | ||
{ | ||
// DerivedDataSourceMetadata is not allowed to change |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. This should throw UnsupportedOperationException
.
new HandleCallback<List<Pair<DataSegment, String>>>() | ||
{ | ||
@Override | ||
public List<Pair<DataSegment, String>> withHandle(Handle handle) throws Exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method doesn't throw Exception
.
public Pair<DataSegment, String> map(int index, ResultSet r, StatementContext ctx) throws SQLException | ||
{ | ||
try { | ||
return new Pair<DataSegment, String>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unnecessary type arguments.
this.connector = connector; | ||
} | ||
|
||
public void insertDataSourceMetadata(String dataSource, DataSourceMetadata metadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably this method should be merged into IndexerSQLMetadataStorageCoordinator.resetDataSourceMetadata()
and that method should check an entry already exists in metastore and insert a new entry if it doesn't. Otherwise, it can update the existing entry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this method can be merged into IndexerSQLMetadataStorageCoordinator.resetDataSourceMetadata()
. However, maybe we can do it in another pr, because we should consider the logic of code where used this method.
); | ||
} | ||
|
||
public Map<DataSegment, String> getSegmentAndCreatedDate(String dataSource, Interval interval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method should return only used
segments. Please add a method like getUsedSegmentsForInterval()
which returns List<Pair<DataSegment, String>>
to IndexerMetadataStorageCoordinator
.
|
||
public Map<DataSegment, String> getSegmentAndCreatedDate(String dataSource, Interval interval) | ||
{ | ||
List<Pair<DataSegment, String>> maxCreatedDate = connector.retryWithHandle( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxCreatedDate
is a less-intuitive name.
@zhangxinyu1 that is great! I think it would be enough. I'll test this PR in our cluster as well. |
6627334
to
ee6fec7
Compare
@jihoonson I have modified code according to your comments. Could you please go on to review it? |
@zhangxinyu1 sure. I'll review tomorrow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhangxinyu1 still reviewing. Reviewed up to DataSourceOptimizer.
@@ -243,6 +243,10 @@ | |||
<argument>io.druid.extensions.contrib:druid-time-min-max</argument> | |||
<argument>-c</argument> | |||
<argument>io.druid.extensions.contrib:druid-virtual-columns</argument> | |||
<argument>-c</argument> | |||
<argument>io.druid.extensions.contrib:materialized-view-maintenance</argument> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you elaborate more on why this feature is split into two extensions? If we need to always load both extensions to use this feature, it would be better to make a single extension.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't agree with you more. However, DataSourceOptimizer
need BrokerServerView
to get the timeline of different dataSources to do optimizing, and only broker has this information. Then, materialized-view-selection module has to be only loaded in broker, so I have to split it into two extensions. I thought about this for a long time, but cannot figure out how to solve this problem. Do you have any suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean that materialized-view-maintenance
should be loaded only in overlords while materialized-view-selection
should be loaded only in brokers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
materialized-view-selection
should be loaded only in brokers, but materialized-view-maintenance
can be loaded anywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, ok. We don't have a nice way to do this currently.. I think it's fine with going as it is. Would you please add some comments about this, especially materialized-view-selection
should be loaded only in brokers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'm working on your comments these days. Thanks very much!
public class MaterializedViewSupervisor implements Supervisor | ||
{ | ||
private static final EmittingLogger log = new EmittingLogger(MaterializedViewSupervisor.class); | ||
private static final Interval ALL_INTERVAL = Intervals.of("0000-01-01/3000-01-01"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use Intervals.ETERNITY
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intervals.ETERNITY doesn't work well when comparing to a varchar
in metastore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would let me know which error you saw?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intervals.ETERNITY="-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z".
When we use it to compare the start
and end
of segments to get all segments from metastore, such as :
select * from druid_segments where start > '-146136543-09-08T08:23:32.096Z' and end < '146140482-04-24T15:36:27.903Z';
,
An empty set will be returned, that is because no end
is less than '146140482-04-24T15:36:27.903Z'.
{ | ||
private static final EmittingLogger log = new EmittingLogger(MaterializedViewSupervisor.class); | ||
private static final Interval ALL_INTERVAL = Intervals.of("0000-01-01/3000-01-01"); | ||
private static final int MAX_TASK_COUNT = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like DEFAULT_MAX_TASK_COUNT
.
this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig cannot be null. Please provide tuningConfig"); | ||
|
||
this.dataSourceName = dataSourceName == null ? | ||
StringUtils.format("%s-%s", baseDataSource, DigestUtils.sha1Hex(dimensionsSpec.toString()).substring(0, 8)) : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The line indentation is not correct. Please adjust it.
@JacksonInject MaterializedViewTaskConfig config | ||
) | ||
{ | ||
this.baseDataSource = Preconditions.checkNotNull(baseDataSource, "baseDataSource cannot be null. Please provide a baseDataSource."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please break the line like
Preconditions.checkNotNull(
baseDataSource,
"baseDataSource cannot be null. Please provide a baseDataSource."
);
Same for the following 3 lines.
} | ||
|
||
@VisibleForTesting | ||
Pair<SortedMap<Interval, String>, Map<Interval, List<DataSegment>>> checkSegments() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some javadoc.
import java.util.Map; | ||
import java.util.Set; | ||
|
||
public class DatasourceOptimizerMonitor extends AbstractMonitor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this!
private static ConcurrentHashMap<String, AtomicLong> hitCount = new ConcurrentHashMap<>(); | ||
private static ConcurrentHashMap<String, AtomicLong> costTime = new ConcurrentHashMap<>(); | ||
private static ConcurrentHashMap<String, ConcurrentHashMap<Set<String>, AtomicLong>> missFields = new ConcurrentHashMap<>(); | ||
private static TimelineServerView serverView = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a final non-static variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
serverView
is used in optimize
method, and this method is static.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, this is should be a final non-static variable because it's quite dangerous. As you said, serverView
is used in a static method (optimize()
), but is initialized in the constructor. As you know, static methods can be used without creating an instance which means serverView
might not be initialized when optimize()
is called. This currently works because Guice initializes DataSourceOptimizer
when DataSourceOptimizerMonitor
is initialized and this happens to be before optimize()
is called. However, it might be broken in the future if somethings change like someone decides to make DataSourceOptimizerMonitor
configurable and disables it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, you'r right. I'll modify it
import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
import java.util.function.Consumer; | ||
|
||
public class DatasourceOptimizer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename to DataSourceOptimizer
.
public class DatasourceOptimizer | ||
{ | ||
private static final ReadWriteLock lock = new ReentrantReadWriteLock(); | ||
private static ConcurrentHashMap<Derivative, AtomicLong> derivativesHitCount = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These variables represent the metrics of dataSourceOptimizer, which means dataSourceOptimizer needs to keep some states. Why don't we simply making a singleton instance of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataSourceOptimizer
is a singleton instance, and I use static
because optimize
method is a static method.
Why don't we simply making a singleton instance of this?
Do you mean I should write another class (e.g. DataSourceOptimizerMetrics) to do record these states.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, you're right. It's singleton. Then, I wonder why you made the optimize()
method static. Usually static methods are useful when a class doesn't have to keep any states (like util classes). But, DataSourceOptimizer
does keep states (that is, metrics).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhangxinyu1 left more comments. It looks a nice start for supporting this kind of cool feature!
Also please add some documentation. I would love to test this in my cluster!
} | ||
|
||
try { | ||
lock.readLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be outside of the try
clause (https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Lock.html).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, probably this should be writeLock()
.
{ | ||
private static final ReadWriteLock lock = new ReentrantReadWriteLock(); | ||
private static ConcurrentHashMap<Derivative, AtomicLong> derivativesHitCount = new ConcurrentHashMap<>(); | ||
private static ConcurrentHashMap<String, AtomicLong> totalCount = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks that the these maps are synchronized with lock
. If so, they don't have to be the concurrentHashMap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also please leave some comments about what these maps mean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lock
is mainly used to synchronized all stats in getAndResetStats()
method. In getAndResetStats()
, we get snapshots of stats one by one, and then clear all stats. I use lock
to ensure there is no changing of stats between these steps.
I use concurrentHashMap beacause, in optimize()
, each stat increases concurrently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understood correctly, but if my new comments are correct, readLock() and writeLock() should be used in getAndResetStats()
and optimize()
, respectively. If so, concurrentMap is not needed because only one thread can write at a time in optimize()
, and all threads can read without contention in getAndResetStats()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my design, many threads are allowed to call optimize()
simultaneously, because, MaterializedViewQuery
need to be optimized concurrently, so I use readLock in optimize()
. It means that these stats can be get and changed respectively by these threads.
However, when a thread call getAndResetStats()
to get the whole snapshot of stats, these stats are not allowed to change respectively. Therefore, I use the writeLock() to limit the call to optimize()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Please add some comments about this.
// only TableDataSource can be optimiezed | ||
if (!(query instanceof TopNQuery || query instanceof TimeseriesQuery || query instanceof GroupByQuery) | ||
|| !(query.getDataSource() instanceof TableDataSource)) { | ||
return Lists.newArrayList(query); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can be Collections.singletonList(query)
.
return Lists.newArrayList(query); | ||
} | ||
|
||
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary.
import java.util.Objects; | ||
import java.util.Set; | ||
|
||
public class Derivative implements Comparable<Derivative> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename to more intuitive name. Looks like DerivativeDataSource
?
@Override | ||
public int hashCode() | ||
{ | ||
return Objects.hash(VIEW) + query.hashCode(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be Objects.hash(VIEW, query)
.
import java.util.concurrent.ExecutorService; | ||
|
||
/** | ||
* Created by zhangxinyu on 2018/2/5. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this.
/** | ||
* Created by zhangxinyu on 2018/2/5. | ||
*/ | ||
public class MaterializedViewQueryRunnerFactory implements QueryRunnerFactory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this class needed in the current implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it's useless. Should I remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes please.
String dim = spec.getDimension(); | ||
dimensions.add(dim); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please throw an exception if the query type is unknown.
return ret; | ||
} | ||
|
||
private static Set<String> getDimensionsInFilter(DimFilter dimFilter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to add a method to DimFilter
which returns all required column names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's better if we add this method. Because it will miss the case when any new implementation of DimFilter
. But, do you think I should add this method in this pr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's up to you. If you don't want to make this PR bigger, please raise an issue for this.
ImmutableMap<String, AtomicLong> costTimeSnapshot; | ||
ImmutableMap<String, ConcurrentHashMap<Set<String>, AtomicLong>> missFieldsSnapshot; | ||
try { | ||
lock.writeLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably this should be readLock()
.
} | ||
|
||
try { | ||
lock.readLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, probably this should be writeLock()
.
{ | ||
private static final ReadWriteLock lock = new ReentrantReadWriteLock(); | ||
private static ConcurrentHashMap<Derivative, AtomicLong> derivativesHitCount = new ConcurrentHashMap<>(); | ||
private static ConcurrentHashMap<String, AtomicLong> totalCount = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understood correctly, but if my new comments are correct, readLock() and writeLock() should be used in getAndResetStats()
and optimize()
, respectively. If so, concurrentMap is not needed because only one thread can write at a time in optimize()
, and all threads can read without contention in getAndResetStats()
.
ee6fec7
to
6887cb0
Compare
The rough documentation about how to use this feature is at the front of this pr. Should I add some documentation to |
@zhangxinyu1 yes, you can add docs to the directory under $DRUID/docs/content/development/extensions-contrib like other extensions. |
6887cb0
to
4a6a372
Compare
@zhangxinyu1 thanks for the update! I didn't realize that. I'll take another look and do some tests in our cluster. BTW, a recent change (#5583) merged into master includes a change of the signature of HadoopTuningConfig which makes merging this PR failed. Would you update this PR? |
4a6a372
to
0ab2bcd
Compare
@jihoonson Thanks for reminding. I have updated it. |
b879328
to
1b452d6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhangxinyu1 thanks for the update. I left my last comments. I also tested this PR in my local machine. It works nicely!
|
||
# Materialized View | ||
|
||
To use this feature, make sure to only load materialized-view-selection on broker and load materialized-view-maintenance on overlord. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add that this feature currently requires a hadoop cluster.
{ | ||
try { | ||
DataSourceMetadata metadata = metadataStorageCoordinator.getDataSourceMetadata(dataSource); | ||
if (metadata != null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you check this comment?
if (dataSourceMetadata == null) { | ||
// if oldMetadata is different from spec, tasks and segments will be removed when reset. | ||
DataSourceMetadata oldMetadata = metadataStorageCoordinator.getDataSourceMetadata(dataSource); | ||
if (oldMetadata != null && oldMetadata instanceof DerivativeDataSourceMetadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. Null check is unnecessary.
* @param interval The interval for which all applicable and used datasources are requested. Start is inclusive, end is exclusive | ||
* @return The DataSegments and the related created_date of segments which include data in the requested interval | ||
*/ | ||
List<Pair<DataSegment, String>> getUsedSegmentAndCreatedDateForInterval(String dataSource, Interval interval); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to modify List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval);
to return List<Pair<DataSegment, String>>
rather than adding a new method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know. I just think when someone calls method getUsedSegmentsForInterval
, maybe he doesn't want to get the information about created date.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe created_date should be a part of DataSegment
. In this way, we only need the method List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval);
. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only usage of getUsedSegmentsForInterval()
is SegmentAllocateAction
. It checks any segments are already allocated for the given interval to allocate a new segment id. I think it can just ignore the createdDate part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe created_date should be a part of DataSegment. In this way, we only need the method List getUsedSegmentsForInterval(String dataSource, Interval interval);. What do you think?
Hmm, that's a good point. It sounds good, but I'm not sure about why created_date
is not a part of DataSchema
itself. @gianm any idea?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, let me raise an issue for this and merge these two methods in another pr, because it affect about 16 Classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Please go for it.
1b452d6
to
1938ce5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a greatly useful change, thanks!
*/ | ||
private boolean hasEnoughLag(Interval target, Interval maxInterval) | ||
{ | ||
if ((target.getStartMillis() + minDataLagMs) > maxInterval.getStartMillis()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could just return the boolean expression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thanks!
parser.put("parseSpec", parseSpec); | ||
|
||
//generate HadoopTuningConfig | ||
HadoopTuningConfig tuningConfigForTask = new HadoopTuningConfig( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do tuningConfigForTask.withVersion instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid not, because though withVersion
function can set new version, it cannot set useExplicitVersion
= true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool thanks!
} | ||
|
||
@Override | ||
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be kinda nice to make the UnionDataSource support QueryDataSources and reuse it to run a list of queries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand. Could you please describe it more detail? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure - not an important suggestion so please ignore if it seems irrelevant or too much work :)
In order to execute a materialised view query we have to issue multiple queries on different intervals and merge their results. That might be a more generally useful component where user's can union multiple queries rather than just multiple datasources.
|| !(query.getDataSource() instanceof TableDataSource)) { | ||
return Collections.singletonList(query); | ||
} | ||
String datasourceName = ((TableDataSource) query.getDataSource()).getName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's easy to do i think it'd be worth supporting UnionDatasources as well. Would it just be a matter of iterating over a list of datasource names and running the rest of this method and flattening the resulting list of queries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your suggestion. The current implementation support UnionDataSource in this way: In UnionQueryRunner
, UnionDataSource are transformed to some TableDataSources, and then, these TableDataSources are optimized in DataSourceOptimizer.java
. Is is ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah got you, thanks for the explanation!
1938ce5
to
3623f9b
Compare
@Dylan1312 Could you please trigger the travis CI building? |
Afraid I don't have the appropriate permission, a committer should be able to help you out |
you can always close and reopen the PR to restart the build ... |
@Dylan1312 Thanks! |
@b-slim It works, thanks! |
import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
import java.util.function.Consumer; | ||
|
||
public class DatasourceOptimizer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please forgive me for posting this here - I'm not a committer/reviewer, so my feedback does not count, but there is one thing that looks incorrect to me:
Class DatasourceOptimizer states that
"Derived dataSource with smallest average size of segments have highest priority to replace the datasource in user query"
and accordingly the following lines produce this prioritized collection of derivatives:
// get all derivatives for datasource in query. The derivatives set is sorted by average size of per segment granularity.
ImmutableSortedSet<Derivative> derivatives = DerivativesManager.getDerivatives(datasourceName);
However, a few lines below items from the above collection named "derivatives" which is sorted by priority get selected and put into the following collection, which is simply a hashset, which is not sorted and which according to javadoc does also not guarantee that the items are in insertion order:
Set<Derivative> derivativesWithRequiredFields = Sets.newHashSet();
To my understanding, the "derivativesWithRequiredFields" should be a list or a LinkedHashSet such that it is guaranteed that later on the best derivative gets consulted first.
thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sascha-coenen thanks for your attention and suggestion.
Please see the latest version of DataSourceOptimizer
here: https://github.com/druid-io/druid/pull/5556/files#diff-250d80eb8afc10c49ee91e41d8f9d91c .
The derivativesWithRequiredFields
will be sorted when it is used as follows :
for (DerivativeDataSource derivativeDataSource : ImmutableSortedSet.copyOf(derivativesWithRequiredFields))
I'm going to remove
|
All right. I'm going to merge this PR shortly. |
Merged. @zhangxinyu1 thank you for the contribution! |
@jihoonson Thanks! I will work on the related issue #5710 and #5775 these days. |
List<Pair<DataSegment, String>> snapshot | ||
) | ||
{ | ||
Interval maxAllowedToBuildInterval = snapshot.parallelStream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhangxinyu1 why did you use parallel Stream?
.list() | ||
); | ||
|
||
List<DerivativeDataSource> derivativeDataSources = derivativesInDatabase.parallelStream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And here
Target
To optimize query.
Implementation
There are two extensions namely materialized-view-maintenance and materialized-view-selection.
In materialized-view-maintenance,
MaterializedViewSupervisor
is used to generate or drop derived datasource segments and keep the timeline's consistency of base datasource and derived datasource.In materialized-view-selection,
MaterializedViewQuery
is implemented to do materialized-view-selection for topn/groupby/timeseries query.The detailed design and discussion is in issue #5304
Usage