a
    4_b[&                     @   sV  d dl mZmZ d dlZd dlT d dlZd dlZd dlmZ d dl	m
Z
 d dlZd dlZd dlZd dlZd dlZd dlZd dlmZ d dlZd dlZejd  dkrejd d	krejd
ree  dZdejd< dZG dd deZG dd dejjZG dd deZ G dd deZ!G dd deZ"G dd deZ#e$dkrRe%  dS )    )absolute_importprint_functionN)*)	HTTPError)IOLoop)platform         wing      ?Z20ZASYNC_TEST_TIMEOUTc                   @   s\   e Zd ZdZdZdd Zdd ZefddZd	d
 Z	efddZ
dd Zdd Zdd ZdS )TestTermClientz%Test connection to a terminal managerFc                 C   s   || _ d | _d S N)wspending_read)self	websocket r   9lib/python3.9/site-packages/terminado/tests/basic_test.py__init__*   s    zTestTermClient.__init__c                    s:   | j d u r| j | _ | j I d H }d | _ |r6t|}|S r   )r   r   Zread_messagejsonloads)r   responser   r   r   read_msg.   s    

zTestTermClient.read_msgc                    s\   g }t j|d}z |  }tj||I dH }W n tjjyJ   | Y S 0 || qdS )z"Read messages until read times out)ZsecondsN)datetimeZ	timedeltar   tornadogenZwith_timeoutTimeoutErrorappend)r   timeoutmsglistZdeltaZmfmsgr   r   r   read_all_msg;   s    
zTestTermClient.read_all_msgc                    s   | j t|I d H  d S r   )r   Zwrite_messager   dumps)r   r    r   r   r   	write_msgH   s    zTestTermClient.write_msgc                    s:   |  |I dH }ddd |D }dd |D }||fS )zkRead standard output until timeout read reached,
           return stdout and any non-stdout msgs received.N c                 S   s    g | ]}|d  dkr|d qS )r   stdoutr	   r   .0r    r   r   r   
<listcomp>O       z.TestTermClient.read_stdout.<locals>.<listcomp>c                 S   s   g | ]}|d  dkr|qS )r   r%   r   r&   r   r   r   r(   P   r)   )r!   join)r   r   r   r%   Zothermsgr   r   r   read_stdoutK   s    zTestTermClient.read_stdoutc                    s   |  d|gI dH  dS )zWrite to terminal stdinstdinN)r#   )r   datar   r   r   write_stdinS   s    zTestTermClient.write_stdinc                    s   |   I dH  | dI dH  |   I dH \}}tjdkrtt| tdt|}|du rntdt|}|du rtdt|}t|	 d }n"td||f  t|
d	d
 }|S )z(Get process ID of terminal shell processNzecho $$ntz(echo \$\$\\x1b\[71X\\x1b\[71C\\r\\n(\d+)zecho \$\$ \\r\\n(\d+)z-echo \$\$ \\r\\n\\x1b\[\?25h\\x1b\[\?25l(\d+)r   zstdout=%r, extra=%r
r	   )r+   r.   osnameprintreprresearchintgroupssplit)r   r%   Zextramatchpidr   r   r   get_pidW   s"    
zTestTermClient.get_pidc                 C   s   | j   d S r   )r   closer   r   r   r   r=   k   s    zTestTermClient.closeN)__name__
__module____qualname____doc__Z__test__r   r   DONE_TIMEOUTr!   r#   r+   r.   r<   r=   r   r   r   r   r   &   s   r   c                       sT   e Zd Zdd Zdd Zdd Z fddZd	d
 Zdej	dkrDdne
  Z  ZS )TermTestCasec                    sF   |   }d||f }tjj|dd| id}tj|I d H }t|S )Nzws://127.0.0.1:%d%sZOriginzhttp://127.0.0.1:%d)headers)Zget_http_portr   Z
httpclientZHTTPRequestr   Zwebsocket_connectr   )r   pathZporturlZrequestr   r   r   r   get_term_clientr   s    
zTermTestCase.get_term_clientc                    s   t j fdd|D  I d H S )Nc                 3   s   | ]}  |V  qd S r   )rH   )r'   rF   r>   r   r   	<genexpr>|   r)   z0TermTestCase.get_term_clients.<locals>.<genexpr>)asyncioZgather)r   pathsr   r>   r   get_term_clients{   s    zTermTestCase.get_term_clientsc                    s*   g }|D ]}|  I d H }|| q|S r   )r<   r   )r   Ztm_listpidstmr;   r   r   r   get_pids~   s
    zTermTestCase.get_pidsc                    s<   t  j}|| jj || jj || jj t   d S r   )	r   ZcurrentZrun_syncnamed_tmZkill_all	single_tm	unique_tmsupertearDown)r   run	__class__r   r   rT      s
    
