Skip to content

Commit

Permalink
Merge pull request apache#5 from apache/master
Browse files Browse the repository at this point in the history
pull latest code
  • Loading branch information
mingmxu authored Mar 8, 2017
2 parents 8c0bb66 + 0fa1d90 commit f1e6f2e
Show file tree
Hide file tree
Showing 130 changed files with 3,039 additions and 1,347 deletions.
4 changes: 4 additions & 0 deletions .jenkins/common_job_properties.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ class common_job_properties {
context.mavenInstallation('Maven 3.3.3')
context.mavenOpts('-Dorg.slf4j.simpleLogger.showDateTime=true')
context.mavenOpts('-Dorg.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd\\\'T\\\'HH:mm:ss.SSS')
// The -XX:+TieredCompilation -XX:TieredStopAtLevel=1 JVM options enable
// tiered compilation to make the JVM startup times faster during the tests.
context.mavenOpts('-XX:+TieredCompilation')
context.mavenOpts('-XX:TieredStopAtLevel=1')
context.rootPOM('pom.xml')
// Use a repository local to the workspace for better isolation of jobs.
context.localRepository(LocalRepositoryLocation.LOCAL_TO_WORKSPACE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ mavenJob('beam_PostCommit_Java_RunnableOnService_Spark') {
'Run Spark RunnableOnService')

// Maven goals for this job.
goals('-B -e clean verify -am -pl runners/spark -Prunnable-on-service-tests -Plocal-runnable-on-service-tests -Dspark.port.maxRetries=64 -Dspark.ui.enabled=false')
goals('-B -e clean verify -am -pl runners/spark -Prunnable-on-service-tests -Plocal-runnable-on-service-tests -Dspark.ui.enabled=false')
}
7 changes: 4 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ matrix:
- os: linux
env: TEST_PYTHON="1"


before_install:
- echo 'MAVEN_OPTS="$MAVEN_OPTS -Xmx1024m -XX:MaxPermSize=512m -XX:+BytecodeVerificationLocal"' >> ~/.mavenrc
# The -XX:+TieredCompilation -XX:TieredStopAtLevel=1 JVM options enable
# tiered compilation to make the JVM startup times faster during the tests.
- echo 'MAVEN_OPTS="$MAVEN_OPTS -Xmx1024m -XX:MaxPermSize=512m -XX:+TieredCompilation -XX:TieredStopAtLevel=1 -XX:+BytecodeVerificationLocal"' >> ~/.mavenrc
- echo $'MAVEN_OPTS="$MAVEN_OPTS -Dorg.slf4j.simpleLogger.showDateTime=true -Dorg.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd\'T\'HH:mm:ss.SSS"' >> ~/.mavenrc
- cat ~/.mavenrc
- if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi
Expand All @@ -80,7 +81,7 @@ install:

script:
- if [ "$TEST_PYTHON" ]; then travis_retry $TOX_HOME/tox -e $TOX_ENV -c sdks/python/tox.ini; fi
- if [ ! "$TEST_PYTHON" ]; then travis_retry mvn --batch-mode --update-snapshots --no-snapshot-updates $MAVEN_OVERRIDE install && travis_retry bash -ex .travis/test_wordcount.sh; fi
- if [ ! "$TEST_PYTHON" ]; then travis_retry mvn --batch-mode --update-snapshots --no-snapshot-updates --threads 1C $MAVEN_OVERRIDE install && travis_retry bash -ex .travis/test_wordcount.sh; fi

cache:
directories:
Expand Down
2 changes: 1 addition & 1 deletion examples/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-examples-parent</artifactId>
<version>0.6.0-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion examples/java8/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-examples-parent</artifactId>
<version>0.6.0-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-parent</artifactId>
<version>0.6.0-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
7 changes: 3 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
<url>http://beam.apache.org/</url>
<inceptionYear>2016</inceptionYear>

<version>0.6.0-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>

<licenses>
<license>
Expand Down Expand Up @@ -1310,7 +1310,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<version>3.0.0</version>
</plugin>

<plugin>
Expand Down Expand Up @@ -1448,7 +1448,7 @@
<version>[1.7,)</version>
</requireJavaVersion>
<requireMavenVersion>
<!-- Keep aligned with preqrequisite section below. -->
<!-- Keep aligned with prerequisite section below. -->
<version>[3.2,)</version>
</requireMavenVersion>
</rules>
Expand Down Expand Up @@ -1483,7 +1483,6 @@
</plugin>
</plugins>
</reporting>

<prerequisites>
<!-- Keep aligned with requireMavenVersion section above. -->
<maven>3.2</maven>
Expand Down
2 changes: 1 addition & 1 deletion runners/apex/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
<version>0.6.0-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,10 @@
package org.apache.beam.runners.apex;

import java.io.IOException;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.joda.time.Duration;

/**
Expand All @@ -48,12 +44,6 @@ public static TestApexRunner fromOptions(PipelineOptions options) {
return new TestApexRunner(apexOptions);
}

@Override
public <OutputT extends POutput, InputT extends PInput>
OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
return delegate.apply(transform, input);
}

@Override
public ApexRunnerResult run(Pipeline pipeline) {
ApexRunnerResult result = delegate.run(pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
new CreateApexPCollectionViewTranslator());
registerTransformTranslator(CreatePCollectionView.class,
new CreatePCollectionViewTranslator());
registerTransformTranslator(Window.Bound.class, new WindowBoundTranslator());
registerTransformTranslator(Window.Assign.class, new WindowAssignTranslator());
}

public ApexPipelineTranslator(ApexPipelineOptions options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@
* {@link Window.Bound} is translated to {link ApexParDoOperator} that wraps an {@link
* AssignWindowsDoFn}.
*/
class WindowBoundTranslator<T> implements TransformTranslator<Window.Bound<T>> {
class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>> {
private static final long serialVersionUID = 1L;

@Override
public void translate(Window.Bound<T> transform, TranslationContext context) {
public void translate(Window.Assign<T> transform, TranslationContext context) {
PCollection<T> output = (PCollection<T>) context.getOutput();
PCollection<T> input = (PCollection<T>) context.getInput();
@SuppressWarnings("unchecked")
Expand Down
6 changes: 2 additions & 4 deletions runners/core-construction-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>beam-runners-parent</artifactId>
<groupId>org.apache.beam</groupId>
<version>0.6.0-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion runners/core-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
<version>0.6.0-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@

import java.util.List;
import org.apache.beam.runners.core.ExecutionContext.StepContext;
import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer;
import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner;
import org.apache.beam.runners.core.StatefulDoFnRunner.StateInternalsStateCleaner;
import org.apache.beam.runners.core.StatefulDoFnRunner.TimeInternalsCleanupTimer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
Expand Down Expand Up @@ -116,4 +122,37 @@ DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
stepContext.timerInternals(),
droppedDueToLatenessAggregator);
}

/**
* Returns an implementation of {@link DoFnRunner} that handles
* late data dropping and garbage collection for stateful {@link DoFn DoFns}.
*
* <p>It registers a timer by TimeInternals, and clean all states by StateInternals.
*/
public static <InputT, OutputT, W extends BoundedWindow>
DoFnRunner<InputT, OutputT> defaultStatefulDoFnRunner(
DoFn<InputT, OutputT> fn,
DoFnRunner<InputT, OutputT> doFnRunner,
StepContext stepContext,
AggregatorFactory aggregatorFactory,
WindowingStrategy<?, ?> windowingStrategy) {
Aggregator<Long, Long> droppedDueToLateness = aggregatorFactory.createAggregatorForDoFn(
fn.getClass(), stepContext, StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER,
Sum.ofLongs());

CleanupTimer cleanupTimer =
new TimeInternalsCleanupTimer(stepContext.timerInternals(), windowingStrategy);

Coder<W> windowCoder = (Coder<W>) windowingStrategy.getWindowFn().windowCoder();
StateCleaner<W> stateCleaner =
new StateInternalsStateCleaner<>(fn, stepContext.stateInternals(), windowCoder);

return new StatefulDoFnRunner<>(
doFnRunner,
windowingStrategy,
cleanupTimer,
stateCleaner,
droppedDueToLateness);
}

}
Loading

0 comments on commit f1e6f2e

Please sign in to comment.