Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improvement: capture and retry database concurrent update errors #5227

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion argilla-server/src/argilla_server/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
DEFAULT_PASSWORD = "1234"
DEFAULT_API_KEY = "argilla.apikey"

DEFAULT_DATABASE_SQLITE_TIMEOUT = 15
DEFAULT_DATABASE_SQLITE_TIMEOUT = 5

DEFAULT_DATABASE_POSTGRESQL_POOL_SIZE = 15
DEFAULT_DATABASE_POSTGRESQL_MAX_OVERFLOW = 10
Expand Down
27 changes: 19 additions & 8 deletions argilla-server/src/argilla_server/contexts/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import backoff
import copy
from datetime import datetime
from typing import (
Expand Down Expand Up @@ -94,6 +95,8 @@

CREATE_DATASET_VECTOR_SETTINGS_MAX_COUNT = 5

MAX_TIME_RETRY_SQLALCHEMY_ERROR = 15


async def _touch_dataset_last_activity_at(db: AsyncSession, dataset: Dataset) -> None:
await db.execute(
Expand Down Expand Up @@ -805,6 +808,7 @@ async def delete_record(db: AsyncSession, search_engine: "SearchEngine", record:
return record


@backoff.on_exception(backoff.expo, sqlalchemy.exc.SQLAlchemyError, max_time=MAX_TIME_RETRY_SQLALCHEMY_ERROR)
async def create_response(
db: AsyncSession, search_engine: SearchEngine, record: Record, user: User, response_create: ResponseCreate
) -> Response:
Expand All @@ -828,16 +832,18 @@ async def create_response(
await db.flush([response])
await _load_users_from_responses([response])
await _touch_dataset_last_activity_at(db, record.dataset)
await search_engine.update_record_response(response)
await db.refresh(record, attribute_names=[Record.responses_submitted.key])
await distribution.update_record_status(db, record)
await search_engine.partial_record_update(record, status=record.status)

await db.commit()

await search_engine.update_record_response(response)
await search_engine.partial_record_update(record, status=record.status)

return response


@backoff.on_exception(backoff.expo, sqlalchemy.exc.SQLAlchemyError, max_time=MAX_TIME_RETRY_SQLALCHEMY_ERROR)
async def update_response(
db: AsyncSession, search_engine: SearchEngine, response: Response, response_update: ResponseUpdate
):
Expand All @@ -854,16 +860,18 @@ async def update_response(

await _load_users_from_responses(response)
await _touch_dataset_last_activity_at(db, response.record.dataset)
await search_engine.update_record_response(response)
await db.refresh(response.record, attribute_names=[Record.responses_submitted.key])
await distribution.update_record_status(db, response.record)
await search_engine.partial_record_update(response.record, status=response.record.status)

await db.commit()

await search_engine.update_record_response(response)
await search_engine.partial_record_update(response.record, status=response.record.status)

return response


@backoff.on_exception(backoff.expo, sqlalchemy.exc.SQLAlchemyError, max_time=MAX_TIME_RETRY_SQLALCHEMY_ERROR)
async def upsert_response(
db: AsyncSession, search_engine: SearchEngine, record: Record, user: User, response_upsert: ResponseUpsert
) -> Response:
Expand All @@ -886,29 +894,32 @@ async def upsert_response(

await _load_users_from_responses(response)
await _touch_dataset_last_activity_at(db, response.record.dataset)
await search_engine.update_record_response(response)
await db.refresh(record, attribute_names=[Record.responses_submitted.key])
await distribution.update_record_status(db, record)
await search_engine.partial_record_update(record, status=record.status)

await db.commit()

await search_engine.update_record_response(response)
await search_engine.partial_record_update(record, status=record.status)

return response


@backoff.on_exception(backoff.expo, sqlalchemy.exc.SQLAlchemyError, max_time=MAX_TIME_RETRY_SQLALCHEMY_ERROR)
async def delete_response(db: AsyncSession, search_engine: SearchEngine, response: Response) -> Response:
async with db.begin_nested():
response = await response.delete(db, autocommit=False)

await _load_users_from_responses(response)
await _touch_dataset_last_activity_at(db, response.record.dataset)
await search_engine.delete_record_response(response)
await db.refresh(response.record, attribute_names=[Record.responses_submitted.key])
await distribution.update_record_status(db, response.record)
await search_engine.partial_record_update(record=response.record, status=response.record.status)

await db.commit()

await search_engine.delete_record_response(response)
await search_engine.partial_record_update(record=response.record, status=response.record.status)

return response


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ async def execute(self, responses: List[ResponseUpsert], user: User) -> List[Res
raise errors.NotFoundError(f"Record with id `{item.record_id}` not found")

await authorize(user, RecordPolicy.create_response(record))

response = await datasets.upsert_response(self.db, self.search_engine, record, user, item)
except Exception as err:
responses_bulk_items.append(ResponseBulk(item=None, error=ResponseBulkError(detail=str(err))))
Expand Down
2 changes: 1 addition & 1 deletion argilla-server/tests/unit/commons/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_settings_database_url(url: str, expected_url: str, monkeypatch):


def test_settings_default_database_sqlite_timeout():
assert Settings().database_sqlite_timeout == 15
assert Settings().database_sqlite_timeout == 5


def test_settings_database_sqlite_timeout(monkeypatch):
Expand Down
Loading