Skip to content

Commit

Permalink
add DataSchema.Builder to tidy stuff up a bit (apache#17065)
Browse files Browse the repository at this point in the history
* add DataSchema.Builder to tidy stuff up a bit

* fixes

* fixes

* more style fixes

* review stuff
  • Loading branch information
clintropolis authored and pranavbhole committed Sep 17, 2024
1 parent e392d5c commit 7db3580
Show file tree
Hide file tree
Showing 64 changed files with 1,528 additions and 1,583 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.indexing.DataSchema;
Expand Down Expand Up @@ -66,18 +65,19 @@ public static Task getTask()
null,
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
"foo",
new TimestampSpec(null, null, null),
DimensionsSpec.EMPTY,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
new UniformGranularitySpec(
Granularities.DAY,
null,
ImmutableList.of(Intervals.of("2010-01-01/P2D"))
),
null
),
DataSchema.builder()
.withDataSource("foo")
.withTimestamp(new TimestampSpec(null, null, null))
.withDimensions(DimensionsSpec.EMPTY)
.withAggregators(new DoubleSumAggregatorFactory("met", "met"))
.withGranularity(
new UniformGranularitySpec(
Granularities.DAY,
null,
ImmutableList.of(Intervals.of("2010-01-01/P2D"))
)
)
.build(),
new IndexTask.IndexIOConfig(
new LocalInputSource(new File("lol"), "rofl"),
new NoopInputFormat(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
Expand Down Expand Up @@ -211,14 +210,13 @@ public HadoopIndexTask createTask(Interval interval, String version, List<DataSe
);

// generate DataSchema
DataSchema dataSchema = new DataSchema(
dataSourceName,
parser,
aggregators,
granularitySpec,
TransformSpec.NONE,
objectMapper
);
DataSchema dataSchema = DataSchema.builder()
.withDataSource(dataSourceName)
.withParserMap(parser)
.withAggregators(aggregators)
.withGranularity(granularitySpec)
.withObjectMapper(objectMapper)
.build();

// generate DatasourceIngestionSpec
DatasourceIngestionSpec datasourceIngestionSpec = new DatasourceIngestionSpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
Expand Down Expand Up @@ -237,14 +236,10 @@ public void testCheckSegmentsAndSubmitTasks()
Map<Interval, HadoopIndexTask> runningTasks = runningTasksPair.lhs;
Map<Interval, String> runningVersion = runningTasksPair.rhs;

DataSchema dataSchema = new DataSchema(
"test_datasource",
null,
null,
null,
TransformSpec.NONE,
objectMapper
);
DataSchema dataSchema = DataSchema.builder()
.withDataSource("test_datasource")
.withObjectMapper(objectMapper)
.build();
HadoopIOConfig hadoopIOConfig = new HadoopIOConfig(new HashMap<>(), null, null);
HadoopIngestionSpec spec = new HadoopIngestionSpec(dataSchema, hadoopIOConfig, null);
HadoopIndexTask task1 = new HadoopIndexTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
Expand All @@ -44,7 +43,6 @@
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
Expand Down Expand Up @@ -102,16 +100,19 @@ private static DataSchema getDataSchema(String dataSource)
dimensions.add(StringDimensionSchema.create("dim1"));
dimensions.add(StringDimensionSchema.create("dim2"));

return new DataSchema(
dataSource,
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(dimensions),
new AggregatorFactory[] {new CountAggregatorFactory("rows")},
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.NONE,
ImmutableList.of()),
null);
return DataSchema.builder()
.withDataSource(dataSource)
.withTimestamp(new TimestampSpec("timestamp", "iso", null))
.withDimensions(dimensions)
.withAggregators(new CountAggregatorFactory("rows"))
.withGranularity(
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.NONE,
ImmutableList.of()
)
)
.build();
}

@BeforeClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.filter.SelectorDimFilter;
Expand Down Expand Up @@ -1262,28 +1261,27 @@ public void testKafkaRecordEntityInputFormat() throws Exception

