diff --git a/src/main/java/com/jforex/programming/misc/HistoryUtil.java b/src/main/java/com/jforex/programming/misc/HistoryUtil.java index 3794b88a..05b8c19d 100644 --- a/src/main/java/com/jforex/programming/misc/HistoryUtil.java +++ b/src/main/java/com/jforex/programming/misc/HistoryUtil.java @@ -1,9 +1,6 @@ package com.jforex.programming.misc; -import java.util.AbstractMap.SimpleEntry; -import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -24,7 +21,6 @@ public class HistoryUtil { private final IHistory history; - private final int maxBarTickRetries = 10; private static final Logger logger = LogManager.getLogger(HistoryUtil.class); @@ -32,24 +28,21 @@ public HistoryUtil(final IHistory history) { this.history = history; } - public Map tickQuotes(final Set instruments) { - return instruments - .stream() - .map(instrument -> { - final TickQuote tickQuote = new TickQuote(instrument, tickQuote(instrument)); - return new SimpleEntry<>(instrument, tickQuote); - }) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + public Observable tickQuotesObservable(final Set instruments) { + return Observable + .from(instruments) + .flatMap(this::lastestTickObservable) + .zipWith(instruments, + (tick, instrument) -> new TickQuote(instrument, tick)); } - public ITick tickQuote(final Instrument instrument) { + public Observable lastestTickObservable(final Instrument instrument) { return Observable .fromCallable(() -> latestHistoryTick(instrument)) .doOnError(e -> logger.error(e.getMessage() + "! Will retry latest tick from history now...")) - .retry(maxBarTickRetries) - .toBlocking() - .single(); + .retryWhen(StreamUtil::retryOnHistoryFailObservable) + .take(1); } private ITick latestHistoryTick(final Instrument instrument) throws JFException { @@ -60,7 +53,7 @@ private ITick latestHistoryTick(final Instrument instrument) throws JFException return tick; } - public IBar barQuote(final BarParams barParams) { + public Observable latestBarObservable(final BarParams barParams) { final Instrument instrument = barParams.instrument(); final Period period = barParams.period(); final OfferSide offerSide = barParams.offerSide(); @@ -69,9 +62,8 @@ public IBar barQuote(final BarParams barParams) { .fromCallable(() -> latestHistoryBar(instrument, period, offerSide)) .doOnError(e -> logger.error(e.getMessage() + "! Will retry latest bar from history now...")) - .retry(maxBarTickRetries) - .toBlocking() - .single(); + .retryWhen(StreamUtil::retryOnHistoryFailObservable) + .take(1); } private IBar latestHistoryBar(final Instrument instrument, diff --git a/src/main/java/com/jforex/programming/misc/StreamUtil.java b/src/main/java/com/jforex/programming/misc/StreamUtil.java index f7c38ea8..c77133f4 100644 --- a/src/main/java/com/jforex/programming/misc/StreamUtil.java +++ b/src/main/java/com/jforex/programming/misc/StreamUtil.java @@ -27,9 +27,9 @@ private StreamUtil() { private static final int maxRetriesOnOrderFail = platformSettings.maxRetriesOnOrderFail(); private static final Logger logger = LogManager.getLogger(StreamUtil.class); - public static final Observable retryObservable(final Observable errors) { + public static final Observable retryOnRejectObservable(final Observable errors) { return checkNotNull(errors) - .flatMap(StreamUtil::filterErrorType) + .flatMap(StreamUtil::filterCallErrorType) .zipWith(retryCounterObservable(maxRetriesOnOrderFail), Pair::of) .flatMap(retryPair -> evaluateRetryPair(retryPair, delayOnOrderFailRetry, @@ -37,7 +37,7 @@ public static final Observable retryObservable(final Observable filterErrorType(final Throwable error) { + private static final Observable filterCallErrorType(final Throwable error) { if (error instanceof OrderCallRejectException) { logPositionTaskRetry((OrderCallRejectException) error); return Observable.just(error); @@ -59,8 +59,8 @@ public static final Observable retryCounterObservable(final int maxRetr return Observable.range(1, maxRetries + 1); } - public static Observable waitObservable(final long delay, - final TimeUnit timeUnit) { + public static final Observable waitObservable(final long delay, + final TimeUnit timeUnit) { return Observable .interval(delay, timeUnit) .take(1); @@ -72,7 +72,7 @@ private static final void logPositionTaskRetry(final OrderCallRejectException re + " Will retry task in " + delayOnOrderFailRetry + " milliseconds..."); } - public static Stream optionalStream(final Optional optional) { + public static final Stream optionalStream(final Optional optional) { return checkNotNull(optional).isPresent() ? Stream.of(optional.get()) : Stream.empty(); @@ -86,4 +86,13 @@ public static final Completable CompletableFromJFRunnable(final JFRunnable jfRun return null; }); } + + public static final Observable retryOnHistoryFailObservable(final Observable errors) { + return checkNotNull(errors) + .zipWith(retryCounterObservable(5), Pair::of) + .flatMap(retryPair -> evaluateRetryPair(retryPair, + 500L, + TimeUnit.MILLISECONDS, + 5)); + } } diff --git a/src/main/java/com/jforex/programming/quote/BarQuoteRepository.java b/src/main/java/com/jforex/programming/quote/BarQuoteRepository.java index f43d5aae..973fa9dc 100644 --- a/src/main/java/com/jforex/programming/quote/BarQuoteRepository.java +++ b/src/main/java/com/jforex/programming/quote/BarQuoteRepository.java @@ -41,9 +41,11 @@ public BarQuote get(final BarParams barParams) { } private BarQuote quoteFromHistory(final BarParams barParams) { - final IBar historyBar = historyUtil.barQuote(barParams); + final IBar historyBar = historyUtil + .latestBarObservable(barParams) + .toBlocking() + .first(); final BarQuote barQuote = new BarQuote(historyBar, barParams); - onBarQuote(barQuote); return barQuote; diff --git a/src/main/java/com/jforex/programming/quote/TickQuoteRepository.java b/src/main/java/com/jforex/programming/quote/TickQuoteRepository.java index 872a13a0..4f56f36e 100644 --- a/src/main/java/com/jforex/programming/quote/TickQuoteRepository.java +++ b/src/main/java/com/jforex/programming/quote/TickQuoteRepository.java @@ -2,21 +2,23 @@ import java.util.Map; import java.util.Set; - -import com.jforex.programming.misc.HistoryUtil; +import java.util.concurrent.ConcurrentHashMap; import com.dukascopy.api.Instrument; +import com.jforex.programming.misc.HistoryUtil; import rx.Observable; public class TickQuoteRepository { - private final Map quotesByInstrument; + private final Map quotesByInstrument = new ConcurrentHashMap<>(); public TickQuoteRepository(final Observable tickQuoteObservable, final HistoryUtil historyUtil, final Set subscribedInstruments) { - quotesByInstrument = historyUtil.tickQuotes(subscribedInstruments); + historyUtil + .tickQuotesObservable(subscribedInstruments) + .subscribe(quote -> quotesByInstrument.put(quote.instrument(), quote)); tickQuoteObservable.subscribe(this::onTickQuote); } diff --git a/src/test/java/com/jforex/programming/misc/test/HistoryUtilTest.java b/src/test/java/com/jforex/programming/misc/test/HistoryUtilTest.java index 5dac5539..3506205d 100644 --- a/src/test/java/com/jforex/programming/misc/test/HistoryUtilTest.java +++ b/src/test/java/com/jforex/programming/misc/test/HistoryUtilTest.java @@ -3,8 +3,10 @@ import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; +import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; @@ -16,26 +18,48 @@ import com.dukascopy.api.OfferSide; import com.google.common.collect.Sets; import com.jforex.programming.misc.HistoryUtil; -import com.jforex.programming.quote.QuoteProviderException; import com.jforex.programming.quote.TickQuote; import com.jforex.programming.test.common.QuoteProviderForTest; +import com.jforex.programming.test.common.RxTestUtil; + +import rx.observers.TestSubscriber; public class HistoryUtilTest extends QuoteProviderForTest { private HistoryUtil historyUtil; + private final TestSubscriber tickSubscriber = new TestSubscriber<>(); + private final TestSubscriber barSubscriber = new TestSubscriber<>(); + @Before public void setUp() { historyUtil = new HistoryUtil(historyMock); } + private void assertTickSubscriber() { + tickSubscriber.assertNoErrors(); + tickSubscriber.assertCompleted(); + tickSubscriber.assertValueCount(1); + assertThat(tickSubscriber.getOnNextEvents().get(0), equalTo(tickEURUSD)); + } + + private void assertBarSubscriber() { + barSubscriber.assertNoErrors(); + barSubscriber.assertCompleted(); + barSubscriber.assertValueCount(1); + assertThat(barSubscriber.getOnNextEvents().get(0), equalTo(askBarEURUSD)); + } + @Test public void latestTickIsCorrect() throws JFException { - when(historyMock.getLastTick(instrumentEURUSD)).thenReturn(tickEURUSD); + when(historyMock.getLastTick(instrumentEURUSD)) + .thenReturn(tickEURUSD); - final ITick latestTick = historyUtil.tickQuote(instrumentEURUSD); + historyUtil + .lastestTickObservable(instrumentEURUSD) + .subscribe(tickSubscriber); - assertThat(latestTick, equalTo(tickEURUSD)); + assertTickSubscriber(); } @Test @@ -43,34 +67,35 @@ public void latestTickWithRetriesIsCorrect() throws JFException { when(historyMock.getLastTick(instrumentEURUSD)) .thenThrow(jfException) .thenThrow(jfException) - .thenThrow(jfException) + .thenReturn(null) .thenReturn(tickEURUSD); - final ITick latestTick = historyUtil.tickQuote(instrumentEURUSD); + historyUtil + .lastestTickObservable(instrumentEURUSD) + .subscribe(tickSubscriber); - assertThat(latestTick, equalTo(tickEURUSD)); - verify(historyMock, times(4)).getLastTick(instrumentEURUSD); - } + RxTestUtil.advanceTimeBy(5000L, TimeUnit.MILLISECONDS); - @Test(expected = QuoteProviderException.class) - public void latestTickThrowsWhenHistoryReturnsNullTick() throws JFException { - when(historyMock.getLastTick(instrumentEURUSD)) - .thenReturn(null); - - historyUtil.tickQuote(instrumentEURUSD); + verify(historyMock, times(4)).getLastTick(instrumentEURUSD); + assertTickSubscriber(); } @Test public void tickQuotesMapIsCorrect() throws JFException { final Set instruments = Sets.newHashSet(instrumentEURUSD, instrumentAUDUSD); - when(historyMock.getLastTick(instrumentEURUSD)).thenReturn(tickEURUSD); - when(historyMock.getLastTick(instrumentAUDUSD)).thenReturn(tickAUDUSD); + when(historyMock.getLastTick(instrumentEURUSD)) + .thenReturn(tickEURUSD); + when(historyMock.getLastTick(instrumentAUDUSD)) + .thenReturn(tickAUDUSD); + final Map quotesByInstrument = new HashMap<>(); - final Map tickQuotes = historyUtil.tickQuotes(instruments); + historyUtil + .tickQuotesObservable(instruments) + .subscribe(quote -> quotesByInstrument.put(quote.instrument(), quote)); - assertThat(tickQuotes.size(), equalTo(2)); - assertEqualTickQuotes(tickQuotes.get(instrumentEURUSD), tickQuoteEURUSD); - assertEqualTickQuotes(tickQuotes.get(instrumentAUDUSD), tickQuoteAUDUSD); + assertThat(quotesByInstrument.size(), equalTo(2)); + assertEqualTickQuotes(quotesByInstrument.get(instrumentEURUSD), tickQuoteEURUSD); + assertEqualTickQuotes(quotesByInstrument.get(instrumentAUDUSD), tickQuoteAUDUSD); } @Test @@ -78,9 +103,11 @@ public void latestBarIsCorrect() throws JFException { when(historyMock.getBar(instrumentEURUSD, barQuotePeriod, OfferSide.ASK, 1)) .thenReturn(askBarEURUSD); - final IBar latestBar = historyUtil.barQuote(askBarEURUSDParams); + historyUtil + .latestBarObservable(askBarEURUSDParams) + .subscribe(barSubscriber); - assertThat(latestBar, equalTo(askBarEURUSD)); + assertBarSubscriber(); } @Test @@ -88,23 +115,20 @@ public void latestBarWithRetriesIsCorrect() throws JFException { when(historyMock.getBar(instrumentEURUSD, barQuotePeriod, OfferSide.ASK, 1)) .thenThrow(jfException) .thenThrow(jfException) - .thenThrow(jfException) + .thenReturn(null) .thenReturn(askBarEURUSD); - final IBar latestBar = historyUtil.barQuote(askBarEURUSDParams); + historyUtil + .latestBarObservable(askBarEURUSDParams) + .subscribe(barSubscriber); + + RxTestUtil.advanceTimeBy(5000L, TimeUnit.MILLISECONDS); - assertThat(latestBar, equalTo(askBarEURUSD)); verify(historyMock, times(4)).getBar(instrumentEURUSD, barQuotePeriod, OfferSide.ASK, 1); - } - - @Test(expected = QuoteProviderException.class) - public void latestBarThrowsWhenHistoryReturnsNullBar() throws JFException { - when(historyMock.getBar(instrumentEURUSD, barQuotePeriod, OfferSide.ASK, 1)) - .thenReturn(null); - historyUtil.barQuote(askBarEURUSDParams); + assertBarSubscriber(); } } diff --git a/src/test/java/com/jforex/programming/misc/test/StreamUtilTest.java b/src/test/java/com/jforex/programming/misc/test/StreamUtilTest.java index f382d375..180e8e0d 100644 --- a/src/test/java/com/jforex/programming/misc/test/StreamUtilTest.java +++ b/src/test/java/com/jforex/programming/misc/test/StreamUtilTest.java @@ -58,7 +58,7 @@ public class RetryObservableSetup { public void setUp() { retryCall = () -> Observable .fromCallable(callableMock) - .retryWhen(StreamUtil::retryObservable) + .retryWhen(StreamUtil::retryOnRejectObservable) .subscribe(orderSubscriber); } diff --git a/src/test/java/com/jforex/programming/quote/test/BarQuoteRepositoryTest.java b/src/test/java/com/jforex/programming/quote/test/BarQuoteRepositoryTest.java index 8ad66af4..cab717cc 100644 --- a/src/test/java/com/jforex/programming/quote/test/BarQuoteRepositoryTest.java +++ b/src/test/java/com/jforex/programming/quote/test/BarQuoteRepositoryTest.java @@ -9,6 +9,7 @@ import com.jforex.programming.test.common.QuoteProviderForTest; import de.bechte.junit.runners.context.HierarchicalContextRunner; +import rx.Observable; import rx.subjects.PublishSubject; import rx.subjects.Subject; @@ -28,24 +29,24 @@ public class BeforeBarsReceived { @Test public void askQuoteForEURUSDComesFromHistory() { - when(historyUtilMock.barQuote(askBarEURUSDParams)) - .thenReturn(askBarEURUSD); + when(historyUtilMock.latestBarObservable(askBarEURUSDParams)) + .thenReturn(Observable.just(askBarEURUSD)); final BarQuote receivedQuoteEURUSD = barQuoteRepository.get(askBarEURUSDParams); assertEqualBarQuotes(receivedQuoteEURUSD, askBarQuoteEURUSD); - verify(historyUtilMock).barQuote(askBarEURUSDParams); + verify(historyUtilMock).latestBarObservable(askBarEURUSDParams); } @Test public void bidQuoteForAUDUSDComesFromHistory() { - when(historyUtilMock.barQuote(bidBarAUDUSDParams)) - .thenReturn(bidBarAUDUSD); + when(historyUtilMock.latestBarObservable(bidBarAUDUSDParams)) + .thenReturn(Observable.just(bidBarAUDUSD)); final BarQuote receivedQuoteAUDUSD = barQuoteRepository.get(bidBarAUDUSDParams); assertEqualBarQuotes(receivedQuoteAUDUSD, bidBarQuoteAUDUSD); - verify(historyUtilMock).barQuote(bidBarAUDUSDParams); + verify(historyUtilMock).latestBarObservable(bidBarAUDUSDParams); } public class AfterReceivedBars { diff --git a/src/test/java/com/jforex/programming/quote/test/TickQuoteRepositoryTest.java b/src/test/java/com/jforex/programming/quote/test/TickQuoteRepositoryTest.java index 4f0c2b71..3e73d5c0 100644 --- a/src/test/java/com/jforex/programming/quote/test/TickQuoteRepositoryTest.java +++ b/src/test/java/com/jforex/programming/quote/test/TickQuoteRepositoryTest.java @@ -3,8 +3,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; -import java.util.HashMap; -import java.util.Map; import java.util.Set; import org.junit.Before; @@ -18,6 +16,7 @@ import com.jforex.programming.test.common.QuoteProviderForTest; import de.bechte.junit.runners.context.HierarchicalContextRunner; +import rx.Observable; import rx.subjects.PublishSubject; import rx.subjects.Subject; @@ -26,7 +25,6 @@ public class TickQuoteRepositoryTest extends QuoteProviderForTest { private TickQuoteRepository tickQuoteRepository; - private final Map historyQuotes = new HashMap<>(); private final Subject quoteObservable = PublishSubject.create(); private final Set subscribedInstruments = Sets.newHashSet(instrumentEURUSD, instrumentAUDUSD); @@ -34,8 +32,6 @@ public class TickQuoteRepositoryTest extends QuoteProviderForTest { @Before public void setUp() { setUpMocks(); - historyQuotes.put(instrumentEURUSD, tickQuoteEURUSD); - historyQuotes.put(instrumentAUDUSD, tickQuoteAUDUSD); tickQuoteRepository = new TickQuoteRepository(quoteObservable, historyUtilMock, @@ -43,15 +39,15 @@ public void setUp() { } private void setUpMocks() { - when(historyUtilMock.tickQuotes(subscribedInstruments)) - .thenReturn(historyQuotes); + when(historyUtilMock.tickQuotesObservable(subscribedInstruments)) + .thenReturn(Observable.just(tickQuoteEURUSD, tickQuoteAUDUSD)); } public class BeforeTicksReceived { @Test public void quotesForSubscribedInstrumentsComeFromHistory() { - verify(historyUtilMock).tickQuotes(subscribedInstruments); + verify(historyUtilMock).tickQuotesObservable(subscribedInstruments); } @Test