Skip to content

Commit

Permalink
feature:: support minute aggs as streaming source
Browse files Browse the repository at this point in the history
so far the only data source was quotes (bid ask)
with this addition one can select the data source to be:
- quotes
- minute aggs
  • Loading branch information
shlomiku committed Dec 23, 2020
1 parent 2a7a14a commit 5dcbddd
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 14 deletions.
35 changes: 32 additions & 3 deletions alpaca_backtrader_api/alpacadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from backtrader.feed import DataBase
from backtrader import date2num, num2date
from backtrader.utils.py3 import queue, with_metaclass
import backtrader as bt

from alpaca_backtrader_api import alpacastore

Expand Down Expand Up @@ -155,7 +156,13 @@ def islive(self):
def __init__(self, **kwargs):
self.o = self._store(**kwargs)
self._candleFormat = 'bidask' if self.p.bidask else 'midpoint'
self._timeframe = self.p.timeframe
self.do_qcheck(True, 0)
if self._timeframe not in [bt.TimeFrame.Ticks,
bt.TimeFrame.Minutes,
bt.TimeFrame.Days]:
raise Exception(f'Unsupported time frame: '
f'{bt.TimeFrame.TName(self._timeframe)}')

def setenvironment(self, env):
"""
Expand Down Expand Up @@ -224,7 +231,9 @@ def _st_start(self, instart=True, tmout=None):

self._state = self._ST_HISTORBACK
return True
self.qlive = self.o.streaming_prices(self.p.dataname, tmout=tmout)
self.qlive = self.o.streaming_prices(self.p.dataname,
self.p.timeframe,
tmout=tmout)
if instart:
self._statelivereconn = self.p.backfill_start
else:
Expand Down Expand Up @@ -299,8 +308,13 @@ def _load(self):
if self._laststatus != self.LIVE:
if self.qlive.qsize() <= 1: # very short live queue
self.put_notification(self.LIVE)

ret = self._load_tick(msg)
if self.p.timeframe == bt.TimeFrame.Ticks:
ret = self._load_tick(msg)
elif self.p.timeframe == bt.TimeFrame.Minutes:
ret = self._load_agg(msg)
else:
# might want to act differently in the future
ret = self._load_agg(msg)
if ret:
return True

Expand Down Expand Up @@ -410,6 +424,21 @@ def _load_tick(self, msg):

return True

def _load_agg(self, msg):
dtobj = datetime.utcfromtimestamp(int(msg['time']))
dt = date2num(dtobj)
if dt <= self.lines.datetime[-1]:
return False # time already seen
self.lines.datetime[0] = dt
self.lines.open[0] = msg['open']
self.lines.high[0] = msg['high']
self.lines.low[0] = msg['low']
self.lines.close[0] = msg['close']
self.lines.volume[0] = msg['volume']
self.lines.openinterest[0] = 0.0

return True

def _load_history(self, msg):
dtobj = msg['time'].to_pydatetime()
dt = date2num(dtobj)
Expand Down
49 changes: 38 additions & 11 deletions alpaca_backtrader_api/alpacastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
unicode_literals)
import os
import collections
import time
from enum import Enum
import traceback

Expand Down Expand Up @@ -91,10 +92,17 @@ def _request(self,


class Granularity(Enum):
Ticks = "ticks"
Daily = "day"
Minute = "minute"


class StreamingMethod(Enum):
AccountUpdate = 'account_update'
Quote = "quote"
MinuteAgg = "minute_agg"


class Streamer:
conn = None

Expand All @@ -104,7 +112,7 @@ def __init__(
api_key='',
api_secret='',
instrument='',
method='',
method: StreamingMethod = StreamingMethod.AccountUpdate,
base_url='',
data_url='',
data_stream='',
Expand All @@ -126,19 +134,23 @@ def __init__(
self.q = q
self.conn.on('authenticated')(self.on_auth)
self.conn.on(r'Q.*')(self.on_quotes)
self.conn.on(r'AM.*')(self.on_agg_min)
self.conn.on(r'A.*')(self.on_agg_min)
self.conn.on(r'account_updates')(self.on_account)
self.conn.on(r'trade_updates')(self.on_trade)

def run(self):
channels = []
if not self.method:
if self.method == StreamingMethod.AccountUpdate:
channels = ['trade_updates'] # 'account_updates'
else:
if self.data_stream == 'polygon':
maps = {"quote": "Q."}
maps = {"quote": "Q.",
"minute_agg": "AM."}
elif self.data_stream == 'alpacadatav1':
maps = {"quote": "alpacadatav1/Q."}
channels = [maps[self.method] + self.instrument]
maps = {"quote": "alpacadatav1/Q.",
"minute_agg": "alpacadatav1/AM."}
channels = [maps[self.method.value] + self.instrument]

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
Expand All @@ -159,7 +171,8 @@ async def on_agg_sec(self, conn, subject, msg):
self.q.put(msg)

async def on_agg_min(self, conn, subject, msg):
self.q.put(msg)
msg._raw['time'] = msg.end.to_pydatetime().timestamp()
self.q.put(msg._raw)

async def on_account(self, conn, stream, msg):
self.q.put(msg)
Expand Down Expand Up @@ -308,6 +321,8 @@ def get_positions(self):
return positions

def get_granularity(self, timeframe, compression) -> Granularity:
if timeframe == bt.TimeFrame.Ticks:
return Granularity.Ticks
if timeframe == bt.TimeFrame.Minutes:
return Granularity.Minute
elif timeframe == bt.TimeFrame.Days:
Expand Down Expand Up @@ -440,7 +455,7 @@ def _make_sure_dates_are_initialized_properly(self, dtbegin, dtend,
dates may or may not be specified by the user.
when they do, they are probably don't include NY timezome data
also, when granularity is minute, we want to make sure we get data when
market is opened. so if it doesn't - let's get set end date to be last
market is opened. so if it doesn't - let's set end date to be last
known minute with opened market.
this nethod takes care of all these issues.
:param dtbegin:
Expand Down Expand Up @@ -585,9 +600,12 @@ def _iterate_api_calls():
timeframe = "5Min"
elif granularity == 'minute' and compression == 15:
timeframe = "15Min"
elif granularity == 'ticks':
timeframe = "minute"
else:
timeframe = granularity
r = self.oapi.get_barset(dataname,
'minute' if timeframe == 'ticks' else
timeframe,
limit=1000,
end=curr.isoformat()
Expand Down Expand Up @@ -687,22 +705,31 @@ def _resample(df):
response = response[~response.index.duplicated()]
return response

def streaming_prices(self, dataname, tmout=None):
def streaming_prices(self, dataname, timeframe, tmout=None):
q = queue.Queue()
kwargs = {'q': q, 'dataname': dataname, 'tmout': tmout}
kwargs = {'q': q,
'dataname': dataname,
'timeframe': timeframe,
'tmout': tmout}
t = threading.Thread(target=self._t_streaming_prices, kwargs=kwargs)
t.daemon = True
t.start()
return q

def _t_streaming_prices(self, dataname, q, tmout):
def _t_streaming_prices(self, dataname, timeframe, q, tmout):
if tmout is not None:
_time.sleep(tmout)

if timeframe == bt.TimeFrame.Ticks:
method = StreamingMethod.Quote
elif timeframe == bt.TimeFrame.Minutes:
method = StreamingMethod.MinuteAgg

streamer = Streamer(q,
api_key=self.p.key_id,
api_secret=self.p.secret_key,
instrument=dataname,
method='quote',
method=method,
base_url=self.p.base_url,
data_url=os.environ.get("DATA_PROXY_WS", ''),
data_stream='polygon' if self.p.usePolygon else
Expand Down

0 comments on commit 5dcbddd

Please sign in to comment.