Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@
.azure-pipelines/ @AA-Turner

# GitHub & related scripts
.github/ @ezio-melotti @hugovk @AA-Turner
Tools/build/compute-changes.py @AA-Turner
.github/ @ezio-melotti @hugovk @AA-Turner @webknjaz
Tools/build/compute-changes.py @AA-Turner @hugovk @webknjaz
Tools/build/verify_ensurepip_wheels.py @AA-Turner @pfmoore @pradyunsg

# Pre-commit
Expand Down
18 changes: 14 additions & 4 deletions Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1234,22 +1234,32 @@ Miscellaneous
.. versionchanged:: 3.11
Accepts a :term:`path-like object`.

.. function:: set_forkserver_preload(module_names)
.. function:: set_forkserver_preload(module_names, *, on_error='ignore')

Set a list of module names for the forkserver main process to attempt to
import so that their already imported state is inherited by forked
processes. Any :exc:`ImportError` when doing so is silently ignored.
This can be used as a performance enhancement to avoid repeated work
in every process.
processes. This can be used as a performance enhancement to avoid repeated
work in every process.

For this to work, it must be called before the forkserver process has been
launched (before creating a :class:`Pool` or starting a :class:`Process`).

The *on_error* parameter controls how :exc:`ImportError` exceptions during
module preloading are handled: ``"ignore"`` (default) silently ignores
failures, ``"warn"`` causes the forkserver subprocess to emit an
:exc:`ImportWarning` to stderr, and ``"fail"`` causes the forkserver
subprocess to exit with the exception traceback on stderr, making
subsequent process creation fail with :exc:`EOFError` or
:exc:`ConnectionError`.

Only meaningful when using the ``'forkserver'`` start method.
See :ref:`multiprocessing-start-methods`.

.. versionadded:: 3.4

.. versionchanged:: next
Added the *on_error* parameter.

.. function:: set_start_method(method, force=False)

Set the method which should be used to start child processes.
Expand Down
2 changes: 1 addition & 1 deletion Include/internal/pycore_opcode_metadata.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Include/internal/pycore_uop_metadata.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions Lib/multiprocessing/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,15 @@ def set_executable(self, executable):
from .spawn import set_executable
set_executable(executable)

def set_forkserver_preload(self, module_names):
def set_forkserver_preload(self, module_names, *, on_error='ignore'):
'''Set list of module names to try to load in forkserver process.
This is really just a hint.
The on_error parameter controls how import failures are handled:
"ignore" (default) silently ignores failures, "warn" emits warnings,
and "fail" raises exceptions breaking the forkserver context.
'''
from .forkserver import set_forkserver_preload
set_forkserver_preload(module_names)
set_forkserver_preload(module_names, on_error=on_error)

def get_context(self, method=None):
if method is None:
Expand Down
110 changes: 87 additions & 23 deletions Lib/multiprocessing/forkserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(self):
self._inherited_fds = None
self._lock = threading.Lock()
self._preload_modules = ['__main__']
self._preload_on_error = 'ignore'

def _stop(self):
# Method used by unit tests to stop the server
Expand All @@ -64,11 +65,22 @@ def _stop_unlocked(self):
self._forkserver_address = None
self._forkserver_authkey = None

def set_forkserver_preload(self, modules_names):
'''Set list of module names to try to load in forkserver process.'''
def set_forkserver_preload(self, modules_names, *, on_error='ignore'):
'''Set list of module names to try to load in forkserver process.
The on_error parameter controls how import failures are handled:
"ignore" (default) silently ignores failures, "warn" emits warnings,
and "fail" raises exceptions breaking the forkserver context.
'''
if not all(type(mod) is str for mod in modules_names):
raise TypeError('module_names must be a list of strings')
if on_error not in ('ignore', 'warn', 'fail'):
raise ValueError(
f"on_error must be 'ignore', 'warn', or 'fail', "
f"not {on_error!r}"
)
self._preload_modules = modules_names
self._preload_on_error = on_error

