Skip to content

Commit c55a9ff

Browse files
Reinit IO buffer locks after fork to prevent deadlocks (#7339)
* Reinit IO buffer locks after fork to prevent deadlocks BufferedReader/Writer/TextIOWrapper use PyThreadMutex internally. If a parent thread held one of these locks during fork(), the child would deadlock on any IO operation. Add reinit_after_fork() to RawThreadMutex and call it on sys.stdin/ stdout/stderr in the child process fork handler, analogous to CPython's _PyIO_Reinit(). * Address review: unsafe fn + decoder lock reinit - Mark reinit_std_streams_after_fork as unsafe fn to encode fork-only precondition, update call site in posix.rs - Reinit IncrementalNewlineDecoder's PyThreadMutex via TextIOWrapper's decoder field to prevent child deadlocks * Auto-format: cargo fmt --all --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 3504993 commit c55a9ff

File tree

4 files changed

+102
-0
lines changed

4 files changed

+102
-0
lines changed

crates/common/src/lock.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,3 +96,18 @@ pub unsafe fn reinit_rwlock_after_fork<T: ?Sized>(rwlock: &PyRwLock<T>) {
9696
core::ptr::write_bytes(raw, 0, core::mem::size_of::<RawRwLock>());
9797
}
9898
}
99+
100+
/// Reset a `PyThreadMutex` to its initial (unlocked, unowned) state after `fork()`.
101+
///
102+
/// `PyThreadMutex` is used by buffered IO objects (`BufferedReader`,
103+
/// `BufferedWriter`, `TextIOWrapper`). If a dead parent thread held one of
104+
/// these locks during `fork()`, the child would deadlock on any IO operation.
105+
///
106+
/// # Safety
107+
///
108+
/// Must only be called from the single-threaded child process immediately
109+
/// after `fork()`, before any other thread is created.
110+
#[cfg(unix)]
111+
pub unsafe fn reinit_thread_mutex_after_fork<T: ?Sized>(mutex: &PyThreadMutex<T>) {
112+
unsafe { mutex.raw().reinit_after_fork() }
113+
}

crates/common/src/lock/thread_mutex.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,23 @@ impl<R: RawMutex, G: GetThreadId> RawThreadMutex<R, G> {
7272
}
7373
}
7474

75+
impl<R: RawMutex, G: GetThreadId> RawThreadMutex<R, G> {
76+
/// Reset this mutex to its initial (unlocked, unowned) state after `fork()`.
77+
///
78+
/// # Safety
79+
///
80+
/// Must only be called from the single-threaded child process immediately
81+
/// after `fork()`, before any other thread is created.
82+
#[cfg(unix)]
83+
pub unsafe fn reinit_after_fork(&self) {
84+
self.owner.store(0, Ordering::Relaxed);
85+
unsafe {
86+
let mutex_ptr = &self.mutex as *const R as *mut u8;
87+
core::ptr::write_bytes(mutex_ptr, 0, core::mem::size_of::<R>());
88+
}
89+
}
90+
}
91+
7592
unsafe impl<R: RawMutex + Send, G: GetThreadId + Send> Send for RawThreadMutex<R, G> {}
7693
unsafe impl<R: RawMutex + Sync, G: GetThreadId + Sync> Sync for RawThreadMutex<R, G> {}
7794

