Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Materialized view implementation #5556

Merged
merged 9 commits into from
Jun 9, 2018

Conversation

zhangxinyu1
Copy link
Contributor

@zhangxinyu1 zhangxinyu1 commented Mar 30, 2018

Target

To optimize query.

Implementation

There are two extensions namely materialized-view-maintenance and materialized-view-selection.

In materialized-view-maintenance, MaterializedViewSupervisor is used to generate or drop derived datasource segments and keep the timeline's consistency of base datasource and derived datasource.

In materialized-view-selection, MaterializedViewQuery is implemented to do materialized-view-selection for topn/groupby/timeseries query.

The detailed design and discussion is in issue #5304

Usage

  1. Loading materialized-view-maintenance and materialized-view-selection. Notices: materialized-view-selection can only be loaded when start broker.
  2. Submit a MaterializedViewSupervisor. e.g.:
{
  "type" : "derivativeDataSource",
  "baseDataSource": "wikiticker",
  "dimensionsSpec":{
            "dimensions" : [
              "isUnpatrolled",
              "metroCode",
              "namespace",
              "page",
              "regionIsoCode",
              "regionName",
              "user"
            ]
          },
    "metricsSpec" : [
        {
          "name" : "count",
          "type" : "count"
        },
        {
          "name" : "added",
          "type" : "longSum",
          "fieldName" : "added"
        }
      ],
  "tuningConfig": {
      "type" : "hadoop"
  }
}
  1. Send a MaterializedViewQuery. e.g.:
{
    "queryType": "view",
    "query": {
        "queryType": "groupBy",
        "dataSource": "wikiticker",
        "granularity": "all",
        "dimensions": [
            "user"
        ],
        "limitSpec": {
            "type": "default",
            "limit": 1,
            "columns": [
                {
                    "dimension": "added",
                    "direction": "descending",
                    "dimensionOrder": "numeric"
                }
            ]
        },
        "aggregations": [
            {
                "type": "longSum",
                "name": "added",
                "fieldName": "added"
            }
        ],
        "intervals": [
            "2015-09-12/2015-09-13"
        ]
    }
}

@jihoonson
Copy link
Contributor

@zhangxinyu1 thanks for raising this PR! Would you add a link to the proposal here?

@jihoonson
Copy link
Contributor

Oh, never mind. It's already here.

@jihoonson
Copy link
Contributor

I restarted Travis. @zhangxinyu1 would you check the TeamCity inspection failure?

@zhangxinyu1 zhangxinyu1 force-pushed the feature-materialized-view-1.0 branch from 8d328ac to fdf16a9 Compare March 31, 2018 10:49
@jihoonson
Copy link
Contributor

@zhangxinyu1 thanks for the fix. I'll start my review. BTW, did you have a chance to test this feature in some real clusters?

@zhangxinyu1
Copy link
Contributor Author

zhangxinyu1 commented Apr 3, 2018

@jihoonson Thanks!
Yes, we have real clusters running with this feature, but the version of these clusters are 0.10.0 and this feature in our clusters is implement based on 0.10.0. However, I have tested some functions of this implementation based on 0.13.0-SNAPSHOT in our test cluster. Do you have any suggestions about testing this feature?

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed up to MaterializedViewMetadataCoordinator.

import java.util.Objects;
import java.util.Set;

@JsonTypeName("view")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we will have more types of views in the future. Please use more specific name like derivativeDataSource.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, this annotation is not needed since you added a NamedType here.

Preconditions.checkNotNull(baseDataSource, "baseDataSource cannot be null. This is not a valid DerivativeDataSourceMetadata.");
Preconditions.checkNotNull(dimensions, "dimensions cannot be null. This is not a valid DerivativeDataSourceMetadata.");
Preconditions.checkNotNull(metrics, "metrics cannot be null. This is not a valid DerivativeDataSourceMetadata.");
this.baseDataSource = baseDataSource;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can be simplified to this.baseDataSource = Preconditions.checkNotNull(baseDataSource, "baseDataSource cannot be null. This is not a valid DerivativeDataSourceMetadata.");

}

