Skip to content

Commit

Permalink
Fix query10 log messages
Browse files Browse the repository at this point in the history
issue apache#5 and issue apache#51
  • Loading branch information
echauchot committed Aug 23, 2017
1 parent 50fddde commit c7ada3f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 23 deletions.
6 changes: 6 additions & 0 deletions integration/java/nexmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
*/
package org.apache.beam.integration.nexmark.queries;

import static com.google.common.base.Preconditions.checkState;

import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
Expand All @@ -33,9 +30,9 @@
import org.apache.beam.integration.nexmark.model.Event;
import org.apache.beam.integration.nexmark.model.KnownSize;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
Expand All @@ -57,7 +54,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Query "10", 'Log to sharded files' (Not in original suite.)
*
Expand Down Expand Up @@ -132,12 +128,9 @@ public void setMaxNumWorkers(int maxNumWorkers) {
*/
private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename)
throws IOException {
//TODO Decide what to do about this one
// WritableByteChannel channel =
// GcsIOChannelFactory.fromOptions(options).create(filename, "text/plain");
// checkState(channel instanceof GoogleCloudStorageWriteChannel);
// ((GoogleCloudStorageWriteChannel) channel).setUploadBufferSize(CHANNEL_BUFFER);
// return channel;
//TODO
// Fix after PR: right now this is a specific Google added use case
// Discuss it on ML: shall we keep GCS or use HDFS or use a generic beam filesystem way.
throw new UnsupportedOperationException("Disabled after removal of GcsIOChannelFactory");
}

Expand Down Expand Up @@ -192,7 +185,7 @@ private PCollection<Done> applyTyped(PCollection<Event> events) {
public void processElement(ProcessContext c) {
if (c.element().hasAnnotation("LATE")) {
lateCounter.inc();
LOG.error("Observed late: %s", c.element());
LOG.info("Observed late: %s", c.element());
} else {
onTimeCounter.inc();
}
Expand Down Expand Up @@ -240,11 +233,11 @@ public void processElement(ProcessContext c, BoundedWindow window) {
}
}
String shard = c.element().getKey();
LOG.error(
LOG.info(String.format(
"%s with timestamp %s has %d actually late and %d on-time "
+ "elements in pane %s for window %s",
shard, c.timestamp(), numLate, numOnTime, c.pane(),
window.maxTimestamp());
window.maxTimestamp()));
if (c.pane().getTiming() == PaneInfo.Timing.LATE) {
if (numLate == 0) {
LOG.error(
Expand Down Expand Up @@ -283,11 +276,11 @@ public void processElement(ProcessContext c, BoundedWindow window)
String shard = c.element().getKey();
GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
OutputFile outputFile = outputFileFor(window, shard, c.pane());
LOG.error(
LOG.info(String.format(
"Writing %s with record timestamp %s, window timestamp %s, pane %s",
shard, c.timestamp(), window.maxTimestamp(), c.pane());
shard, c.timestamp(), window.maxTimestamp(), c.pane()));
if (outputFile.filename != null) {
LOG.error("Beginning write to '%s'", outputFile.filename);
LOG.info("Beginning write to '%s'", outputFile.filename);
int n = 0;
try (OutputStream output =
Channels.newOutputStream(openWritableGcsFile(options, outputFile
Expand All @@ -296,12 +289,12 @@ public void processElement(ProcessContext c, BoundedWindow window)
Event.CODER.encode(event, output, Coder.Context.OUTER);
writtenRecordsCounter.inc();
if (++n % 10000 == 0) {
LOG.error("So far written %d records to '%s'", n,
LOG.info("So far written %d records to '%s'", n,
outputFile.filename);
}
}
}
LOG.error("Written all %d records to '%s'", n, outputFile.filename);
LOG.info("Written all %d records to '%s'", n, outputFile.filename);
}
savedFileCounter.inc();
c.output(KV.<Void, OutputFile>of(null, outputFile));
Expand Down Expand Up @@ -341,23 +334,23 @@ public void processElement(ProcessContext c, BoundedWindow window)
LOG.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane());
} else {
GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
LOG.error(
LOG.info(
"Index with record timestamp %s, window timestamp %s, pane %s",
c.timestamp(), window.maxTimestamp(), c.pane());

@Nullable String filename = indexPathFor(window);
if (filename != null) {
LOG.error("Beginning write to '%s'", filename);
LOG.info("Beginning write to '%s'", filename);
int n = 0;
try (OutputStream output =
Channels.newOutputStream(
openWritableGcsFile(options, filename))) {
for (OutputFile outputFile : c.element().getValue()) {
output.write(outputFile.toString().getBytes());
output.write(outputFile.toString().getBytes("UTF-8"));
n++;
}
}
LOG.error("Written all %d lines to '%s'", n, filename);
LOG.info("Written all %d lines to '%s'", n, filename);
}
c.output(
new Done("written for timestamp " + window.maxTimestamp()));
Expand Down

0 comments on commit c7ada3f

Please sign in to comment.