a
    )(b                     @   s  U d Z ddlmZ ddlmZ ddlmZ ddlmZm	Z	 ddl
mZ ddlmZmZmZ ddlmZ dd	lmZmZ dd
lmZmZ ddlmZ ddlmZ ddlmZ dZee ed< edredrddl m!Z! ddl"mZm#Z#m$Z$ ddl%m&Z& ddl'm(Z( nG dd dZ#G dd dZ$G dd de$j)Z*G dd de$j)Z+G dd de$j)Z,G d d! d!e#j-Z.eeG d"d# d#Z/eeG d$d% d%Z0eeG d&d' d'Z1eeG d(d) d)Z2G d*d+ d+ej3Z4G d,d- d-ej3Z5G d.d/ d/ej3Z6G d0d1 d1ej3Z7dS )2zT
Tests for the implementation of the ssh-userauth service.

Maintainer: Paul Swartz
    )
ModuleType)Optional)implementer)
ConchErrorValidPublicKey)ICredentialsChecker)
IAnonymousISSHPrivateKeyIUsernamePassword)UnauthorizedLogin)IRealmPortal)defertask)loopback)requireModule)unittestNkeysZcryptographyZpyasn1)SSHProtocolChecker)r   	transportuserauth)NS)keydatac                   @   s   e Zd ZG dd dZdS )r   c                   @   s   e Zd ZdZdS )ztransport.SSHTransportBaseQ
            A stub class so that later class definitions won't die.
            N__name__
__module____qualname____doc__ r   r   ?lib/python3.9/site-packages/twisted/conch/test/test_userauth.pySSHTransportBase"   s   r!   N)r   r   r   r!   r   r   r   r    r   !   s   r   c                   @   s   e Zd ZG dd dZdS )r   c                   @   s   e Zd ZdZdS )zuserauth.SSHUserAuthClientr   Nr   r   r   r   r    SSHUserAuthClient(   s   r"   N)r   r   r   r"   r   r   r   r    r   '   s   r   c                   @   s2   e Zd ZdZdd Zdd ZdddZd	d
 ZdS )ClientUserAuthz"
    A mock user auth client.
    c                 C   s,   | j rtjtjS ttjtjS dS )z
        If this is the first time we've been called, return a blob for
        the DSA key.  Otherwise, return a blob
        for the RSA key.
        N)	ZlastPublicKeyr   Key
fromStringr   publicRSA_opensshr   succeedpublicDSA_opensshselfr   r   r    getPublicKey3   s    zClientUserAuth.getPublicKeyc                 C   s   t tjtjS )z@
        Return the private key object for the RSA key.
        )r   r'   r   r$   r%   r   privateRSA_opensshr)   r   r   r    getPrivateKey>   s    zClientUserAuth.getPrivateKeyNc                 C   s
   t dS )z/
        Return 'foo' as the password.
           foor   r'   )r*   promptr   r   r    getPasswordD   s    zClientUserAuth.getPasswordc                 C   s
   t dS )z>
        Return 'foo' as the answer to two questions.
        )foor2   r/   )r*   nameZinformationZanswersr   r   r    getGenericAnswersJ   s    z ClientUserAuth.getGenericAnswers)N)r   r   r   r   r+   r-   r1   r4   r   r   r   r    r#   .   s
   
