Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix could not serialize access due to concurrent DELETE from presence_stream #15826

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15826.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use lower isolation level when cleaning old presence stream data to avoid serialization errors.
7 changes: 6 additions & 1 deletion synapse/storage/databases/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, cast

from synapse.api.presence import PresenceState, UserPresenceState
Expand All @@ -24,6 +23,7 @@
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.engines._base import IsolationLevel
from synapse.storage.types import Connection
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
Expand Down Expand Up @@ -115,11 +115,16 @@ async def update_presence(
)

async with stream_ordering_manager as stream_orderings:
# Run the interaction with an isolation level of READ_COMMITTED to avoid
# serialization errors(and rollbacks) in the database. This way it will
# ignore new rows during the DELETE, but will pick them up the next time
# this is run. Currently, that is between 5-60 seconds.
await self.db_pool.runInteraction(
"update_presence",
self._update_presence_txn,
stream_orderings,
presence_states,
isolation_level=IsolationLevel.READ_COMMITTED,
)

return stream_orderings[-1], self._presence_id_gen.get_current_token()
Expand Down