Skip to content

Commit 6bf6010

Browse files
committed
Suspend Python threads before fork()
Add stop-the-world thread suspension around fork() to prevent deadlocks from locks held by dead parent threads in the child. - Thread states: DETACHED / ATTACHED / SUSPENDED with atomic CAS transitions matching _PyThreadState_{Attach,Detach,Suspend} - stop_the_world / start_the_world: park all non-requester threads before fork, resume after (parent) or reset (child) - allow_threads (Py_BEGIN/END_ALLOW_THREADS): detach around blocking syscalls (os.read/write, waitpid, Lock.acquire, time.sleep) so stop_the_world can force-park via CAS - Acquire/release import lock around fork lifecycle - zero_reinit_after_fork: generic lock reset for parking_lot types - gc_clear_raw: detach dict instead of clearing entries - Lock-free double-check for descriptor cache reads (no read-side seqlock); write-side seqlock retained for writer serialization - fork() returns PyResult, checks PythonFinalizationError, calls sys.audit
1 parent fc1c278 commit 6bf6010

File tree

17 files changed

+950
-93
lines changed

17 files changed

+950
-93
lines changed

.cspell.dict/cpython.txt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ NEWLOCALS
127127
newsemlockobject
128128
nfrees
129129
nkwargs
130-
nlocalsplus
131130
nkwelts
131+
nlocalsplus
132132
Nondescriptor
133133
noninteger
134134
nops
@@ -160,6 +160,7 @@ pylifecycle
160160
pymain
161161
pyrepl
162162
PYTHONTRACEMALLOC
163+
PYTHONUTF8
163164
pythonw
164165
PYTHREAD_NAME
165166
releasebuffer
@@ -171,9 +172,11 @@ saveall
171172
scls
172173
setdict
173174
setfunc
175+
setprofileallthreads
174176
SETREF
175177
setresult
176178
setslice
179+
settraceallthreads
177180
SLOTDEFINED
178181
SMALLBUF
179182
SOABI
@@ -190,6 +193,7 @@ subparams
190193
subscr
191194
sval
192195
swappedbytes
196+
sysdict
193197
templatelib
194198
testconsole
195199
ticketer