r#   c                   @   s    e Zd ZdZdd Zdd ZdS )OldClientAuthz~
    The old SSHUserAuthClient returned a cryptography key object from
    getPrivateKey() and a string from getPublicKey
    c                 C   s   t tjtjjS N)r   r'   r   r$   r%   r   r,   Z	keyObjectr)   r   r   r    r-   W   s    zOldClientAuth.getPrivateKeyc                 C   s   t jtj S r6   )r   r$   r%   r   r&   blobr)   r   r   r    r+   Z   s    zOldClientAuth.getPublicKeyNr   r   r   r   r-   r+   r   r   r   r    r5   Q   s   r5   c                   @   s    e Zd ZdZdd Zdd ZdS )ClientAuthWithoutPrivateKeyzP
    This client doesn't have a private key, but it does have a public key.
    c                 C   s   d S r6   r   r)   r   r   r    r-   c   s    z)ClientAuthWithoutPrivateKey.getPrivateKeyc                 C   s   t jtjS r6   )r   r$   r%   r   r&   r)   r   r   r    r+   f   s    z(ClientAuthWithoutPrivateKey.getPublicKeyNr8   r   r   r   r    r9   ^   s   r9   c                   @   sL   e Zd ZdZG dd dZG dd dZdd Zdd	 Zd
d Zdd Z	dS )FakeTransporta_  
    L{userauth.SSHUserAuthServer} expects an SSH transport which has a factory
    attribute which has a portal attribute. Because the portal is important for
    testing authentication, we need to be able to provide an interesting portal
    object to the L{SSHUserAuthServer}.

    In addition, we want to be able to capture any packets sent over the
    transport.

    @ivar packets: a list of 2-tuples: (messageType, data).  Each 2-tuple is
        a sent packet.
    @type packets: C{list}
    @param lostConnecion: True if loseConnection has been called on us.
    @type lostConnection: L{bool}
    c                   @   s   e Zd ZdZdZdd ZdS )zFakeTransport.ServicezW
        A mock service, representing the other service offered by the server.
           nancyc                 C   s   d S r6   r   r)   r   r   r    serviceStarted   s    z$FakeTransport.Service.serviceStartedN)r   r   r   r   r3   r<   r   r   r   r    Service{   s   r=   c                   @   s   e Zd ZdZdd ZdS )zFakeTransport.Factoryzg
        A mock factory, representing the factory that spawned this user auth
        service.
        c                 C   s   |dkrt jS dS )z2
            Return our fake service.
               noneN)r:   r=   )r*   r   servicer   r   r    
getService   s    z FakeTransport.Factory.getServiceN)r   r   r   r   r@   r   r   r   r    Factory   s   rA   c                 C   s(   |   | _|| j_d| _| | _g | _d S NF)rA   factoryportallostConnectionr   packets)r*   rD   r   r   r    __init__   s
    
zFakeTransport.__init__c                 C   s   | j ||f dS )z8
        Record the packet sent by the service.
        N)rF   append)r*   ZmessageTypemessager   r   r    
sendPacket   s    zFakeTransport.sendPacketc                 C   s   dS )z
        Pretend that this transport encrypts traffic in both directions. The
        SSHUserAuthServer disables password authentication if the transport
        isn't encrypted.
        Tr   )r*   	directionr   r   r    isEncrypted   s    zFakeTransport.isEncryptedc                 C   s
   d| _ d S NT)rE   r)   r   r   r    loseConnection   s    zFakeTransport.loseConnectionN)
r   r   r   r   r=   rA   rG   rJ   rL   rN   r   r   r   r    r:   j   s   
r:   c                   @   s   e Zd ZdZdd ZdS )Realmz
    A mock realm for testing L{userauth.SSHUserAuthServer}.

    This realm is not actually used in the course of testing, so it returns the
    simplest thing that could possibly work.
    c                 G   s   t |d d dd fS )Nr   c                   S   s   d S r6   r   r   r   r   r    <lambda>       z%Realm.requestAvatar.<locals>.<lambda>r/   )r*   ZavatarIdZmindZ
interfacesr   r   r    requestAvatar   s    zRealm.requestAvatarN)r   r   r   r   rR   r   r   r   r    rO      s   rO   c                   @   s   e Zd ZdZefZdd ZdS )PasswordCheckerz
    A very simple username/password checker which authenticates anyone whose
    password matches their username and rejects all others.
    c                 C   s&   |j |jkrt|j S ttdS )NzInvalid username/password pair)usernameZpasswordr   r'   failr   )r*   credsr   r   r    requestAvatarId   s    zPasswordChecker.requestAvatarIdN)r   r   r   r   r
   credentialInterfacesrW   r   r   r   r    rS      s   rS   c                   @   s   e Zd ZdZefZdd ZdS )PrivateKeyCheckerz
    A very simple public key checker which authenticates anyone whose
    public/private keypair is the same keydata.public/privateRSA_openssh.
    c                 C   sX   |j tjtj  krN|jd urHtj|j }||j|jrN|j	S nt
 t d S r6   )r7   r   r$   r%   r   r&   	signatureZverifysigDatarT   r   r   )r*   rV   objr   r   r    rW      s    
