Skip to content

Commit 65c4899

Browse files
committed
acquire-release
1 parent 9dd1405 commit 65c4899

File tree

4 files changed

+53
-43
lines changed

4 files changed

+53
-43
lines changed

ddtrace/internal/_threads.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,8 @@ static PyTypeObject PeriodicThreadType = {
515515

516516
// ----------------------------------------------------------------------------
517517
static PyMethodDef _threads_methods[] = {
518-
{ "reset_locks", (PyCFunction)lock_reset_locks, METH_NOARGS, "Reset all locks (generally after a fork)" },
518+
{ "acquire_all", (PyCFunction)lock_acquire_all, METH_NOARGS, "Acquire all locks (generally before a fork)" },
519+
{ "release_all", (PyCFunction)lock_release_all, METH_NOARGS, "Release all locks (generally after a fork)" },
519520
{ NULL, NULL, 0, NULL } /* Sentinel */
520521
};
521522

ddtrace/internal/_threads.pyi

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ class _BaseLock:
1111
class Lock(_BaseLock): ...
1212
class RLock(_BaseLock): ...
1313

14-
def reset_locks() -> None: ...
14+
def acquire_all() -> None: ...
15+
def release_all() -> None: ...
1516

1617
class PeriodicThread:
1718
name: str

ddtrace/internal/_threads/lock.hpp

Lines changed: 42 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
#include <mutex>
88
#include <set>
99

10-
std::unique_ptr<std::mutex> _lock_set_mutex = std::make_unique<std::mutex>();
10+
std::mutex _lock_set_mutex;
1111

1212
// ----------------------------------------------------------------------------
1313
// Lock class
@@ -23,19 +23,18 @@ typedef struct lock
2323
std::unique_ptr<std::timed_mutex> _mutex = nullptr;
2424
} Lock;
2525

26-
std::set<Lock*> lock_set; // Global set of locks for reset after fork
26+
std::set<Lock*> lock_set;
2727

2828
// ----------------------------------------------------------------------------
2929
static int
30-
Lock_init(Lock* self, PyObject* args, PyObject* kwargs)
30+
Lock_init(Lock* self, PyObject* Py_UNUSED(args), PyObject* Py_UNUSED(kwargs))
3131
{
3232
self->_mutex = std::make_unique<std::timed_mutex>();
3333

34-
// Register the lock for reset after fork
3534
{
3635
AllowThreads _;
3736

38-
std::lock_guard<std::mutex> guard(*_lock_set_mutex);
37+
std::lock_guard<std::mutex> guard(_lock_set_mutex);
3938

4039
lock_set.insert(self);
4140
}
@@ -51,7 +50,7 @@ Lock_dealloc(Lock* self)
5150
{
5251
AllowThreads _;
5352

54-
std::lock_guard<std::mutex> guard(*_lock_set_mutex);
53+
std::lock_guard<std::mutex> guard(_lock_set_mutex);
5554

5655
lock_set.erase(self);
5756
}
@@ -109,7 +108,7 @@ Lock_release(Lock* self)
109108
}
110109

111110
self->_mutex->unlock();
112-
self->_locked = 0; // Reset the lock state
111+
self->_locked = 0;
113112

114113
Py_RETURN_NONE;
115114
}
@@ -127,7 +126,7 @@ Lock_locked(Lock* self)
127126

