diff --git a/argilla-server/src/argilla_server/constants.py b/argilla-server/src/argilla_server/constants.py index b65f18ca0a..eb419d989c 100644 --- a/argilla-server/src/argilla_server/constants.py +++ b/argilla-server/src/argilla_server/constants.py @@ -25,7 +25,7 @@ DEFAULT_PASSWORD = "1234" DEFAULT_API_KEY = "argilla.apikey" -DEFAULT_DATABASE_SQLITE_TIMEOUT = 15 +DEFAULT_DATABASE_SQLITE_TIMEOUT = 5 DEFAULT_DATABASE_POSTGRESQL_POOL_SIZE = 15 DEFAULT_DATABASE_POSTGRESQL_MAX_OVERFLOW = 10 diff --git a/argilla-server/src/argilla_server/contexts/datasets.py b/argilla-server/src/argilla_server/contexts/datasets.py index b95fcde5e1..7f5e49b6dd 100644 --- a/argilla-server/src/argilla_server/contexts/datasets.py +++ b/argilla-server/src/argilla_server/contexts/datasets.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio +import backoff import copy from datetime import datetime from typing import ( @@ -94,6 +95,8 @@ CREATE_DATASET_VECTOR_SETTINGS_MAX_COUNT = 5 +MAX_TIME_RETRY_SQLALCHEMY_ERROR = 15 + async def _touch_dataset_last_activity_at(db: AsyncSession, dataset: Dataset) -> None: await db.execute( @@ -805,6 +808,7 @@ async def delete_record(db: AsyncSession, search_engine: "SearchEngine", record: return record +@backoff.on_exception(backoff.expo, sqlalchemy.exc.SQLAlchemyError, max_time=MAX_TIME_RETRY_SQLALCHEMY_ERROR) async def create_response( db: AsyncSession, search_engine: SearchEngine, record: Record, user: User, response_create: ResponseCreate ) -> Response: @@ -828,16 +832,18 @@ async def create_response( await db.flush([response]) await _load_users_from_responses([response]) await _touch_dataset_last_activity_at(db, record.dataset) - await search_engine.update_record_response(response) await db.refresh(record, attribute_names=[Record.responses_submitted.key]) await distribution.update_record_status(db, record) - await search_engine.partial_record_update(record, status=record.status) await db.commit() + await search_engine.update_record_response(response) + await search_engine.partial_record_update(record, status=record.status) + return response +@backoff.on_exception(backoff.expo, sqlalchemy.exc.SQLAlchemyError, max_time=MAX_TIME_RETRY_SQLALCHEMY_ERROR) async def update_response( db: AsyncSession, search_engine: SearchEngine, response: Response, response_update: ResponseUpdate ): @@ -854,16 +860,18 @@ async def update_response( await _load_users_from_responses(response) await _touch_dataset_last_activity_at(db, response.record.dataset) - await search_engine.update_record_response(response) await db.refresh(response.record, attribute_names=[Record.responses_submitted.key]) await distribution.update_record_status(db, response.record) - await search_engine.partial_record_update(response.record, status=response.record.status) await db.commit() + await search_engine.update_record_response(response) + await search_engine.partial_record_update(response.record, status=response.record.status) + return response +@backoff.on_exception(backoff.expo, sqlalchemy.exc.SQLAlchemyError, max_time=MAX_TIME_RETRY_SQLALCHEMY_ERROR) async def upsert_response( db: AsyncSession, search_engine: SearchEngine, record: Record, user: User, response_upsert: ResponseUpsert ) -> Response: @@ -886,29 +894,32 @@ async def upsert_response( await _load_users_from_responses(response) await _touch_dataset_last_activity_at(db, response.record.dataset) - await search_engine.update_record_response(response) await db.refresh(record, attribute_names=[Record.responses_submitted.key]) await distribution.update_record_status(db, record) - await search_engine.partial_record_update(record, status=record.status) await db.commit() + await search_engine.update_record_response(response) + await search_engine.partial_record_update(record, status=record.status) + return response +@backoff.on_exception(backoff.expo, sqlalchemy.exc.SQLAlchemyError, max_time=MAX_TIME_RETRY_SQLALCHEMY_ERROR) async def delete_response(db: AsyncSession, search_engine: SearchEngine, response: Response) -> Response: async with db.begin_nested(): response = await response.delete(db, autocommit=False) await _load_users_from_responses(response) await _touch_dataset_last_activity_at(db, response.record.dataset) - await search_engine.delete_record_response(response) await db.refresh(response.record, attribute_names=[Record.responses_submitted.key]) await distribution.update_record_status(db, response.record) - await search_engine.partial_record_update(record=response.record, status=response.record.status) await db.commit() + await search_engine.delete_record_response(response) + await search_engine.partial_record_update(record=response.record, status=response.record.status) + return response diff --git a/argilla-server/src/argilla_server/use_cases/responses/upsert_responses_in_bulk.py b/argilla-server/src/argilla_server/use_cases/responses/upsert_responses_in_bulk.py index 520194e46a..cb801365e7 100644 --- a/argilla-server/src/argilla_server/use_cases/responses/upsert_responses_in_bulk.py +++ b/argilla-server/src/argilla_server/use_cases/responses/upsert_responses_in_bulk.py @@ -44,6 +44,7 @@ async def execute(self, responses: List[ResponseUpsert], user: User) -> List[Res raise errors.NotFoundError(f"Record with id `{item.record_id}` not found") await authorize(user, RecordPolicy.create_response(record)) + response = await datasets.upsert_response(self.db, self.search_engine, record, user, item) except Exception as err: responses_bulk_items.append(ResponseBulk(item=None, error=ResponseBulkError(detail=str(err)))) diff --git a/argilla-server/tests/unit/commons/test_settings.py b/argilla-server/tests/unit/commons/test_settings.py index 8215709b4a..1ef0b64849 100644 --- a/argilla-server/tests/unit/commons/test_settings.py +++ b/argilla-server/tests/unit/commons/test_settings.py @@ -75,7 +75,7 @@ def test_settings_database_url(url: str, expected_url: str, monkeypatch): def test_settings_default_database_sqlite_timeout(): - assert Settings().database_sqlite_timeout == 15 + assert Settings().database_sqlite_timeout == 5 def test_settings_database_sqlite_timeout(monkeypatch):