z!PrivateKeyChecker.requestAvatarIdN)r   r   r   r   r	   rX   rW   r   r   r   r    rY      s   rY   c                   @   s   e Zd ZdZefZdd ZdS )AnonymousCheckerzI
    A simple checker which isn't supported by L{SSHUserAuthServer}.
    c                 C   s   d S r6   r   )r*   Zcredentialsr   r   r    rW      s    z AnonymousChecker.requestAvatarIdN)r   r   r   r   r   rX   rW   r   r   r   r    r]      s   r]   c                   @   s   e Zd ZdZedu rdZdd Zdd Zdd	 Zd
d Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd d! Zd"d# Zd$d% Zd&d' Zd(d) Zd*d+ ZdS ),SSHUserAuthServerTestsz&
    Tests for SSHUserAuthServer.
    Ncannot run without cryptographyc                 C   sb   t  | _t| j| _| jt  | jt  t | _	t
| j| j	_| j	  | j	j  d S r6   )rO   realmr   rD   registerCheckerrS   rY   r   SSHUserAuthServer
authServerr:   r   r<   supportedAuthenticationssortr)   r   r   r    setUp   s    

zSSHUserAuthServerTests.setUpc                 C   s   | j   d | _ d S r6   )rc   serviceStoppedr)   r   r   r    tearDown   s    
zSSHUserAuthServerTests.tearDownc                 C   s(   |  | jjjd tjtdd f dS )z;
        Check that the authentication has failed.
        s   password,publickey    N)assertEqualrc   r   rF   r   ZMSG_USERAUTH_FAILUREr   r*   ignoredr   r   r    _checkFailed   s    z#SSHUserAuthServerTests._checkFailedc                 C   s,   | j tdtd td }|| jS )z
        A client may request a list of authentication 'method name' values
        that may continue by using the "none" authentication 'method name'.

        See RFC 4252 Section 5.2.
        r.   s   servicer>   )rc   ssh_USERAUTH_REQUESTr   addCallbackrn   )r*   dr   r   r    test_noneAuthentication  s    z.SSHUserAuthServerTests.test_noneAuthenticationc                    sF   d tdtdtddtdg} j|} fdd}||S )z
        When provided with correct password authentication information, the
        server should respond by sending a MSG_USERAUTH_SUCCESS message with
        no other data.

        See RFC 4252, Section 5.1.
        rQ   r.   r>      passwordrj   c                    s      jjjtjdfg d S NrQ   rk   rc   r   rF   r   ZMSG_USERAUTH_SUCCESSrm   r)   r   r    check  s    
zKSSHUserAuthServerTests.test_successfulPasswordAuthentication.<locals>.check)joinr   rc   ro   rp   r*   packetrq   rw   r   r)   r    %test_successfulPasswordAuthentication  s    $z<SSHUserAuthServerTests.test_successfulPasswordAuthenticationc                 C   sh   d tdtdtddtdg}t | j_| j|}| | jjj	g  | jj
d || jS )a;  
        When provided with invalid authentication details, the server should
        respond by sending a MSG_USERAUTH_FAILURE message which states whether
        the authentication was partially successful, and provides other, open
        options for authentication.

        See RFC 4252, Section 5.1.
        rQ   r.   r>   rs   rj      bar   )rx   r   r   Clockrc   clockro   rk   r   rF   advancerp   rn   r*   rz   rq   r   r   r    !test_failedPasswordAuthentication'  s    
$z8SSHUserAuthServerTests.test_failedPasswordAuthenticationc                    s   t jtj }t jtj}tdtd td d t|  t| }d j	j
_|tdttjf | }|t|7 } j	|} fdd}||S )zN
        Test that private key authentication completes successfully,
        r.   r>   	   publickey      testc                    s      jjjtjdfg d S rt   ru   rv   r)   r   r    rw   M  s    
zMSSHUserAuthServerTests.test_successfulPrivateKeyAuthentication.<locals>.check)r   r$   r%   r   r&   r7   r,   r   ZsshTyperc   r   	sessionIDsignbytesr   MSG_USERAUTH_REQUESTro   rp   )r*   r7   r\   rz   rZ   rq   rw   r   r)   r    'test_successfulPrivateKeyAuthentication8  s,    

