diff --git a/Include/Python.h b/Include/Python.h index 52a7aac6ba6..0e981239c10 100644 --- a/Include/Python.h +++ b/Include/Python.h @@ -46,6 +46,7 @@ #include "typeslots.h" #include "pyhash.h" #include "cpython/pydebug.h" +#include "cpython/lock.h" #include "bytearrayobject.h" #include "bytesobject.h" #include "unicodeobject.h" diff --git a/Include/cpython/lock.h b/Include/cpython/lock.h new file mode 100644 index 00000000000..879dbc9f281 --- /dev/null +++ b/Include/cpython/lock.h @@ -0,0 +1,137 @@ +#ifndef Py_LIMITED_API +#ifndef Py_LOCK_H +#define Py_LOCK_H + +#include "pyatomic.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + uintptr_t v; +} _PyOnceFlag; + +typedef struct { + uint8_t v; +} _PyMutex; + +typedef _PyMutex PyMutex; + +typedef struct { + uintptr_t v; + size_t recursions; +} _PyRecursiveMutex; + +typedef enum { + UNLOCKED = 0, + LOCKED = 1, + HAS_PARKED = 2, + ONCE_INITIALIZED = 4, + THREAD_ID_MASK = ~(LOCKED | HAS_PARKED) +} _PyMutex_State; + +PyAPI_FUNC(void) _PyMutex_lock_slow(_PyMutex *m); +PyAPI_FUNC(void) _PyMutex_unlock_slow(_PyMutex *m); +PyAPI_FUNC(int) _PyMutex_TryLockSlow(_PyMutex *m); + +PyAPI_FUNC(void) _PyRecursiveMutex_lock_slow(_PyRecursiveMutex *m); +PyAPI_FUNC(void) _PyRecursiveMutex_unlock_slow(_PyRecursiveMutex *m); + +PyAPI_FUNC(int) _PyBeginOnce_slow(_PyOnceFlag *o); +PyAPI_FUNC(void) _PyEndOnce(_PyOnceFlag *o); +PyAPI_FUNC(void) _PyEndOnceFailed(_PyOnceFlag *o); + +static inline int +_PyMutex_is_locked(_PyMutex *m) +{ + return _Py_atomic_load_uint8(&m->v) & LOCKED; +} + +static inline int +_PyMutex_lock_fast(_PyMutex *m) +{ + return _Py_atomic_compare_exchange_uint8(&m->v, UNLOCKED, LOCKED); +} + +static inline void +_PyMutex_lock(_PyMutex *m) +{ + if (_PyMutex_lock_fast(m)) { + return; + } + _PyMutex_lock_slow(m); +} + +static inline int +_PyMutex_TryLock(_PyMutex *m) +{ + if (_PyMutex_lock_fast(m)) { + return 1; + } + return _PyMutex_TryLockSlow(m); +} + +static inline int +_PyMutex_unlock_fast(_PyMutex *m) +{ + return _Py_atomic_compare_exchange_uint8(&m->v, LOCKED, UNLOCKED); +} + +static inline void +_PyMutex_unlock(_PyMutex *m) +{ + if (_PyMutex_unlock_fast(m)) { + return; + } + _PyMutex_unlock_slow(m); +} + +static inline void +_PyRecursiveMutex_lock(_PyRecursiveMutex *m) +{ + if (_Py_atomic_compare_exchange_uintptr(&m->v, UNLOCKED, _Py_ThreadId() | LOCKED)) { + return; + } + _PyRecursiveMutex_lock_slow(m); +} + +static inline int +_PyRecursiveMutex_owns_lock(_PyRecursiveMutex *m) +{ + uintptr_t v = _Py_atomic_load_uintptr(&m->v); + return (v & THREAD_ID_MASK) == _Py_ThreadId(); +} + +static inline void +_PyRecursiveMutex_unlock(_PyRecursiveMutex *m) +{ + uintptr_t v = _Py_atomic_load_uintptr_relaxed(&m->v); + if (m->recursions == 0 && (v & 3) == LOCKED) { + if (_Py_atomic_compare_exchange_uintptr(&m->v, v, UNLOCKED)) { + return; + } + } + _PyRecursiveMutex_unlock_slow(m); +} + +static inline int +_PyOnce_Initialized(_PyOnceFlag *o) +{ + return (_Py_atomic_load_uintptr(&o->v) & ONCE_INITIALIZED) != 0; +} + +static inline int +_PyBeginOnce(_PyOnceFlag *o) +{ + if ((_Py_atomic_load_uintptr(&o->v) & ONCE_INITIALIZED) != 0) { + return 0; + } + return _PyBeginOnce_slow(o); +} + +#ifdef __cplusplus +} +#endif +#endif /* !Py_LOCK_H */ +#endif /* Py_LIMITED_API */ diff --git a/Include/internal/pycore_critical_section.h b/Include/internal/pycore_critical_section.h new file mode 100644 index 00000000000..3710cf87668 --- /dev/null +++ b/Include/internal/pycore_critical_section.h @@ -0,0 +1,194 @@ +#ifndef Py_LIMITED_API +#ifndef Py_INTERNAL_CRITICAL_SECTION_H +#define Py_INTERNAL_CRITICAL_SECTION_H + +#include "pycore_pystate.h" +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#ifndef Py_BUILD_CORE +# error "this header requires Py_BUILD_CORE define" +#endif + +/* + * Implementation of Python critical sections. + * + * Python critical sections are helpers to replace the global interpreter lock + * with finer grained locking. A Python critical section is a region of code + * that can only be executed by a single thread at at time. The regions begin + * with a call to _Py_critical_section_begin and end with either an explicit + * call to _Py_critical_section_end or *implicitly* at any point that might + * have released the global interpreter lock. This is a substantial difference + * from the traditonal notion of a "critical section", where the end of the + * section is typically explicitly marked. + * + * The critical section can be resumed after a potential implicit ending by + * the _Py_critical_section_resume function. + * + * The purposed of the implicitly ending critical sections is to avoid + * potential deadlock due to holding locks on multiple objects. Any time a + * thread would have released the GIL, it releases all locks from critical + * sections. This includes block on a lock acquisition. + * + * The following are examples of calls that may implicitly end a critical + * section: + * + * Py_DECREF, PyObject_GC_New, PyObject_Call, PyObject_RichCompareBool, + * Py_BuildValue, _Py_critical_section_begin + * + * The following are examples of calls that do NOT implicitly end a critical + * section: + * + * Py_INCREF, PyMem_RawMalloc, PyMem_RawFree, memset and other C functions + * that do not call into the Python API. + */ + +enum { + // Tag to used with `prev` pointer to differentiate _Py_critical_section + // vs. _Py_critical_section2. + _Py_CRITICAL_SECTION_INACTIVE = 1, + + _Py_CRITICAL_SECTION_TWO_MUTEXES = 2, + + _Py_CRITICAL_SECTION_MASK = 3 +}; + +#define Py_BEGIN_CRITICAL_SECTION(m) { \ + struct _Py_critical_section _cs; \ + _Py_critical_section_begin(&_cs, m) + +#define Py_END_CRITICAL_SECTION \ + _Py_critical_section_end(&_cs); \ +} + +#define Py_BEGIN_CRITICAL_SECTION2(m1, m2) { \ + struct _Py_critical_section2 _cs2; \ + _Py_critical_section2_begin(&_cs2, m1, m2) + +#define Py_END_CRITICAL_SECTION2 \ + _Py_critical_section2_end(&_cs2); \ +} + +struct _Py_critical_section { + // Pointer to the an outer active critical section (or + // _Py_critical_section_sentinel). The two least-significant-bits indicate + // whether the pointed-to critical section is inactive and whether it is + // a _Py_critical_section2 object. + uintptr_t prev; + + // Mutex used to protect critical section + _PyMutex *mutex; +}; + +// A critical section protected by two mutexes. Use +// _Py_critical_section2_begin and _Py_critical_section2_end. +struct _Py_critical_section2 { + struct _Py_critical_section base; + + _PyMutex *mutex2; +}; + +static inline int +_Py_critical_section_is_active(uintptr_t tag) +{ + return tag != 0 && (tag & _Py_CRITICAL_SECTION_INACTIVE) == 0; +} + +PyAPI_FUNC(void) +_Py_critical_section_resume(PyThreadState *tstate); + +PyAPI_FUNC(void) +_Py_critical_section_begin_slow(struct _Py_critical_section *c, _PyMutex *m); + +PyAPI_FUNC(void) +_Py_critical_section2_begin_slow(struct _Py_critical_section2 *c, + _PyMutex *m1, _PyMutex *m2, int flag); + +static inline void +_Py_critical_section_begin(struct _Py_critical_section *c, _PyMutex *m) +{ + if (_PyMutex_lock_fast(m)) { + PyThreadState *tstate = PyThreadState_GET(); + c->mutex = m; + c->prev = tstate->critical_section; + tstate->critical_section = (uintptr_t)c; + } + else { + _Py_critical_section_begin_slow(c, m); + } +} + +static inline void +_Py_critical_section_pop(struct _Py_critical_section *c) +{ + PyThreadState *tstate = PyThreadState_GET(); + uintptr_t prev = c->prev; + tstate->critical_section = prev; + + if (_PY_UNLIKELY((prev & _Py_CRITICAL_SECTION_INACTIVE))) { + _Py_critical_section_resume(tstate); + } +} + +static inline void +_Py_critical_section_end(struct _Py_critical_section *c) +{ + _PyMutex_unlock(c->mutex); + _Py_critical_section_pop(c); +} + +static inline void +_Py_critical_section2_begin(struct _Py_critical_section2 *c, + _PyMutex *m1, _PyMutex *m2) +{ + if ((uintptr_t)m2 < (uintptr_t)m1) { + _PyMutex *m1_ = m1; + m1 = m2; + m2 = m1_; + } + else if (m1 == m2) { + c->mutex2 = NULL; + _Py_critical_section_begin(&c->base, m1); + return; + } + if (_PyMutex_lock_fast(m1)) { + if (_PyMutex_lock_fast(m2)) { + PyThreadState *tstate = PyThreadState_GET(); + c->base.mutex = m1; + c->mutex2 = m2; + c->base.prev = tstate->critical_section; + + uintptr_t p = (uintptr_t)c | _Py_CRITICAL_SECTION_TWO_MUTEXES; + tstate->critical_section = p; + } + else { + _Py_critical_section2_begin_slow(c, m1, m2, 1); + } + } + else { + _Py_critical_section2_begin_slow(c, m1, m2, 0); + } +} + +static inline void +_Py_critical_section2_end(struct _Py_critical_section2 *c) +{ + if (c->mutex2) { + _PyMutex_unlock(c->mutex2); + } + _PyMutex_unlock(c->base.mutex); + _Py_critical_section_pop(&c->base); +} + +PyAPI_FUNC(void) +_Py_critical_section_end_all(PyThreadState *tstate); + + +#ifdef __cplusplus +} +#endif +#endif /* !Py_INTERNAL_CRITICAL_SECTION_H */ +#endif /* !Py_LIMITED_API */ diff --git a/Include/internal/pycore_gc.h b/Include/internal/pycore_gc.h index b3abe2030a0..8ae9d8f9a55 100644 --- a/Include/internal/pycore_gc.h +++ b/Include/internal/pycore_gc.h @@ -8,6 +8,8 @@ extern "C" { # error "this header requires Py_BUILD_CORE define" #endif +#include "pycore_lock.h" + /* GC information is stored BEFORE the object structure. */ typedef struct { // Pointer to next object in the list. diff --git a/Include/internal/pycore_llist.h b/Include/internal/pycore_llist.h new file mode 100644 index 00000000000..a08edb60ca7 --- /dev/null +++ b/Include/internal/pycore_llist.h @@ -0,0 +1,76 @@ +#ifndef Py_INTERNAL_LLIST_H +#define Py_INTERNAL_LLIST_H + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#ifndef Py_BUILD_CORE +# error "Py_BUILD_CORE must be defined to include this header" +#endif + +struct llist_node { + struct llist_node *next; + struct llist_node *prev; +}; + +#define llist_data(node, type, member) \ + (type*)((char*)node - offsetof(type, member)) + +#define llist_for_each(node, head) \ + for (node = (head)->next; node != (head); node = node->next) + +#define LLIST_INIT(head) { &head, &head } + +static inline void +llist_init(struct llist_node *head) +{ + head->next = head; + head->prev = head; +} + +static inline int +llist_empty(struct llist_node *head) +{ + return head->next == head; +} + +static inline void +llist_insert_tail(struct llist_node *head, struct llist_node *node) +{ + node->prev = head->prev; + node->next = head; + head->prev->next = node; + head->prev = node; +} + +static inline void +llist_remove(struct llist_node *node) +{ + struct llist_node *prev = node->prev; + struct llist_node *next = node->next; + prev->next = next; + next->prev = prev; + node->prev = NULL; + node->next = NULL; +} + +static inline void +llist_concat(struct llist_node *head1, struct llist_node *head2) +{ + if (!llist_empty(head2)) { + head1->prev->next = head2->next; + head2->next->prev = head1->prev; + + head1->prev = head2->prev; + head2->prev->next = head1; + llist_init(head2); + } +} + +#ifdef __cplusplus +} +#endif +#endif /* !Py_INTERNAL_LLIST_H */ diff --git a/Include/internal/pycore_lock.h b/Include/internal/pycore_lock.h new file mode 100644 index 00000000000..dc16fa0a4d4 --- /dev/null +++ b/Include/internal/pycore_lock.h @@ -0,0 +1,130 @@ +#ifndef Py_INTERNAL_LOCK_H +#define Py_INTERNAL_LOCK_H +#ifdef __cplusplus +extern "C" { +#endif + +#ifndef Py_BUILD_CORE +# error "this header requires Py_BUILD_CORE define" +#endif + +typedef enum _PyLockFlags { + _Py_LOCK_DONT_DETACH = 0, + _PY_LOCK_DETACH = 1, + _PY_LOCK_MAKE_PENDING_CALLS = 2, +} _PyLockFlags; + +typedef struct { + uintptr_t v; +} _PyRawMutex; + +typedef struct { + uintptr_t v; +} _PyRawEvent; + +// A one-time event notification +typedef struct { + uintptr_t v; +} _PyEvent; + +// A one-time event notification with reference counting +typedef struct _PyEventRc { + _PyEvent event; + Py_ssize_t refcount; +} _PyEventRc; + +extern void _PyMutex_LockSlowEx(_PyMutex *m, int detach); + +extern void _PyRawMutex_lock_slow(_PyRawMutex *m); +extern void _PyRawMutex_unlock_slow(_PyRawMutex *m); + +extern void _PyRawEvent_Notify(_PyRawEvent *o); +extern void _PyRawEvent_Wait(_PyRawEvent *o); +extern int _PyRawEvent_TimedWait(_PyRawEvent *o, int64_t ns); +extern void _PyRawEvent_Reset(_PyRawEvent *o); + +extern void _PyEvent_Notify(_PyEvent *o); +extern void _PyEvent_Wait(_PyEvent *o); +extern int _PyEvent_TimedWait(_PyEvent *o, int64_t ns); + +extern PyLockStatus +_PyMutex_TimedLockEx(_PyMutex *m, _PyTime_t timeout_ns, _PyLockFlags flags); + +extern int _PyMutex_TryUnlock(_PyMutex *m); + +static inline void +_PyMutex_LockEx(_PyMutex *m, int detach) +{ + if (_PyMutex_lock_fast(m)) { + return; + } + _PyMutex_LockSlowEx(m, detach); +} + +static inline int +_PyRawMutex_is_locked(_PyRawMutex *m) +{ + return _Py_atomic_load_uintptr(&m->v) & 1; +} + +static inline void +_PyRawMutex_lock(_PyRawMutex *m) +{ + if (_Py_atomic_compare_exchange_uintptr(&m->v, UNLOCKED, LOCKED)) { + return; + } + _PyRawMutex_lock_slow(m); +} + +static inline int +_PyRawMutex_trylock(_PyRawMutex *m) +{ + if (_Py_atomic_compare_exchange_uintptr(&m->v, UNLOCKED, LOCKED)) { + return 1; + } + return 0; +} + +static inline void +_PyRawMutex_unlock(_PyRawMutex *m) +{ + if (_Py_atomic_compare_exchange_uintptr(&m->v, LOCKED, UNLOCKED)) { + return; + } + _PyRawMutex_unlock_slow(m); +} + +static inline int +_PyEvent_IsSet(_PyEvent *e) +{ + return _Py_atomic_load_uintptr(&e->v) == LOCKED; +} + +static inline _PyEventRc * +_PyEventRc_New(void) +{ + _PyEventRc *erc = (_PyEventRc *)PyMem_RawCalloc(1, sizeof(_PyEventRc)); + if (erc != NULL) { + erc->refcount = 1; + } + return erc; +} + +static inline void +_PyEventRc_Incref(_PyEventRc *erc) +{ + _Py_atomic_add_ssize(&erc->refcount, 1); +} + +static inline void +_PyEventRc_Decref(_PyEventRc *erc) +{ + if (_Py_atomic_add_ssize(&erc->refcount, -1) == 1) { + PyMem_RawFree(erc); + } +} + +#ifdef __cplusplus +} +#endif +#endif /* !Py_INTERNAL_LOCK_H */ diff --git a/Include/parking_lot.h b/Include/parking_lot.h new file mode 100644 index 00000000000..05b68a8cdac --- /dev/null +++ b/Include/parking_lot.h @@ -0,0 +1,74 @@ +#ifndef Py_PARKING_LOT_H +#define Py_PARKING_LOT_H + +#include "pyatomic.h" + +#ifdef __cplusplus +extern "C" { +#endif + +enum { + PY_PARK_AGAIN = -1, + PY_PARK_TIMEOUT = -2, + PY_PARK_INTR = -3, + PY_PARK_OK = 0, +}; + +typedef struct _PyWakeup _PyWakeup; + +typedef struct _PyUnpark { + void *data; + void *wait_entry; + int more_waiters; +} _PyUnpark; + +struct wait_entry; + +void +_PyParkingLot_InitThread(void); + +void +_PyParkingLot_DeinitThread(void); + +PyAPI_FUNC(_PyWakeup *) +_PyWakeup_Acquire(void); + +PyAPI_FUNC(void) +_PyWakeup_Release(_PyWakeup *wakeup); + +PyAPI_FUNC(void) +_PyWakeup_Wakeup(_PyWakeup *wakeup); + +PyAPI_FUNC(int) +_PyWakeup_Wait(_PyWakeup *wakeup, int64_t ns, int detach); + +PyAPI_FUNC(int) +_PyParkingLot_ParkInt(const int *key, int expected, int detach); + +PyAPI_FUNC(int) +_PyParkingLot_Park(const void *key, uintptr_t expected, + void *data, int64_t ns); + +PyAPI_FUNC(int) +_PyParkingLot_ParkUint8(const uint8_t *key, uint8_t expected, + void *data, int64_t ns, int detach); + +PyAPI_FUNC(void) +_PyParkingLot_UnparkAll(const void *key); + +PyAPI_FUNC(void *) +_PyParkingLot_BeginUnpark(const void *key, + struct wait_entry **out_waiter, + int *more_waiters); + +PyAPI_FUNC(void) +_PyParkingLot_FinishUnpark(const void *key, struct wait_entry *waiter); + +PyAPI_FUNC(void) +_PyParkingLot_AfterFork(void); + + +#ifdef __cplusplus +} +#endif +#endif /* !Py_PARKING_LOT_H */ diff --git a/Makefile.pre.in b/Makefile.pre.in index 07ca882f670..845a0ae4d9f 100644 --- a/Makefile.pre.in +++ b/Makefile.pre.in @@ -396,10 +396,12 @@ PYTHON_OBJS= \ Python/importdl.o \ Python/initconfig.o \ Python/intrinsics.o \ + Python/lock.o \ Python/marshal.o \ Python/modsupport.o \ Python/mysnprintf.o \ Python/mystrtoul.o \ + Python/parking_lot.o \ Python/pathconfig.o \ Python/preconfig.o \ Python/pyarena.o \ @@ -1537,6 +1539,7 @@ PYTHON_HEADERS= \ $(srcdir)/Include/opcode.h \ $(srcdir)/Include/osdefs.h \ $(srcdir)/Include/osmodule.h \ + $(srcdir)/Include/parking_lot.h \ $(srcdir)/Include/patchlevel.h \ $(srcdir)/Include/pybuffer.h \ $(srcdir)/Include/pycapsule.h \ @@ -1594,6 +1597,7 @@ PYTHON_HEADERS= \ $(srcdir)/Include/cpython/import.h \ $(srcdir)/Include/cpython/initconfig.h \ $(srcdir)/Include/cpython/listobject.h \ + $(srcdir)/Include/cpython/lock.h \ $(srcdir)/Include/cpython/longintrepr.h \ $(srcdir)/Include/cpython/longobject.h \ $(srcdir)/Include/cpython/memoryobject.h \ diff --git a/Modules/posixmodule.c b/Modules/posixmodule.c index 0d4c179368c..b1fe1ec7ed9 100644 --- a/Modules/posixmodule.c +++ b/Modules/posixmodule.c @@ -34,6 +34,7 @@ #include "pycore_pystate.h" // _PyInterpreterState_GET() #include "pycore_signal.h" // Py_NSIG +#include "parking_lot.h" #include "structmember.h" // PyMemberDef #ifndef MS_WINDOWS # include "posixmodule.h" @@ -587,6 +588,11 @@ PyOS_AfterFork_Child(void) PyStatus status; _PyRuntimeState *runtime = &_PyRuntime; + // Clears the parking lot. Any waiting threads are dead. This must be + // called before releasing any locks that use the parking lot so that + // unlocking the locks doesn't try to wake up dead threads. + _PyParkingLot_AfterFork(); + status = _PyGILState_Reinit(runtime); if (_PyStatus_EXCEPTION(status)) { goto fatal_error; diff --git a/PCbuild/_freeze_module.vcxproj b/PCbuild/_freeze_module.vcxproj index 4f39756019e..00f72864b66 100644 --- a/PCbuild/_freeze_module.vcxproj +++ b/PCbuild/_freeze_module.vcxproj @@ -207,10 +207,12 @@ + + diff --git a/PCbuild/_freeze_module.vcxproj.filters b/PCbuild/_freeze_module.vcxproj.filters index 7d7c4587b9a..995a9653a52 100644 --- a/PCbuild/_freeze_module.vcxproj.filters +++ b/PCbuild/_freeze_module.vcxproj.filters @@ -208,6 +208,9 @@ Source Files + + Source Files + Source Files @@ -268,6 +271,9 @@ Source Files + + Source Files + Source Files diff --git a/PCbuild/pythoncore.vcxproj b/PCbuild/pythoncore.vcxproj index f81b4afa560..a115a7da649 100644 --- a/PCbuild/pythoncore.vcxproj +++ b/PCbuild/pythoncore.vcxproj @@ -153,6 +153,7 @@ + @@ -234,6 +235,8 @@ + + @@ -284,6 +287,7 @@ + @@ -525,10 +529,12 @@ + + diff --git a/PCbuild/pythoncore.vcxproj.filters b/PCbuild/pythoncore.vcxproj.filters index 7e92178a3cc..680f6ae79a9 100644 --- a/PCbuild/pythoncore.vcxproj.filters +++ b/PCbuild/pythoncore.vcxproj.filters @@ -378,6 +378,9 @@ Include\cpython + + Include\cpython + Include @@ -606,6 +609,12 @@ Include\internal + + Include\internal + + + Include\internal + Include\internal @@ -744,6 +753,9 @@ Include + + Include + @@ -1163,6 +1175,9 @@ Source Files + + Python + Python @@ -1175,6 +1190,9 @@ Python + + Python + Python diff --git a/Python/lock.c b/Python/lock.c new file mode 100644 index 00000000000..e6cd4aa0441 --- /dev/null +++ b/Python/lock.c @@ -0,0 +1,517 @@ +#include "Python.h" +#include "pycore_lock.h" +#include "pycore_object.h" +#include "pycore_pystate.h" +#include "condvar.h" +#include "parking_lot.h" + +#include + +#define TIME_TO_BE_FAIR_NS (1000*1000) + +struct mutex_entry { + _PyTime_t time_to_be_fair; + int handoff; +}; + +void +_PyMutex_lock_slow(_PyMutex *m) { + _PyMutex_LockSlowEx(m, _PY_LOCK_DETACH); +} + +void +_PyMutex_LockSlowEx(_PyMutex *m, int detach) +{ + _PyTime_t now = _PyTime_GetMonotonicClock(); + + struct mutex_entry entry; + entry.time_to_be_fair = now + TIME_TO_BE_FAIR_NS; + + for (;;) { + uint8_t v = _Py_atomic_load_uint8(&m->v); + + if (!(v & LOCKED)) { + if (_Py_atomic_compare_exchange_uint8(&m->v, v, v|LOCKED)) { + return; + } + continue; + } + + uint8_t newv = v; + if (!(v & HAS_PARKED)) { + newv = v | HAS_PARKED; + if (!_Py_atomic_compare_exchange_uint8(&m->v, v, newv)) { + continue; + } + } + + int ret = _PyParkingLot_ParkUint8(&m->v, newv, &entry, -1, detach); + if (ret == PY_PARK_OK) { + if (entry.handoff) { + // we own the lock now + assert(_Py_atomic_load_uint8_relaxed(&m->v) & LOCKED); + return; + } + } + } +} + +PyLockStatus +_PyMutex_TimedLockEx(_PyMutex *m, _PyTime_t timeout, _PyLockFlags flags) +{ + uint8_t v = _Py_atomic_load_uint8_relaxed(&m->v); + if ((v & LOCKED) == UNLOCKED) { + if (_Py_atomic_compare_exchange_uint8(&m->v, v, v|LOCKED)) { + return PY_LOCK_ACQUIRED; + } + } + else if (timeout == 0) { + return PY_LOCK_FAILURE; + } + + _PyTime_t now = _PyTime_GetMonotonicClock(); + _PyTime_t endtime = 0; + if (timeout > 0) { + endtime = _PyTime_Add(now, timeout); + } + + struct mutex_entry entry; + entry.time_to_be_fair = now + TIME_TO_BE_FAIR_NS; + + for (;;) { + uint8_t v = _Py_atomic_load_uint8(&m->v); + + if (!(v & LOCKED)) { + if (_Py_atomic_compare_exchange_uint8(&m->v, v, v|LOCKED)) { + return PY_LOCK_ACQUIRED; + } + continue; + } + + if (timeout == 0) { + return PY_LOCK_FAILURE; + } + + uint8_t newv = v; + if (!(v & HAS_PARKED)) { + newv = v | HAS_PARKED; + if (!_Py_atomic_compare_exchange_uint8(&m->v, v, newv)) { + continue; + } + } + + int detach = (flags & _PY_LOCK_DETACH) == _PY_LOCK_DETACH; + int ret = _PyParkingLot_ParkUint8(&m->v, newv, &entry, timeout, detach); + if (ret == PY_PARK_OK) { + if (entry.handoff) { + // we own the lock now + assert(_Py_atomic_load_uint8_relaxed(&m->v) & LOCKED); + return PY_LOCK_ACQUIRED; + } + } + else if (ret == PY_PARK_INTR && (flags & _PY_LOCK_MAKE_PENDING_CALLS)) { + if (Py_MakePendingCalls() < 0) { + return PY_LOCK_INTR; + } + } + else if (ret == PY_PARK_TIMEOUT) { + assert(timeout >= 0); + return PY_LOCK_FAILURE; + } + + if (timeout > 0) { + timeout = _PyDeadline_Get(endtime); + if (timeout <= 0) { + /* Check for negative values, since those mean block forever. */ + timeout = 0; + } + } + } +} + +int +_PyMutex_TryLockSlow(_PyMutex *m) +{ + for (;;) { + uint8_t v = _Py_atomic_load_uint8(&m->v); + + if (!(v & LOCKED)) { + if (_Py_atomic_compare_exchange_uint8(&m->v, v, v|LOCKED)) { + return 1; + } + continue; + } + + return 0; + } +} + +int +_PyMutex_TryUnlock(_PyMutex *m) +{ + for (;;) { + uint8_t v = _Py_atomic_load_uint8(&m->v); + + if ((v & LOCKED) == UNLOCKED) { + return -1; + } + else if ((v & HAS_PARKED)) { + int more_waiters; + struct wait_entry *wait; + struct mutex_entry *entry; + + entry = _PyParkingLot_BeginUnpark(&m->v, &wait, &more_waiters); + v = 0; + if (entry) { + int should_be_fair = _PyTime_GetMonotonicClock() > entry->time_to_be_fair; + entry->handoff = should_be_fair; + if (should_be_fair) { + v |= LOCKED; + } + if (more_waiters) { + v |= HAS_PARKED; + } + } + _Py_atomic_store_uint8(&m->v, v); + + _PyParkingLot_FinishUnpark(&m->v, wait); + return 0; + } + else if (_Py_atomic_compare_exchange_uint8(&m->v, v, UNLOCKED)) { + return 0; + } + } +} + +void +_PyMutex_unlock_slow(_PyMutex *m) +{ + if (_PyMutex_TryUnlock(m) < 0) { + Py_FatalError("unlocking mutex that is not locked"); + } +} + +struct raw_mutex_entry { + _PyWakeup *wakeup; + struct raw_mutex_entry *next; +}; + +void +_PyRawMutex_lock_slow(_PyRawMutex *m) +{ + struct raw_mutex_entry waiter; + waiter.wakeup = _PyWakeup_Acquire(); + + for (;;) { + uintptr_t v = _Py_atomic_load_uintptr(&m->v); + + if ((v & 1) == UNLOCKED) { + if (_Py_atomic_compare_exchange_uintptr(&m->v, v, v|LOCKED)) { + break; + } + continue; + } + + waiter.next = (struct raw_mutex_entry *)(v & ~1); + + if (!_Py_atomic_compare_exchange_uintptr(&m->v, v, ((uintptr_t)&waiter)|LOCKED)) { + continue; + } + + _PyWakeup_Wait(waiter.wakeup, -1, /*detach=*/0); + } + + _PyWakeup_Release(waiter.wakeup); +} + +void +_PyRawMutex_unlock_slow(_PyRawMutex *m) +{ + for (;;) { + uintptr_t v = _Py_atomic_load_uintptr(&m->v); + + if ((v & LOCKED) == UNLOCKED) { + Py_FatalError("unlocking mutex that is not locked"); + } + + struct raw_mutex_entry *waiter = (struct raw_mutex_entry *)(v & ~LOCKED); + if (waiter) { + uintptr_t next_waiter = (uintptr_t)waiter->next; + if (_Py_atomic_compare_exchange_uintptr(&m->v, v, next_waiter)) { + _PyWakeup_Wakeup(waiter->wakeup); + return; + } + } + else { + if (_Py_atomic_compare_exchange_uintptr(&m->v, v, UNLOCKED)) { + return; + } + } + } +} + +void +_PyRawEvent_Notify(_PyRawEvent *o) +{ + uintptr_t v = _Py_atomic_exchange_uintptr(&o->v, LOCKED); + if (v == UNLOCKED) { + return; + } + else if (v == LOCKED) { + Py_FatalError("_PyRawEvent: duplicate notifications"); + } + else { + _PyWakeup *waiter = (_PyWakeup *)v; + _PyWakeup_Wakeup(waiter); + } +} + +void +_PyRawEvent_Wait(_PyRawEvent *o) +{ + int64_t ns = -1; + _PyRawEvent_TimedWait(o, ns); +} + +static int +_PyRawEvent_TimedWaitEx(_PyRawEvent *o, int64_t ns, _PyWakeup *waiter) +{ + if (_Py_atomic_compare_exchange_uintptr(&o->v, UNLOCKED, (uintptr_t)waiter)) { + if (_PyWakeup_Wait(waiter, ns, /*detach=*/0) == PY_PARK_OK) { + assert(_Py_atomic_load_uintptr(&o->v) == LOCKED); + return 1; + } + + /* remove us as the waiter */ + if (_Py_atomic_compare_exchange_uintptr(&o->v, (uintptr_t)waiter, UNLOCKED)) { + return 0; + } + + uintptr_t v = _Py_atomic_load_uintptr(&o->v); + if (v == LOCKED) { + /* Grab the notification */ + for (;;) { + if (_PyWakeup_Wait(waiter, -1, /*detach=*/0) == PY_PARK_OK) { + return 1; + } + } + } + else { + Py_FatalError("_PyRawEvent: invalid state"); + } + } + + uintptr_t v = _Py_atomic_load_uintptr(&o->v); + if (v == LOCKED) { + return 1; + } + else { + Py_FatalError("_PyRawEvent: duplicate waiter"); + } +} + +int +_PyRawEvent_TimedWait(_PyRawEvent *o, int64_t ns) +{ + _PyWakeup *waiter = _PyWakeup_Acquire(); + int res = _PyRawEvent_TimedWaitEx(o, ns, waiter); + _PyWakeup_Release(waiter); + return res; +} + +void +_PyRawEvent_Reset(_PyRawEvent *o) +{ + _Py_atomic_store_uintptr(&o->v, UNLOCKED); +} + +void +_PyEvent_Notify(_PyEvent *o) +{ + uintptr_t v = _Py_atomic_exchange_uintptr(&o->v, LOCKED); + if (v == UNLOCKED) { + return; + } + else if (v == LOCKED) { + // Py_FatalError("_PyEvent: duplicate notifications"); + return; + } + else { + assert(v == HAS_PARKED); + _PyParkingLot_UnparkAll(&o->v); + } +} + +void +_PyEvent_Wait(_PyEvent *o) +{ + for (;;) { + if (_PyEvent_TimedWait(o, -1)) { + return; + } + } +} + +int +_PyEvent_TimedWait(_PyEvent *o, int64_t ns) +{ + uintptr_t v = _Py_atomic_load_uintptr(&o->v); + if (v == LOCKED) { + return 1; + } + if (v == UNLOCKED) { + _Py_atomic_compare_exchange_uintptr(&o->v, UNLOCKED, HAS_PARKED); + } + + _PyParkingLot_Park(&o->v, HAS_PARKED, NULL, ns); + + return _Py_atomic_load_uintptr(&o->v) == LOCKED; +} + +int +_PyBeginOnce_slow(_PyOnceFlag *o) +{ + for (;;) { + uintptr_t v = _Py_atomic_load_uintptr(&o->v); + if (v == UNLOCKED) { + if (!_Py_atomic_compare_exchange_uintptr(&o->v, UNLOCKED, LOCKED)) { + continue; + } + return 1; + } + if (v == ONCE_INITIALIZED) { + return 0; + } + + assert((v & LOCKED) != 0); + uintptr_t newv = LOCKED | HAS_PARKED; + if (!_Py_atomic_compare_exchange_uintptr(&o->v, v, newv)) { + continue; + } + + _PyParkingLot_Park(&o->v, newv, NULL, -1); + } +} + +void +_PyEndOnce(_PyOnceFlag *o) +{ + uintptr_t v = _Py_atomic_exchange_uintptr(&o->v, ONCE_INITIALIZED); + assert((v & LOCKED) != 0); + if ((v & HAS_PARKED) != 0) { + _PyParkingLot_UnparkAll(&o->v); + } +} + +void +_PyEndOnceFailed(_PyOnceFlag *o) +{ + uintptr_t v = _Py_atomic_exchange_uintptr(&o->v, UNLOCKED); + assert((v & LOCKED) != 0); + if ((v & HAS_PARKED) != 0) { + _PyParkingLot_UnparkAll(&o->v); + } +} + +struct rmutex_entry { + uintptr_t thread_id; + _PyTime_t time_to_be_fair; + int handoff; +}; + +void +_PyRecursiveMutex_lock_slow(_PyRecursiveMutex *m) +{ + uintptr_t v = _Py_atomic_load_uintptr_relaxed(&m->v); + if ((v & THREAD_ID_MASK) == _Py_ThreadId()) { + m->recursions++; + return; + } + + PyThreadState *finalizing = _Py_atomic_load_ptr_relaxed(&_PyRuntime._finalizing); + if (finalizing && finalizing == _PyThreadState_GET()) { + /* Act as-if we have ownership of the lock if the interpretr + * shutting down. At this point all other threads have exited. */ + m->recursions++; + return; + } + + + struct rmutex_entry entry; + entry.thread_id = _Py_ThreadId(); + entry.time_to_be_fair = _PyTime_GetMonotonicClock() + TIME_TO_BE_FAIR_NS; + + int loops = 0; + for (;;) { + v = _Py_atomic_load_uintptr(&m->v); + + assert((v & THREAD_ID_MASK) != _Py_ThreadId()); + + if ((v & 1) == UNLOCKED) { + uintptr_t newv = _Py_ThreadId() | (v & HAS_PARKED) | LOCKED; + if (_Py_atomic_compare_exchange_uintptr(&m->v, v, newv)) { + return; + } + loops++; + continue; + } + + uintptr_t newv = v; + if (!(v & HAS_PARKED)) { + newv = v | HAS_PARKED; + if (!_Py_atomic_compare_exchange_uintptr(&m->v, v, newv)) { + continue; + } + } + + int ret = _PyParkingLot_Park(&m->v, newv, &entry, -1); + if (ret == PY_PARK_OK && entry.handoff) { + assert((_Py_atomic_load_uintptr_relaxed(&m->v) & ~HAS_PARKED) == (_Py_ThreadId() | LOCKED)); + return; + } + } +} + +void +_PyRecursiveMutex_unlock_slow(_PyRecursiveMutex *m) +{ + if (m->recursions > 0) { + m->recursions--; + return; + } + + for (;;) { + uintptr_t v = _Py_atomic_load_uintptr(&m->v); + + if ((v & LOCKED) == UNLOCKED) { + Py_FatalError("unlocking mutex that is not locked"); + } + else if ((v & HAS_PARKED) == HAS_PARKED) { + int more_waiters; + struct wait_entry *wait; + struct rmutex_entry *entry; + + entry = _PyParkingLot_BeginUnpark(&m->v, &wait, &more_waiters); + v = 0; + if (wait) { + _PyTime_t now = _PyTime_GetMonotonicClock(); + int should_be_fair = now > entry->time_to_be_fair; + entry->handoff = should_be_fair; + if (should_be_fair) { + v |= entry->thread_id; + v |= LOCKED; + } + if (more_waiters) { + v |= HAS_PARKED; + } + } + _Py_atomic_store_uintptr(&m->v, v); + + _PyParkingLot_FinishUnpark(&m->v, wait); + return; + } + else if (_Py_atomic_compare_exchange_uintptr(&m->v, v, UNLOCKED)) { + return; + } + } +} diff --git a/Python/parking_lot.c b/Python/parking_lot.c new file mode 100644 index 00000000000..a58902684c9 --- /dev/null +++ b/Python/parking_lot.c @@ -0,0 +1,480 @@ +#include "Python.h" + +#include "parking_lot.h" + +#include "pycore_ceval.h" // _PyEval_TakeGIL, _PyEval_DropGIL +#include "pycore_llist.h" +#include "pycore_pystate.h" + +#define MAX_DEPTH 3 + +#ifdef _WIN32 +#define WIN32_LEAN_AND_MEAN +#include +#elif (defined(_POSIX_SEMAPHORES) && !defined(HAVE_BROKEN_POSIX_SEMAPHORES) && \ + defined(HAVE_SEM_TIMEDWAIT)) +#define USE_SEMAPHORES +#include +#else +#include +#endif + +typedef struct { + _PyRawMutex mutex; + + struct llist_node root; + size_t num_waiters; +} Bucket; + +struct wait_entry { + _PyWakeup *wakeup; + struct llist_node node; + uintptr_t key; + void *data; +}; + +struct _PyWakeup { +#if defined(_WIN32) + HANDLE sem; +#elif defined(USE_SEMAPHORES) + sem_t sem; +#else + PyMUTEX_T mutex; + PyCOND_T cond; + int counter; +#endif +}; + +typedef struct { + Py_ssize_t refcount; + uintptr_t thread_id; + + int depth; + _PyWakeup semas[MAX_DEPTH]; +} ThreadData; + +#define NUM_BUCKETS 251 + +static Bucket buckets[NUM_BUCKETS]; + +Py_DECL_THREAD ThreadData *thread_data; + +static void +_PyWakeup_Init(_PyWakeup *wakeup) +{ +#if defined(_WIN32) + wakeup->sem = CreateSemaphore( + NULL, // attributes + 0, // initial count + 10, // maximum count + NULL // unnamed + ); + if (!wakeup->sem) { + Py_FatalError("parking_lot: CreateSemaphore failed"); + } +#elif defined(USE_SEMAPHORES) + if (sem_init(&wakeup->sem, /*pshared=*/0, /*value=*/0) < 0) { + Py_FatalError("parking_lot: sem_init failed"); + } +#else + if (pthread_mutex_init(&wakeup->mutex, NULL) != 0) { + Py_FatalError("parking_lot: pthread_mutex_init failed"); + } + if (pthread_cond_init(&wakeup->cond, NULL)) { + Py_FatalError("parking_lot: pthread_cond_init failed"); + } +#endif +} + +static void +_PyWakeup_Destroy(_PyWakeup *wakeup) +{ +#if defined(_WIN32) + CloseHandle(wakeup->sem); +#elif defined(USE_SEMAPHORES) + sem_destroy(&wakeup->sem); +#else + pthread_mutex_destroy(&wakeup->mutex); + pthread_cond_destroy(&wakeup->cond); +#endif +} + + +static int +_PyWakeup_PlatformWait(_PyWakeup *wakeup, int64_t ns) +{ + int res = PY_PARK_INTR; +#if defined(_WIN32) + DWORD wait; + DWORD millis = 0; + if (ns < 0) { + millis = INFINITE; + } + else { + millis = (DWORD) (ns / 1000000); + } + wait = WaitForSingleObjectEx(wakeup->sem, millis, FALSE); + if (wait == WAIT_OBJECT_0) { + res = PY_PARK_OK; + } + else if (wait == WAIT_TIMEOUT) { + res = PY_PARK_TIMEOUT; + } +#elif defined(USE_SEMAPHORES) + int err; + if (ns >= 0) { + struct timespec ts; + + _PyTime_t deadline = _PyTime_GetSystemClock() + ns; + _PyTime_AsTimespec(deadline, &ts); + + err = sem_timedwait(&wakeup->sem, &ts); + } + else { + err = sem_wait(&wakeup->sem); + } + if (err == -1) { + err = errno; + if (err == EINTR) { + res = PY_PARK_INTR; + } + else if (err == ETIMEDOUT) { + res = PY_PARK_TIMEOUT; + } + else { + _Py_FatalErrorFormat(__func__, + "unexpected error from semaphore: %d", + err); + } + } + else { + res = PY_PARK_OK; + } +#else + pthread_mutex_lock(&wakeup->mutex); + if (wakeup->counter == 0) { + int err; + if (ns >= 0) { + struct timespec ts; + + _PyTime_t deadline = _PyTime_GetSystemClock() + ns; + _PyTime_AsTimespec(deadline, &ts); + + err = pthread_cond_timedwait(&wakeup->cond, &wakeup->mutex, &ts); + } + else { + err = pthread_cond_wait(&wakeup->cond, &wakeup->mutex); + } + if (err) { + res = PY_PARK_TIMEOUT; + } + } + if (wakeup->counter > 0) { + wakeup->counter--; + res = PY_PARK_OK; + } + pthread_mutex_unlock(&wakeup->mutex); +#endif + return res; +} + +int +_PyWakeup_Wait(_PyWakeup *wakeup, int64_t ns, int detach) +{ + PyThreadState *tstate = _PyThreadState_GET(); + int was_attached = 0; + if (tstate) { + was_attached = (_Py_atomic_load_int32(&tstate->status) == _Py_THREAD_ATTACHED); + if (was_attached && detach) { + PyEval_ReleaseThread(tstate); + } + else { + _PyEval_DropGIL(tstate); + } + } + + int res = _PyWakeup_PlatformWait(wakeup, ns); + + if (tstate) { + if (was_attached && detach) { + PyEval_AcquireThread(tstate); + } + else { + _PyEval_TakeGIL(tstate); + } + } + return res; +} + +_PyWakeup * +_PyWakeup_Acquire(void) +{ + // Make sure we have a valid thread_data. We need to acquire + // some locks before we have a fully initialized PyThreadState. + _PyParkingLot_InitThread(); + + ThreadData *this_thread = thread_data; + if (this_thread->depth >= MAX_DEPTH) { + Py_FatalError("_PyWakeup_Acquire(): too many calls"); + } + return &this_thread->semas[this_thread->depth++]; +} + +void +_PyWakeup_Release(_PyWakeup *wakeup) +{ + ThreadData *this_thread = thread_data; + this_thread->depth--; + if (&this_thread->semas[this_thread->depth] != wakeup) { + Py_FatalError("_PyWakeup_Release(): mismatch wakeup"); + } + _PyParkingLot_DeinitThread(); +} + +void +_PyWakeup_Wakeup(_PyWakeup *wakeup) +{ +#if defined(_WIN32) + if (!ReleaseSemaphore(wakeup->sem, 1, NULL)) { + Py_FatalError("parking_lot: ReleaseSemaphore failed"); + } +#elif defined(USE_SEMAPHORES) + int err = sem_post(&wakeup->sem); + if (err != 0) { + Py_FatalError("parking_lot: sem_post failed"); + } +#else + pthread_mutex_lock(&wakeup->mutex); + wakeup->counter++; + pthread_cond_signal(&wakeup->cond); + pthread_mutex_unlock(&wakeup->mutex); +#endif +} + + +void +_PyParkingLot_InitThread(void) +{ + if (thread_data != NULL) { + thread_data->refcount++; + return; + } + ThreadData *this_thread = PyMem_RawMalloc(sizeof(ThreadData)); + if (this_thread == NULL) { + Py_FatalError("_PyParkingLot_InitThread: unable to allocate thread data"); + } + memset(this_thread, 0, sizeof(*this_thread)); + this_thread->thread_id = _Py_ThreadId(); + this_thread->refcount = 1; + this_thread->depth = 0; + for (int i = 0; i < MAX_DEPTH; i++) { + _PyWakeup_Init(&this_thread->semas[i]); + } + thread_data = this_thread; +} + +void +_PyParkingLot_DeinitThread(void) +{ + ThreadData *td = thread_data; + if (td == NULL) { + return; + } + + if (--td->refcount != 0) { + assert(td->refcount > 0); + return; + } + + thread_data = NULL; + for (int i = 0; i < MAX_DEPTH; i++) { + _PyWakeup_Destroy(&td->semas[i]); + } + + PyMem_RawFree(td); +} + +static void +enqueue(Bucket *bucket, const void *key, struct wait_entry *wait) +{ + /* initialize bucket */ + if (!bucket->root.next) { + llist_init(&bucket->root); + } + + wait->key = (uintptr_t)key; + llist_insert_tail(&bucket->root, &wait->node); + ++bucket->num_waiters; +} + +static struct wait_entry * +dequeue(Bucket *bucket, const void *key) +{ + struct llist_node *root = &bucket->root; + struct llist_node *next = root->next; + for (;;) { + if (!next || next == root) { + return NULL; + } + struct wait_entry *wait = llist_data(next, struct wait_entry, node); + if (wait->key == (uintptr_t)key) { + llist_remove(next); + --bucket->num_waiters; + return wait; + } + next = next->next; + } +} + +typedef int (*_Py_validate_func)(const void *, const void *); + +static int +_PyParkingLot_ParkEx(const void *key, + _Py_validate_func validate, + const void *expected, + struct wait_entry *wait, + int64_t ns, + int detach) +{ + ThreadData *this_thread = thread_data; + assert(thread_data->depth >= 0 && thread_data->depth < MAX_DEPTH); + Bucket *bucket = &buckets[((uintptr_t)key) % NUM_BUCKETS]; + + _PyRawMutex_lock(&bucket->mutex); + if (!validate(key, expected)) { + _PyRawMutex_unlock(&bucket->mutex); + return PY_PARK_AGAIN; + } + wait->wakeup = _PyWakeup_Acquire(); + enqueue(bucket, key, wait); + _PyRawMutex_unlock(&bucket->mutex); + + int res = _PyWakeup_Wait(wait->wakeup, ns, detach); + if (res == PY_PARK_OK) { + this_thread->depth--; + return res; + } + + /* timeout or interrupt */ + _PyRawMutex_lock(&bucket->mutex); + if (wait->node.next == NULL) { + _PyRawMutex_unlock(&bucket->mutex); + /* We've been removed the waiter queue. Wait until we + * receive process the wakup signal. */ + do { + res = _PyWakeup_Wait(wait->wakeup, -1, detach); + } while (res != PY_PARK_OK); + _PyWakeup_Release(wait->wakeup); + return PY_PARK_OK; + } + else { + llist_remove(&wait->node); + --bucket->num_waiters; + } + _PyRawMutex_unlock(&bucket->mutex); + _PyWakeup_Release(wait->wakeup); + return res; +} + +static int +validate_int(const void *key, const void *expected_ptr) +{ + int expected = *(const int *)expected_ptr; + return _Py_atomic_load_int(key) == expected; +} + +int +_PyParkingLot_ParkInt(const int *key, int expected, int detach) +{ + struct wait_entry wait; + return _PyParkingLot_ParkEx( + key, &validate_int, &expected, &wait, -1, detach); +} + +static int +validate_ptr(const void *key, const void *expected_ptr) +{ + uintptr_t expected = *(const uintptr_t *)expected_ptr; + return _Py_atomic_load_uintptr(key) == expected; +} + +int +_PyParkingLot_Park(const void *key, uintptr_t expected, + void *data, int64_t ns) +{ + struct wait_entry wait; + wait.data = data; + return _PyParkingLot_ParkEx( + key, &validate_ptr, &expected, &wait, ns, /*detach=*/1); +} + +static int +validate_uint8(const void *key, const void *expected_ptr) +{ + uint8_t expected = *(const uint8_t *)expected_ptr; + return _Py_atomic_load_uint8(key) == expected; +} + +int +_PyParkingLot_ParkUint8(const uint8_t *key, uint8_t expected, + void *data, int64_t ns, int detach) +{ + struct wait_entry wait; + wait.data = data; + return _PyParkingLot_ParkEx( + key, &validate_uint8, &expected, &wait, ns, detach); +} + +void +_PyParkingLot_UnparkAll(const void *key) +{ + Bucket *bucket = &buckets[((uintptr_t)key) % NUM_BUCKETS]; + + for (;;) { + _PyRawMutex_lock(&bucket->mutex); + struct wait_entry *entry = dequeue(bucket, key); + _PyRawMutex_unlock(&bucket->mutex); + + if (!entry) { + return; + } + + _PyWakeup_Wakeup(entry->wakeup); + } +} + +void * +_PyParkingLot_BeginUnpark(const void *key, + struct wait_entry **out_waiter, + int *more_waiters) +{ + Bucket *bucket = &buckets[((uintptr_t)key) % NUM_BUCKETS]; + + _PyRawMutex_lock(&bucket->mutex); + + struct wait_entry *waiter = dequeue(bucket, key); + + *more_waiters = (bucket->num_waiters > 0); + *out_waiter = waiter; + return waiter ? waiter->data : NULL; +} + +void +_PyParkingLot_FinishUnpark(const void *key, struct wait_entry *entry) +{ + Bucket *bucket = &buckets[((uintptr_t)key) % NUM_BUCKETS]; + _PyRawMutex_unlock(&bucket->mutex); + + if (entry) { + _PyWakeup_Wakeup(entry->wakeup); + } +} + +void +_PyParkingLot_AfterFork(void) +{ + /* After a fork only one thread remains. That thread cannot be blocked + * so all entries in the parking lot are for dead threads. + */ + memset(buckets, 0, sizeof(buckets)); +} diff --git a/Python/pyrefcnt.c b/Python/pyrefcnt.c new file mode 100644 index 00000000000..3885fdad0eb --- /dev/null +++ b/Python/pyrefcnt.c @@ -0,0 +1,229 @@ +/* Implementation of biased reference counting */ + +#include "Python.h" +#include "pycore_llist.h" +#include "pycore_pystate.h" +#include "pycore_refcnt.h" + +// TODO: not efficient + +typedef struct { + _PyMutex mutex; + struct llist_node threads; +} Bucket; + +#define NUM_BUCKETS 251 + +static Bucket buckets[NUM_BUCKETS]; + +static inline struct brc_state * +brc_state(PyThreadState *tstate) +{ + return &((PyThreadStateImpl *)tstate)->brc; +} + +static PyThreadStateImpl * +find_thread_state(Bucket *bucket, uintptr_t thread_id) +{ + struct llist_node *node; + llist_for_each(node, &bucket->threads) { + PyThreadStateImpl *ts; + ts = llist_data(node, PyThreadStateImpl, brc.bucket_node); + if (ts->tstate.fast_thread_id == thread_id) { + return ts; + } + } + return NULL; +} + + +_PyObjectQueue * +_PyObjectQueue_New(void) +{ + _PyObjectQueue *q = PyMem_RawMalloc(sizeof(_PyObjectQueue)); + if (q == NULL) { + Py_FatalError("gc: failed to allocate object queue"); + } + q->prev = NULL; + q->n = 0; + return q; +} + +void +_PyObjectQueue_Merge(_PyObjectQueue **dst_ptr, _PyObjectQueue **src_ptr) +{ + _PyObjectQueue *dst = *dst_ptr; + _PyObjectQueue *src = *src_ptr; + if (src == NULL) { + return; + } + if (dst == NULL || (dst->n == 0 && dst->prev == NULL)) { + *dst_ptr = src; + *src_ptr = dst; + return; + } + + _PyObjectQueue *last = src; + while (last->prev != NULL) { + last = last->prev; + } + last->prev = dst; + *dst_ptr = src; + *src_ptr = NULL; +} + + +void +_Py_queue_object(PyObject *ob, uintptr_t tid) +{ + PyThreadStateImpl *tstate_impl; + Bucket *bucket = &buckets[tid % NUM_BUCKETS]; + + if (tid == 0) { + if (_PyObject_IS_IMMORTAL(ob)/* || _PyObject_IS_DEFERRED_RC(ob)*/) { + // kind of awkward, but strings can be immortalized after they have + // a bunch of references and the new interpreter still tries decrefing + // the immortalized object. + return; + } + Py_FatalError("_Py_queue_object called with unowned object"); + } + + _PyMutex_lock(&bucket->mutex); + tstate_impl = find_thread_state(bucket, tid); + if (!tstate_impl) { + // If we didn't find the owning thread then it must have already exited. + // It's safe (and necessary) to merge the refcount. Subtract one when + // merging because we've stolen a reference. + Py_ssize_t refcount = _Py_ExplicitMergeRefcount(ob, -1); + _PyMutex_unlock(&bucket->mutex); + if (refcount == 0) { + _Py_Dealloc(ob); + } + return; + } + + _PyObjectQueue_Push(&tstate_impl->brc.queue, ob); + + // Notify owning thread + _PyThreadState_Signal(&tstate_impl->tstate, EVAL_EXPLICIT_MERGE); + + _PyMutex_unlock(&bucket->mutex); +} + +static void +_Py_queue_merge_objects(struct brc_state *brc) +{ + // Process all objects to be merged in the local queue. + // Note that _Py_Dealloc call can reentrantly call into + // this function. + for (;;) { + PyObject *ob = _PyObjectQueue_Pop(&brc->local_queue); + if (ob == NULL) { + break; + } + + // Subtract one when merging refcount because the queue + // owned a reference. + Py_ssize_t refcount = _Py_ExplicitMergeRefcount(ob, -1); + if (refcount == 0) { + _Py_Dealloc(ob); + } + } +} + +void +_Py_queue_process(PyThreadState *tstate) +{ + uintptr_t tid = tstate->fast_thread_id; + struct brc_state *brc = brc_state(tstate); + Bucket *bucket = &buckets[tid % NUM_BUCKETS]; + + assert(brc->bucket_node.next != NULL); + + // Append all objects from "queue" into "local_queue" + _PyMutex_lock(&bucket->mutex); + _PyObjectQueue_Merge(&brc->local_queue, &brc->queue); + _PyMutex_unlock(&bucket->mutex); + + // Process "local_queue" until it's empty + _Py_queue_merge_objects(brc); +} + +void +_Py_queue_process_gc(PyThreadState *tstate, _PyObjectQueue **queue_ptr) +{ + struct brc_state *brc = brc_state(tstate); + + if (brc->bucket_node.next == NULL) { + // thread isn't finish initializing + return; + } + + _PyObjectQueue_Merge(&brc->local_queue, &brc->queue); + + for (;;) { + PyObject *ob = _PyObjectQueue_Pop(&brc->local_queue); + if (ob == NULL) { + break; + } + + // Subtract one when merging refcount because the queue + // owned a reference. + Py_ssize_t refcount = _Py_ExplicitMergeRefcount(ob, -1); + if (refcount == 0) { + if (!PyObject_GC_IsTracked(ob)) { + _PyObjectQueue_Push(queue_ptr, ob); + } + } + } +} + +void +_Py_queue_create(PyThreadState *tstate) +{ + uintptr_t tid = tstate->fast_thread_id; + struct brc_state *brc = brc_state(tstate); + Bucket *bucket = &buckets[tid % NUM_BUCKETS]; + + brc->queue = NULL; + brc->local_queue = NULL; + + _PyMutex_lock(&bucket->mutex); + if (bucket->threads.next == NULL) { + llist_init(&bucket->threads); + } + llist_insert_tail(&bucket->threads, &brc->bucket_node); + _PyMutex_unlock(&bucket->mutex); +} + +void +_Py_queue_destroy(PyThreadState *tstate) +{ + uintptr_t tid = tstate->fast_thread_id; + struct brc_state *brc = brc_state(tstate); + Bucket *bucket = &buckets[tid % NUM_BUCKETS]; + + _PyMutex_lock(&bucket->mutex); + if (brc->bucket_node.next) { + llist_remove(&brc->bucket_node); + _PyObjectQueue_Merge(&brc->local_queue, &brc->queue); + } + _PyMutex_unlock(&bucket->mutex); + + // Process "local_queue" until it's empty + _Py_queue_merge_objects(brc); +} + +void +_Py_queue_after_fork(void) +{ + // Unlock all bucket mutexes. Some of the buckets may be locked because + // locks can be handed off to a parked thread (see lock.c). We don't have + // to worry about consistency here, becuase no thread can be actively + // modifying a bucket, but it might be paused (not yet woken up) on a + // _PyMutex_lock while holding that lock. + for (int i = 0; i < NUM_BUCKETS; i++) { + memset(&buckets[i].mutex, 0, sizeof(buckets[i].mutex)); + } +} diff --git a/Python/pystate.c b/Python/pystate.c index 53d9d928bea..a5f383a5556 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -6,6 +6,7 @@ #include "pycore_code.h" // stats #include "pycore_frame.h" #include "pycore_initconfig.h" +#include "pycore_lock.h" // _PyRawEvent #include "pycore_object.h" // _PyType_InitCache() #include "pycore_pyerrors.h" #include "pycore_pylifecycle.h" @@ -15,6 +16,8 @@ #include "pycore_sysmodule.h" #include "pyatomic.h" +#include "parking_lot.h" + /* -------------------------------------------------------------------------- CAUTION @@ -937,6 +940,7 @@ void _PyThreadState_SetCurrent(PyThreadState *tstate) { tstate->fast_thread_id = _Py_ThreadId(); + _PyParkingLot_InitThread(); _PyGILState_NoteThreadState(&tstate->interp->runtime->gilstate, tstate); } @@ -1154,6 +1158,7 @@ tstate_delete_common(PyThreadState *tstate, if (is_current) { _PyThreadState_SET(NULL); + _PyParkingLot_DeinitThread(); } _PyStackChunk *chunk = tstate->datastack_chunk; tstate->datastack_chunk = NULL; diff --git a/Python/qsbr.c b/Python/qsbr.c new file mode 100644 index 00000000000..cf5f803d923 --- /dev/null +++ b/Python/qsbr.c @@ -0,0 +1,192 @@ +/* + * Implementation of safe memory reclamation scheme using + * quiescent states. + * + * This is dervied from the "GUS" safe memory reclamation technique + * in FreeBSD written by Jeffrey Roberson. The original copyright + * is preserved below. + * + * Copyright (c) 2019,2020 Jeffrey Roberson + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice unmodified, this list of conditions, and the following + * disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "Python.h" +#include "pycore_atomic.h" +#include "pycore_qsbr.h" +#include "pycore_pystate.h" +#include "pyatomic.h" + +#include + +#define QSBR_LT(a, b) ((int64_t)((a)-(b)) < 0) +#define QSBR_LEQ(a, b) ((int64_t)((a)-(b)) <= 0) + +enum { + QSBR_OFFLINE = 0, + QSBR_INITIAL = 1, + QSBR_INCR = 2, +}; + +static struct qsbr * +_Py_qsbr_alloc(struct qsbr_shared *shared) +{ + struct qsbr *qsbr; + + qsbr = (struct qsbr *)PyMem_RawMalloc(sizeof(struct qsbr_pad)); + if (qsbr == NULL) { + return NULL; + } + memset(qsbr, 0, sizeof(*qsbr)); + qsbr->t_shared = shared; + return qsbr; +} + +PyStatus +_Py_qsbr_init(struct qsbr_shared *shared) +{ + memset(shared, 0, sizeof(*shared)); + struct qsbr *head = _Py_qsbr_alloc(shared); + if (head == NULL) { + return _PyStatus_NO_MEMORY(); + } + shared->head = head; + shared->n_free = 1; + shared->s_wr = QSBR_INITIAL; + shared->s_rd_seq = QSBR_INITIAL; + return _PyStatus_OK(); +} + +void +_Py_qsbr_after_fork(struct qsbr_shared *shared, struct qsbr *this_qsbr) +{ + struct qsbr *qsbr = _Py_atomic_load_ptr(&shared->head); + while (qsbr != NULL) { + if (qsbr != this_qsbr) { + _Py_qsbr_unregister(qsbr); + } + qsbr = qsbr->t_next; + } +} + +uint64_t +_Py_qsbr_advance(struct qsbr_shared *shared) +{ + // TODO(sgross): handle potential wrap around + return _Py_atomic_add_uint64(&shared->s_wr, QSBR_INCR) + QSBR_INCR; +} + +uint64_t +_Py_qsbr_poll_scan(struct qsbr_shared *shared) +{ + uint64_t min_seq = _Py_atomic_load_uint64(&shared->s_wr); + struct qsbr *qsbr = _Py_atomic_load_ptr(&shared->head); + while (qsbr != NULL) { + uint64_t seq = _Py_atomic_load_uint64(&qsbr->t_seq); + if (seq != QSBR_OFFLINE && QSBR_LT(seq, min_seq)) { + min_seq = seq; + } + qsbr = qsbr->t_next; + } + + uint64_t rd_seq = _Py_atomic_load_uint64(&shared->s_rd_seq); + if (QSBR_LT(rd_seq, min_seq)) { + _Py_atomic_compare_exchange_uint64(&shared->s_rd_seq, rd_seq, min_seq); + rd_seq = min_seq; + } + return rd_seq; +} + +bool +_Py_qsbr_poll(struct qsbr *qsbr, uint64_t goal) +{ + uint64_t rd_seq = _Py_atomic_load_uint64(&qsbr->t_shared->s_rd_seq); + if (QSBR_LEQ(goal, rd_seq)) { + return true; + } + + rd_seq = _Py_qsbr_poll_scan(qsbr->t_shared); + return QSBR_LEQ(goal, rd_seq); +} + +void +_Py_qsbr_online(struct qsbr *qsbr) +{ + assert(qsbr->t_seq == 0 && "thread is already online"); + + uint64_t seq = _Py_qsbr_shared_current(qsbr->t_shared); + _Py_atomic_store_uint64_relaxed(&qsbr->t_seq, seq); + + /* ensure update to local counter is visible */ + _Py_atomic_fence_seq_cst(); +} + +void +_Py_qsbr_offline(struct qsbr *qsbr) +{ + assert(qsbr->t_seq != 0 && "thread is already offline"); + + /* maybe just release fence... investigate */ + _Py_atomic_fence_release(); + _Py_atomic_store_uint64_relaxed(&qsbr->t_seq, QSBR_OFFLINE); +} + +struct qsbr * +_Py_qsbr_recycle(struct qsbr_shared *shared, PyThreadState *tstate) +{ + if (_Py_atomic_load_uintptr(&shared->n_free) == 0) { + return NULL; + } + struct qsbr *qsbr = _Py_atomic_load_ptr(&shared->head); + while (qsbr != NULL) { + if (_Py_atomic_load_ptr_relaxed(&qsbr->tstate) == NULL && + _Py_atomic_compare_exchange_ptr(&qsbr->tstate, NULL, tstate)) { + + _Py_atomic_add_uintptr(&shared->n_free, -1); + return qsbr; + } + qsbr = qsbr->t_next; + } + return NULL; +} + +struct qsbr * +_Py_qsbr_register(struct qsbr_shared *shared, PyThreadState *tstate, struct qsbr *qsbr) +{ + qsbr->tstate = tstate; + qsbr->t_shared = shared; + struct qsbr *next; + do { + next = _Py_atomic_load_ptr(&shared->head); + qsbr->t_next = next; + } while (!_Py_atomic_compare_exchange_ptr(&shared->head, next, qsbr)); + return qsbr; +} + +void +_Py_qsbr_unregister(struct qsbr *qsbr) +{ + assert(qsbr->t_seq == 0 && "qsbr thread-state must be offline"); + + _Py_atomic_store_ptr_relaxed(&qsbr->tstate, NULL); + _Py_atomic_add_uintptr(&qsbr->t_shared->n_free, 1); +}