Skip to content

Commit

Permalink
Merge pull request #1634 from metamx/IrcFirehoseFixes
Browse files Browse the repository at this point in the history
Allow IrcFirehoseFactory to shutdown cleanly
  • Loading branch information
fjy committed Aug 19, 2015
2 parents 3b2e41e + cd2c377 commit 6a03892
Showing 1 changed file with 40 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* <p><b>Example code:</b></p>
* <pre>{@code
*
* <p/>
* IrcFirehoseFactory factory = new IrcFirehoseFactory(
* "wiki123",
* "irc.wikimedia.org",
Expand All @@ -63,6 +64,7 @@ public class IrcFirehoseFactory implements FirehoseFactory<IrcInputRowParser>
private final String nick;
private final String host;
private final List<String> channels;
private volatile boolean closed = false;

@JsonCreator
public IrcFirehoseFactory(
Expand Down Expand Up @@ -100,17 +102,21 @@ public Firehose connect(final IrcInputRowParser firehoseParser) throws IOExcepti
final IRCApi irc = new IRCApiImpl(false);
final LinkedBlockingQueue<Pair<DateTime, ChannelPrivMsg>> queue = new LinkedBlockingQueue<Pair<DateTime, ChannelPrivMsg>>();

irc.addListener(new VariousMessageListenerAdapter() {
@Override
public void onChannelMessage(ChannelPrivMsg aMsg)
{
try {
queue.put(Pair.of(DateTime.now(), aMsg));
} catch(InterruptedException e) {
throw new RuntimeException("interrupted adding message to queue", e);
irc.addListener(
new VariousMessageListenerAdapter()
{
@Override
public void onChannelMessage(ChannelPrivMsg aMsg)
{
try {
queue.put(Pair.of(DateTime.now(), aMsg));
}
catch (InterruptedException e) {
throw new RuntimeException("interrupted adding message to queue", e);
}
}
}
}
});
);

log.info("connecting to irc server [%s]", host);
irc.connect(
Expand Down Expand Up @@ -152,7 +158,7 @@ public IRCServer getServer()
public void onSuccess(IIRCState aObject)
{
log.info("irc connection to server [%s] established", host);
for(String chan : channels) {
for (String chan : channels) {
log.info("Joining channel %s", chan);
irc.joinChannel(chan);
}
Expand All @@ -164,8 +170,10 @@ public void onFailure(Exception e)
log.error(e, "Unable to connect to irc server [%s]", host);
throw new RuntimeException("Unable to connect to server", e);
}
});
}
);

closed = false;

return new Firehose()
{
Expand All @@ -175,18 +183,26 @@ public void onFailure(Exception e)
public boolean hasMore()
{
try {
while(true) {
Pair<DateTime, ChannelPrivMsg> nextMsg = queue.take();
while (true) {
Pair<DateTime, ChannelPrivMsg> nextMsg = queue.poll(100, TimeUnit.MILLISECONDS);
if (closed) {
return false;
}
if (nextMsg == null) {
continue;
}
try {
nextRow = firehoseParser.parse(nextMsg);
if(nextRow != null) return true;
if (nextRow != null) {
return true;
}
}
catch (IllegalArgumentException iae) {
log.debug("ignoring invalid message in channel [%s]", nextMsg.rhs.getChannelName());
}
}
}
catch(InterruptedException e) {
catch (InterruptedException e) {
Thread.interrupted();
throw new RuntimeException("interrupted retrieving elements from queue", e);
}
Expand Down Expand Up @@ -214,8 +230,13 @@ public void run()
@Override
public void close() throws IOException
{
log.info("disconnecting from irc server [%s]", host);
irc.disconnect("");
try {
log.info("disconnecting from irc server [%s]", host);
irc.disconnect("");
}
finally {
closed = true;
}
}
};
}
Expand Down

0 comments on commit 6a03892

Please sign in to comment.