z>SSHUserAuthServerTests.test_successfulPrivateKeyAuthenticationc                    s   t   dd }dd } fdd}| | jd| | | jd| | | jd	| td
td td td }| j| |  tS )z
        ssh_USERAUTH_REQUEST should raise a ConchError if tryAuth returns
        None. Added to catch a bug noticed by pyflakes.
        c                 S   s   |  d d S )Nz&request should have raised ConochError)rU   rl   r   r   r    mockCbFinishedAuth\  s    zOSSHUserAuthServerTests.test_requestRaisesConchError.<locals>.mockCbFinishedAuthc                 S   s   d S r6   r   )kinduserdatar   r   r    mockTryAuth_  s    zHSSHUserAuthServerTests.test_requestRaisesConchError.<locals>.mockTryAuthc                    s     | j d S r6   )Zerrbackvalue)reasonrq   r   r    mockEbBadAuthb  s    zJSSHUserAuthServerTests.test_requestRaisesConchError.<locals>.mockEbBadAuthtryAuthZ_cbFinishedAuthZ
_ebBadAuths   userr>   s
   public-keys   data)r   ZDeferredpatchrc   r   ro   assertFailurer   )r*   r   r   r   rz   r   r   r    test_requestRaisesConchErrorU  s     z3SSHUserAuthServerTests.test_requestRaisesConchErrorc                    sb   t jtj  tdtd td d td t  }j|} fdd}|	|S )z@
        Test that verifying a valid private key works.
        r.   r>   r   rj      ssh-rsac                    s*    jjjtjtdt  fg d S )Nr   )rk   rc   r   rF   r   MSG_USERAUTH_PK_OKr   rv   r7   r*   r   r    rw   ~  s    z@SSHUserAuthServerTests.test_verifyValidPrivateKey.<locals>.check)
r   r$   r%   r   r&   r7   r   rc   ro   rp   ry   r   r   r    test_verifyValidPrivateKeyo  s     z1SSHUserAuthServerTests.test_verifyValidPrivateKeyc                 C   sV   t jtj }tdtd td d td t| }| j|}|	| j
S )d
        Test that private key authentication fails when the public key
        is invalid.
        r.   r>   r   rj   s   ssh-dsar   r$   r%   r   r(   r7   r   rc   ro   rp   rn   r*   r7   rz   rq   r   r   r    3test_failedPrivateKeyAuthenticationWithoutSignature  s    zJSSHUserAuthServerTests.test_failedPrivateKeyAuthenticationWithoutSignaturec                 C   s|   t jtj }t jtj}tdtd td d td t| t|| }d| j	j
_| j	|}|| jS )r   r.   r>   r   r   r   r   )r   r$   r%   r   r&   r7   r,   r   r   rc   r   r   ro   rp   rn   )r*   r7   r\   rz   rq   r   r   r    0test_failedPrivateKeyAuthenticationWithSignature  s&    	
zGSSHUserAuthServerTests.test_failedPrivateKeyAuthenticationWithSignaturec                 C   sj   t jtj }td|dd  }tdtd td d td t| }| j|}|	| j
S )	z
        Private key authentication fails when the public key type is
        unsupported or the public key is corrupt.
        s   ssh-bad-type   Nr.   r>   r   rj   r   r   r   r   r   r    test_unsupported_publickey  s     z1SSHUserAuthServerTests.test_unsupported_publickeyc                 C   sR   t  }t| j|_| jt  |  |  |j	
  | |j	ddg dS )ah  
        L{SSHUserAuthServer} sets up
        C{SSHUserAuthServer.supportedAuthentications} by checking the portal's
        credentials interfaces and mapping them to SSH authentication method
        strings.  If the Portal advertises an interface that
        L{SSHUserAuthServer} can't map, it should be ignored.  This is a white
        box test.
        rs   r   N)r   rb   r:   rD   r   ra   r]   r<   rg   rd   re   rk   r*   serverr   r   r     test_ignoreUnknownCredInterfaces  s    	
