Skip to content

Commit

Permalink
Fix and improve query3 and query12
Browse files Browse the repository at this point in the history
query3: Use GlobalWindow to comply with the State/Timer APIs (issue apache#7). Use timer for personState expiration in GlobalWindow (issue apache#29). Add trigger to GlobalWindow

query12: Replace Count.perKey by Count.perElement (issue apache#34)
  • Loading branch information
echauchot committed Jun 16, 2017
1 parent 07817cf commit 7714618
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ public class NexmarkConfiguration implements Serializable {
@JsonProperty
public int fanout = 5;

/**
* Maximum waiting time to clean personState in query3
* (ie maximum waiting of the auctions related to person in state in seconds in event time).
*/
@JsonProperty
public int maxAuctionsWaitingTime = 600;

/**
* Length of occasional delay to impose on events (in seconds).
*/
Expand Down Expand Up @@ -322,6 +329,9 @@ public void overrideFromOptions(NexmarkOptions options) {
if (options.getFanout() != null) {
fanout = options.getFanout();
}
if (options.getMaxAuctionsWaitingTime() != null) {
fanout = options.getMaxAuctionsWaitingTime();
}
if (options.getOccasionalDelaySec() != null) {
occasionalDelaySec = options.getOccasionalDelaySec();
}
Expand Down Expand Up @@ -376,6 +386,7 @@ public NexmarkConfiguration clone() {
result.diskBusyBytes = diskBusyBytes;
result.auctionSkip = auctionSkip;
result.fanout = fanout;
result.maxAuctionsWaitingTime = maxAuctionsWaitingTime;
result.occasionalDelaySec = occasionalDelaySec;
result.probDelayedEvent = probDelayedEvent;
result.maxLogEvents = maxLogEvents;
Expand Down Expand Up @@ -479,6 +490,9 @@ public String toShortString() {
if (fanout != DEFAULT.fanout) {
sb.append(String.format("; fanout:%d", fanout));
}
if (maxAuctionsWaitingTime != DEFAULT.maxAuctionsWaitingTime) {
sb.append(String.format("; maxAuctionsWaitingTime:%d", fanout));
}
if (occasionalDelaySec != DEFAULT.occasionalDelaySec) {
sb.append(String.format("; occasionalDelaySec:%d", occasionalDelaySec));
}
Expand Down Expand Up @@ -527,7 +541,7 @@ public int hashCode() {
ratePeriodSec, preloadSeconds, isRateLimited, useWallclockEventTime, avgPersonByteSize,
avgAuctionByteSize, avgBidByteSize, hotAuctionRatio, hotSellersRatio, hotBiddersRatio,
windowSizeSec, windowPeriodSec, watermarkHoldbackSec, numInFlightAuctions, numActivePeople,
coderStrategy, cpuDelayMs, diskBusyBytes, auctionSkip, fanout,
coderStrategy, cpuDelayMs, diskBusyBytes, auctionSkip, fanout, maxAuctionsWaitingTime,
occasionalDelaySec, probDelayedEvent, maxLogEvents, usePubsubPublishTime,
outOfOrderGroupSize);
}
Expand Down Expand Up @@ -571,6 +585,9 @@ public boolean equals(Object obj) {
if (fanout != other.fanout) {
return false;
}
if (maxAuctionsWaitingTime != other.maxAuctionsWaitingTime) {
return false;
}
if (firstEventRate != other.firstEventRate) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,13 @@ public interface NexmarkOptions extends PubsubOptions {

void setFanout(Integer fanout);

@Description("Maximum waiting time to clean personState in query3 "
+ "(ie maximum waiting of the auctions related to person in state in seconds in event time).")
@Nullable
Integer getMaxAuctionsWaitingTime();

void setMaxAuctionsWaitingTime(Integer fanout);

@Description("Length of occasional delay to impose on events (in seconds).")
@Nullable
Long getOccasionalDelaySec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,21 @@ public Query12(NexmarkConfiguration configuration) {
private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
return events
.apply(JUST_BIDS)
.apply(name + ".Rekey",
// TODO etienne: why not avoid this ParDo and do a Cont.perElement?
ParDo.of(new DoFn<Bid, KV<Long, Void>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Bid bid = c.element();
c.output(KV.of(bid.bidder, (Void) null));
}
}))
.apply(Window.<KV<Long, Void>>into(new GlobalWindows())
.apply(ParDo.of(new DoFn<Bid, Long>() {
@ProcessElement
public void processElement(ProcessContext c){
c.output(c.element().bidder);
}
}))
.apply(Window.<Long>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(
Duration.standardSeconds(configuration.windowSizeSec))))
.discardingFiredPanes()
.withAllowedLateness(Duration.ZERO))
.apply(Count.<Long, Void>perKey())
.apply(Count.<Long>perElement())
.apply(name + ".ToResult",
ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() {
@ProcessElement
Expand Down
Loading

0 comments on commit 7714618

Please sign in to comment.