Skip to content

Commit

Permalink
Merge pull request #178 from diogomatoschaves/fix-pipeline-restart
Browse files Browse the repository at this point in the history
Refactor restarting pipeline logic
  • Loading branch information
diogomatoschaves authored Mar 5, 2024
2 parents ba7b45b + c6e4698 commit 0db3247
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 11 deletions.
5 changes: 2 additions & 3 deletions data/service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,9 @@ def startup_task(app):
cache.set("bearer_token", bearer_token)

for pipeline in active_pipelines:
response = start_symbol_trading(pipeline, restart=True)

response = start_symbol_trading(pipeline)

if not response["success"] and response["code"] != "SYMBOL_ALREADY_TRADED":
if not response["success"]:
logging.info(f"Pipeline {pipeline.id} could not be started. {response['message']}")

while any(is_pipeline_loading(cache, pipeline.id) for pipeline in active_pipelines):
Expand Down
11 changes: 6 additions & 5 deletions data/service/blueprints/bots_api/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def stop_pipeline(pipeline_id, header='', raise_exception=False, nr_retries=3, f
time.sleep(60 * retries)


def start_symbol_trading(pipeline):
def start_symbol_trading(pipeline, restart=False):

add_pipeline_loading(cache, pipeline.id)

Expand All @@ -102,15 +102,16 @@ def start_symbol_trading(pipeline):

response = start_stop_symbol_trading(payload, 'start')

if response["success"]:
pipeline.last_entry = None
pipeline.save()
else:
if not response["success"] and not (response["code"] == "SYMBOL_ALREADY_TRADED" and restart):
pipeline.active = False
pipeline.open_time = None
pipeline.save()

return response
else:
response["success"] = True
pipeline.last_entry = None
pipeline.save()

executor.submit(
initialize_data_collection,
Expand Down
6 changes: 3 additions & 3 deletions data/tests/setup/fixtures/internal_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def fake_start_stop_trading(pipeline_id, start_or_stop):
if start_or_stop == 'stop':
Pipeline.objects.get(id=pipeline_id).update(active=False)

return {"success": True, "message": ''}
return {"success": True, "message": '', "code": ""}


@pytest.fixture
Expand All @@ -166,7 +166,7 @@ def mock_start_stop_symbol_trading_success_true_binance_handler(mocker):
mocker.patch.object(
data.sources.binance._binance,
'start_stop_symbol_trading',
lambda pipeline_id, start_or_stop: {"success": True, "message": ''},
lambda pipeline_id, start_or_stop: {"success": True, "message": '', "code": ""},
)


Expand All @@ -180,7 +180,7 @@ def mock_start_stop_symbol_trading_success_false(mocker):
return mocker.patch.object(
data.service.blueprints.bots_api._helpers,
'start_stop_symbol_trading',
lambda payload, start_or_stop: {"success": False, "message": "Pipeline could not be started."},
lambda payload, start_or_stop: {"success": False, "message": "Pipeline could not be started.", "code": ""},
)


Expand Down

0 comments on commit 0db3247

Please sign in to comment.