z7SSHUserAuthServerTests.test_ignoreUnknownCredInterfacesc                 C   s   |  d| jj t }t| j|_dd |j_|	  |
  | d|j t }t| j|_dd |j_|	  |
  |  d|j dS )z
        Test that the userauth service does not advertise password
        authentication if the password would be send in cleartext.
        rs   c                 S   s   dS rB   r   xr   r   r    rP     rQ   zISSHUserAuthServerTests.test_removePasswordIfUnencrypted.<locals>.<lambda>c                 S   s   | dkS Ninr   r   r   r   r    rP     rQ   N)ZassertInrc   rd   r   rb   r:   rD   r   rL   r<   rg   ZassertNotIn)r*   clearAuthServerhalfAuthServerr   r   r     test_removePasswordIfUnencrypted  s    z7SSHUserAuthServerTests.test_removePasswordIfUnencryptedc                 C   s   t | j}|t  t }t||_dd |j_|	  |
  | |jdg t }t||_dd |j_|	  |
  | |jdg dS )z
        If the L{SSHUserAuthServer} is not advertising passwords, then an
        unencrypted connection should not cause any warnings or exceptions.
        This is a white box test.
        c                 S   s   dS rB   r   r   r   r   r    rP     rQ   zSSSHUserAuthServerTests.test_unencryptedConnectionWithoutPasswords.<locals>.<lambda>r   c                 S   s   | dkS r   r   r   r   r   r    rP     rQ   N)r   r`   ra   rY   r   rb   r:   r   rL   r<   rg   rk   rd   )r*   rD   r   r   r   r   r    *test_unencryptedConnectionWithoutPasswords  s    


zASSHUserAuthServerTests.test_unencryptedConnectionWithoutPasswordsc                 C   s   t  }t |_t| j|_|  |j	d |
  | |jjtjdttjf td td fg | |jj dS )z0
        Test that the login times out.
        鰚        s   you took too longrQ   N)r   rb   r   r~   r   r:   rD   r   r<   r   rg   rk   rF   MSG_DISCONNECTr   )DISCONNECT_NO_MORE_AUTH_METHODS_AVAILABLEr   Z
assertTruerE   r*   ZtimeoutAuthServerr   r   r    test_loginTimeout  s(    

z(SSHUserAuthServerTests.test_loginTimeoutc                 C   s\   t  }t |_t| j|_|  |	  |j
d | |jjg  | |jj dS )zN
        Test that stopping the service also stops the login timeout.
        r   N)r   rb   r   r~   r   r:   rD   r   r<   rg   r   rk   rF   assertFalserE   r   r   r   r    test_cancelLoginTimeout  s    
z.SSHUserAuthServerTests.test_cancelLoginTimeoutc                    sn   d tdtdtddtdg}t  j_tdD ]} j|} jjd q8 fd	d
}|	|S )zm
        Test that the server disconnects if the client fails authentication
        too many times.
        rQ   r.   r>   rs   rj   r|      r}   c                    s<      jjjd tjdttjf td td f d S )Nri   r   s   too many bad authsrQ   )rk   rc   r   rF   r   r   r   r   rv   r)   r   r    rw   1  s    
z:SSHUserAuthServerTests.test_tooManyAttempts.<locals>.check)
rx   r   r   r~   rc   r   rangero   r   rp   )r*   rz   irq   rw   r   r)   r    test_tooManyAttempts&  s    $z+SSHUserAuthServerTests.test_tooManyAttemptsc                 C   sH   t dt d t d d t d }t | j_| j|}|| jS )zo
        If the user requests a service that we don't support, the
        authentication should fail.
        r.   rQ   rs   rj   )r   r   r~   rc   r   ro   rp   rn   r   r   r   r    test_failIfUnknownService?  s    $z0SSHUserAuthServerTests.test_failIfUnknownServicec                    sV   dd }   jd|    jdd  fdd} jddd} |t|S )	aZ  
        tryAuth() has two edge cases that are difficult to reach.

        1) an authentication method auth_* returns None instead of a Deferred.
        2) an authentication type that is defined does not have a matching
           auth_* method.

        Both these cases should return a Deferred which fails with a
        ConchError.
        c                 S   s   d S r6   r   )rz   r   r   r    mockAuthU  s    z>SSHUserAuthServerTests.test_tryAuthEdgeCases.<locals>.mockAuthZauth_publickeyZauth_passwordNc                    s    j dd d } |tS )Nrs   )rc   r   r   r   )rm   Zd2r)   r   r    
secondTest[  s    z@SSHUserAuthServerTests.test_tryAuthEdgeCases.<locals>.secondTestr   )r   rc   r   r   r   rp   )r*   r   r   Zd1r   r)   r    test_tryAuthEdgeCasesI  s    z,SSHUserAuthServerTests.test_tryAuthEdgeCases)r   r   r   r   r   skiprf   rh   rn   rr   r{   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r    r^      s.   	
r^   c                   @   s   e Zd ZdZedu rdZdd Zdd Zdd	 Zd
d Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd d! ZdS )"SSHUserAuthClientTestsz&
    Tests for SSHUserAuthClient.
    Nr_   c                 C   s4   t dt | _td | j_d| jj_| j  d S )Nr.   r   )r#   r:   r=   
authClientr   r   r<   r)   r   r   r    rf   k  s    
zSSHUserAuthClientTests.setUpc                 C   s   | j   d | _ d S r6   )r   rg   r)   r   r   r    rh   q  s    
zSSHUserAuthClientTests.tearDownc                 C   sT   |  | jjd |  | jjjd |  | jjjtjt	dt	d t	d fg dS )z;
        Test that client is initialized properly.
        r.   r;   r>   N)
rk   r   r   instancer3   r   rF   r   r   r   r)   r   r   r    	test_initu  s    z SSHUserAuthClientTests.test_initc                    s@   dg  fdd}|| j j_| j d |  d | j j dS )z9
        Test that the client succeeds properly.
        Nc                    s   |  d< d S )Nr   r   )r?   r   r   r    stubSetService  s    zDSSHUserAuthClientTests.test_USERAUTH_SUCCESS.<locals>.stubSetServicerQ   r   )r   r   Z
setServiceZssh_USERAUTH_SUCCESSrk   r   )r*   r   r   r   r    test_USERAUTH_SUCCESS  s
    
z,SSHUserAuthClientTests.test_USERAUTH_SUCCESSc              	   C   s  | j tdd  | | j jjd tjtdtd td d td ttj	
tj  f | j tdd  ttj	
tj }| | j jjd tjtdtd td d td | f | j tdttj	
tj   t| j jjttjf td td td d td | }tj	
tj}| | j jjd tjtdtd td d td | t|| f d	S )
zJ
        Test that the client can authenticate with a public key.
        r   rj   ri   r.   r;   s   ssh-dssr      N)r   ssh_USERAUTH_FAILUREr   rk   r   rF   r   r   r   r$   r%   r   r(   r7   r&   ssh_USERAUTH_PK_OKr   r   r,   r   )r*   r7   r[   r\   r   r   r    test_publickey  s    

z%SSHUserAuthClientTests.test_publickeyc                 C   sz   t dt }td|_d|j_|  |d g |j_| |	d | 
|jjtjtdtd td fg dS )z
        If the SSHUserAuthClient doesn't return anything from signData,
        the client should start the authentication over again by requesting
        'none' authentication.
        r.   Nr   r   rQ   r;   r>   )r9   r:   r=   r   r   r<   r   rF   assertIsNoner   rk   r   r   r   )r*   r   r   r   r    !test_publickey_without_privatekey  s    

z8SSHUserAuthClientTests.test_publickey_without_privatekeyc                    s.   dd  j _ j d} fdd}||S )z{
        If there's no public key, auth_publickey should return a Deferred
        called back with a False value.
        c                 S   s   d S r6   r   r   r   r   r    rP     rQ   z:SSHUserAuthClientTests.test_no_publickey.<locals>.<lambda>r   c                    s     |  d S r6   )r   resultr)   r   r    rw     s    z7SSHUserAuthClientTests.test_no_publickey.<locals>.check)r   r+   r   rp   )r*   rq   rw   r   r)   r    test_no_publickey  s    z(SSHUserAuthClientTests.test_no_publickeyc                 C   s   | j tdd  | | j jjd tjtdtd td d td f | j tdtd  | | j jjd tjtdtd td d tdd  f d	S )
zx
        Test that the client can authentication with a password.  This
        includes changing the password.
        rs   rj   ri   r.   r;   rQ   r   r}   N)	r   r   r   rk   r   rF   r   r   r   r)   r   r   r    test_password  s    "&z$SSHUserAuthClientTests.test_passwordc                 C   s"   dd | j _| | j d dS )zK
        If getPassword returns None, tryAuth should return False.
        c                   S   s   d S r6   r   r   r   r   r    rP     rQ   z9SSHUserAuthClientTests.test_no_password.<locals>.<lambda>rs   N)r   r1   r   r   r)   r   r   r    test_no_password  s    z'SSHUserAuthClientTests.test_no_passwordc                 C   s`   | j tdtd td d td d  | | j jjd tjdtd td f dS )	zj
        Make sure that the client can authenticate with the keyboard
        interactive method.
        rQ   s      s
   Password: rj   ri   s      r.   N)r   Z'ssh_USERAUTH_PK_OK_keyboard_interactiver   rk   r   rF   r   ZMSG_USERAUTH_INFO_RESPONSEr)   r   r   r    test_keyboardInteractive  s&    z/SSHUserAuthClientTests.test_keyboardInteractivec                 C   sP   d| j _g | j j_| j d | | j jjtjtdtd td fg dS )z
        If C{SSHUserAuthClient} gets a MSG_USERAUTH_PK_OK packet when it's not
        expecting it, it should fail the current authentication and move on to
        the next type.
        s   unknownrQ   r.   r;   r>   N)	r   ZlastAuthr   rF   r   rk   r   r   r   r)   r   r   r    "test_USERAUTH_PK_OK_unknown_method  s    