.cspell.json

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,6 @@
152152
"IFEXEC",
153153
// "stat"
154154
"FIRMLINK",
155-
// CPython internal names
156-
"PYTHONUTF",
157-
"sysdict",
158-
"settraceallthreads",
159-
"setprofileallthreads"
160155
],
161156
// flagWords - list of words to be always considered incorrect
162157
"flagWords": [

Lib/test/test_fork1.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020

2121
class ForkTest(ForkWait):
22-
@unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: process 44587 exited with code 1, but exit code 42 is expected
2322
def test_threaded_import_lock_fork(self):
2423
"""Check fork() in main thread works while a subthread is doing an import"""
2524
import_started = threading.Event()

Lib/test/test_os.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5574,7 +5574,6 @@ def test_fork_warns_when_non_python_thread_exists(self):
55745574
self.assertEqual(err.decode("utf-8"), "")
55755575
self.assertEqual(out.decode("utf-8"), "")
55765576

5577-
@unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: b"can't fork at interpreter shutdown" not found in b"Exception ignored in: <function AtFinalization.__del__ at 0xc508b30c0>\nAttributeError: 'NoneType' object has no attribute 'fork'\n"
55785577
def test_fork_at_finalization(self):
55795578
code = """if 1:
55805579
import atexit

crates/common/src/lock.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -68,43 +68,45 @@ pub type PyMappedRwLockWriteGuard<'a, T> = MappedRwLockWriteGuard<'a, RawRwLock,
6868

6969
// can add fn const_{mutex,rw_lock}() if necessary, but we probably won't need to
7070

71-
/// Reset a `PyMutex` to its initial (unlocked) state after `fork()`.
71+
/// Reset a lock to its initial (unlocked) state by zeroing its bytes.
7272
///
73-
/// After `fork()`, locks held by dead parent threads would deadlock in the
74-
/// child. This writes `RawMutex::INIT` via the `Mutex::raw()` accessor,
75-
/// bypassing the normal unlock path which may interact with parking_lot's
76-
/// internal waiter queues.
73+
/// After `fork()`, any lock held by a now-dead thread would remain
74+
/// permanently locked. We zero the raw bytes (the unlocked state for all
75+
/// `parking_lot` raw lock types) instead of using the normal unlock path,
76+
/// which would interact with stale waiter queues.
7777
///
7878
/// # Safety
7979
///
8080
/// Must only be called from the single-threaded child process immediately
8181
/// after `fork()`, before any other thread is created.
82-
#[cfg(unix)]
83-
pub unsafe fn reinit_mutex_after_fork<T: ?Sized>(mutex: &PyMutex<T>) {
84-
// Use Mutex::raw() to access the underlying lock without layout assumptions.
85-
// parking_lot::RawMutex (AtomicU8) and RawCellMutex (Cell<bool>) both
86-
// represent the unlocked state as all-zero bytes.
82+
/// The type `T` must represent the unlocked state as all-zero bytes
83+
/// (true for `parking_lot::RawMutex`, `RawRwLock`, `RawReentrantMutex`, etc.).
84+
pub unsafe fn zero_reinit_after_fork<T>(lock: *const T) {
8785
unsafe {
88-
let raw = mutex.raw() as *const RawMutex as *mut u8;
89-
core::ptr::write_bytes(raw, 0, core::mem::size_of::<RawMutex>());
86+
core::ptr::write_bytes(lock as *mut u8, 0, core::mem::size_of::<T>());
9087
}
9188
}
9289

93-
/// Reset a `PyRwLock` to its initial (unlocked) state after `fork()`.
90+
/// Reset a `PyMutex` after `fork()`. See [`zero_reinit_after_fork`].
91+
///
92+
/// # Safety
9493
///
95-
/// Same rationale as [`reinit_mutex_after_fork`] — dead threads' read or
96-
/// write locks would cause permanent deadlock in the child.
94+
/// Must only be called from the single-threaded child process immediately
95+
/// after `fork()`, before any other thread is created.
96+
#[cfg(unix)]
97+
pub unsafe fn reinit_mutex_after_fork<T: ?Sized>(mutex: &PyMutex<T>) {
98+
unsafe { zero_reinit_after_fork(mutex.raw()) }
99+
}
100+
101+
/// Reset a `PyRwLock` after `fork()`. See [`zero_reinit_after_fork`].
97102
///
98103
/// # Safety
99104
///
100105
/// Must only be called from the single-threaded child process immediately
101106
/// after `fork()`, before any other thread is created.
102107
#[cfg(unix)]
103108
pub unsafe fn reinit_rwlock_after_fork<T: ?Sized>(rwlock: &PyRwLock<T>) {
104-
unsafe {
105-
let raw = rwlock.raw() as *const RawRwLock as *mut u8;
106-
core::ptr::write_bytes(raw, 0, core::mem::size_of::<RawRwLock>());
107-
}
109+
unsafe { zero_reinit_after_fork(rwlock.raw()) }
108110
}
109111

110112
/// Reset a `PyThreadMutex` to its initial (unlocked, unowned) state after `fork()`.

crates/compiler-core/src/bytecode.rs

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use core::{
1212
cell::UnsafeCell,
1313
hash, mem,
1414
ops::Deref,
15-
sync::atomic::{AtomicU8, AtomicU16, AtomicUsize, Ordering},
15+
sync::atomic::{AtomicU8, AtomicU16, AtomicU32, AtomicUsize, Ordering},
1616
};
1717
use itertools::Itertools;
1818
use malachite_bigint::BigInt;
@@ -415,6 +415,9 @@ pub struct CodeUnits {
415415
/// Single atomic load/store prevents torn reads when multiple threads
416416
/// specialize the same instruction concurrently.
417417
pointer_cache: Box<[AtomicUsize]>,
418+
/// SeqLock counter per instruction cache base for descriptor payload writes.
419+
/// odd = write in progress, even = quiescent.
420+
descriptor_sequences: Box<[AtomicU32]>,
418421
}
419422

420423
// SAFETY: All cache operations use atomic read/write instructions.
@@ -441,10 +444,16 @@ impl Clone for CodeUnits {
441444
.iter()
442445
.map(|c| AtomicUsize::new(c.load(Ordering::Relaxed)))
443446
.collect();
447+
let descriptor_sequences = self
448+
.descriptor_sequences
449+
.iter()
450+
.map(|c| AtomicU32::new(c.load(Ordering::Relaxed)))
451+
.collect();
444452
Self {
445453
units: UnsafeCell::new(units),
446454
adaptive_counters,
447455
pointer_cache,
456+
descriptor_sequences,
448457
}
449458
}
450459
}
@@ -491,10 +500,15 @@ impl From<Vec<CodeUnit>> for CodeUnits {
491500
.map(|_| AtomicUsize::new(0))
492501
.collect::<Vec<_>>()
493502
.into_boxed_slice();
503+
let descriptor_sequences = (0..len)
504+
.map(|_| AtomicU32::new(0))
505+
.collect::<Vec<_>>()
506+
.into_boxed_slice();
494507
Self {
495508
units: UnsafeCell::new(units),
496509
adaptive_counters,
497510
pointer_cache,
511+
descriptor_sequences,
498512
}
499513
}
500514
}
@@ -641,6 +655,38 @@ impl CodeUnits {
641655
self.pointer_cache[index].load(Ordering::Relaxed)
642656
}
643657

658+
#[inline]
659+
pub fn begin_descriptor_write(&self, index: usize) {
660+
let sequence = &self.descriptor_sequences[index];
661+
let mut seq = sequence.load(Ordering::Acquire);
662+
loop {
663+
while (seq & 1) != 0 {
664+
core::hint::spin_loop();
665+
seq = sequence.load(Ordering::Acquire);
666+
}
667+
match sequence.compare_exchange_weak(
668+
seq,
669+
seq.wrapping_add(1),
670+
Ordering::AcqRel,
671+
Ordering::Acquire,
672+
) {
673+
Ok(_) => {
674+
core::sync::atomic::fence(Ordering::Release);
675+
break;
676+
}
677+
Err(observed) => {
678+
core::hint::spin_loop();
679+
seq = observed;
680+
}
681+
}
682+
}
683+
}
684+
685+
#[inline]
686+
pub fn end_descriptor_write(&self, index: usize) {
687+
self.descriptor_sequences[index].fetch_add(1, Ordering::Release);
688+
}
689+
644690
/// Read adaptive counter bits for instruction at `index`.
645691
/// Uses Relaxed atomic load.
646692
pub fn read_adaptive_counter(&self, index: usize) -> u16 {

crates/stdlib/src/socket.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,7 +1105,9 @@ mod _socket {
11051105
loop {
11061106
if deadline.is_some() || matches!(select, SelectKind::Connect) {
11071107
let interval = deadline.as_ref().map(|d| d.time_until()).transpose()?;
1108-
let res = sock_select(&*self.sock()?, select, interval);
1108+
let sock = self.sock()?;
1109+
let res =
1110+
vm.allow_threads(|| sock_select(&*sock, select, interval));
11091111
match res {
11101112
Ok(true) => return Err(IoOrPyException::Timeout),
11111113
Err(e) if e.kind() == io::ErrorKind::Interrupted => {
@@ -1118,8 +1120,9 @@ mod _socket {
11181120
}
11191121

11201122
let err = loop {
1121-
// loop on interrupt
1122-
match f() {
1123+
// Detach thread state around the blocking syscall so
1124+
// stop-the-world can park this thread (e.g. before fork).
1125+
match vm.allow_threads(&mut f) {
11231126
Ok(x) => return Ok(x),
11241127
Err(e) if e.kind() == io::ErrorKind::Interrupted => vm.check_signals()?,
11251128
Err(e) => break e,
@@ -1342,7 +1345,8 @@ mod _socket {
13421345
) -> Result<(), IoOrPyException> {
13431346
let sock_addr = self.extract_address(address, caller, vm)?;
13441347

1345-
let err = match self.sock()?.connect(&sock_addr) {
1348+
let sock = self.sock()?;
1349+
let err = match vm.allow_threads(|| sock.connect(&sock_addr)) {
13461350
Ok(()) => return Ok(()),
13471351
Err(e) => e,
13481352
};

crates/vm/src/frame.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7016,6 +7016,14 @@ impl ExecutingFrame<'_> {
70167016
Ok(None)
70177017
}
70187018

7019+
/// Read a cached descriptor pointer and validate it against the expected
7020+
/// type version, using a lock-free double-check pattern:
7021+
/// 1. read pointer → incref (try_to_owned)
7022+
/// 2. re-read version + pointer and confirm they still match
7023+
///
7024+
/// This matches the read-side pattern used in LOAD_ATTR_METHOD_WITH_VALUES
7025+
/// and friends: no read-side lock, relying on the write side to invalidate
7026+
/// the version tag before swapping the pointer.
70197027
#[inline]
70207028
fn try_read_cached_descriptor(
70217029
&self,
@@ -7026,7 +7034,12 @@ impl ExecutingFrame<'_> {
70267034
if descr_ptr == 0 {
70277035
return None;
70287036
}
7037+
// SAFETY: `descr_ptr` was a valid `*mut PyObject` when the writer
7038+
// stored it, and the writer keeps a strong reference alive in
7039+
// `InlineCacheEntry`. `try_to_owned_from_ptr` performs a
7040+
// conditional incref that fails if the object is already freed.
70297041
let cloned = unsafe { PyObject::try_to_owned_from_ptr(descr_ptr as *mut PyObject) }?;
7042+
// Double-check: version tag still matches AND pointer unchanged.
70307043
if self.code.instructions.read_cache_u32(cache_base + 1) == expected_type_version
70317044
&& self.code.instructions.read_cache_ptr(cache_base + 5) == descr_ptr
70327045
{
@@ -7046,6 +7059,7 @@ impl ExecutingFrame<'_> {
70467059
) {
70477060
// Publish descriptor cache atomically as a tuple:
70487061
// invalidate version first, then write payload, then publish version.
7062+
self.code.instructions.begin_descriptor_write(cache_base);
70497063
unsafe {
70507064
self.code.instructions.write_cache_u32(cache_base + 1, 0);
70517065
self.code
@@ -7055,6 +7069,7 @@ impl ExecutingFrame<'_> {
70557069
.instructions
70567070
.write_cache_u32(cache_base + 1, type_version);
70577071
}
7072+
self.code.instructions.end_descriptor_write(cache_base);
70587073
}
70597074

70607075
#[inline]
@@ -7066,6 +7081,7 @@ impl ExecutingFrame<'_> {
70667081
descr_ptr: usize,
70677082
) {
70687083
// Same publish protocol as write_cached_descriptor(), plus metaclass guard.
7084+
self.code.instructions.begin_descriptor_write(cache_base);
70697085
unsafe {
70707086
self.code.instructions.write_cache_u32(cache_base + 1, 0);
70717087
self.code
@@ -7078,6 +7094,7 @@ impl ExecutingFrame<'_> {
70787094
.instructions
70797095
.write_cache_u32(cache_base + 1, type_version);
70807096
}
7097+
self.code.instructions.end_descriptor_write(cache_base);
70817098
}
70827099

70837100
fn load_attr(&mut self, vm: &VirtualMachine, oparg: LoadAttr) -> FrameResult {

crates/vm/src/object/core.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use super::{
1717
};
1818
use crate::object::traverse_object::PyObjVTable;
1919
use crate::{
20-
builtins::{PyDict, PyDictRef, PyType, PyTypeRef},
20+
builtins::{PyDictRef, PyType, PyTypeRef},
2121
common::{
2222
atomic::{Ordering, PyAtomic, Radium},
2323
linked_list::{Link, Pointers},
@@ -916,6 +916,12 @@ impl InstanceDict {
916916
pub fn replace(&self, d: PyDictRef) -> PyDictRef {
917917
core::mem::replace(&mut self.d.write(), d)
918918
}
919+
920+
/// Consume the InstanceDict and return the inner PyDictRef.
921+
#[inline]
922+
pub fn into_inner(self) -> PyDictRef {
923+
self.d.into_inner()
924+
}
919925
}
920926

921927
impl<T: PyPayload> PyInner<T> {
@@ -1668,11 +1674,19 @@ impl PyObject {
16681674
}
16691675

16701676
// 2. Clear dict and member slots (subtype_clear)
1671-
if let Some(ext) = obj.0.ext_ref() {
1672-
if let Some(dict) = ext.dict.as_ref() {
1673-
let dict_ref = dict.get();
1674-
// Clear dict entries to break cycles, then collect the dict itself
1675-
PyDict::clear(&dict_ref);
1677+
// Use mutable access to actually detach the dict, matching CPython's
1678+
// Py_CLEAR(*_PyObject_GetDictPtr(self)) which NULLs the dict pointer
1679+
// without clearing dict contents. This is critical because the dict
1680+
// may still be referenced by other live objects (e.g. function.__globals__).
1681+
if obj.0.has_ext() {
1682+
let self_addr = (ptr as *const u8).addr();
1683+
let ext_ptr = core::ptr::with_exposed_provenance_mut::<ObjExt>(
1684+
self_addr.wrapping_sub(EXT_OFFSET),
1685+
);
1686+
let ext = unsafe { &mut *ext_ptr };
1687+
if let Some(old_dict) = ext.dict.take() {
1688+
// Get the dict ref before dropping InstanceDict
1689+
let dict_ref = old_dict.into_inner();
16761690
result.push(dict_ref.into());
16771691
}
16781692
for slot in ext.slots.iter() {

0 commit comments

Comments
 (0)