a
    ì<+bd  ã                   @   s†   d dl mZ d dlZd dlZd dlZd dlZd dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ dd	„ ZG d
d„ dejƒZdS )é    )ÚStringION)ÚBlockingInProcessKernelClient)ÚInProcessKernelManager)ÚInProcessKernel)Úassemble_output)Úcapture_outputc                  C   sp   t j d¡rlt jdkrltjdk rlddl} zddlm}m} W n tyN   Y n0 t	|  
¡ ƒ|u rl|  |ƒ ¡ dS )a  set default asyncio policy to be compatible with tornado

    Tornado 6 (at least) is not compatible with the default
    asyncio implementation on Windows

    Pick the older SelectorEventLoopPolicy on Windows
    if the known-incompatible default policy is in use.

    do this as early as possible to make it a low priority and overrideable

    ref: https://github.com/tornadoweb/tornado/issues/2608

    FIXME: if/when tornado supports the defaults in asyncio,
           remove and bump tornado requirement for py38
    Úwin)é   é   )é   é   r   N)ÚWindowsProactorEventLoopPolicyÚWindowsSelectorEventLoopPolicy)ÚsysÚplatformÚ
startswithÚversion_infoÚtornadoÚasyncior   r   ÚImportErrorÚtypeZget_event_loop_policyZset_event_loop_policy)r   r   r   © r   úDlib/python3.9/site-packages/ipykernel/inprocess/tests/test_kernel.pyÚ_init_asyncio_patch   s     r   c                   @   s`   e Zd Zdd„ Zdd„ Zdd„ Zejjde	j
v dd	d
d„ ƒZejjdd	dd„ ƒZdd„ ZdS )ÚInProcessKernelTestCasec                 C   s<   t ƒ  tƒ | _| j ¡  | j ¡ | _| j ¡  | j ¡  d S ©N)r   r   ÚkmZstart_kernelZclientÚkcZstart_channelsZwait_for_ready)Úselfr   r   r   ÚsetUp7   s    

zInProcessKernelTestCase.setUpc                 C   s<   t jddd}| j}| d¡ t|jƒ\}}|  d|¡ dS )z*Does %pylab work in the in-process kernel?Ú
matplotlibzThis test requires matplotlib©Úreasonz%pylabN)ÚpytestZimportorskipr   Úexecuter   Úget_iopub_msgZassertIn)r   r    r   ÚoutÚerrr   r   r   Ú
test_pylab?   s
    
z"InProcessKernelTestCase.test_pylabc                 C   sR   t dƒ}tj}|t_z| j d¡ W |t_n|t_0 | jjjj 	d¡dksNJ ‚dS )z@ Does the in-process kernel handle raw_input correctly?
        zfoobar
zx = input()ÚxZfoobarN)
r   r   Ústdinr   r$   r   ÚkernelÚshellZuser_nsÚget)r   ÚioZ	sys_stdinr   r   r   Útest_raw_inputG   s    z&InProcessKernelTestCase.test_raw_inputZ__pypy__zfails on pypyr!   c                 C   sˆ   t ƒ }tƒ }|j d¡ W d  ƒ n1 s.0    Y  |jdksFJ ‚t||jd}|j |¡ | 	d¡ t
|jƒ\}}|dks„J ‚dS )z: Does the in-process kernel correctly capture IO?
        úprint("foo")Núfoo
©r+   Úsessionzprint("bar")zbar
)r   r   r,   Úrun_cellÚstdoutr   r3   Ú	frontendsÚappendr$   r   r%   ©r   r+   r.   r   r&   r'   r   r   r   Útest_stdoutS   s    *
z#InProcessKernelTestCase.test_stdoutzDCurrently don't capture during test as pytest does its own capturingc                 C   s’   t ƒ }tƒ }|j d¡ W d  ƒ n1 s.0    Y  |jdksFJ ‚t||jd}|j |¡ | 	d¡ | 	d¡ t
|jƒ\}}|dksŽJ ‚dS )zDoes correctly capture fdr0   Nr1   r2   z	import oszos.system("echo capfd")zcapfd
)r   r   r,   r4   r5   r   r3   r6   r7   r$   r   Ziopub_channelr8   r   r   r   Ú
test_capfdf   s    *

z"InProcessKernelTestCase.test_capfdc                 C   s&   t ƒ }d|_dd„ |_|jdd dS )z5Tests that kernel getpass accept the stream parameterTc                  _   s   d S r   r   )ÚargsÚkwargsr   r   r   Ú<lambda>|   ó    z=InProcessKernelTestCase.test_getpass_stream.<locals>.<lambda>z	non empty)ÚstreamN)r   Z_allow_stdinZ_input_requestZgetpass)r   r+   r   r   r   Útest_getpass_streamx   s    
z+InProcessKernelTestCase.test_getpass_streamN)Ú__name__Ú
__module__Ú__qualname__r   r(   r/   r#   ZmarkZskipifr   Úbuiltin_module_namesr9   Úskipr:   r@   r   r   r   r   r   5   s   þ
ÿ
r   )r.   r   r   Zunittestr#   r   Zipykernel.inprocess.blockingr   Zipykernel.inprocess.managerr   Zipykernel.inprocess.ipkernelr   Zipykernel.tests.utilsr   ZIPython.utils.ior   r   ZTestCaser   r   r   r   r   Ú<module>   s   !