z9SSHUserAuthClientTests.test_USERAUTH_PK_OK_unknown_methodc                    s    fdd} fdd}| j _| j _ j tdd    j jjd tj	tdtd	 td
 d td f  j tdd    j jjdd ddg dS )z
        ssh_USERAUTH_FAILURE should sort the methods by their position
        in SSHUserAuthClient.preferredOrder.  Methods that are not in
        preferredOrder should be sorted at the end of that list.
        c                      s    j jdd d S )N      here is datar   r   rJ   r   r)   r   r    auth_firstmethod2  s    zNSSHUserAuthClientTests.test_USERAUTH_FAILURE_sorting.<locals>.auth_firstmethodc                      s    j jdd dS )N   
   other dataTr   r   r)   r   r    auth_anothermethod5  s    zPSSHUserAuthClientTests.test_USERAUTH_FAILURE_sorting.<locals>.auth_anothermethods   anothermethod,passwordrj   ri   r.   r;   rs   s"   firstmethod,anothermethod,passwordr   N)r   r   )r   r   )
r   r   r   r   r   rk   r   rF   r   r   )r*   r   r   r   r)   r    test_USERAUTH_FAILURE_sorting+  s$    "
z4SSHUserAuthClientTests.test_USERAUTH_FAILURE_sortingc                 C   sT   | j tdd  | j tdd  | | j jjd tjdtd d f dS )	z
        If there are no more available user authentication messages,
        the SSHUserAuthClient should disconnect with code
        DISCONNECT_NO_MORE_AUTH_METHODS_AVAILABLE.
        rs   rj   r   ri   s      s(   no more authentication methods availables       N)r   r   r   rk   r   rF   r   r)   r   r   r    %test_disconnectIfNoMoreAuthenticationO  s    z<SSHUserAuthClientTests.test_disconnectIfNoMoreAuthenticationc                 C   sH   g | j j_| j d | | j jjtjtdtd td fg dS )z
        _ebAuth (the generic authentication error handler) should send
        a request for the 'none' authentication method.
        Nr.   r;   r>   )r   r   rF   Z_ebAuthrk   r   r   r   r)   r   r   r    test_ebAutha  s    
