Skip to content

Commit

Permalink
Getting latest bars and ticks from history is now done with Observable
Browse files Browse the repository at this point in the history
as return types. This makes retries easier and more natural. The user
can decide on its own how to deal with incoming instances.
  • Loading branch information
reiss authored and reiss committed Aug 5, 2016
1 parent 98e7c4f commit 75e6bd8
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 80 deletions.
32 changes: 12 additions & 20 deletions src/main/java/com/jforex/programming/misc/HistoryUtil.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package com.jforex.programming.misc;

import java.util.AbstractMap.SimpleEntry;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -24,32 +21,28 @@
public class HistoryUtil {

private final IHistory history;
private final int maxBarTickRetries = 10;

private static final Logger logger = LogManager.getLogger(HistoryUtil.class);

public HistoryUtil(final IHistory history) {
this.history = history;
}

public Map<Instrument, TickQuote> tickQuotes(final Set<Instrument> instruments) {
return instruments
.stream()
.map(instrument -> {
final TickQuote tickQuote = new TickQuote(instrument, tickQuote(instrument));
return new SimpleEntry<>(instrument, tickQuote);
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
public Observable<TickQuote> tickQuotesObservable(final Set<Instrument> instruments) {
return Observable
.from(instruments)
.flatMap(this::lastestTickObservable)
.zipWith(instruments,
(tick, instrument) -> new TickQuote(instrument, tick));
}

public ITick tickQuote(final Instrument instrument) {
public Observable<ITick> lastestTickObservable(final Instrument instrument) {
return Observable
.fromCallable(() -> latestHistoryTick(instrument))
.doOnError(e -> logger.error(e.getMessage()
+ "! Will retry latest tick from history now..."))
.retry(maxBarTickRetries)
.toBlocking()
.single();
.retryWhen(StreamUtil::retryOnHistoryFailObservable)
.take(1);
}

private ITick latestHistoryTick(final Instrument instrument) throws JFException {
Expand All @@ -60,7 +53,7 @@ private ITick latestHistoryTick(final Instrument instrument) throws JFException
return tick;
}

public IBar barQuote(final BarParams barParams) {
public Observable<IBar> latestBarObservable(final BarParams barParams) {
final Instrument instrument = barParams.instrument();
final Period period = barParams.period();
final OfferSide offerSide = barParams.offerSide();
Expand All @@ -69,9 +62,8 @@ public IBar barQuote(final BarParams barParams) {
.fromCallable(() -> latestHistoryBar(instrument, period, offerSide))
.doOnError(e -> logger.error(e.getMessage()
+ "! Will retry latest bar from history now..."))
.retry(maxBarTickRetries)
.toBlocking()
.single();
.retryWhen(StreamUtil::retryOnHistoryFailObservable)
.take(1);
}

private IBar latestHistoryBar(final Instrument instrument,
Expand Down
21 changes: 15 additions & 6 deletions src/main/java/com/jforex/programming/misc/StreamUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ private StreamUtil() {
private static final int maxRetriesOnOrderFail = platformSettings.maxRetriesOnOrderFail();
private static final Logger logger = LogManager.getLogger(StreamUtil.class);

public static final Observable<Long> retryObservable(final Observable<? extends Throwable> errors) {
public static final Observable<Long> retryOnRejectObservable(final Observable<? extends Throwable> errors) {
return checkNotNull(errors)
.flatMap(StreamUtil::filterErrorType)
.flatMap(StreamUtil::filterCallErrorType)
.zipWith(retryCounterObservable(maxRetriesOnOrderFail), Pair::of)
.flatMap(retryPair -> evaluateRetryPair(retryPair,
delayOnOrderFailRetry,
TimeUnit.MILLISECONDS,
maxRetriesOnOrderFail));
}

private static final Observable<? extends Throwable> filterErrorType(final Throwable error) {
private static final Observable<? extends Throwable> filterCallErrorType(final Throwable error) {
if (error instanceof OrderCallRejectException) {
logPositionTaskRetry((OrderCallRejectException) error);
return Observable.just(error);
Expand All @@ -59,8 +59,8 @@ public static final Observable<Integer> retryCounterObservable(final int maxRetr
return Observable.range(1, maxRetries + 1);
}

public static Observable<Long> waitObservable(final long delay,
final TimeUnit timeUnit) {
public static final Observable<Long> waitObservable(final long delay,
final TimeUnit timeUnit) {
return Observable
.interval(delay, timeUnit)
.take(1);
Expand All @@ -72,7 +72,7 @@ private static final void logPositionTaskRetry(final OrderCallRejectException re
+ " Will retry task in " + delayOnOrderFailRetry + " milliseconds...");
}

public static <T> Stream<T> optionalStream(final Optional<T> optional) {
public static final <T> Stream<T> optionalStream(final Optional<T> optional) {
return checkNotNull(optional).isPresent()
? Stream.of(optional.get())
: Stream.empty();
Expand All @@ -86,4 +86,13 @@ public static final Completable CompletableFromJFRunnable(final JFRunnable jfRun
return null;
});
}

public static final Observable<Long> retryOnHistoryFailObservable(final Observable<? extends Throwable> errors) {
return checkNotNull(errors)
.zipWith(retryCounterObservable(5), Pair::of)
.flatMap(retryPair -> evaluateRetryPair(retryPair,
500L,
TimeUnit.MILLISECONDS,
5));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ public BarQuote get(final BarParams barParams) {
}

private BarQuote quoteFromHistory(final BarParams barParams) {
final IBar historyBar = historyUtil.barQuote(barParams);
final IBar historyBar = historyUtil
.latestBarObservable(barParams)
.toBlocking()
.first();
final BarQuote barQuote = new BarQuote(historyBar, barParams);

onBarQuote(barQuote);

return barQuote;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,23 @@

import java.util.Map;
import java.util.Set;

import com.jforex.programming.misc.HistoryUtil;
import java.util.concurrent.ConcurrentHashMap;

import com.dukascopy.api.Instrument;
import com.jforex.programming.misc.HistoryUtil;

import rx.Observable;

public class TickQuoteRepository {

private final Map<Instrument, TickQuote> quotesByInstrument;
private final Map<Instrument, TickQuote> quotesByInstrument = new ConcurrentHashMap<>();

public TickQuoteRepository(final Observable<TickQuote> tickQuoteObservable,
final HistoryUtil historyUtil,
final Set<Instrument> subscribedInstruments) {
quotesByInstrument = historyUtil.tickQuotes(subscribedInstruments);
historyUtil
.tickQuotesObservable(subscribedInstruments)
.subscribe(quote -> quotesByInstrument.put(quote.instrument(), quote));

tickQuoteObservable.subscribe(this::onTickQuote);
}
Expand Down
90 changes: 57 additions & 33 deletions src/test/java/com/jforex/programming/misc/test/HistoryUtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -16,95 +18,117 @@
import com.dukascopy.api.OfferSide;
import com.google.common.collect.Sets;
import com.jforex.programming.misc.HistoryUtil;
import com.jforex.programming.quote.QuoteProviderException;
import com.jforex.programming.quote.TickQuote;
import com.jforex.programming.test.common.QuoteProviderForTest;
import com.jforex.programming.test.common.RxTestUtil;

import rx.observers.TestSubscriber;

public class HistoryUtilTest extends QuoteProviderForTest {

private HistoryUtil historyUtil;

private final TestSubscriber<ITick> tickSubscriber = new TestSubscriber<>();
private final TestSubscriber<IBar> barSubscriber = new TestSubscriber<>();

@Before
public void setUp() {
historyUtil = new HistoryUtil(historyMock);
}

private void assertTickSubscriber() {
tickSubscriber.assertNoErrors();
tickSubscriber.assertCompleted();
tickSubscriber.assertValueCount(1);
assertThat(tickSubscriber.getOnNextEvents().get(0), equalTo(tickEURUSD));
}

private void assertBarSubscriber() {
barSubscriber.assertNoErrors();
barSubscriber.assertCompleted();
barSubscriber.assertValueCount(1);
assertThat(barSubscriber.getOnNextEvents().get(0), equalTo(askBarEURUSD));
}

@Test
public void latestTickIsCorrect() throws JFException {
when(historyMock.getLastTick(instrumentEURUSD)).thenReturn(tickEURUSD);
when(historyMock.getLastTick(instrumentEURUSD))
.thenReturn(tickEURUSD);

final ITick latestTick = historyUtil.tickQuote(instrumentEURUSD);
historyUtil
.lastestTickObservable(instrumentEURUSD)
.subscribe(tickSubscriber);

assertThat(latestTick, equalTo(tickEURUSD));
assertTickSubscriber();
}

@Test
public void latestTickWithRetriesIsCorrect() throws JFException {
when(historyMock.getLastTick(instrumentEURUSD))
.thenThrow(jfException)
.thenThrow(jfException)
.thenThrow(jfException)
.thenReturn(null)
.thenReturn(tickEURUSD);

final ITick latestTick = historyUtil.tickQuote(instrumentEURUSD);
historyUtil
.lastestTickObservable(instrumentEURUSD)
.subscribe(tickSubscriber);

assertThat(latestTick, equalTo(tickEURUSD));
verify(historyMock, times(4)).getLastTick(instrumentEURUSD);
}
RxTestUtil.advanceTimeBy(5000L, TimeUnit.MILLISECONDS);

@Test(expected = QuoteProviderException.class)
public void latestTickThrowsWhenHistoryReturnsNullTick() throws JFException {
when(historyMock.getLastTick(instrumentEURUSD))
.thenReturn(null);

historyUtil.tickQuote(instrumentEURUSD);
verify(historyMock, times(4)).getLastTick(instrumentEURUSD);
assertTickSubscriber();
}

@Test
public void tickQuotesMapIsCorrect() throws JFException {
final Set<Instrument> instruments = Sets.newHashSet(instrumentEURUSD, instrumentAUDUSD);
when(historyMock.getLastTick(instrumentEURUSD)).thenReturn(tickEURUSD);
when(historyMock.getLastTick(instrumentAUDUSD)).thenReturn(tickAUDUSD);
when(historyMock.getLastTick(instrumentEURUSD))
.thenReturn(tickEURUSD);
when(historyMock.getLastTick(instrumentAUDUSD))
.thenReturn(tickAUDUSD);
final Map<Instrument, TickQuote> quotesByInstrument = new HashMap<>();

final Map<Instrument, TickQuote> tickQuotes = historyUtil.tickQuotes(instruments);
historyUtil
.tickQuotesObservable(instruments)
.subscribe(quote -> quotesByInstrument.put(quote.instrument(), quote));

assertThat(tickQuotes.size(), equalTo(2));
assertEqualTickQuotes(tickQuotes.get(instrumentEURUSD), tickQuoteEURUSD);
assertEqualTickQuotes(tickQuotes.get(instrumentAUDUSD), tickQuoteAUDUSD);
assertThat(quotesByInstrument.size(), equalTo(2));
assertEqualTickQuotes(quotesByInstrument.get(instrumentEURUSD), tickQuoteEURUSD);
assertEqualTickQuotes(quotesByInstrument.get(instrumentAUDUSD), tickQuoteAUDUSD);
}

@Test
public void latestBarIsCorrect() throws JFException {
when(historyMock.getBar(instrumentEURUSD, barQuotePeriod, OfferSide.ASK, 1))
.thenReturn(askBarEURUSD);

final IBar latestBar = historyUtil.barQuote(askBarEURUSDParams);
historyUtil
.latestBarObservable(askBarEURUSDParams)
.subscribe(barSubscriber);

assertThat(latestBar, equalTo(askBarEURUSD));
assertBarSubscriber();
}

@Test
public void latestBarWithRetriesIsCorrect() throws JFException {
when(historyMock.getBar(instrumentEURUSD, barQuotePeriod, OfferSide.ASK, 1))
.thenThrow(jfException)
.thenThrow(jfException)
.thenThrow(jfException)
.thenReturn(null)
.thenReturn(askBarEURUSD);

final IBar latestBar = historyUtil.barQuote(askBarEURUSDParams);
historyUtil
.latestBarObservable(askBarEURUSDParams)
.subscribe(barSubscriber);

RxTestUtil.advanceTimeBy(5000L, TimeUnit.MILLISECONDS);

assertThat(latestBar, equalTo(askBarEURUSD));
verify(historyMock, times(4)).getBar(instrumentEURUSD,
barQuotePeriod,
OfferSide.ASK,
1);
}

@Test(expected = QuoteProviderException.class)
public void latestBarThrowsWhenHistoryReturnsNullBar() throws JFException {
when(historyMock.getBar(instrumentEURUSD, barQuotePeriod, OfferSide.ASK, 1))
.thenReturn(null);

historyUtil.barQuote(askBarEURUSDParams);
assertBarSubscriber();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class RetryObservableSetup {
public void setUp() {
retryCall = () -> Observable
.fromCallable(callableMock)
.retryWhen(StreamUtil::retryObservable)
.retryWhen(StreamUtil::retryOnRejectObservable)
.subscribe(orderSubscriber);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.jforex.programming.test.common.QuoteProviderForTest;

import de.bechte.junit.runners.context.HierarchicalContextRunner;
import rx.Observable;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;

Expand All @@ -28,24 +29,24 @@ public class BeforeBarsReceived {

@Test
public void askQuoteForEURUSDComesFromHistory() {
when(historyUtilMock.barQuote(askBarEURUSDParams))
.thenReturn(askBarEURUSD);
when(historyUtilMock.latestBarObservable(askBarEURUSDParams))
.thenReturn(Observable.just(askBarEURUSD));

final BarQuote receivedQuoteEURUSD = barQuoteRepository.get(askBarEURUSDParams);

assertEqualBarQuotes(receivedQuoteEURUSD, askBarQuoteEURUSD);
verify(historyUtilMock).barQuote(askBarEURUSDParams);
verify(historyUtilMock).latestBarObservable(askBarEURUSDParams);
}

@Test
public void bidQuoteForAUDUSDComesFromHistory() {
when(historyUtilMock.barQuote(bidBarAUDUSDParams))
.thenReturn(bidBarAUDUSD);
when(historyUtilMock.latestBarObservable(bidBarAUDUSDParams))
.thenReturn(Observable.just(bidBarAUDUSD));

final BarQuote receivedQuoteAUDUSD = barQuoteRepository.get(bidBarAUDUSDParams);

assertEqualBarQuotes(receivedQuoteAUDUSD, bidBarQuoteAUDUSD);
verify(historyUtilMock).barQuote(bidBarAUDUSDParams);
verify(historyUtilMock).latestBarObservable(bidBarAUDUSDParams);
}

public class AfterReceivedBars {
Expand Down
Loading

0 comments on commit 75e6bd8

Please sign in to comment.