final KafkaIndexTask task = createTask(
null,
new DataSchema(
"test_ds",
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat"),
new StringDimensionSchema("kafka.topic"),
new LongDimensionSchema("kafka.offset"),
new StringDimensionSchema("kafka.header.encoding")
)
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null
),
DataSchema.builder()
.withDataSource("test_ds")
.withTimestamp(new TimestampSpec("timestamp", "iso", null))
.withDimensions(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat"),
new StringDimensionSchema("kafka.topic"),
new LongDimensionSchema("kafka.offset"),
new StringDimensionSchema("kafka.header.encoding")
)
.withAggregators(
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
)
.withGranularity(
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)
)
.build(),
new KafkaIndexTaskIOConfig(
0,
"sequence0",
Expand Down Expand Up @@ -1337,26 +1335,25 @@ public void testKafkaInputFormat() throws Exception

final KafkaIndexTask task = createTask(
null,
new DataSchema(
"test_ds",
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat"),
new StringDimensionSchema("kafka.testheader.encoding")
)
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null
),
DataSchema.builder()
.withDataSource("test_ds")
.withTimestamp(new TimestampSpec("timestamp", "iso", null))
.withDimensions(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat"),
new StringDimensionSchema("kafka.testheader.encoding")
)
.withAggregators(
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
)
.withGranularity(
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)
)
.build(),
new KafkaIndexTaskIOConfig(
0,
"sequence0",
Expand Down Expand Up @@ -2888,16 +2885,7 @@ private KafkaIndexTask createTask(

private static DataSchema cloneDataSchema(final DataSchema dataSchema)
{
return new DataSchema(
dataSchema.getDataSource(),
dataSchema.getTimestampSpec(),
dataSchema.getDimensionsSpec(),
dataSchema.getAggregators(),
dataSchema.getGranularitySpec(),
dataSchema.getTransformSpec(),
dataSchema.getParserMap(),
OBJECT_MAPPER
);
return DataSchema.builder(dataSchema).withObjectMapper(OBJECT_MAPPER).build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.segment.TestHelper;
Expand Down Expand Up @@ -81,45 +80,30 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest

private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
private static final String TOPIC = "sampling";
private static final DataSchema DATA_SCHEMA = new DataSchema(
"test_ds",
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat")
)
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null
);

private static final DataSchema DATA_SCHEMA_KAFKA_TIMESTAMP = new DataSchema(
"test_ds",
new TimestampSpec("kafka.timestamp", "iso", null),
new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat")
)
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null
);
private static final DataSchema DATA_SCHEMA =
DataSchema.builder()
.withDataSource("test_ds")
.withTimestamp(new TimestampSpec("timestamp", "iso", null))
.withDimensions(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat")
)
.withAggregators(
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
)
.withGranularity(
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)
)
.build();

private static final DataSchema DATA_SCHEMA_KAFKA_TIMESTAMP =
DataSchema.builder(DATA_SCHEMA)
.withTimestamp(new TimestampSpec("kafka.timestamp", "iso", null))
.build();

private static TestingCluster zkServer;
private static TestBroker kafkaServer;
Expand Down Expand Up @@ -364,17 +348,18 @@ public void testWithInputRowParser() throws IOException
);
InputRowParser parser = new StringInputRowParser(new JSONParseSpec(timestampSpec, dimensionsSpec, JSONPathSpec.DEFAULT, null, null), "UTF8");

DataSchema dataSchema = new DataSchema(
"test_ds",
objectMapper.readValue(objectMapper.writeValueAsBytes(parser), Map.class),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null,
objectMapper
);
DataSchema dataSchema = DataSchema.builder()
.withDataSource("test_ds")
.withParserMap(
objectMapper.readValue(objectMapper.writeValueAsBytes(parser), Map.class)
)
.withAggregators(
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
)
.withGranularity(new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null))
.withObjectMapper(objectMapper)
.build();

KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
null,
Expand Down
Loading

0 comments on commit 7db3580

Please sign in to comment.