z"SSHUserAuthClientTests.test_ebAuthc                    s`   t dt      fdd} fdddd   }|j	|S )z
        getPublicKey() should return None.  getPrivateKey() should return a
        failed Deferred.  getPassword() should return a failed Deferred.
        getGenericAnswers() should return a failed Deferred.
        r.   c                    s$   |  t   }|jS r6   )trapNotImplementedErrorr1   rp   rU   
addErrbackr   rq   )r   check2r*   r   r    rw   v  s    
z3SSHUserAuthClientTests.test_defaults.<locals>.checkc                    s*   |  t  d d d }|jS r6   )r   r   r4   rp   rU   r   r   )r   check3r*   r   r    r   {  s    
z4SSHUserAuthClientTests.test_defaults.<locals>.check2c                 S   s   |  t d S r6   )r   r   r   r   r   r    r     s    z4SSHUserAuthClientTests.test_defaults.<locals>.check3)
r   r"   r:   r=   r   r+   r-   rp   rU   r   )r*   rw   rq   r   )r   r   r   r*   r    test_defaultsm  s    z$SSHUserAuthClientTests.test_defaults)r   r   r   r   r   r   rf   rh   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r    r   c  s$   >$r   c                   @   s.   e Zd Zedu rdZG dd dZdd ZdS )LoopbackTestsN)cannot run without cryptography or PyASN1c                   @   s"   e Zd ZG dd dZdd ZdS )zLoopbackTests.Factoryc                   @   s    e Zd ZdZdd Zdd ZdS )zLoopbackTests.Factory.Service   TestServicec                 C   s   | j   d S r6   )r   rN   r)   r   r   r    r<     s    z,LoopbackTests.Factory.Service.serviceStartedc                 C   s   d S r6   r   r)   r   r   r    rg     s    z,LoopbackTests.Factory.Service.serviceStoppedN)r   r   r   r3   r<   rg   r   r   r   r    r=     s   r=   c                 C   s   | j S r6   )r=   )r*   Zavatarr3   r   r   r    r@     s    z LoopbackTests.Factory.getServiceN)r   r   r   r=   r@   r   r   r   r    rA     s   	rA   c                    s   t  tdj }t _j_dd j_t |_||j_d j_	|j_	dd  j_
|j_
 j_d_t }t|}t   t   t   fdd _|  |jj_tj|j}dd jj_d	d |jj_  |  fd
