From d95913e759289d4ab2651c6941b9ef94ea17a574 Mon Sep 17 00:00:00 2001 From: Martynas Mickevicius Date: Fri, 19 Aug 2016 15:43:32 +0300 Subject: [PATCH] Update to Akka 2.4.9. --- .../akka/stream/contrib/DirectoryChanges.java | 202 +++++++++--------- .../akka/stream/contrib/FileTailSource.java | 117 +++++----- project/Common.scala | 2 +- 3 files changed, 157 insertions(+), 164 deletions(-) diff --git a/contrib/src/main/java/akka/stream/contrib/DirectoryChanges.java b/contrib/src/main/java/akka/stream/contrib/DirectoryChanges.java index 6212059..a8057f8 100644 --- a/contrib/src/main/java/akka/stream/contrib/DirectoryChanges.java +++ b/contrib/src/main/java/akka/stream/contrib/DirectoryChanges.java @@ -13,10 +13,13 @@ import akka.stream.stage.GraphStage; import akka.stream.stage.GraphStageLogic; import akka.stream.stage.TimerGraphStageLogic; + import com.sun.nio.file.SensitivityWatchEventModifier; + import scala.Tuple2; import scala.concurrent.duration.FiniteDuration; +import java.io.IOException; import java.nio.file.*; import java.util.ArrayDeque; import java.util.Queue; @@ -67,131 +70,126 @@ public Attributes initialAttributes() { } @Override - public GraphStageLogic createLogic(Attributes inheritedAttributes) { + public GraphStageLogic createLogic(Attributes inheritedAttributes) throws IOException { if (!Files.exists(directoryPath)) throw new IllegalArgumentException("The path: '" + directoryPath + "' does not exist"); if (!Files.isDirectory(directoryPath)) throw new IllegalArgumentException("The path '" + directoryPath + "' is not a directory"); - try { - return new TimerGraphStageLogic(shape) { - private final Queue> buffer = new ArrayDeque<>(); - private final WatchService service = directoryPath.getFileSystem().newWatchService(); - private final WatchKey watchKey = directoryPath.register( - service, - new WatchEvent.Kind[] { ENTRY_CREATE, ENTRY_MODIFY, ENTRY_DELETE, OVERFLOW }, - // this is com.sun internal, but the service is useless on OSX without it - SensitivityWatchEventModifier.HIGH - ); - - { - setHandler(out, new AbstractOutHandler(){ - - @Override - public void onPull() throws Exception { + return new TimerGraphStageLogic(shape) { + private final Queue> buffer = new ArrayDeque<>(); + private final WatchService service = directoryPath.getFileSystem().newWatchService(); + private final WatchKey watchKey = directoryPath.register( + service, + new WatchEvent.Kind[] { ENTRY_CREATE, ENTRY_MODIFY, ENTRY_DELETE, OVERFLOW }, + // this is com.sun internal, but the service is useless on OSX without it + SensitivityWatchEventModifier.HIGH + ); + + { + setHandler(out, new AbstractOutHandler(){ + + @Override + public void onPull() throws Exception { + if (!buffer.isEmpty()) { + pushHead(); + } else { + doPoll(); if (!buffer.isEmpty()) { pushHead(); } else { - doPoll(); - if (!buffer.isEmpty()) { - pushHead(); - } else { - schedulePoll(); - } + schedulePoll(); } } - }); - } - - @Override - public void onTimer(Object timerKey) { - if (!isClosed(out)) { - doPoll(); - if (!buffer.isEmpty()) { - pushHead(); - } else { - schedulePoll(); - } } - } - - @Override - public void postStop() { - try { - if (watchKey.isValid()) watchKey.cancel(); - service.close(); - } catch (Exception ex) { - // Remove when #21168 is in a release - throw new RuntimeException(ex); + }); + } + + @Override + public void onTimer(Object timerKey) { + if (!isClosed(out)) { + doPoll(); + if (!buffer.isEmpty()) { + pushHead(); + } else { + schedulePoll(); } } - - private void pushHead() { - final Pair head = buffer.poll(); - if (head != null) { - push(out, head); - } + } + + @Override + public void postStop() { + try { + if (watchKey.isValid()) watchKey.cancel(); + service.close(); + } catch (Exception ex) { + // Remove when #21168 is in a release + throw new RuntimeException(ex); } + } - private void schedulePoll() { - scheduleOnce("poll", pollInterval); + private void pushHead() { + final Pair head = buffer.poll(); + if (head != null) { + push(out, head); } + } - private void doPoll() { - try { - for (WatchEvent event: watchKey.pollEvents()) { - final WatchEvent.Kind kind = event.kind(); + private void schedulePoll() { + scheduleOnce("poll", pollInterval); + } - if (OVERFLOW.equals(kind)) { - // overflow means that some file system change events may have been missed, - // that may be ok for some scenarios but to make sure it does not pass unnoticed we fail the stage - failStage(new RuntimeException("Overflow from watch service: '" + directoryPath + "'")); + private void doPoll() { + try { + for (WatchEvent event: watchKey.pollEvents()) { + final WatchEvent.Kind kind = event.kind(); - } else { - // if it's not an overflow it must be a Path event - @SuppressWarnings("unchecked") - final Path path = (Path) event.context(); - final Path absolutePath = directoryPath.resolve(path); - final Change change = kindToChange(kind); - - buffer.add(new Pair<>(absolutePath, change)); - if (buffer.size() > maxBufferSize) { - failStage(new RuntimeException("Max event buffer size " + - maxBufferSize + " reached for $path")); - } - } + if (OVERFLOW.equals(kind)) { + // overflow means that some file system change events may have been missed, + // that may be ok for some scenarios but to make sure it does not pass unnoticed we fail the stage + failStage(new RuntimeException("Overflow from watch service: '" + directoryPath + "'")); + } else { + // if it's not an overflow it must be a Path event + @SuppressWarnings("unchecked") + final Path path = (Path) event.context(); + final Path absolutePath = directoryPath.resolve(path); + final Change change = kindToChange(kind); + + buffer.add(new Pair<>(absolutePath, change)); + if (buffer.size() > maxBufferSize) { + failStage(new RuntimeException("Max event buffer size " + + maxBufferSize + " reached for $path")); + } } - } finally { - if (!watchKey.reset()) { - // directory no longer accessible - completeStage(); - } - } - } - - - // convert from the parametrized API to our much nicer API enum - private Change kindToChange(WatchEvent.Kind kind) { - final Change change; - if (kind.equals(ENTRY_CREATE)) { - change = Change.Creation; - } else if (kind.equals(ENTRY_DELETE)) { - change = Change.Deletion; - } else if (kind.equals(ENTRY_MODIFY)) { - change = Change.Modification; - } else { - throw new RuntimeException("Unexpected kind of event gotten from watch service for path '" + - directoryPath + "': " + kind); } - return change; + } finally { + if (!watchKey.reset()) { + // directory no longer accessible + completeStage(); + } + } + } + + + + // convert from the parametrized API to our much nicer API enum + private Change kindToChange(WatchEvent.Kind kind) { + final Change change; + if (kind.equals(ENTRY_CREATE)) { + change = Change.Creation; + } else if (kind.equals(ENTRY_DELETE)) { + change = Change.Deletion; + } else if (kind.equals(ENTRY_MODIFY)) { + change = Change.Modification; + } else { + throw new RuntimeException("Unexpected kind of event gotten from watch service for path '" + + directoryPath + "': " + kind); } + return change; + } - }; - } catch (Exception ex) { - // Remove when #21168 is in a release - throw new RuntimeException(ex); - } + }; } @Override @@ -227,5 +225,5 @@ public static akka.stream.scaladsl.Source, NotUsed> apply(P .map((Pair pair) -> Tuple2.apply(pair.first(), pair.second())) .asScala(); } - + } diff --git a/contrib/src/main/java/akka/stream/contrib/FileTailSource.java b/contrib/src/main/java/akka/stream/contrib/FileTailSource.java index 13c9bd1..974e89f 100644 --- a/contrib/src/main/java/akka/stream/contrib/FileTailSource.java +++ b/contrib/src/main/java/akka/stream/contrib/FileTailSource.java @@ -15,6 +15,7 @@ import scala.util.Success; import scala.util.Try; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; @@ -63,75 +64,69 @@ public SourceShape shape() { } @Override - public GraphStageLogic createLogic(Attributes inheritedAttributes) { - try { - if (!Files.exists(path)) throw new IllegalArgumentException("Path '" + path + "' does not exist"); - if (Files.isDirectory(path)) throw new IllegalArgumentException("Path '" + path + "' cannot be tailed, it is a directory"); - if (!Files.isReadable(path)) throw new IllegalArgumentException("No read permission for '" + path + "'"); - - return new TimerGraphStageLogic(shape) { - private final ByteBuffer buffer = ByteBuffer.allocate(maxChunkSize); - private final AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ); - - private long position = startingPosition; - private AsyncCallback> chunkCallback; - - { - setHandler(out, new AbstractOutHandler() { - @Override - public void onPull() throws Exception { - doPull(); - } - }); - } - - @Override - public void preStart() { - chunkCallback = createAsyncCallback((tryInteger) -> { - if (tryInteger.isSuccess()) { - int readBytes = tryInteger.get(); - if (readBytes > 0) { - buffer.flip(); - push(out, ByteString.fromByteBuffer(buffer)); - position += readBytes; - buffer.clear(); - } else { - // hit end, try again in a while - scheduleOnce("poll", pollingInterval); - } - + public GraphStageLogic createLogic(Attributes inheritedAttributes) throws IOException { + if (!Files.exists(path)) throw new IllegalArgumentException("Path '" + path + "' does not exist"); + if (Files.isDirectory(path)) throw new IllegalArgumentException("Path '" + path + "' cannot be tailed, it is a directory"); + if (!Files.isReadable(path)) throw new IllegalArgumentException("No read permission for '" + path + "'"); + + return new TimerGraphStageLogic(shape) { + private final ByteBuffer buffer = ByteBuffer.allocate(maxChunkSize); + private final AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ); + + private long position = startingPosition; + private AsyncCallback> chunkCallback; + + { + setHandler(out, new AbstractOutHandler() { + @Override + public void onPull() throws Exception { + doPull(); + } + }); + } + + @Override + public void preStart() { + chunkCallback = createAsyncCallback((tryInteger) -> { + if (tryInteger.isSuccess()) { + int readBytes = tryInteger.get(); + if (readBytes > 0) { + buffer.flip(); + push(out, ByteString.fromByteBuffer(buffer)); + position += readBytes; + buffer.clear(); } else { - failStage(tryInteger.failed().get()); + // hit end, try again in a while + scheduleOnce("poll", pollingInterval); } - }); - } + } else { + failStage(tryInteger.failed().get()); + } - @Override - public void onTimer(Object timerKey) { - doPull(); - } + }); + } + @Override + public void onTimer(Object timerKey) { + doPull(); + } - private void doPull() { - channel.read(buffer, position, chunkCallback, completionHandler); - } - @Override - public void postStop() { - try { - if (channel.isOpen()) channel.close(); - } catch(Exception ex) { - // Remove when #21168 is fixed - throw new RuntimeException(ex); - } - } - }; + private void doPull() { + channel.read(buffer, position, chunkCallback, completionHandler); + } - } catch (Exception ex) { - // remove when #21168 is fixed - throw new RuntimeException(ex); - } + @Override + public void postStop() { + try { + if (channel.isOpen()) channel.close(); + } catch(Exception ex) { + // Remove when #21168 is fixed + throw new RuntimeException(ex); + } + } + }; } diff --git a/project/Common.scala b/project/Common.scala index f44a8ad..12b95dd 100644 --- a/project/Common.scala +++ b/project/Common.scala @@ -11,7 +11,7 @@ import com.typesafe.sbt.SbtScalariform.ScalariformKeys object Common extends AutoPlugin { - val AkkaVersion = "2.4.8" + val AkkaVersion = "2.4.9" override def trigger = allRequirements override def requires = plugins.JvmPlugin && HeaderPlugin