@@ -103,6 +120,11 @@ impl<R: RawMutex, G: GetThreadId, T> From<T> for ThreadMutex<R, G, T> {
103120
}
104121
}
105122
impl<R: RawMutex, G: GetThreadId, T: ?Sized> ThreadMutex<R, G, T> {
123+
/// Access the underlying raw thread mutex.
124+
pub fn raw(&self) -> &RawThreadMutex<R, G> {
125+
&self.raw
126+
}
127+
106128
pub fn lock(&self) -> Option<ThreadMutexGuard<'_, R, G, T>> {
107129
if self.raw.lock() {
108130
Some(ThreadMutexGuard {

crates/vm/src/stdlib/io.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
* I/O core tools.
33
*/
44
pub(crate) use _io::module_def;
5+
#[cfg(all(unix, feature = "threading"))]
6+
pub(crate) use _io::reinit_std_streams_after_fork;
57

68
cfg_if::cfg_if! {
79
if #[cfg(any(not(target_arch = "wasm32"), target_os = "wasi"))] {
@@ -4985,6 +4987,61 @@ mod _io {
49854987
}
49864988
}
49874989

4990+
/// Reinit per-object IO buffer locks on std streams after `fork()`.
4991+
///
4992+
/// # Safety
4993+
///
4994+
/// Must only be called from the single-threaded child process immediately
4995+
/// after `fork()`, before any other thread is created.
4996+
#[cfg(all(unix, feature = "threading"))]
4997+
pub unsafe fn reinit_std_streams_after_fork(vm: &VirtualMachine) {
4998+
for name in ["stdin", "stdout", "stderr"] {
4999+
let Ok(stream) = vm.sys_module.get_attr(name, vm) else {
5000+
continue;
5001+
};
5002+
reinit_io_locks(&stream);
5003+
}
5004+
}
5005+
5006+
#[cfg(all(unix, feature = "threading"))]
5007+
fn reinit_io_locks(obj: &PyObject) {
5008+
use crate::common::lock::reinit_thread_mutex_after_fork;
5009+
5010+
if let Some(tio) = obj.downcast_ref::<TextIOWrapper>() {
5011+
unsafe { reinit_thread_mutex_after_fork(&tio.data) };
5012+
if let Some(guard) = tio.data.lock() {
5013+
if let Some(ref data) = *guard {
5014+
if let Some(ref decoder) = data.decoder {
5015+
reinit_io_locks(decoder);
5016+
}
5017+
reinit_io_locks(&data.buffer);
5018+
}
5019+
}
5020+
return;
5021+
}
5022+
if let Some(nl) = obj.downcast_ref::<IncrementalNewlineDecoder>() {
5023+
unsafe { reinit_thread_mutex_after_fork(&nl.data) };
5024+
return;
5025+
}
5026+
if let Some(br) = obj.downcast_ref::<BufferedReader>() {
5027+
unsafe { reinit_thread_mutex_after_fork(&br.data) };
5028+
return;
5029+
}
5030+
if let Some(bw) = obj.downcast_ref::<BufferedWriter>() {
5031+
unsafe { reinit_thread_mutex_after_fork(&bw.data) };
5032+
return;
5033+
}
5034+
if let Some(brw) = obj.downcast_ref::<BufferedRandom>() {
5035+
unsafe { reinit_thread_mutex_after_fork(&brw.data) };
5036+
return;
5037+
}
5038+
if let Some(brw) = obj.downcast_ref::<BufferedRWPair>() {
5039+
unsafe { reinit_thread_mutex_after_fork(&brw.read.data) };
5040+
unsafe { reinit_thread_mutex_after_fork(&brw.write.data) };
5041+
return;
5042+
}
5043+
}
5044+
49885045
pub fn io_open(
49895046
file: PyObjectRef,
49905047
mode: Option<&str>,

crates/vm/src/stdlib/posix.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,14 @@ pub mod module {
719719
#[cfg(feature = "threading")]
720720
reinit_locks_after_fork(vm);
721721

722+
// Reinit per-object IO buffer locks on std streams.
723+
// BufferedReader/Writer/TextIOWrapper use PyThreadMutex which can be
724+
// held by dead parent threads, causing deadlocks on any IO in the child.
725+
#[cfg(feature = "threading")]
726+
unsafe {
727+
crate::stdlib::io::reinit_std_streams_after_fork(vm)
728+
};
729+
722730
// Phase 2: Reset low-level atomic state (no locks needed).
723731
crate::signal::clear_after_fork();
724732
crate::stdlib::signal::_signal::clear_wakeup_fd_after_fork();

0 commit comments

Comments
 (0)