d}||S )zW
        Test that the userauth server and client play nicely with each other.
        r.   c                 S   s   dS rM   r   r   r   r   r    rP     rQ   z-LoopbackTests.test_loopback.<locals>.<lambda>rQ   c                   S   s   d S r6   r   r   r   r   r    rP     rQ   r   c                    s   t  j|  dkS )Nr}   )lenZsuccessfulCredentials)ZaId)checkerr   r    rP     rQ   c                   S   s   dS )NZ_ServerLoopbackr   r   r   r   r    rP     rQ   c                   S   s   dS )NZ_ClientLoopbackr   r   r   r   r    rP     rQ   c                    s     jjjd d S )Nr   )rk   r   r?   r3   rv   r   r   r    rw     s    z*LoopbackTests.test_loopback.<locals>.check)r   rb   r#   rA   r=   r   r!   r?   rL   r   ZsendKexInitrC   ZpasswordDelayrO   r   r   ra   rS   rY   ZareDonerD   r   ZloopbackAsyncZ	logPrefixr<   rp   )r*   Zclientr`   rD   rq   rw   r   )r   r*   r   r    test_loopback  s4    



zLoopbackTests.test_loopback)r   r   r   r   r   rA   r   r   r   r   r    r     s   r   c                   @   s    e Zd Zedu rdZdd ZdS )ModuleInitializationTestsNr   c                 C   s,   |  tjjd d |  tjjd d d S )N<   r   )rk   r   rb   ZprotocolMessagesr"   r)   r   r   r    test_messages  s    z'ModuleInitializationTests.test_messages)r   r   r   r   r   r   r   r   r   r    r     s   r   )8r   typesr   typingr   Zzope.interfacer   Ztwisted.conch.errorr   r   Ztwisted.cred.checkersr   Ztwisted.cred.credentialsr   r	   r
   Ztwisted.cred.errorr   Ztwisted.cred.portalr   r   Ztwisted.internetr   r   Ztwisted.protocolsr   Ztwisted.python.reflectr   Ztwisted.trialr   r   __annotations__Ztwisted.conch.checkersr   Ztwisted.conch.sshr   r   Ztwisted.conch.ssh.commonr   Ztwisted.conch.testr   r"   r#   r5   r9   r!   r:   rO   rS   rY   r]   ZTestCaser^   r   r   r   r   r   r   r    <module>   sP   #A  }  &<