import os import platform import sys import time from functools import partial import pytest from anyio import CancelScope, create_task_group, fail_after, to_process, wait_all_tasks_blocked pytestmark = pytest.mark.anyio @pytest.fixture(autouse=True) def check_compatibility(anyio_backend_name: str) -> None: if anyio_backend_name == 'asyncio': if platform.system() == 'Windows' and sys.version_info < (3, 8): pytest.skip('Python < 3.8 uses SelectorEventLoop by default and it does not support ' 'subprocesses') async def test_run_sync_in_process_pool() -> None: """ Test that the function runs in a different process, and the same process in both calls. """ worker_pid = await to_process.run_sync(os.getpid) assert worker_pid != os.getpid() assert await to_process.run_sync(os.getpid) == worker_pid async def test_identical_sys_path() -> None: """Test that partial() can be used to pass keyword arguments.""" assert await to_process.run_sync(eval, 'sys.path') == sys.path async def test_partial() -> None: """Test that partial() can be used to pass keyword arguments.""" assert await to_process.run_sync(partial(sorted, reverse=True), ['a', 'b']) == ['b', 'a'] async def test_exception() -> None: """Test that exceptions are delivered properly.""" with pytest.raises(ValueError, match='invalid literal for int'): assert await to_process.run_sync(int, 'a') async def test_print() -> None: """Test that print() won't interfere with parent-worker communication.""" worker_pid = await to_process.run_sync(os.getpid) await to_process.run_sync(print, 'hello') await to_process.run_sync(print, 'world') assert await to_process.run_sync(os.getpid) == worker_pid async def test_cancel_before() -> None: """ Test that starting to_process.run_sync() in a cancelled scope does not cause a worker process to be reserved. """ with CancelScope() as scope: scope.cancel() await to_process.run_sync(os.getpid) pytest.raises(LookupError, to_process._process_pool_workers.get) async def test_cancel_during() -> None: """ Test that cancelling an operation on the worker process causes the process to be killed. """ worker_pid = await to_process.run_sync(os.getpid) with fail_after(4): async with create_task_group() as tg: tg.start_soon(partial(to_process.run_sync, cancellable=True), time.sleep, 5) await wait_all_tasks_blocked() tg.cancel_scope.cancel() # The previous worker was killed so we should get a new one now assert await to_process.run_sync(os.getpid) != worker_pid