@Override
public boolean matches(DataSourceMetadata other)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the logic is almost same with equals(). Then it would be better to call equals() here.

@Override
public DataSourceMetadata plus(DataSourceMetadata other)
{
// DerivedDataSourceMetadata is not allowed to change
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, this should throw UnsupportedOperationException. If this causes a problem, you might need to add some methods like isMergeable() and isSubtractable() to the DataSourceMetadata interface.

@Override
public DataSourceMetadata minus(DataSourceMetadata other)
{
// DerivedDataSourceMetadata is not allowed to change
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. This should throw UnsupportedOperationException.

new HandleCallback<List<Pair<DataSegment, String>>>()
{
@Override
public List<Pair<DataSegment, String>> withHandle(Handle handle) throws Exception
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method doesn't throw Exception.

public Pair<DataSegment, String> map(int index, ResultSet r, StatementContext ctx) throws SQLException
{
try {
return new Pair<DataSegment, String>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary type arguments.

this.connector = connector;
}

public void insertDataSourceMetadata(String dataSource, DataSourceMetadata metadata)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably this method should be merged into IndexerSQLMetadataStorageCoordinator.resetDataSourceMetadata() and that method should check an entry already exists in metastore and insert a new entry if it doesn't. Otherwise, it can update the existing entry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this method can be merged into IndexerSQLMetadataStorageCoordinator.resetDataSourceMetadata(). However, maybe we can do it in another pr, because we should consider the logic of code where used this method.

);
}

public Map<DataSegment, String> getSegmentAndCreatedDate(String dataSource, Interval interval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should return only used segments. Please add a method like getUsedSegmentsForInterval() which returns List<Pair<DataSegment, String>> to IndexerMetadataStorageCoordinator.


public Map<DataSegment, String> getSegmentAndCreatedDate(String dataSource, Interval interval)
{
List<Pair<DataSegment, String>> maxCreatedDate = connector.retryWithHandle(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxCreatedDate is a less-intuitive name.

@jihoonson
Copy link
Contributor

Yes, we have real clusters running with this feature, but the version of these clusters are 0.10.0 and this feature in our clusters is implement based on 0.10.0. However, I have tested some functions of this implementation based on 0.13.0-SNAPSHOT in our test cluster. Do you have any suggestions about testing this feature?

@zhangxinyu1 that is great! I think it would be enough. I'll test this PR in our cluster as well.

@zhangxinyu1 zhangxinyu1 force-pushed the feature-materialized-view-1.0 branch 2 times, most recently from 6627334 to ee6fec7 Compare April 9, 2018 07:53
@zhangxinyu1
Copy link
Contributor Author

@jihoonson I have modified code according to your comments. Could you please go on to review it?

@jihoonson
Copy link
Contributor

@zhangxinyu1 sure. I'll review tomorrow.

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhangxinyu1 still reviewing. Reviewed up to DataSourceOptimizer.

@@ -243,6 +243,10 @@
<argument>io.druid.extensions.contrib:druid-time-min-max</argument>
<argument>-c</argument>
<argument>io.druid.extensions.contrib:druid-virtual-columns</argument>
<argument>-c</argument>
<argument>io.druid.extensions.contrib:materialized-view-maintenance</argument>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you elaborate more on why this feature is split into two extensions? If we need to always load both extensions to use this feature, it would be better to make a single extension.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't agree with you more. However, DataSourceOptimizer need BrokerServerView to get the timeline of different dataSources to do optimizing, and only broker has this information. Then, materialized-view-selection module has to be only loaded in broker, so I have to split it into two extensions. I thought about this for a long time, but cannot figure out how to solve this problem. Do you have any suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean that materialized-view-maintenance should be loaded only in overlords while materialized-view-selection should be loaded only in brokers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

materialized-view-selection should be loaded only in brokers, but materialized-view-maintenance can be loaded anywhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok. We don't have a nice way to do this currently.. I think it's fine with going as it is. Would you please add some comments about this, especially materialized-view-selection should be loaded only in brokers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'm working on your comments these days. Thanks very much!

public class MaterializedViewSupervisor implements Supervisor
{
private static final EmittingLogger log = new EmittingLogger(MaterializedViewSupervisor.class);
private static final Interval ALL_INTERVAL = Intervals.of("0000-01-01/3000-01-01");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Intervals.ETERNITY instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intervals.ETERNITY doesn't work well when comparing to a varchar in metastore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would let me know which error you saw?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intervals.ETERNITY="-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z".

When we use it to compare the start and end of segments to get all segments from metastore, such as :
select * from druid_segments where start > '-146136543-09-08T08:23:32.096Z' and end < '146140482-04-24T15:36:27.903Z';,
An empty set will be returned, that is because no end is less than '146140482-04-24T15:36:27.903Z'.

{
private static final EmittingLogger log = new EmittingLogger(MaterializedViewSupervisor.class);
private static final Interval ALL_INTERVAL = Intervals.of("0000-01-01/3000-01-01");
private static final int MAX_TASK_COUNT = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like DEFAULT_MAX_TASK_COUNT.

this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig cannot be null. Please provide tuningConfig");

this.dataSourceName = dataSourceName == null ?
StringUtils.format("%s-%s", baseDataSource, DigestUtils.sha1Hex(dimensionsSpec.toString()).substring(0, 8)) :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line indentation is not correct. Please adjust it.

@JacksonInject MaterializedViewTaskConfig config
)
{
this.baseDataSource = Preconditions.checkNotNull(baseDataSource, "baseDataSource cannot be null. Please provide a baseDataSource.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please break the line like

Preconditions.checkNotNull(
  baseDataSource,
  "baseDataSource cannot be null. Please provide a baseDataSource."
);

Same for the following 3 lines.

}

@VisibleForTesting
Pair<SortedMap<Interval, String>, Map<Interval, List<DataSegment>>> checkSegments()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some javadoc.

import java.util.Map;
import java.util.Set;

public class DatasourceOptimizerMonitor extends AbstractMonitor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this!

private static ConcurrentHashMap<String, AtomicLong> hitCount = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, AtomicLong> costTime = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, ConcurrentHashMap<Set<String>, AtomicLong>> missFields = new ConcurrentHashMap<>();
private static TimelineServerView serverView = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a final non-static variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serverView is used in optimize method, and this method is static.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, this is should be a final non-static variable because it's quite dangerous. As you said, serverView is used in a static method (optimize()), but is initialized in the constructor. As you know, static methods can be used without creating an instance which means serverView might not be initialized when optimize() is called. This currently works because Guice initializes DataSourceOptimizer when DataSourceOptimizerMonitor is initialized and this happens to be before optimize() is called. However, it might be broken in the future if somethings change like someone decides to make DataSourceOptimizerMonitor configurable and disables it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, you'r right. I'll modify it

import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;

public class DatasourceOptimizer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename to DataSourceOptimizer.

public class DatasourceOptimizer
{
private static final ReadWriteLock lock = new ReentrantReadWriteLock();
private static ConcurrentHashMap<Derivative, AtomicLong> derivativesHitCount = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These variables represent the metrics of dataSourceOptimizer, which means dataSourceOptimizer needs to keep some states. Why don't we simply making a singleton instance of this?

Copy link
Contributor Author

@zhangxinyu1 zhangxinyu1 Apr 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataSourceOptimizer is a singleton instance, and I use static because optimize method is a static method.

Why don't we simply making a singleton instance of this?

Do you mean I should write another class (e.g. DataSourceOptimizerMetrics) to do record these states.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you're right. It's singleton. Then, I wonder why you made the optimize() method static. Usually static methods are useful when a class doesn't have to keep any states (like util classes). But, DataSourceOptimizer does keep states (that is, metrics).

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhangxinyu1 left more comments. It looks a nice start for supporting this kind of cool feature!

Also please add some documentation. I would love to test this in my cluster!

}

try {
lock.readLock().lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, probably this should be writeLock().

{
private static final ReadWriteLock lock = new ReentrantReadWriteLock();
private static ConcurrentHashMap<Derivative, AtomicLong> derivativesHitCount = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, AtomicLong> totalCount = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks that the these maps are synchronized with lock. If so, they don't have to be the concurrentHashMap.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also please leave some comments about what these maps mean.

Copy link
Contributor Author

@zhangxinyu1 zhangxinyu1 Apr 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lock is mainly used to synchronized all stats in getAndResetStats() method. In getAndResetStats() , we get snapshots of stats one by one, and then clear all stats. I use lock to ensure there is no changing of stats between these steps.
I use concurrentHashMap beacause, in optimize(), each stat increases concurrently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understood correctly, but if my new comments are correct, readLock() and writeLock() should be used in getAndResetStats() and optimize(), respectively. If so, concurrentMap is not needed because only one thread can write at a time in optimize(), and all threads can read without contention in getAndResetStats().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my design, many threads are allowed to call optimize() simultaneously, because, MaterializedViewQuery need to be optimized concurrently, so I use readLock in optimize(). It means that these stats can be get and changed respectively by these threads.
However, when a thread call getAndResetStats() to get the whole snapshot of stats, these stats are not allowed to change respectively. Therefore, I use the writeLock() to limit the call to optimize().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Please add some comments about this.

// only TableDataSource can be optimiezed
if (!(query instanceof TopNQuery || query instanceof TimeseriesQuery || query instanceof GroupByQuery)
|| !(query.getDataSource() instanceof TableDataSource)) {
return Lists.newArrayList(query);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can be Collections.singletonList(query).

return Lists.newArrayList(query);
}

//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary.

import java.util.Objects;
import java.util.Set;

public class Derivative implements Comparable<Derivative>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename to more intuitive name. Looks like DerivativeDataSource?

@Override
public int hashCode()
{
return Objects.hash(VIEW) + query.hashCode();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be Objects.hash(VIEW, query).

import java.util.concurrent.ExecutorService;

/**
* Created by zhangxinyu on 2018/2/5.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this.

/**
* Created by zhangxinyu on 2018/2/5.
*/
public class MaterializedViewQueryRunnerFactory implements QueryRunnerFactory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this class needed in the current implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's useless. Should I remove it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes please.

String dim = spec.getDimension();
dimensions.add(dim);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please throw an exception if the query type is unknown.

return ret;
}

private static Set<String> getDimensionsInFilter(DimFilter dimFilter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to add a method to DimFilter which returns all required column names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's better if we add this method. Because it will miss the case when any new implementation of DimFilter . But, do you think I should add this method in this pr?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's up to you. If you don't want to make this PR bigger, please raise an issue for this.

ImmutableMap<String, AtomicLong> costTimeSnapshot;
ImmutableMap<String, ConcurrentHashMap<Set<String>, AtomicLong>> missFieldsSnapshot;
try {
lock.writeLock().lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably this should be readLock().

}

try {
lock.readLock().lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, probably this should be writeLock().

{
private static final ReadWriteLock lock = new ReentrantReadWriteLock();
private static ConcurrentHashMap<Derivative, AtomicLong> derivativesHitCount = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, AtomicLong> totalCount = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understood correctly, but if my new comments are correct, readLock() and writeLock() should be used in getAndResetStats() and optimize(), respectively. If so, concurrentMap is not needed because only one thread can write at a time in optimize(), and all threads can read without contention in getAndResetStats().

@zhangxinyu1
Copy link
Contributor Author

zhangxinyu1 commented Apr 28, 2018

@jihoonson

Also please add some documentation. I would love to test this in my cluster!

The rough documentation about how to use this feature is at the front of this pr. Should I add some documentation to docs?

@jihoonson
Copy link
Contributor

@zhangxinyu1 yes, you can add docs to the directory under $DRUID/docs/content/development/extensions-contrib like other extensions.

@zhangxinyu1 zhangxinyu1 force-pushed the feature-materialized-view-1.0 branch from 6887cb0 to 4a6a372 Compare May 3, 2018 09:42
@jihoonson
Copy link
Contributor

@zhangxinyu1 thanks for the update! I didn't realize that. I'll take another look and do some tests in our cluster.

BTW, a recent change (#5583) merged into master includes a change of the signature of HadoopTuningConfig which makes merging this PR failed. Would you update this PR?

@zhangxinyu1 zhangxinyu1 force-pushed the feature-materialized-view-1.0 branch from 4a6a372 to 0ab2bcd Compare May 8, 2018 14:58
@zhangxinyu1
Copy link
Contributor Author

@jihoonson Thanks for reminding. I have updated it.

@zhangxinyu1 zhangxinyu1 force-pushed the feature-materialized-view-1.0 branch 2 times, most recently from b879328 to 1b452d6 Compare May 10, 2018 03:00
Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhangxinyu1 thanks for the update. I left my last comments. I also tested this PR in my local machine. It works nicely!


# Materialized View

To use this feature, make sure to only load materialized-view-selection on broker and load materialized-view-maintenance on overlord.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add that this feature currently requires a hadoop cluster.

{
try {
DataSourceMetadata metadata = metadataStorageCoordinator.getDataSourceMetadata(dataSource);
if (metadata != null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you check this comment?

if (dataSourceMetadata == null) {
// if oldMetadata is different from spec, tasks and segments will be removed when reset.
DataSourceMetadata oldMetadata = metadataStorageCoordinator.getDataSourceMetadata(dataSource);
if (oldMetadata != null && oldMetadata instanceof DerivativeDataSourceMetadata) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Null check is unnecessary.

* @param interval The interval for which all applicable and used datasources are requested. Start is inclusive, end is exclusive
* @return The DataSegments and the related created_date of segments which include data in the requested interval
*/
List<Pair<DataSegment, String>> getUsedSegmentAndCreatedDateForInterval(String dataSource, Interval interval);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to modify List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval); to return List<Pair<DataSegment, String>> rather than adding a new method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know. I just think when someone calls method getUsedSegmentsForInterval , maybe he doesn't want to get the information about created date.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe created_date should be a part of DataSegment. In this way, we only need the method List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval);. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only usage of getUsedSegmentsForInterval() is SegmentAllocateAction. It checks any segments are already allocated for the given interval to allocate a new segment id. I think it can just ignore the createdDate part.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe created_date should be a part of DataSegment. In this way, we only need the method List getUsedSegmentsForInterval(String dataSource, Interval interval);. What do you think?

Hmm, that's a good point. It sounds good, but I'm not sure about why created_date is not a part of DataSchema itself. @gianm any idea?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, let me raise an issue for this and merge these two methods in another pr, because it affect about 16 Classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Please go for it.

@zhangxinyu1 zhangxinyu1 force-pushed the feature-materialized-view-1.0 branch from 1b452d6 to 1938ce5 Compare May 14, 2018 02:55
Copy link
Contributor

@dylwylie dylwylie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a greatly useful change, thanks!

*/
private boolean hasEnoughLag(Interval target, Interval maxInterval)
{
if ((target.getStartMillis() + minDataLagMs) > maxInterval.getStartMillis()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could just return the boolean expression

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks!

parser.put("parseSpec", parseSpec);

//generate HadoopTuningConfig
HadoopTuningConfig tuningConfigForTask = new HadoopTuningConfig(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do tuningConfigForTask.withVersion instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid not, because though withVersion function can set new version, it cannot set useExplicitVersion = true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool thanks!

}

@Override
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be kinda nice to make the UnionDataSource support QueryDataSources and reuse it to run a list of queries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. Could you please describe it more detail? Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure - not an important suggestion so please ignore if it seems irrelevant or too much work :)

In order to execute a materialised view query we have to issue multiple queries on different intervals and merge their results. That might be a more generally useful component where user's can union multiple queries rather than just multiple datasources.

|| !(query.getDataSource() instanceof TableDataSource)) {
return Collections.singletonList(query);
}
String datasourceName = ((TableDataSource) query.getDataSource()).getName();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's easy to do i think it'd be worth supporting UnionDatasources as well. Would it just be a matter of iterating over a list of datasource names and running the rest of this method and flattening the resulting list of queries?

Copy link
Contributor Author

@zhangxinyu1 zhangxinyu1 May 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion. The current implementation support UnionDataSource in this way: In UnionQueryRunner, UnionDataSource are transformed to some TableDataSources, and then, these TableDataSources are optimized in DataSourceOptimizer.java. Is is ok?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah got you, thanks for the explanation!

@zhangxinyu1 zhangxinyu1 force-pushed the feature-materialized-view-1.0 branch from 1938ce5 to 3623f9b Compare May 28, 2018 03:11
@zhangxinyu1
Copy link
Contributor Author

@Dylan1312 Could you please trigger the travis CI building?

@dylwylie
Copy link
Contributor

Afraid I don't have the appropriate permission, a committer should be able to help you out

@b-slim
Copy link
Contributor

b-slim commented May 30, 2018

you can always close and reopen the PR to restart the build ...

@zhangxinyu1 zhangxinyu1 reopened this May 30, 2018
@zhangxinyu1
Copy link
Contributor Author

@Dylan1312 Thanks!

@zhangxinyu1
Copy link
Contributor Author

@b-slim It works, thanks!

import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;

public class DatasourceOptimizer

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please forgive me for posting this here - I'm not a committer/reviewer, so my feedback does not count, but there is one thing that looks incorrect to me:
Class DatasourceOptimizer states that
"Derived dataSource with smallest average size of segments have highest priority to replace the datasource in user query"
and accordingly the following lines produce this prioritized collection of derivatives:

// get all derivatives for datasource in query. The derivatives set is sorted by average size of per segment granularity.
ImmutableSortedSet<Derivative> derivatives = DerivativesManager.getDerivatives(datasourceName);

However, a few lines below items from the above collection named "derivatives" which is sorted by priority get selected and put into the following collection, which is simply a hashset, which is not sorted and which according to javadoc does also not guarantee that the items are in insertion order:
Set<Derivative> derivativesWithRequiredFields = Sets.newHashSet();

To my understanding, the "derivativesWithRequiredFields" should be a list or a LinkedHashSet such that it is guaranteed that later on the best derivative gets consulted first.

thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sascha-coenen thanks for your attention and suggestion.
Please see the latest version of DataSourceOptimizer here: https://github.com/druid-io/druid/pull/5556/files#diff-250d80eb8afc10c49ee91e41d8f9d91c .
The derivativesWithRequiredFields will be sorted when it is used as follows :
for (DerivativeDataSource derivativeDataSource : ImmutableSortedSet.copyOf(derivativesWithRequiredFields))

@jihoonson
Copy link
Contributor

I'm going to remove Design Review tag and merge this PR unless any other committers start reviewing until tonight because

@jihoonson
Copy link
Contributor

All right. I'm going to merge this PR shortly.

@jihoonson jihoonson merged commit e43e5eb into apache:master Jun 9, 2018
@jihoonson
Copy link
Contributor

Merged. @zhangxinyu1 thank you for the contribution!

@zhangxinyu1
Copy link
Contributor Author

@jihoonson Thanks! I will work on the related issue #5710 and #5775 these days.

@dclim dclim added this to the 0.13.0 milestone Oct 8, 2018
List<Pair<DataSegment, String>> snapshot
)
{
Interval maxAllowedToBuildInterval = snapshot.parallelStream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhangxinyu1 why did you use parallel Stream?

.list()
);

List<DerivativeDataSource> derivativeDataSources = derivativesInDatabase.parallelStream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants