Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add latch mechanism to pause/resume in inmemory source #1605

Merged
merged 3 commits into from
Jan 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import io.siddhi.core.util.transport.OptionHolder;
import org.apache.log4j.Logger;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* Implementation of {@link Source} to receive events through in-memory transport.
*/
Expand Down Expand Up @@ -62,6 +65,9 @@ public class InMemorySource extends Source {
private static final String TOPIC_KEY = "topic";
private SourceEventListener sourceEventListener;
private InMemoryBroker.Subscriber subscriber;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
private volatile boolean paused = false;

@Override
protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
Expand All @@ -77,6 +83,18 @@ public StateFactory<State> init(SourceEventListener sourceEventListener, OptionH
this.subscriber = new InMemoryBroker.Subscriber() {
@Override
public void onMessage(Object event) {
if (paused) {
pauseLock.lock();
try {
while (paused) {
unpaused.await();
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
} finally {
pauseLock.unlock();
}
}
sourceEventListener.onEvent(event, null);
}

Expand Down Expand Up @@ -110,12 +128,17 @@ public void destroy() {

@Override
public void pause() {
InMemoryBroker.unsubscribe(subscriber);
paused = true;
}

@Override
public void resume() {
InMemoryBroker.subscribe(subscriber);
paused = false;
try {
pauseLock.lock();
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public void receive(Event[] events) {
count++;
if (count > 1) {
float triggerTimeDiff = timestamp / 1000 - lastTimeStamp / 1000;
AssertJUnit.assertTrue(1.0f == triggerTimeDiff);
AssertJUnit.assertTrue(1 == Math.round(triggerTimeDiff));
}
lastTimeStamp = timestamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.time.LocalDate;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class InMemoryTransportTestCase {
Expand Down Expand Up @@ -1564,4 +1565,60 @@ public void receive(Event[] events) {

}

@Test(dependsOnMethods = {"inMemoryTestCase21"})
public void inMemoryTestCase22() throws InterruptedException {
log.info("Test inMemoryTestCase22");
SiddhiManager siddhiManager = new SiddhiManager();

String publisherApp = "" +
"define stream CheckStockStream (symbol1 string, totalPrice double); " +
"@sink(type='inMemory', topic='OutputStream', @map(type='passThrough')) " +
"define stream OutputStream (symbol1 string, totalPrice double); " +
"" +
"from CheckStockStream " +
"select * " +
"insert into OutputStream; ";

String consumerApp = "" +
"@source(type='inMemory', topic='OutputStream', @map(type='passThrough')) " +
"define stream InputStream (symbol1 string, totalPrice double); ";

SiddhiAppRuntime publisherRuntime = siddhiManager.createSiddhiAppRuntime(publisherApp);
SiddhiAppRuntime consumerRuntime = siddhiManager.createSiddhiAppRuntime(consumerApp);

consumerRuntime.addCallback("InputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
wso2Count.incrementAndGet();
}
});
InputHandler stockStream = publisherRuntime.getInputHandler("CheckStockStream");

publisherRuntime.start();
consumerRuntime.start();

stockStream.send(new Object[]{"WSO2", 50.0f});
stockStream.send(new Object[]{"WSO2", 70.0f});
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
consumerRuntime.getSources().iterator().next().get(0).pause();
}
});
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
stockStream.send(new Object[]{"WSO2", 90f});
} catch (InterruptedException ignored) {
}
}
});
Thread.sleep(2000);
consumerRuntime.getSources().iterator().next().get(0).resume();
Thread.sleep(2000);
Assert.assertEquals(wso2Count.get(), 3);
siddhiManager.shutdown();
}
}