Skip to content

Commit

Permalink
Merge pull request #179 from diogomatoschaves/fix-pipeline-restart
Browse files Browse the repository at this point in the history
Ensure proper restart of execution app
  • Loading branch information
diogomatoschaves authored Mar 7, 2024
2 parents 0db3247 + 79f96da commit 07f66f6
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 18 deletions.
56 changes: 39 additions & 17 deletions execution/service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
from execution.service.helpers.decorators import binance_error_handler, handle_app_errors, handle_order_execution_errors
from execution.service.blueprints.market_data import market_data
from execution.service.helpers import validate_signal, extract_and_validate, get_header
from execution.service.helpers.exceptions import PipelineNotActive
from execution.service.helpers.exceptions import PipelineNotActive, InsufficientBalance
from execution.service.helpers.responses import Responses
from execution.exchanges.binance.futures import BinanceFuturesTrader
from shared.utils.config_parser import get_config
from shared.utils.decorators import handle_db_connection_error, retry_failed_connection
from shared.utils.decorators import handle_db_connection_error
from shared.utils.exceptions import EquityRequired
from shared.utils.logger import configure_logger

Expand Down Expand Up @@ -43,6 +43,39 @@ def get_binance_trader_instance(paper_trading):
return binance_futures_trader


def startup_task():

start_background_scheduler(config_vars)

active_pipelines = [position.pipeline for position in Position.objects.filter(pipeline__active=True)]

for pipeline in active_pipelines:

header = get_header(pipeline.id)

try:
start_pipeline_trade(pipeline, header)
except InsufficientBalance:
logging.info(f"Insufficient balance to start pipeline {pipeline.id}.")
pipeline.active = False
pipeline.save()


def start_pipeline_trade(pipeline, header):
try:
initial_position = Position.objects.get(pipeline__id=pipeline.id).position
except Position.DoesNotExist:
initial_position = 0

bt = get_binance_trader_instance(pipeline.paper_trading)

bt.start_symbol_trading(
pipeline.id,
initial_position=initial_position,
header=header,
)


def create_app():

global binance_futures_mock_trader, binance_futures_trader
Expand All @@ -54,11 +87,12 @@ def create_app():
app.register_blueprint(market_data)

app.config["JWT_SECRET_KEY"] = os.getenv('SECRET_KEY')
jwt = JWTManager(app)

JWTManager(app)

CORS(app)

start_background_scheduler(config_vars)
startup_task()

@app.route('/')
@jwt_required()
Expand All @@ -80,19 +114,7 @@ def start_symbol_trading():
raise EquityRequired

if pipeline.exchange.name == 'binance':

try:
initial_position = Position.objects.get(pipeline__id=pipeline.id).position
except Position.DoesNotExist:
initial_position = 0

bt = get_binance_trader_instance(pipeline.paper_trading)

bt.start_symbol_trading(
pipeline.id,
initial_position=initial_position,
header=parameters.header,
)
start_pipeline_trade(pipeline, parameters.header)

return jsonify(Responses.TRADING_SYMBOL_START(pipeline.symbol.name))

Expand Down
31 changes: 30 additions & 1 deletion execution/tests/service/test_execution_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def test_mock_setup(


class TestExecutionService:
def test_index_route(self, client):
@pytest.mark.slow
def test_index_route(self, test_mock_setup, client):

res = client.get("/")

Expand Down Expand Up @@ -481,3 +482,31 @@ def test_failed_leverage_setting(
res = client.post(f"start_symbol_trading", json={"pipeline_id": 1})

assert res.json == Responses.LEVERAGE_SETTING_FAILURE("Failed to set leverage. ")

@pytest.mark.slow
def test_startup_task_with_open_positions(
self,
client_with_open_positions,
spy_start_pipeline_trade
):
assert spy_start_pipeline_trade.call_count == 3

@pytest.mark.slow
def test_startup_task_with_open_positions_insufficient_balance(
self,
app_with_open_positions_insufficient_balance,
spy_start_pipeline_trade
):
assert spy_start_pipeline_trade.call_count == 3

pipeline_11 = Pipeline.objects.get(id=11)

assert pipeline_11.active is False

def test_startup_task_no_open_positions(
self,
client,
spy_start_pipeline_trade
):
spy_start_pipeline_trade.assert_not_called()

36 changes: 36 additions & 0 deletions execution/tests/setup/fixtures/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,39 @@ def app(
def client(app):
with app.test_client() as client:
yield client


@pytest.fixture
def app_with_open_positions(
mock_client_env_vars,
futures_init,
mock_jwt_required,
mock_redis_connection,
exchange_data,
create_positions,
mock_start_pipeline_trade,
spy_start_pipeline_trade
):
app = create_app(testing=True)
return app


@pytest.fixture
def app_with_open_positions_insufficient_balance(
mock_client_env_vars,
futures_init,
mock_jwt_required,
mock_redis_connection,
exchange_data,
create_positions,
mock_start_pipeline_trade_raise_exception,
spy_start_pipeline_trade
):
app = create_app(testing=True)
return app


@pytest.fixture
def client_with_open_positions(app_with_open_positions):
with app_with_open_positions.test_client() as client:
yield client
27 changes: 27 additions & 0 deletions execution/tests/setup/fixtures/internal_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,30 @@ def mock_binance_futures_trader_raise_exception_start_stop(mocker):
@pytest.fixture
def mock_binance_futures_trader_raise_leverage_setting_fail(mocker):
return mocker.patch("execution.service.app.binance_futures_trader", MockBinanceTrader(raise_leverage_setting_failure=True))


@pytest.fixture
def mock_start_pipeline_trade(mocker):
return mocker.patch.object(
execution.service.app,
"start_pipeline_trade",
lambda pipeline, header, **kwargs: None
)


def raise_insufficient_balance(pipeline, header, **kwargs):
raise InsufficientBalance


@pytest.fixture
def mock_start_pipeline_trade_raise_exception(mocker):
return mocker.patch.object(
execution.service.app,
"start_pipeline_trade",
raise_insufficient_balance
)


@pytest.fixture
def spy_start_pipeline_trade(mocker):
return mocker.spy(execution.service.app, "start_pipeline_trade")

0 comments on commit 07f66f6

Please sign in to comment.