Skip to content

Commit

Permalink
Merge pull request #48 from 2m/wip/akka-2.4.9
Browse files Browse the repository at this point in the history
Update to Akka 2.4.9
  • Loading branch information
2m authored Aug 19, 2016
2 parents d0c39ba + d95913e commit c8685c3
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 164 deletions.
202 changes: 100 additions & 102 deletions contrib/src/main/java/akka/stream/contrib/DirectoryChanges.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.TimerGraphStageLogic;

import com.sun.nio.file.SensitivityWatchEventModifier;

import scala.Tuple2;
import scala.concurrent.duration.FiniteDuration;

import java.io.IOException;
import java.nio.file.*;
import java.util.ArrayDeque;
import java.util.Queue;
Expand Down Expand Up @@ -67,131 +70,126 @@ public Attributes initialAttributes() {
}

@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
public GraphStageLogic createLogic(Attributes inheritedAttributes) throws IOException {
if (!Files.exists(directoryPath)) throw new IllegalArgumentException("The path: '" + directoryPath + "' does not exist");
if (!Files.isDirectory(directoryPath)) throw new IllegalArgumentException("The path '" + directoryPath + "' is not a directory");

try {
return new TimerGraphStageLogic(shape) {
private final Queue<Pair<Path, Change>> buffer = new ArrayDeque<>();
private final WatchService service = directoryPath.getFileSystem().newWatchService();
private final WatchKey watchKey = directoryPath.register(
service,
new WatchEvent.Kind<?>[] { ENTRY_CREATE, ENTRY_MODIFY, ENTRY_DELETE, OVERFLOW },
// this is com.sun internal, but the service is useless on OSX without it
SensitivityWatchEventModifier.HIGH
);

{
setHandler(out, new AbstractOutHandler(){

@Override
public void onPull() throws Exception {
return new TimerGraphStageLogic(shape) {
private final Queue<Pair<Path, Change>> buffer = new ArrayDeque<>();
private final WatchService service = directoryPath.getFileSystem().newWatchService();
private final WatchKey watchKey = directoryPath.register(
service,
new WatchEvent.Kind<?>[] { ENTRY_CREATE, ENTRY_MODIFY, ENTRY_DELETE, OVERFLOW },
// this is com.sun internal, but the service is useless on OSX without it
SensitivityWatchEventModifier.HIGH
);

{
setHandler(out, new AbstractOutHandler(){

@Override
public void onPull() throws Exception {
if (!buffer.isEmpty()) {
pushHead();
} else {
doPoll();
if (!buffer.isEmpty()) {
pushHead();
} else {
doPoll();
if (!buffer.isEmpty()) {
pushHead();
} else {
schedulePoll();
}
schedulePoll();
}
}
});
}

@Override
public void onTimer(Object timerKey) {
if (!isClosed(out)) {
doPoll();
if (!buffer.isEmpty()) {
pushHead();
} else {
schedulePoll();
}
}
}

@Override
public void postStop() {
try {
if (watchKey.isValid()) watchKey.cancel();
service.close();
} catch (Exception ex) {
// Remove when #21168 is in a release
throw new RuntimeException(ex);
});
}

@Override
public void onTimer(Object timerKey) {
if (!isClosed(out)) {
doPoll();
if (!buffer.isEmpty()) {
pushHead();
} else {
schedulePoll();
}
}

private void pushHead() {
final Pair<Path, Change> head = buffer.poll();
if (head != null) {
push(out, head);
}
}

@Override
public void postStop() {
try {
if (watchKey.isValid()) watchKey.cancel();
service.close();
} catch (Exception ex) {
// Remove when #21168 is in a release
throw new RuntimeException(ex);
}
}

private void schedulePoll() {
scheduleOnce("poll", pollInterval);
private void pushHead() {
final Pair<Path, Change> head = buffer.poll();
if (head != null) {
push(out, head);
}
}

private void doPoll() {
try {
for (WatchEvent<?> event: watchKey.pollEvents()) {
final WatchEvent.Kind<?> kind = event.kind();
private void schedulePoll() {
scheduleOnce("poll", pollInterval);
}

if (OVERFLOW.equals(kind)) {
// overflow means that some file system change events may have been missed,
// that may be ok for some scenarios but to make sure it does not pass unnoticed we fail the stage
failStage(new RuntimeException("Overflow from watch service: '" + directoryPath + "'"));
private void doPoll() {
try {
for (WatchEvent<?> event: watchKey.pollEvents()) {
final WatchEvent.Kind<?> kind = event.kind();

} else {
// if it's not an overflow it must be a Path event
@SuppressWarnings("unchecked")
final Path path = (Path) event.context();
final Path absolutePath = directoryPath.resolve(path);
final Change change = kindToChange(kind);

buffer.add(new Pair<>(absolutePath, change));
if (buffer.size() > maxBufferSize) {
failStage(new RuntimeException("Max event buffer size " +
maxBufferSize + " reached for $path"));
}
}
if (OVERFLOW.equals(kind)) {
// overflow means that some file system change events may have been missed,
// that may be ok for some scenarios but to make sure it does not pass unnoticed we fail the stage
failStage(new RuntimeException("Overflow from watch service: '" + directoryPath + "'"));

} else {
// if it's not an overflow it must be a Path event
@SuppressWarnings("unchecked")
final Path path = (Path) event.context();
final Path absolutePath = directoryPath.resolve(path);
final Change change = kindToChange(kind);

buffer.add(new Pair<>(absolutePath, change));
if (buffer.size() > maxBufferSize) {
failStage(new RuntimeException("Max event buffer size " +
maxBufferSize + " reached for $path"));
}
}
} finally {
if (!watchKey.reset()) {
// directory no longer accessible
completeStage();
}
}
}



// convert from the parametrized API to our much nicer API enum
private Change kindToChange(WatchEvent.Kind<?> kind) {
final Change change;
if (kind.equals(ENTRY_CREATE)) {
change = Change.Creation;
} else if (kind.equals(ENTRY_DELETE)) {
change = Change.Deletion;
} else if (kind.equals(ENTRY_MODIFY)) {
change = Change.Modification;
} else {
throw new RuntimeException("Unexpected kind of event gotten from watch service for path '" +
directoryPath + "': " + kind);
}
return change;
} finally {
if (!watchKey.reset()) {
// directory no longer accessible
completeStage();
}
}
}



// convert from the parametrized API to our much nicer API enum
private Change kindToChange(WatchEvent.Kind<?> kind) {
final Change change;
if (kind.equals(ENTRY_CREATE)) {
change = Change.Creation;
} else if (kind.equals(ENTRY_DELETE)) {
change = Change.Deletion;
} else if (kind.equals(ENTRY_MODIFY)) {
change = Change.Modification;
} else {
throw new RuntimeException("Unexpected kind of event gotten from watch service for path '" +
directoryPath + "': " + kind);
}
return change;
}


};
} catch (Exception ex) {
// Remove when #21168 is in a release
throw new RuntimeException(ex);
}
};
}

@Override
Expand Down Expand Up @@ -227,5 +225,5 @@ public static akka.stream.scaladsl.Source<Tuple2<Path, Change>, NotUsed> apply(P
.map((Pair<Path, Change> pair) -> Tuple2.apply(pair.first(), pair.second()))
.asScala();
}

}
117 changes: 56 additions & 61 deletions contrib/src/main/java/akka/stream/contrib/FileTailSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import scala.util.Success;
import scala.util.Try;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
Expand Down Expand Up @@ -63,75 +64,69 @@ public SourceShape<ByteString> shape() {
}

@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
try {
if (!Files.exists(path)) throw new IllegalArgumentException("Path '" + path + "' does not exist");
if (Files.isDirectory(path)) throw new IllegalArgumentException("Path '" + path + "' cannot be tailed, it is a directory");
if (!Files.isReadable(path)) throw new IllegalArgumentException("No read permission for '" + path + "'");

return new TimerGraphStageLogic(shape) {
private final ByteBuffer buffer = ByteBuffer.allocate(maxChunkSize);
private final AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);

private long position = startingPosition;
private AsyncCallback<Try<Integer>> chunkCallback;

{
setHandler(out, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
doPull();
}
});
}

@Override
public void preStart() {
chunkCallback = createAsyncCallback((tryInteger) -> {
if (tryInteger.isSuccess()) {
int readBytes = tryInteger.get();
if (readBytes > 0) {
buffer.flip();
push(out, ByteString.fromByteBuffer(buffer));
position += readBytes;
buffer.clear();
} else {
// hit end, try again in a while
scheduleOnce("poll", pollingInterval);
}

public GraphStageLogic createLogic(Attributes inheritedAttributes) throws IOException {
if (!Files.exists(path)) throw new IllegalArgumentException("Path '" + path + "' does not exist");
if (Files.isDirectory(path)) throw new IllegalArgumentException("Path '" + path + "' cannot be tailed, it is a directory");
if (!Files.isReadable(path)) throw new IllegalArgumentException("No read permission for '" + path + "'");

return new TimerGraphStageLogic(shape) {
private final ByteBuffer buffer = ByteBuffer.allocate(maxChunkSize);
private final AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);

private long position = startingPosition;
private AsyncCallback<Try<Integer>> chunkCallback;

{
setHandler(out, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
doPull();
}
});
}

@Override
public void preStart() {
chunkCallback = createAsyncCallback((tryInteger) -> {
if (tryInteger.isSuccess()) {
int readBytes = tryInteger.get();
if (readBytes > 0) {
buffer.flip();
push(out, ByteString.fromByteBuffer(buffer));
position += readBytes;
buffer.clear();
} else {
failStage(tryInteger.failed().get());
// hit end, try again in a while
scheduleOnce("poll", pollingInterval);
}

});
}
} else {
failStage(tryInteger.failed().get());
}

@Override
public void onTimer(Object timerKey) {
doPull();
}
});
}

@Override
public void onTimer(Object timerKey) {
doPull();
}

private void doPull() {
channel.read(buffer, position, chunkCallback, completionHandler);
}

@Override
public void postStop() {
try {
if (channel.isOpen()) channel.close();
} catch(Exception ex) {
// Remove when #21168 is fixed
throw new RuntimeException(ex);
}
}
};
private void doPull() {
channel.read(buffer, position, chunkCallback, completionHandler);
}

} catch (Exception ex) {
// remove when #21168 is fixed
throw new RuntimeException(ex);
}
@Override
public void postStop() {
try {
if (channel.isOpen()) channel.close();
} catch(Exception ex) {
// Remove when #21168 is fixed
throw new RuntimeException(ex);
}
}
};
}


Expand Down
2 changes: 1 addition & 1 deletion project/Common.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import com.typesafe.sbt.SbtScalariform.ScalariformKeys

object Common extends AutoPlugin {

val AkkaVersion = "2.4.8"
val AkkaVersion = "2.4.9"

override def trigger = allRequirements
override def requires = plugins.JvmPlugin && HeaderPlugin
Expand Down

0 comments on commit c8685c3

Please sign in to comment.