Skip to content

Commit

Permalink
[euphoria-core] #21 add builder javadocs to operators
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik authored and David Moravek committed Oct 5, 2018
1 parent 6a18f4a commit 776d856
Show file tree
Hide file tree
Showing 16 changed files with 304 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@

import java.util.Objects;

/** A convenient alias for:
/** A convenient alias for assignment of event time.
*
* Can be rewritten as:
* <pre>{@code
* Dataset<T> input = ...;
* Dataset<T> withStamps = FlatMap.of(input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@

/**
* Operator counting elements with same key.
*
* <h3>Builders:</h3>
* <ol>
* <li>{@code [named] ..................} give name to the operator [optional]
* <li>{@code of .......................} input dataset
* <li>{@code keyBy ....................} key extractor function
* <li>{@code [windowBy] ...............} windowing function (see {@link Windowing}), default attached windowing
* <li>{@code output ...................} build output dataset
* </ol>
*
*/
@Audience(Audience.Type.CLIENT)
@Derived(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@

/**
* Operator outputting distinct (based on {@link Object#equals}) elements.
*
* <h3>Builders:</h3>
* <ol>
* <li>{@code [named] ..................} give name to the operator [optional]
* <li>{@code of .......................} input dataset
* <li>{@code [mapped] .................} compare objects retrieved by this {@link UnaryFunction} instead of raw input elements
* <li>{@code [windowBy] ...............} windowing function (see {@link Windowing}), default attached windowing
* <li>{@code output ...................} build output dataset
* </ol>
*
*/
@Audience(Audience.Type.CLIENT)
@Recommended(
Expand All @@ -54,29 +64,20 @@ public static class OfBuilder implements Builders.Of {
}

@Override
public <IN> WindowingBuilder<IN, IN> of(Dataset<IN> input) {
return new WindowingBuilder<>(name, input, e -> e);
public <IN> MappedBuilder<IN, IN> of(Dataset<IN> input) {
return new MappedBuilder<>(name, input);
}
}

public static class WindowingBuilder<IN, ELEM>
implements Builders.WindowBy<IN, WindowingBuilder<IN, ELEM>>,
Builders.Output<ELEM>,
OptionalMethodBuilder<WindowingBuilder<IN, ELEM>> {

final String name;
final Dataset<IN> input;
@Nullable
final UnaryFunction<IN, ELEM> mapper;
public static class MappedBuilder<IN, ELEM>
extends WindowingBuilder<IN, ELEM> {

WindowingBuilder(
@SuppressWarnings("unchecked")
private MappedBuilder(
String name,
Dataset<IN> input,
@Nullable UnaryFunction<IN, ELEM> mapper) {
Dataset<IN> input) {

this.name = Objects.requireNonNull(name);
this.input = Objects.requireNonNull(input);
this.mapper = mapper;
super(name, input, (UnaryFunction) e -> e);
}

/**
Expand All @@ -98,6 +99,27 @@ public <ELEM> WindowingBuilder<IN, ELEM> mapped(UnaryFunction<IN, ELEM> mapper)
return new WindowingBuilder<>(name, input, mapper);
}

}

public static class WindowingBuilder<IN, ELEM>
implements Builders.WindowBy<IN, WindowingBuilder<IN, ELEM>>,
Builders.Output<ELEM>,
OptionalMethodBuilder<WindowingBuilder<IN, ELEM>> {

final String name;
final Dataset<IN> input;
final UnaryFunction<IN, ELEM> mapper;

private WindowingBuilder(
String name,
Dataset<IN> input,
UnaryFunction<IN, ELEM> mapper) {

this.name = Objects.requireNonNull(name);
this.input = Objects.requireNonNull(input);
this.mapper = Objects.requireNonNull(mapper);
}

@Override
public <W extends Window> OutputBuilder<IN, ELEM, W>
windowBy(Windowing<IN, W> windowing) {
Expand All @@ -119,7 +141,7 @@ public static class OutputBuilder<IN, ELEM, W extends Window>

OutputBuilder(String name,
Dataset<IN> input,
@Nullable UnaryFunction<IN, ELEM> mapper,
UnaryFunction<IN, ELEM> mapper,
@Nullable Windowing<IN, W> windowing) {

super(name, input, mapper);
Expand Down Expand Up @@ -149,8 +171,8 @@ public Dataset<ELEM> output() {
* @see #named(String)
* @see OfBuilder#of(Dataset)
*/
public static <IN> WindowingBuilder<IN, IN> of(Dataset<IN> input) {
return new WindowingBuilder<>("Distinct", input, e -> e);
public static <IN> MappedBuilder<IN, IN> of(Dataset<IN> input) {
return new MappedBuilder<>("Distinct", input);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@

/**
* Operator performing a filter operation.
*
* Output elements that pass given condition.
*
* <h3>Builders:</h3>
* <ol>
* <li>{@code [named] ..................} give name to the operator [optional]
* <li>{@code of .......................} input dataset
* <li>{@code by .......................} apply {@link UnaryPredicate} to input elements
* <li>{@code output ...................} build output dataset
* </ol>
*
*/
@Audience(Audience.Type.CLIENT)
@Derived(
Expand All @@ -48,10 +59,9 @@ public <IN> ByBuilder<IN> of(Dataset<IN> input) {
}
}

public static class ByBuilder<IN> implements Builders.Output<IN> {
public static class ByBuilder<IN> {
private final String name;
private final Dataset<IN> input;
private UnaryPredicate<IN> predicate;

ByBuilder(String name, Dataset<IN> input) {
this.name = Objects.requireNonNull(name);
Expand All @@ -66,8 +76,21 @@ public static class ByBuilder<IN> implements Builders.Output<IN> {
* @return the next builder to complete the setup of the operator
*/
public Builders.Output<IN> by(UnaryPredicate<IN> predicate) {
this.predicate = Objects.requireNonNull(predicate);
return this;
return new OutputBuilder<>(name, input, predicate);
}

}

public static class OutputBuilder<IN> implements Builders.Output<IN> {

private final String name;
private final Dataset<IN> input;
private final UnaryPredicate<IN> predicate;

private OutputBuilder(String name, Dataset<IN> input, UnaryPredicate<IN> predicate) {
this.name = name;
this.input = input;
this.predicate = predicate;
}

@Override
Expand All @@ -78,6 +101,7 @@ public Dataset<IN> output() {

return filter.output();
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@
* been used only once here, a {@link FlatMap} operator is free
* to invoke it multiple times or not at all to generate that many elements
* to the output dataset.
*
* <h3>Builders:</h3>
* <ol>
* <li>{@code [named] ..................} give name to the operator [optional]
* <li>{@code of .......................} input dataset
* <li>{@code using ....................} apply {@link UnaryFunctor} to input elements
* <li>{@code [eventTimeBy] ............} change event time characteristic of output elements using {@link ExtractEventTime}
* <li>{@code output ...................} build output dataset
* </ol>
*
*/
@Audience(Audience.Type.CLIENT)
@Basic(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,26 @@
import java.util.Objects;
import java.util.Optional;

/**
* Full outer join of two input datasets producing single new dataset.
*
* When joining two streams, the join has to specify {@link Windowing}
* which groups elements from streams into {@link Window}s. The join operation
* is performed within same windows produced on left and right side of
* input {@link Dataset}s.
*
* <h3>Builders:</h3>
* <ol>
* <li>{@code [named] ..................} give name to the operator [optional]
* <li>{@code of .......................} left and right input dataset
* <li>{@code by .......................} {@link UnaryFunction}s transforming left and right elements into keys
* <li>{@code using ....................} {@link BinaryFunctor} receiving left and right element from joined window
* <li>{@code [windowBy] ...............} windowing function (see {@link Windowing}), default attached windowing
* <li>{@code [withHints] ..............} specify hints about runtime data characteristics, see {@link JoinHint}
* <li>{@code (output | outputValues) ..} build output dataset
* </ol>
*
*/
@Audience(Audience.Type.CLIENT)
public class FullJoin {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,24 @@
import java.util.Set;

/**
* Join two datasets by given key producing single new dataset.
* Inner join of two datasets by given key producing single new dataset.
*
* When joining two streams, the join has to specify {@link Windowing}
* which groups elements from streams into {@link Window}s. The join operation
* is performed within same windows produced on left and right side of
* input {@link Dataset}s.
*
* <h3>Builders:</h3>
* <ol>
* <li>{@code [named] ..................} give name to the operator [optional]
* <li>{@code of .......................} left and right input dataset
* <li>{@code by .......................} {@link UnaryFunction}s transforming left and right elements into keys
* <li>{@code using ....................} {@link BinaryFunctor} receiving left and right element from joined window
* <li>{@code [windowBy] ...............} windowing function (see {@link Windowing}), default attached windowing
* <li>{@code [withHints] ..............} specify hints about runtime data characteristics, see {@link JoinHint}
* <li>{@code (output | outputValues) ..} build output dataset
* </ol>
*
*/
@Audience(Audience.Type.CLIENT)
@Recommended(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,26 @@
import java.util.Objects;
import java.util.Optional;

/**
* Left outer join of two input datasets producing single new dataset.
*
* When joining two streams, the join has to specify {@link Windowing}
* which groups elements from streams into {@link Window}s. The join operation
* is performed within same windows produced on left and right side of
* input {@link Dataset}s.
*
* <h3>Builders:</h3>
* <ol>
* <li>{@code [named] ..................} give name to the operator [optional]
* <li>{@code of .......................} left and right input dataset
* <li>{@code by .......................} {@link UnaryFunction}s transforming left and right elements into keys
* <li>{@code using ....................} {@link BinaryFunctor} receiving left and right element from joined window
* <li>{@code [windowBy] ...............} windowing function (see {@link Windowing}), default attached windowing
* <li>{@code [withHints] ..............} specify hints about runtime data characteristics, see {@link JoinHint}
* <li>{@code (output | outputValues) ..} build output dataset
* </ol>
*
*/
@Audience(Audience.Type.CLIENT)
public class LeftJoin {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,18 @@
import java.util.Objects;

/**
* Simple one-to-one transformation of input elements. It is a special usage of {@link FlatMap}
* with exactly one output element for every one input element. No context is provided inside
* the map function.
* Simple one-to-one transformation of input elements. It is a special case of
* {@link FlatMap} with exactly one output element for every one input element.
* No context is provided inside the map function.
*
* <h3>Builders:</h3>
* <ol>
* <li>{@code [named] ..................} give name to the operator [optional]
* <li>{@code of .......................} input dataset
* <li>{@code using ....................} apply {@link UnaryFunction} or {@link UnaryFunctionEnv} to input elements
* <li>{@code output ...................} build output dataset
* </ol>
*
*/
@Audience(Audience.Type.CLIENT)
@Derived(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@
* Custom {@link Windowing} can be set, otherwise values from
* input operator are used.<p>
*
* <h3>Builders:</h3>
* <ol>
* <li>{@code [named] ..................} give name to the operator [optional]
* <li>{@code of .......................} input dataset
* <li>{@code keyBy ....................} key extractor function
* <li>{@code [valueBy] ................} value extractor function (default: identity)
* <li>{@code (combineBy | reduceBy)....} {@link CombinableReduceFunction} or {@link ReduceFunction} for combinable or non-combinable function
* <li>{@code [withSortedValues] .......} use comparator for sorting values prior to being passed to {@link ReduceFunction} function (applicable only for non-combinable version)
* <li>{@code [windowBy] ...............} windowing function (see {@link Windowing}), default attached windowing
* <li>{@code (output | outputValues) ..} build output dataset
* </ol>
*
* @param <IN> Type of input records
* @param <KEY> Output type of #keyBy method
* @param <VALUE> Output type of #valueBy method
Expand Down Expand Up @@ -109,7 +121,7 @@ public <KEY> DatasetBuilder2<IN, KEY> keyBy(UnaryFunction<IN, KEY> keyExtractor)
}
}

interface ReduceBy<IN, KEY, VALUE> {
public interface ReduceBy<IN, KEY, VALUE> {

/**
* Define a function that reduces all values related to one key into one result object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
*
* Example:
*
* <pre>{@code
* <pre>
* {@code
* Dataset<String> words = ...;
* Dataset<Pair<String, Integer>> counts =
* ReduceStateByKey.named("WORD-COUNT")
Expand All @@ -57,7 +58,8 @@
* .mergeStatesBy(WordCountState::merge)
* .windowBy(Time.of(Duration.ofHours(1))
* .output();
* }</pre>
* }
* </pre>
*
* This example constitutes a windowed word-count program. Each input element
* is treated as a key to identify an imaginary {@code WordCountState} within
Expand All @@ -73,6 +75,18 @@
* mode, i.e. it will attach itself to the windowing strategy active on they
* way from its input.
*
* <h3>Builders:</h3>
* <ol>
* <li>{@code [named] ..................} give name to the operator [optional]
* <li>{@code of .......................} input dataset
* <li>{@code keyBy ....................} key extractor function
* <li>{@code valueBy ..................} value extractor function
* <li>{@code stateFactory .............} factory method for {@link State} (see {@link StateFactory})
* <li>{@code mergeStatesBy ............} state merge function (see {@link StateMerger})
* <li>{@code [windowBy] ...............} windowing function (see {@link Windowing}), default attached windowing
* <li>{@code (output | outputValues) ..} build output dataset
* </ol>
*
* @param <IN> the type of input elements
* @param <KEY> the type of the key (result type of {@code #keyBy}
* @param <VALUE> the type of the accumulated values (result type of {@code #valueBy})
Expand Down Expand Up @@ -229,7 +243,8 @@ public static class DatasetBuilder4<
public static class DatasetBuilder5<
IN, KEY, VALUE, OUT, STATE extends State<VALUE, OUT>>
implements Builders.WindowBy<IN, DatasetBuilder5<IN, KEY, VALUE, OUT, STATE>>,
Builders.Output<Pair<KEY, OUT>> {
Builders.Output<Pair<KEY, OUT>>,
Builders.OutputValues<KEY, OUT> {

final String name;
final Dataset<IN> input;
Expand Down
Loading

0 comments on commit 776d856

Please sign in to comment.