def get_inherited_fds(self):
'''Return list of fds inherited from parent process.
Expand Down Expand Up @@ -107,6 +119,14 @@ def connect_to_new_process(self, fds):
wrapped_client, self._forkserver_authkey)
connection.deliver_challenge(
wrapped_client, self._forkserver_authkey)
except (EOFError, ConnectionError, BrokenPipeError) as exc:
if (self._preload_modules and
self._preload_on_error == 'fail'):
exc.add_note(
"Forkserver process may have crashed during module "
"preloading. Check stderr."
)
raise
finally:
wrapped_client._detach()
del wrapped_client
Expand Down Expand Up @@ -154,6 +174,8 @@ def ensure_running(self):
main_kws['main_path'] = data['init_main_from_path']
if 'sys_argv' in data:
main_kws['sys_argv'] = data['sys_argv']
if self._preload_on_error != 'ignore':
main_kws['on_error'] = self._preload_on_error

with socket.socket(socket.AF_UNIX) as listener:
address = connection.arbitrary_address('AF_UNIX')
Expand Down Expand Up @@ -198,8 +220,69 @@ def ensure_running(self):
#
#

def _handle_import_error(on_error, modinfo, exc, *, warn_stacklevel):
"""Handle an import error according to the on_error policy."""
match on_error:
case 'fail':
raise
case 'warn':
warnings.warn(
f"Failed to preload {modinfo}: {exc}",
ImportWarning,
stacklevel=warn_stacklevel + 1
)
case 'ignore':
pass


def _handle_preload(preload, main_path=None, sys_path=None, sys_argv=None,
on_error='ignore'):
"""Handle module preloading with configurable error handling.
Args:
preload: List of module names to preload.
main_path: Path to __main__ module if '__main__' is in preload.
sys_path: sys.path to use for imports (None means use current).
sys_argv: sys.argv to use (None means use current).
on_error: How to handle import errors ("ignore", "warn", or "fail").
"""
if not preload:
return

if sys_argv is not None:
sys.argv[:] = sys_argv
if sys_path is not None:
sys.path[:] = sys_path

if '__main__' in preload and main_path is not None:
process.current_process()._inheriting = True
try:
spawn.import_main_path(main_path)
except Exception as e:
# Catch broad Exception because import_main_path() uses
# runpy.run_path() which executes the script and can raise
# any exception, not just ImportError
_handle_import_error(
on_error, f"__main__ from {main_path!r}", e, warn_stacklevel=2
)
finally:
del process.current_process()._inheriting

for modname in preload:
try:
__import__(modname)
except ImportError as e:
_handle_import_error(
on_error, f"module {modname!r}", e, warn_stacklevel=2
)

# gh-135335: flush stdout/stderr in case any of the preloaded modules
# wrote to them, otherwise children might inherit buffered data
util._flush_std_streams()


def main(listener_fd, alive_r, preload, main_path=None, sys_path=None,
*, sys_argv=None, authkey_r=None):
*, sys_argv=None, authkey_r=None, on_error='ignore'):
"""Run forkserver."""
if authkey_r is not None:
try:
Expand All @@ -210,26 +293,7 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None,
else:
authkey = b''

if preload:
if sys_argv is not None:
sys.argv[:] = sys_argv
if sys_path is not None:
sys.path[:] = sys_path
if '__main__' in preload and main_path is not None:
process.current_process()._inheriting = True
try:
spawn.import_main_path(main_path)
finally:
del process.current_process()._inheriting
for modname in preload:
try:
__import__(modname)
except ImportError:
pass

# gh-135335: flush stdout/stderr in case any of the preloaded modules
# wrote to them, otherwise children might inherit buffered data
util._flush_std_streams()
_handle_preload(preload, main_path, sys_path, sys_argv, on_error)

util._close_stdin()

Expand Down
3 changes: 3 additions & 0 deletions Lib/test/test_multiprocessing_forkserver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,8 @@
if sys.platform == "win32":
raise unittest.SkipTest("forkserver is not available on Windows")

if not support.has_fork_support:
raise unittest.SkipTest("requires working os.fork()")

def load_tests(*args):
return support.load_package_tests(os.path.dirname(__file__), *args)
Loading
Loading