128127
// ----------------------------------------------------------------------------
129128
static PyObject*
130-
Lock_enter(Lock* self, PyObject* args, PyObject* kwargs)
129+
Lock_enter(Lock* self)
131130
{
132131
AllowThreads _;
133132

@@ -140,7 +139,7 @@ Lock_enter(Lock* self, PyObject* args, PyObject* kwargs)
140139

141140
// ----------------------------------------------------------------------------
142141
static PyObject*
143-
Lock_exit(Lock* self, PyObject* args, PyObject* kwargs)
142+
Lock_exit(Lock* self, PyObject* Py_UNUSED(args), PyObject* Py_UNUSED(kwargs))
144143
{
145144
// This method is called when the lock is used in a "with" statement
146145
if (Lock_release(self) == NULL) {
@@ -150,13 +149,6 @@ Lock_exit(Lock* self, PyObject* args, PyObject* kwargs)
150149
Py_RETURN_FALSE;
151150
}
152151

153-
static inline void
154-
Lock_reset(Lock* self)
155-
{
156-
self->_mutex = std::make_unique<std::timed_mutex>();
157-
self->_locked = 0;
158-
}
159-
160152
// ----------------------------------------------------------------------------
161153
static PyMethodDef Lock_methods[] = {
162154
{ "acquire", (PyCFunction)Lock_acquire, METH_VARARGS | METH_KEYWORDS, "Acquire the lock with an optional timeout" },
@@ -200,19 +192,18 @@ typedef struct rlock
200192
std::unique_ptr<std::recursive_timed_mutex> _mutex = nullptr;
201193
} RLock;
202194

203-
std::set<RLock*> rlock_set; // Global set of re-entrant locks for reset after fork
195+
std::set<RLock*> rlock_set;
204196

205197
// ----------------------------------------------------------------------------
206198
static int
207-
RLock_init(RLock* self, PyObject* args, PyObject* kwargs)
199+
RLock_init(RLock* self, PyObject* Py_UNUSED(args), PyObject* Py_UNUSED(kwargs))
208200
{
209201
self->_mutex = std::make_unique<std::recursive_timed_mutex>();
210202

211-
// Register the re-entrant lock for reset after fork
212203
{
213204
AllowThreads _;
214205

215-
std::lock_guard<std::mutex> guard(*_lock_set_mutex);
206+
std::lock_guard<std::mutex> guard(_lock_set_mutex);
216207

217208
rlock_set.insert(self);
218209
}
@@ -227,7 +218,7 @@ RLock_dealloc(RLock* self)
227218
{
228219
AllowThreads _;
229220

230-
std::lock_guard<std::mutex> guard(*_lock_set_mutex);
221+
std::lock_guard<std::mutex> guard(_lock_set_mutex);
231222

232223
rlock_set.erase(self);
233224
}
@@ -303,7 +294,7 @@ RLock_locked(RLock* self)
303294

304295
// ----------------------------------------------------------------------------
305296
static PyObject*
306-
RLock_enter(RLock* self, PyObject* args, PyObject* kwargs)
297+
RLock_enter(RLock* self)
307298
{
308299
AllowThreads _;
309300

@@ -316,7 +307,7 @@ RLock_enter(RLock* self, PyObject* args, PyObject* kwargs)
316307

317308
// ----------------------------------------------------------------------------
318309
static PyObject*
319-
RLock_exit(RLock* self, PyObject* args, PyObject* kwargs)
310+
RLock_exit(RLock* self, PyObject* Py_UNUSED(args), PyObject* Py_UNUSED(kwargs))
320311
{
321312
// This method is called when the lock is used in a "with" statement
322313
if (RLock_release(self) == NULL) {
@@ -326,13 +317,6 @@ RLock_exit(RLock* self, PyObject* args, PyObject* kwargs)
326317
Py_RETURN_FALSE;
327318
}
328319

329-
static inline void
330-
RLock_reset(RLock* self)
331-
{
332-
self->_mutex = std::make_unique<std::recursive_timed_mutex>();
333-
self->_locked = 0;
334-
}
335-
336320
// ----------------------------------------------------------------------------
337321
static PyMethodDef RLock_methods[] = {
338322
{ "acquire",
@@ -367,21 +351,41 @@ static PyTypeObject RLockType = {
367351

368352
// ----------------------------------------------------------------------------
369353
static PyObject*
370-
lock_reset_locks(PyObject* Py_UNUSED(self), PyObject* Py_UNUSED(args))
354+
lock_acquire_all(PyObject* Py_UNUSED(self), PyObject* Py_UNUSED(args))
371355
{
372-
// Reset all locks that have been registered for reset after a fork. This
373-
// MUST be called in a single-thread scenario only, e.g. soon after the
374-
// fork.
356+
{
357+
AllowThreads _;
358+
359+
_lock_set_mutex.lock();
360+
361+
for (Lock* lock : lock_set) {
362+
self->_mutex->lock();
363+
self->_locked = 1;
364+
}
365+
366+
for (RLock* rlock : rlock_set) {
367+
rlock->_mutex->lock();
368+
rlock->_locked++;
369+
}
370+
}
371+
372+
Py_RETURN_NONE;
373+
}
374+
375+
// ----------------------------------------------------------------------------
376+
static PyObject*
377+
lock_release_all(PyObject* Py_UNUSED(self), PyObject* Py_UNUSED(args))
378+
{
379+
375380
for (Lock* lock : lock_set) {
376-
Lock_reset(lock);
381+
Lock_exit(lock, NULL, NULL);
377382
}
378383

379384
for (RLock* rlock : rlock_set) {
380-
RLock_reset(rlock);
385+
RLock_exit(rlock, NULL, NULL);
381386
}
382387

383-
// Reset the lock set mutex too!
384-
_lock_set_mutex = std::make_unique<std::mutex>();
388+
_lock_set_mutex.unlock();
385389

386390
Py_RETURN_NONE;
387391
}

ddtrace/internal/threads.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@
44
from ddtrace.internal._threads import Lock
55
from ddtrace.internal._threads import PeriodicThread
66
from ddtrace.internal._threads import RLock
7+
from ddtrace.internal._threads import acquire_all
78
from ddtrace.internal._threads import periodic_threads
8-
from ddtrace.internal._threads import reset_locks
9+
from ddtrace.internal._threads import release_all
910

1011

1112
__all__ = [
1213
"Lock",
1314
"PeriodicThread",
14-
"periodic_threads",
1515
"RLock",
1616
]
1717

@@ -36,4 +36,8 @@ def _() -> None:
3636
periodic_threads.clear()
3737

3838

39-
forksafe.register(reset_locks)
39+
# Acquire all locks before a fork in the thread that is forking. We then
40+
# release them after the fork in both the parent and child processes.
41+
forksafe.register_before_fork(acquire_all)
42+
forksafe.register(release_all)
43+
forksafe.register_after_parent(release_all)

0 commit comments

Comments
 (0)