Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/parallel multi event simulation #687

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,13 @@ strategy = ExampleStrategy(
)
```

To process multiple events in parallel, provide a mapping for event groups in the format `{event_id: event_group}`. In this example, events with IDs `"123"` and `"456"` are added to execution group `"A"` and will be simulated together.
```python
strategy = ExampleStrategy(
market_filter={"markets": [..], "event_processing": True, "event_groups": {"123": "A", "456": "A"}}
)
```

The `Market` object contains a helper method for accessing other event linked markets:

```python
Expand Down
24 changes: 12 additions & 12 deletions flumine/simulation/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,18 @@ def run(self) -> None:
Event data to be muxed/processed chronologically as per
live rather than single which is per market in isolation.
"""
event_streams = defaultdict(list) # eventId: [<Stream>, ..]
event_group_streams = defaultdict(list) # event_group: [<Stream>, ..]
for stream in self.streams:
event_id = stream.event_id if stream.event_processing else None
event_streams[event_id].append(stream)
# stream.event_group is None if stream is added without
# event_processing=True
event_group_streams[stream.event_group].append(stream)

for event_id, streams in event_streams.items():
if event_id and len(streams) > 1:
for event_group, streams in event_group_streams.items():
if event_group and len(streams) > 1:
logger.info(
"Starting historical event '%s'",
event_id,
extra={
"event_id": event_id,
"markets": [s.market_filter for s in streams],
},
"Starting historical event group '%s'",
event_group,
extra={"markets": [s.market_filter for s in streams]},
)
self.simulated_datetime.reset_real_datetime()
# create cycles
Expand Down Expand Up @@ -83,7 +81,9 @@ def run(self) -> None:
# add back
cycles.append([publish_time_epoch, market_book, stream_gen])
self.handler_queue.clear()
logger.info("Completed historical event '%s'", event_id)
logger.info(
"Completed historical event group '%s'", event_group
)
else:
for stream in streams:
logger.info(
Expand Down
2 changes: 2 additions & 0 deletions flumine/streams/basestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(
client=None,
output_queue: bool = True,
event_processing: bool = False,
event_group: str = None,
event_id: str = None,
operation: str = "marketSubscription",
**listener_kwargs,
Expand All @@ -43,6 +44,7 @@ def __init__(
self._stream = None
self._output_queue = queue.Queue() if output_queue else None
self.event_processing = event_processing
self.event_group = event_group
self.event_id = event_id
self.operation = operation
self.listener_kwargs = listener_kwargs
Expand Down
15 changes: 15 additions & 0 deletions flumine/streams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def __call__(self, strategy: BaseStrategy) -> None:
market_types = strategy.market_filter.get("market_types")
country_codes = strategy.market_filter.get("country_codes")
event_processing = strategy.market_filter.get("event_processing", False)
event_groups = strategy.market_filter.get("event_groups", {})
events = strategy.market_filter.get("events")
listener_kwargs = strategy.market_filter.get("listener_kwargs", {})
if markets and events:
Expand Down Expand Up @@ -66,6 +67,7 @@ def __call__(self, strategy: BaseStrategy) -> None:
market,
market_definition,
event_processing,
event_groups,
**listener_kwargs,
)
strategy.streams.append(stream)
Expand Down Expand Up @@ -164,12 +166,20 @@ def add_historical_stream(
market: str,
market_definition: Optional[MarketDefinition],
event_processing: bool,
event_groups: dict,
**listener_kwargs
) -> HistoricalStream:
for stream in self:
# Get the expected event group of a stream considering its event id and group mapping
event_group = (
event_groups.get(stream.event_id, stream.event_id)
if event_processing
else None
)
if (
stream.market_filter == market
and stream.event_processing == event_processing
and stream.event_group == event_group
and stream.listener_kwargs == listener_kwargs
):
return stream
Expand All @@ -178,6 +188,9 @@ def add_historical_stream(
event_id = getattr(market_definition, "event_id", None)
if event_processing and event_id is None:
logger.warning("EventId not found for market %s" % market)
event_group = (
event_groups.get(event_id, event_id) if event_processing else None
) # Event ID by default, None if event_processing is False
logger.info(
"Creating new %s (%s) for strategy %s",
HistoricalStream.__name__,
Expand All @@ -187,6 +200,7 @@ def add_historical_stream(
"strategy": strategy,
"stream_id": stream_id,
"market_filter": market,
"event_group": event_group,
"event_id": event_id,
"event_processing": event_processing,
},
Expand All @@ -200,6 +214,7 @@ def add_historical_stream(
conflate_ms=strategy.conflate_ms,
output_queue=False,
event_processing=event_processing,
event_group=event_group,
event_id=event_id,
**listener_kwargs,
)
Expand Down
4 changes: 2 additions & 2 deletions tests/test_fluminesimulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ def test_run_event(
mock_events,
mock__process_end_flumine,
):
mock_stream_one = mock.Mock(event_processing=True, event_id=123)
mock_stream_one = mock.Mock(event_processing=True, event_group="123")
mock_market_book_one = mock.Mock(publish_time_epoch=321)
mock_gen_one = mock.Mock(return_value=iter([[mock_market_book_one]]))
mock_stream_one.create_generator.return_value = mock_gen_one

mock_stream_two = mock.Mock(event_processing=True, event_id=123)
mock_stream_two = mock.Mock(event_processing=True, event_group="123")
mock_market_book_two = mock.Mock(publish_time_epoch=123)
mock_gen_two = mock.Mock(return_value=iter([[mock_market_book_two]]))
mock_stream_two.create_generator.return_value = mock_gen_two
Expand Down
167 changes: 120 additions & 47 deletions tests/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def test_call_simulated_markets(self, mock_add_historical_stream, mock_get_file_
"dubs of the mad skint and british",
mock_md,
False,
{},
canary_yellow=True,
)
self.assertEqual(len(mock_strategy.streams), 1)
Expand Down Expand Up @@ -295,48 +296,89 @@ def test_add_stream_sports_data_old(self, mock_increment):
@mock.patch("flumine.streams.streams.HistoricalStream")
@mock.patch("flumine.streams.streams.Streams._increment_stream_id")
def test_add_historical_stream(self, mock_increment, mock_historical_stream_class):
mock_historical_stream_class.__name__ = "test"
self.mock_flumine.SIMULATED = True
mock_strategy = mock.Mock()
mock_strategy.market_filter = 1
mock_strategy.market_data_filter = 2
mock_strategy.streaming_timeout = 3
mock_strategy.conflate_ms = 4
mock_strategy.stream_class = streams.MarketStream
mock_event_id = "12345"
mock_md = mock.Mock(event_id=mock_event_id)

self.streams.add_historical_stream(
mock_strategy, "GANG", mock_md, event_processing=False, inplay=True
)
self.assertEqual(len(self.streams), 1)
mock_increment.assert_called_with()
mock_historical_stream_class.assert_called_with(
flumine=self.mock_flumine,
stream_id=mock_increment(),
market_filter="GANG",
market_data_filter=mock_strategy.market_data_filter,
streaming_timeout=mock_strategy.streaming_timeout,
conflate_ms=mock_strategy.conflate_ms,
output_queue=False,
event_processing=False,
event_id=mock_event_id,
inplay=True,
)

def test_add_historical_stream_old(self):
self.mock_flumine.SIMULATED = True
mock_strategy = mock.Mock()
mock_stream = mock.Mock(event_processing=False, listener_kwargs={})
mock_stream.market_filter = "GANG"
self.streams._streams = [mock_stream]
mock_md = mock.Mock()
test_args = [
# event_processing, event_id, event_group, event_groups
# Basic case, no event processing
(False, "123", None, {}),
# Event groups are ignored since event_processing is False
(False, "123", None, {"123": "A"}),
# Events groups default to event id if not provided
(True, "123", "123", {}),
# Event groups are provided
(True, "123", "A", {"123": "A"}),
]
for stream_count, subtest_args in enumerate(test_args, start=1):
with self.subTest():
event_processing, event_id, event_group, event_groups = subtest_args
mock_historical_stream_class.__name__ = "test"
self.mock_flumine.SIMULATED = True
mock_strategy = mock.Mock()
mock_strategy.market_filter = 1
mock_strategy.market_data_filter = 2
mock_strategy.streaming_timeout = 3
mock_strategy.conflate_ms = 4
mock_strategy.stream_class = streams.MarketStream

self.streams.add_historical_stream(
mock_strategy,
"GANG",
mock.Mock(event_id=event_id),
event_processing=event_processing,
event_groups=event_groups,
inplay=True,
)
self.assertEqual(len(self.streams), stream_count)
mock_increment.assert_called_with()
mock_historical_stream_class.assert_called_with(
flumine=self.mock_flumine,
stream_id=mock_increment(),
market_filter="GANG",
market_data_filter=mock_strategy.market_data_filter,
streaming_timeout=mock_strategy.streaming_timeout,
conflate_ms=mock_strategy.conflate_ms,
output_queue=False,
event_processing=event_processing,
event_group=event_group,
event_id=event_id,
inplay=True,
)

stream = self.streams.add_historical_stream(
mock_strategy, "GANG", mock_md, event_processing=False, **{}
)
self.assertEqual(stream, mock_stream)
self.assertEqual(len(self.streams), 1)
def test_add_historical_stream_which_already_exists(self):
test_args = [
# event_processing, event_id, event_group, event_groups
# Basic case, no event processing
(False, "123", None, {}),
# Event groups are ignored since event_processing is False
(False, "123", None, {"123": "A"}),
# Events groups default to event id if not provided
(True, "123", "123", {}),
# Event groups are provided
(True, "123", "A", {"123": "A"}),
]
for subtest_args in test_args:
with self.subTest():
event_processing, event_id, event_group, event_groups = subtest_args
self.mock_flumine.SIMULATED = True
mock_strategy = mock.Mock()
mock_stream = mock.Mock(
event_processing=event_processing,
event_id=event_id,
event_group=event_group,
listener_kwargs={},
)
mock_stream.market_filter = "GANG"
self.streams._streams = [mock_stream]

stream = self.streams.add_historical_stream(
mock_strategy,
"GANG",
mock.Mock(event_id=event_id),
event_processing=event_processing,
event_groups=event_groups,
**{},
)
self.assertEqual(stream, mock_stream)
self.assertEqual(len(self.streams), 1)

@mock.patch("flumine.streams.streams.HistoricalStream")
@mock.patch("flumine.streams.streams.Streams._increment_stream_id")
Expand All @@ -346,14 +388,20 @@ def test_add_historical_stream_kwargs(
mock_historical_stream_class.__name__ = "test"
self.mock_flumine.SIMULATED = True
mock_strategy = mock.Mock()
mock_stream = mock.Mock(event_processing=False, listener_kwargs={})
mock_stream = mock.Mock(
event_processing=False, event_group=None, listener_kwargs={}
)
mock_stream.market_filter = "GANG"
self.streams._streams = [mock_stream]
mock_event_id = "12345"
mock_md = mock.Mock(event_id=mock_event_id)

self.streams.add_historical_stream(
mock_strategy, "GANG", mock_md, event_processing=False, **{"inplay": True}
mock_strategy,
"GANG",
mock.Mock(event_id=mock_event_id),
event_processing=False,
event_groups={},
**{"inplay": True},
)
self.assertEqual(len(self.streams), 2)
mock_increment.assert_called_with()
Expand All @@ -366,6 +414,7 @@ def test_add_historical_stream_kwargs(
conflate_ms=mock_strategy.conflate_ms,
output_queue=False,
event_processing=False,
event_group=None,
event_id=mock_event_id,
inplay=True,
)
Expand All @@ -375,13 +424,37 @@ def test_add_historical_stream_event_processing(self, mock_historical_stream_cla
mock_historical_stream_class.__name__ = "test"
self.mock_flumine.SIMULATED = True
mock_strategy = mock.Mock()
mock_stream = mock.Mock(event_processing=False)
mock_stream = mock.Mock(event_processing=False, event_group=None)
mock_stream.market_filter = "GANG"
self.streams._streams = [mock_stream]
mock_md = mock.Mock()
stream = self.streams.add_historical_stream(
mock_strategy,
"GANG",
mock.Mock(),
event_processing=True,
event_groups={},
)
self.assertEqual(stream, mock_historical_stream_class())
self.assertEqual(len(self.streams), 2)

@mock.patch("flumine.streams.streams.get_file_md")
@mock.patch("flumine.streams.streams.HistoricalStream")
def test_add_historical_stream_event_processing_with_grouping(
self, mock_historical_stream_class, mock_get_file_md
):
event_id = "123"
mock_historical_stream_class.__name__ = "test"
self.mock_flumine.SIMULATED = True
mock_strategy = mock.Mock()
mock_stream = mock.Mock(event_processing=True, event_group=event_id)
mock_stream.market_filter = "GANG"
self.streams._streams = [mock_stream]
stream = self.streams.add_historical_stream(
mock_strategy, "GANG", mock_md, event_processing=True
mock_strategy,
"GANG",
mock.Mock(event_id=event_id),
event_processing=True,
event_groups={event_id: "A"},
)
self.assertEqual(stream, mock_historical_stream_class())
self.assertEqual(len(self.streams), 2)
Expand Down
Loading