zTermTestCase.tearDownc                    s   t dgtd| _tdgd| _tdgtd| _| j G  fdddtjj	}tjj
d|fdtd| jifd	td| jifd
td| jifgddS )NZbash)shell_commandZmax_terminals)rX   c                       s   e Zd ZdZ fddZdS )z0TermTestCase.get_app.<locals>.NewTerminalHandlerz,Create a new named terminal, return redirectc                    s"      \}}| jd| dd d S )Nz/named/F)Z	permanent)Znew_named_terminalZredirect)r   r2   ZterminalrP   r   r   get   s    z4TermTestCase.get_app.<locals>.NewTerminalHandler.getN)r?   r@   rA   rB   rZ   r   rY   r   r   NewTerminalHandler   s   r[   /newz/named/(\w+)Zterm_manager/single/uniqueT)debug)ZNamedTermManager	MAX_TERMSrP   ZSingleTermManagerrQ   ZUniqueTermManagerrR   r   ZwebZRequestHandlerZApplicationZ
TermSocket)r   r[   r   rY   r   get_app   s&    zTermTestCase.get_app)z/named/term1r^   r/   )r]   )r?   r@   rA   rH   rL   rO   rT   ra   r1   r2   tuple	test_urls__classcell__r   r   rV   r   rD   n   s   	rD   c                   @   s,   e Zd Zejjdd Zejjdd ZdS )CommonTestsc                    sx   | j D ]l}| |I d H }| I d H }| |di g | I d H }| |d d | t|d d |  qd S )Nsetupr   r%   r	   )rc   rH   r   assertEqualZassertGreaterlenr=   )r   rG   rN   r   r   r   r   
test_basic   s    
zCommonTests.test_basicc                    s   | j D ]~}| |I d H }| I d H  |dI d H  | I d H \}}tjdkrbd|v spJ n|dspJ |g ks|J |  qd S )Nzwhoami
r/   ZwhoamiZwho)	rc   rH   r!   r.   r+   r1   r2   
startswithr=   )r   rG   rN   r%   otherr   r   r   test_basic_command   s    

zCommonTests.test_basic_commandN)r?   r@   rA   r   testinggen_testri   rl   r   r   r   r   re      s   
re   c                   @   sH   e Zd Zdd Zejjdd Zejjej	j
devdddd	 Zd
S )NamedTermTestsc                 C   sH   | j ddd}| |jd |jd }|dd }| || jj d S )Nr\   F)Zfollow_redirectsi.  ZLocation/   )Zfetchrg   coderE   r9   ZassertInrP   Z	terminals)r   r   rG   r2   r   r   r   test_new   s
    
zNamedTermTests.test_newc                    st   dgd dgd  }|  |I d H }| |I d H }| |d |d  | |d |d  | |d |d  d S )Nz/named/1rq   z/named/2r   r	   r   )rL   rO   rg   assertNotEqual)r   namestmsrM   r   r   r   test_namespace   s    zNamedTermTests.test_namespacelinuxIt only works on Linuxreasonc                    sp   dd t td D }| |d t I d H }| |I d H }| |t I d H }| I d H }| |d  d S )Nc                 S   s   g | ]}d | qS )z	/named/%dr   )r'   ir   r   r   r(      r)   z5NamedTermTests.test_max_terminals.<locals>.<listcomp>r	   )ranger`   rL   rO   rH   r   rg   )r   Zurlsrv   rM   rN   r    r   r   r   test_max_terminals   s    z!NamedTermTests.test_max_terminalsN)r?   r@   rA   rs   r   rm   rn   rw   pytestmarkskipifr   r~   r   r   r   r   ro      s   	
	ro   c                   @   s   e Zd Zejjdd ZdS )SingleTermTestsc                    s<   |  ddgI d H }| |I d H }| |d |d  d S )Nr]   r   r	   )rL   rO   rg   r   rv   rM   r   r   r   test_single_process   s    z#SingleTermTests.test_single_processN)r?   r@   rA   r   rm   rn   r   r   r   r   r   r      s   r   c                   @   s@   e Zd Zejjdd Zejjejj	de
vdddd ZdS )	UniqueTermTestsc                    s<   |  ddgI d H }| |I d H }| |d |d  d S )Nr^   r   r	   )rL   rO   rt   r   r   r   r   test_unique_processes   s    z%UniqueTermTests.test_unique_processesrx   ry   rz   c                    s   |  dgt I d H }| |I d H }| tt|t | dI d H }| I d H }| |d  |d   |d  I d H }| |d  | dI d H }| I d H }| |d d d S )Nr^   r   rf   )	rL   r`   rO   rg   rh   setrH   r   r=   )r   rv   rM   rN   r    r   r   r   r~      s    z"UniqueTermTests.test_max_terminalsN)r?   r@   rA   r   rm   rn   r   r   r   r   r   r~   r   r   r   r   r      s
   
r   __main__)&Z
__future__r   r   ZunittestZ	terminador   Ztornado.httpserverZtornado.httpclientr   Ztornado.ioloopr   Ztornado.testingr   r   r1   r5   r   sysr   rJ   version_inforj   Zset_event_loop_policyZWindowsSelectorEventLoopPolicyrC   environr`   objectr   rm   ZAsyncHTTPTestCaserD   re   ro   r   r   r?   mainr   r   r   r   <module>   s6   (
H= 
