From fd6152540e92418a433311b2e7790f7d0e21f170 Mon Sep 17 00:00:00 2001 From: Sam Gross Date: Thu, 7 Dec 2023 15:11:50 -0500 Subject: [PATCH] gh-111964: Add `_PyRWMutex` a "readers-writer" lock This adds `_PyRWMutex`, a "readers-writer" lock, which wil be used to serialize global stop-the-world pauses with per-interpreter pauses. --- Include/internal/pycore_lock.h | 24 +++++++ Modules/_testinternalcapi/test_lock.c | 93 +++++++++++++++++++++++++++ Python/lock.c | 86 +++++++++++++++++++++++++ 3 files changed, 203 insertions(+) diff --git a/Include/internal/pycore_lock.h b/Include/internal/pycore_lock.h index 03ad1c9fd584b5..7cc7a8cc95ddb2 100644 --- a/Include/internal/pycore_lock.h +++ b/Include/internal/pycore_lock.h @@ -213,6 +213,30 @@ _PyOnceFlag_CallOnce(_PyOnceFlag *flag, _Py_once_fn_t *fn, void *arg) return _PyOnceFlag_CallOnceSlow(flag, fn, arg); } +// A readers-writer (RW) lock. The lock supports multiple concurrent readers or +// a single writer. The lock is write-preferring: if a writer is waiting, then +// new readers will be blocked. This avoids starvation of writers. +// +// The low two bits store whether the lock is write-locked (_Py_LOCKED) and +// whether there are parked threads (_Py_HAS_PARKED). The remaining bits are +// used to store the number of readers. +// +// The design is optimized for simplicity of the implementation. The lock is +// not fair: if fairness is desired, use an additional PyMutex to serialize +// writers. The lock is also not reentrant. +typedef struct { + uintptr_t bits; +} _PyRWMutex; + +// Read lock +PyAPI_FUNC(void) _PyRWMutex_RLock(_PyRWMutex *rwmutex); +PyAPI_FUNC(void) _PyRWMutex_RUnlock(_PyRWMutex *rwmutex); + +// Write lock +PyAPI_FUNC(void) _PyRWMutex_Lock(_PyRWMutex *rwmutex); +PyAPI_FUNC(void) _PyRWMutex_Unlock(_PyRWMutex *rwmutex); + + #ifdef __cplusplus } #endif diff --git a/Modules/_testinternalcapi/test_lock.c b/Modules/_testinternalcapi/test_lock.c index 418f71c1441995..1dd3c736ad3f28 100644 --- a/Modules/_testinternalcapi/test_lock.c +++ b/Modules/_testinternalcapi/test_lock.c @@ -372,6 +372,98 @@ test_lock_once(PyObject *self, PyObject *obj) Py_RETURN_NONE; } +struct test_rwlock_data { + Py_ssize_t nthreads; + _PyRWMutex rw; + PyEvent step1; + PyEvent step2; + PyEvent step3; + PyEvent done; +}; + +static void +rdlock_thread(void *arg) +{ + struct test_rwlock_data *test_data = arg; + + // Acquire the lock in read mode + _PyRWMutex_RLock(&test_data->rw); + PyEvent_Wait(&test_data->step1); + _PyRWMutex_RUnlock(&test_data->rw); + + _PyRWMutex_RLock(&test_data->rw); + PyEvent_Wait(&test_data->step3); + _PyRWMutex_RUnlock(&test_data->rw); + + if (_Py_atomic_add_ssize(&test_data->nthreads, -1) == 1) { + _PyEvent_Notify(&test_data->done); + } +} +static void +wrlock_thread(void *arg) +{ + struct test_rwlock_data *test_data = arg; + + // First acquire the lock in write mode + _PyRWMutex_Lock(&test_data->rw); + PyEvent_Wait(&test_data->step2); + _PyRWMutex_Unlock(&test_data->rw); + + if (_Py_atomic_add_ssize(&test_data->nthreads, -1) == 1) { + _PyEvent_Notify(&test_data->done); + } +} + +static void +wait_until(uintptr_t *ptr, uintptr_t value) +{ + // wait up to two seconds for *ptr == value + int iters = 0; + uintptr_t bits; + do { + pysleep(10); + bits = _Py_atomic_load_uintptr(ptr); + iters++; + } while (bits != value && iters < 200); +} + +static PyObject * +test_lock_rwlock(PyObject *self, PyObject *obj) +{ + struct test_rwlock_data test_data = {.nthreads = 3}; + + // Start two readers + PyThread_start_new_thread(rdlock_thread, &test_data); + PyThread_start_new_thread(rdlock_thread, &test_data); + + // wait up to two seconds for the threads to attempt to read-lock "rw" + wait_until(&test_data.rw.bits, 8); + assert(test_data.rw.bits == 8); + + // start writer (while readers hold lock) + PyThread_start_new_thread(wrlock_thread, &test_data); + wait_until(&test_data.rw.bits, 10); + assert(test_data.rw.bits == 10); + + // readers release lock, writer should acquire it + _PyEvent_Notify(&test_data.step1); + wait_until(&test_data.rw.bits, 3); + assert(test_data.rw.bits == 3); + + // writer releases lock, readers acquire it + _PyEvent_Notify(&test_data.step2); + wait_until(&test_data.rw.bits, 8); + assert(test_data.rw.bits == 8); + + // readers release lock again + _PyEvent_Notify(&test_data.step3); + wait_until(&test_data.rw.bits, 0); + assert(test_data.rw.bits == 0); + + PyEvent_Wait(&test_data.done); + Py_RETURN_NONE; +} + static PyMethodDef test_methods[] = { {"test_lock_basic", test_lock_basic, METH_NOARGS}, {"test_lock_two_threads", test_lock_two_threads, METH_NOARGS}, @@ -380,6 +472,7 @@ static PyMethodDef test_methods[] = { _TESTINTERNALCAPI_BENCHMARK_LOCKS_METHODDEF {"test_lock_benchmark", test_lock_benchmark, METH_NOARGS}, {"test_lock_once", test_lock_once, METH_NOARGS}, + {"test_lock_rwlock", test_lock_rwlock, METH_NOARGS}, {NULL, NULL} /* sentinel */ }; diff --git a/Python/lock.c b/Python/lock.c index e9279f0b92a5e7..1883aff7b5e03c 100644 --- a/Python/lock.c +++ b/Python/lock.c @@ -353,3 +353,89 @@ _PyOnceFlag_CallOnceSlow(_PyOnceFlag *flag, _Py_once_fn_t *fn, void *arg) v = _Py_atomic_load_uint8(&flag->v); } } + +#define _PyRWMutex_READER_SHIFT 2 + +void +_PyRWMutex_RLock(_PyRWMutex *rwmutex) +{ + uintptr_t bits = _Py_atomic_load_uintptr_relaxed(&rwmutex->bits); + for (;;) { + // If the lock is not write-locked and there is no writer waiting, then + // we can increment the reader count. + if ((bits & (_Py_LOCKED|_Py_HAS_PARKED)) == 0) { + uintptr_t newval = bits + (1 << _PyRWMutex_READER_SHIFT); + if (!_Py_atomic_compare_exchange_uintptr(&rwmutex->bits, + &bits, newval)) { + continue; + } + return; + } + + // Set _Py_HAS_PARKED if it's not already set. + if ((bits & _Py_HAS_PARKED) == 0) { + uintptr_t newval = bits | _Py_HAS_PARKED; + if (!_Py_atomic_compare_exchange_uintptr(&rwmutex->bits, + &bits, newval)) { + continue; + } + bits = newval; + } + + _PyParkingLot_Park(&rwmutex->bits, &bits, sizeof(bits), -1, NULL, 1); + bits = _Py_atomic_load_uintptr_relaxed(&rwmutex->bits); + } +} + +void +_PyRWMutex_RUnlock(_PyRWMutex *rwmutex) +{ + uintptr_t bits = _Py_atomic_add_uintptr(&rwmutex->bits, -(1 << _PyRWMutex_READER_SHIFT)); + bits -= (1 << _PyRWMutex_READER_SHIFT); + + if ((bits >> _PyRWMutex_READER_SHIFT) == 0 && (bits & _Py_HAS_PARKED)) { + _PyParkingLot_UnparkAll(&rwmutex->bits); + return; + } +} + +void +_PyRWMutex_Lock(_PyRWMutex *rwmutex) +{ + uintptr_t bits = _Py_atomic_load_uintptr_relaxed(&rwmutex->bits); + for (;;) { + // If there are no active readers and it's not already write-locked, + // then we can grab the lock. + if ((bits & ~_Py_HAS_PARKED) == 0) { + if (!_Py_atomic_compare_exchange_uintptr(&rwmutex->bits, + &bits, + bits | _Py_LOCKED)) { + return; + } + continue; + } + + if (!(bits & _Py_HAS_PARKED)) { + if (!_Py_atomic_compare_exchange_uintptr(&rwmutex->bits, + &bits, + bits | _Py_HAS_PARKED)) { + continue; + } + bits |= _Py_HAS_PARKED; + } + + _PyParkingLot_Park(&rwmutex->bits, &bits, sizeof(bits), -1, NULL, 1); + bits = _Py_atomic_load_uintptr_relaxed(&rwmutex->bits); + } +} + +void +_PyRWMutex_Unlock(_PyRWMutex *rwmutex) +{ + uintptr_t old_bits = _Py_atomic_exchange_uintptr(&rwmutex->bits, 0); + assert(old_bits >> _PyRWMutex_READER_SHIFT == 0); + + if ((old_bits & _Py_HAS_PARKED) != 0) { + _PyParkingLot_UnparkAll(&rwmutex->bits); + } +}