Skip to content

Commit

Permalink
Fix compile after ParDo refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
iemejia authored and echauchot committed Jun 16, 2017
1 parent 39f51ba commit 07817cf
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void processElement(ProcessContext c) {
};

/** Transform to key each person by their id. */
protected static final ParDo.Bound<Person, KV<Long, Person>> PERSON_BY_ID =
protected static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID =
ParDo.of(new DoFn<Person, KV<Long, Person>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Expand All @@ -106,7 +106,7 @@ public void processElement(ProcessContext c) {
});

/** Transform to key each auction by its id. */
protected static final ParDo.Bound<Auction, KV<Long, Auction>> AUCTION_BY_ID =
protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID =
ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Expand All @@ -115,7 +115,7 @@ public void processElement(ProcessContext c) {
});

/** Transform to key each auction by its seller id. */
protected static final ParDo.Bound<Auction, KV<Long, Auction>> AUCTION_BY_SELLER =
protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_SELLER =
ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Expand All @@ -124,7 +124,7 @@ public void processElement(ProcessContext c) {
});

/** Transform to key each bid by it's auction id. */
protected static final ParDo.Bound<Bid, KV<Long, Bid>> BID_BY_AUCTION =
protected static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION =
ParDo.of(new DoFn<Bid, KV<Long, Bid>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Expand All @@ -133,7 +133,7 @@ public void processElement(ProcessContext c) {
});

/** Transform to project the auction id from each bid. */
protected static final ParDo.Bound<Bid, Long> BID_TO_AUCTION =
protected static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION =
ParDo.of(new DoFn<Bid, Long>() {
@ProcessElement
public void processElement(ProcessContext c) {
Expand All @@ -142,7 +142,7 @@ public void processElement(ProcessContext c) {
});

/** Transform to project the price from each bid. */
protected static final ParDo.Bound<Bid, Long> BID_TO_PRICE =
protected static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE =
ParDo.of(new DoFn<Bid, Long>() {
@ProcessElement
public void processElement(ProcessContext c) {
Expand All @@ -151,7 +151,7 @@ public void processElement(ProcessContext c) {
});

/** Transform to emit each event with the timestamp embedded within it. */
public static final ParDo.Bound<Event, Event> EVENT_TIMESTAMP_FROM_DATA =
public static final ParDo.SingleOutput<Event, Event> EVENT_TIMESTAMP_FROM_DATA =
ParDo.of(new DoFn<Event, Event>() {
@ProcessElement
public void processElement(ProcessContext c) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1073,8 +1073,7 @@ private void sink(PCollection<TimestampedValue<KnownSize>> results, long now) {
case BIGQUERY:
// Multiple BigQuery backends to mimic what most customers do.
PCollectionTuple res = formattedResults.apply(queryName + ".Partition",
ParDo.withOutputTags(MAIN, TupleTagList.of(SIDE))
.of(new PartitionDoFn()));
ParDo.of(new PartitionDoFn()).withOutputTags(MAIN, TupleTagList.of(SIDE)));
sinkResultsToBigQuery(res.get(MAIN), now, "main");
sinkResultsToBigQuery(res.get(SIDE), now, "side");
sinkResultsToBigQuery(formattedResults, now, "copy");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ public static PTransform<PBegin, PCollection<Event>> streamEventsSource(
/**
* Return a transform to pass-through events, but count them as they go by.
*/
public static ParDo.Bound<Event, Event> snoop(final String name) {
public static ParDo.SingleOutput<Event, Event> snoop(final String name) {
return ParDo.of(new DoFn<Event, Event>() {
final Aggregator<Long, Long> eventCounter =
createAggregator("events", Sum.ofLongs());
Expand Down Expand Up @@ -451,7 +451,7 @@ public void processElement(ProcessContext c) {
/**
* Return a transform to count and discard each element.
*/
public static <T> ParDo.Bound<T, Void> devNull(String name) {
public static <T> ParDo.SingleOutput<T, Void> devNull(String name) {
return ParDo.of(new DoFn<T, Void>() {
final Aggregator<Long, Long> discardCounter =
createAggregator("discarded", Sum.ofLongs());
Expand All @@ -466,7 +466,7 @@ public void processElement(ProcessContext c) {
/**
* Return a transform to log each element, passing it through unchanged.
*/
public static <T> ParDo.Bound<T, T> log(final String name) {
public static <T> ParDo.SingleOutput<T, T> log(final String name) {
return ParDo.of(new DoFn<T, T>() {
@ProcessElement
public void processElement(ProcessContext c) {
Expand All @@ -479,7 +479,7 @@ public void processElement(ProcessContext c) {
/**
* Return a transform to format each element as a string.
*/
public static <T> ParDo.Bound<T, String> format(String name) {
public static <T> ParDo.SingleOutput<T, String> format(String name) {
return ParDo.of(new DoFn<T, String>() {
final Aggregator<Long, Long> recordCounter =
createAggregator("records", Sum.ofLongs());
Expand All @@ -495,7 +495,7 @@ public void processElement(ProcessContext c) {
/**
* Return a transform to make explicit the timestamp of each element.
*/
public static <T> ParDo.Bound<T, TimestampedValue<T>> stamp(String name) {
public static <T> ParDo.SingleOutput<T, TimestampedValue<T>> stamp(String name) {
return ParDo.of(new DoFn<T, TimestampedValue<T>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Expand Down Expand Up @@ -548,7 +548,7 @@ public Long apply(Long left, Long right) {
/**
* Return a transform to keep the CPU busy for given milliseconds on every record.
*/
public static <T> ParDo.Bound<T, T> cpuDelay(String name, final long delayMs) {
public static <T> ParDo.SingleOutput<T, T> cpuDelay(String name, final long delayMs) {
return ParDo.of(new DoFn<T, T>() {
@ProcessElement
public void processElement(ProcessContext c) {
Expand Down Expand Up @@ -580,7 +580,7 @@ public void processElement(ProcessContext c) {
/**
* Return a transform to write given number of bytes to durable store on every record.
*/
public static <T> ParDo.Bound<T, T> diskBusy(String name, final long bytes) {
public static <T> ParDo.SingleOutput<T, T> diskBusy(String name, final long bytes) {
return ParDo.of(new DoFn<T, T>() {
@ProcessElement
public void processElement(ProcessContext c) {
Expand Down Expand Up @@ -608,7 +608,7 @@ public void processElement(ProcessContext c) {
/**
* Return a transform to cast each element to {@link KnownSize}.
*/
private static <T extends KnownSize> ParDo.Bound<T, KnownSize> castToKnownSize() {
private static <T extends KnownSize> ParDo.SingleOutput<T, KnownSize> castToKnownSize() {
return ParDo.of(new DoFn<T, KnownSize>() {
@ProcessElement
public void processElement(ProcessContext c) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,18 @@ private PCollection<Bid> applyTyped(PCollection<Event> events) {

return slidingBids
// Select all bids which have that maximum price (there may be more than one).
.apply(name + ".Select",
ParDo.withSideInputs(maxPriceView)
.of(new DoFn<Bid, Bid>() {
@ProcessElement
public void processElement(ProcessContext c) {
long maxPrice = c.sideInput(maxPriceView);
Bid bid = c.element();
if (bid.price == maxPrice) {
c.output(bid);
}
.apply(name + ".Select", ParDo
.of(new DoFn<Bid, Bid>() {
@ProcessElement
public void processElement(ProcessContext c) {
long maxPrice = c.sideInput(maxPriceView);
Bid bid = c.element();
if (bid.price == maxPrice) {
c.output(bid);
}
}));
}
})
.withSideInputs(maxPriceView));
}

@Override
Expand Down

0 comments on commit 07817cf

Please sign in to comment.