# Copyright 2005 Divmod, Inc. See LICENSE file for details # Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ Tests for L{twisted.internet._sslverify}. """ import datetime import itertools import sys from unittest import skipIf from zope.interface import implementer from incremental import Version from twisted.internet import defer, interfaces, protocol, reactor from twisted.internet._idna import _idnaText from twisted.internet.error import CertificateError, ConnectionClosed, ConnectionLost from twisted.python.compat import nativeString from twisted.python.filepath import FilePath from twisted.python.modules import getModule from twisted.python.reflect import requireModule from twisted.test.iosim import connectedServerAndClient from twisted.test.test_twisted import SetAsideModule from twisted.trial import util from twisted.trial.unittest import SkipTest, SynchronousTestCase, TestCase skipSSL = "" skipSNI = "" skipNPN = "" skipALPN = "" if requireModule("OpenSSL"): import ipaddress from OpenSSL import SSL from OpenSSL.crypto import FILETYPE_PEM, TYPE_RSA, X509, PKey, get_elliptic_curves from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.serialization import ( Encoding, NoEncryption, PrivateFormat, ) from cryptography.x509.oid import NameOID from twisted.internet import ssl try: ctx = SSL.Context(SSL.SSLv23_METHOD) ctx.set_npn_advertise_callback(lambda c: None) except (NotImplementedError, AttributeError): skipNPN = ( "NPN is deprecated (and OpenSSL 1.0.1 or greater required for NPN" " support)" ) try: ctx = SSL.Context(SSL.SSLv23_METHOD) ctx.set_alpn_select_callback(lambda c: None) # type: ignore[misc,arg-type] except NotImplementedError: skipALPN = "OpenSSL 1.0.2 or greater required for ALPN support" else: skipSSL = "OpenSSL is required for SSL tests." skipSNI = skipSSL skipNPN = skipSSL skipALPN = skipSSL if not skipSSL: from twisted.internet import _sslverify as sslverify from twisted.internet.ssl import VerificationError, platformTrust from twisted.protocols.tls import TLSMemoryBIOFactory # A couple of static PEM-format certificates to be used by various tests. A_HOST_CERTIFICATE_PEM = """ -----BEGIN CERTIFICATE----- MIIC2jCCAkMCAjA5MA0GCSqGSIb3DQEBBAUAMIG0MQswCQYDVQQGEwJVUzEiMCAG A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo dXNldHRzMScwJQYJKoZIhvcNAQkBFhhub2JvZHlAdHdpc3RlZG1hdHJpeC5jb20x ETAPBgNVBAsTCFNlY3VyaXR5MB4XDTA2MDgxNjAxMDEwOFoXDTA3MDgxNjAxMDEw OFowgbQxCzAJBgNVBAYTAlVTMSIwIAYDVQQDExlleGFtcGxlLnR3aXN0ZWRtYXRy aXguY29tMQ8wDQYDVQQHEwZCb3N0b24xHDAaBgNVBAoTE1R3aXN0ZWQgTWF0cml4 IExhYnMxFjAUBgNVBAgTDU1hc3NhY2h1c2V0dHMxJzAlBgkqhkiG9w0BCQEWGG5v Ym9keUB0d2lzdGVkbWF0cml4LmNvbTERMA8GA1UECxMIU2VjdXJpdHkwgZ8wDQYJ KoZIhvcNAQEBBQADgY0AMIGJAoGBAMzH8CDF/U91y/bdbdbJKnLgnyvQ9Ig9ZNZp 8hpsu4huil60zF03+Lexg2l1FIfURScjBuaJMR6HiMYTMjhzLuByRZ17KW4wYkGi KXstz03VIKy4Tjc+v4aXFI4XdRw10gGMGQlGGscXF/RSoN84VoDKBfOMWdXeConJ VyC4w3iJAgMBAAEwDQYJKoZIhvcNAQEEBQADgYEAviMT4lBoxOgQy32LIgZ4lVCj JNOiZYg8GMQ6y0ugp86X80UjOvkGtNf/R7YgED/giKRN/q/XJiLJDEhzknkocwmO S+4b2XpiaZYxRyKWwL221O7CGmtWYyZl2+92YYmmCiNzWQPfP6BOMlfax0AGLHls fXzCWdG0O/3Lk2SRM0I= -----END CERTIFICATE----- """ A_PEER_CERTIFICATE_PEM = """ -----BEGIN CERTIFICATE----- MIIC3jCCAkcCAjA6MA0GCSqGSIb3DQEBBAUAMIG2MQswCQYDVQQGEwJVUzEiMCAG A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo dXNldHRzMSkwJwYJKoZIhvcNAQkBFhpzb21lYm9keUB0d2lzdGVkbWF0cml4LmNv bTERMA8GA1UECxMIU2VjdXJpdHkwHhcNMDYwODE2MDEwMTU2WhcNMDcwODE2MDEw MTU2WjCBtjELMAkGA1UEBhMCVVMxIjAgBgNVBAMTGWV4YW1wbGUudHdpc3RlZG1h dHJpeC5jb20xDzANBgNVBAcTBkJvc3RvbjEcMBoGA1UEChMTVHdpc3RlZCBNYXRy aXggTGFiczEWMBQGA1UECBMNTWFzc2FjaHVzZXR0czEpMCcGCSqGSIb3DQEJARYa c29tZWJvZHlAdHdpc3RlZG1hdHJpeC5jb20xETAPBgNVBAsTCFNlY3VyaXR5MIGf MA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCnm+WBlgFNbMlHehib9ePGGDXF+Nz4 CjGuUmVBaXCRCiVjg3kSDecwqfb0fqTksBZ+oQ1UBjMcSh7OcvFXJZnUesBikGWE JE4V8Bjh+RmbJ1ZAlUPZ40bAkww0OpyIRAGMvKG+4yLFTO4WDxKmfDcrOb6ID8WJ e1u+i3XGkIf/5QIDAQABMA0GCSqGSIb3DQEBBAUAA4GBAD4Oukm3YYkhedUepBEA vvXIQhVDqL7mk6OqYdXmNj6R7ZMC8WWvGZxrzDI1bZuB+4aIxxd1FXC3UOHiR/xg i9cDl1y8P/qRp4aEBNF6rI0D4AxTbfnHQx4ERDAOShJdYZs/2zifPJ6va6YvrEyr yqDtGhklsWW3ZwBzEh5VEOUp -----END CERTIFICATE----- """ A_KEYPAIR = getModule(__name__).filePath.sibling("server.pem").getContent() def counter(counter=itertools.count()): """ Each time we're called, return the next integer in the natural numbers. """ return next(counter) def makeCertificate(**kw): keypair = PKey() keypair.generate_key(TYPE_RSA, 2048) certificate = X509() certificate.gmtime_adj_notBefore(0) certificate.gmtime_adj_notAfter(60 * 60 * 24 * 365) # One year for xname in certificate.get_issuer(), certificate.get_subject(): for (k, v) in kw.items(): setattr(xname, k, nativeString(v)) certificate.set_serial_number(counter()) certificate.set_pubkey(keypair) certificate.sign(keypair, "md5") return keypair, certificate def certificatesForAuthorityAndServer(serviceIdentity="example.com"): """ Create a self-signed CA certificate and server certificate signed by the CA. @param serviceIdentity: The identity (hostname) of the server. @type serviceIdentity: L{unicode} @return: a 2-tuple of C{(certificate_authority_certificate, server_certificate)} @rtype: L{tuple} of (L{sslverify.Certificate}, L{sslverify.PrivateCertificate}) """ commonNameForCA = x509.Name( [x509.NameAttribute(NameOID.COMMON_NAME, "Testing Example CA")] ) commonNameForServer = x509.Name( [x509.NameAttribute(NameOID.COMMON_NAME, "Testing Example Server")] ) oneDay = datetime.timedelta(1, 0, 0) privateKeyForCA = rsa.generate_private_key( public_exponent=65537, key_size=4096, backend=default_backend() ) publicKeyForCA = privateKeyForCA.public_key() caCertificate = ( x509.CertificateBuilder() .subject_name(commonNameForCA) .issuer_name(commonNameForCA) .not_valid_before(datetime.datetime.today() - oneDay) .not_valid_after(datetime.datetime.today() + oneDay) .serial_number(x509.random_serial_number()) .public_key(publicKeyForCA) .add_extension( x509.BasicConstraints(ca=True, path_length=9), critical=True, ) .sign( private_key=privateKeyForCA, algorithm=hashes.SHA256(), backend=default_backend(), ) ) privateKeyForServer = rsa.generate_private_key( public_exponent=65537, key_size=4096, backend=default_backend() ) publicKeyForServer = privateKeyForServer.public_key() try: ipAddress = ipaddress.ip_address(serviceIdentity) except ValueError: subjectAlternativeNames = [ x509.DNSName(serviceIdentity.encode("idna").decode("ascii")) ] else: subjectAlternativeNames = [x509.IPAddress(ipAddress)] serverCertificate = ( x509.CertificateBuilder() .subject_name(commonNameForServer) .issuer_name(commonNameForCA) .not_valid_before(datetime.datetime.today() - oneDay) .not_valid_after(datetime.datetime.today() + oneDay) .serial_number(x509.random_serial_number()) .public_key(publicKeyForServer) .add_extension( x509.BasicConstraints(ca=False, path_length=None), critical=True, ) .add_extension( x509.SubjectAlternativeName(subjectAlternativeNames), critical=True, ) .sign( private_key=privateKeyForCA, algorithm=hashes.SHA256(), backend=default_backend(), ) ) caSelfCert = sslverify.Certificate.loadPEM(caCertificate.public_bytes(Encoding.PEM)) serverCert = sslverify.PrivateCertificate.loadPEM( b"\n".join( [ privateKeyForServer.private_bytes( Encoding.PEM, PrivateFormat.TraditionalOpenSSL, NoEncryption(), ), serverCertificate.public_bytes(Encoding.PEM), ] ) ) return caSelfCert, serverCert def _loopbackTLSConnection(serverOpts, clientOpts): """ Common implementation code for both L{loopbackTLSConnection} and L{loopbackTLSConnectionInMemory}. Creates a loopback TLS connection using the provided server and client context factories. @param serverOpts: An OpenSSL context factory for the server. @type serverOpts: C{OpenSSLCertificateOptions}, or any class with an equivalent API. @param clientOpts: An OpenSSL context factory for the client. @type clientOpts: C{OpenSSLCertificateOptions}, or any class with an equivalent API. @return: 5-tuple of server-tls-protocol, server-inner-protocol, client-tls-protocol, client-inner-protocol and L{IOPump} @rtype: L{tuple} """ class GreetingServer(protocol.Protocol): greeting = b"greetings!" def connectionMade(self): self.transport.write(self.greeting) class ListeningClient(protocol.Protocol): data = b"" lostReason = None def dataReceived(self, data): self.data += data def connectionLost(self, reason): self.lostReason = reason clientWrappedProto = ListeningClient() serverWrappedProto = GreetingServer() plainClientFactory = protocol.Factory() plainClientFactory.protocol = lambda: clientWrappedProto plainServerFactory = protocol.Factory() plainServerFactory.protocol = lambda: serverWrappedProto clientFactory = TLSMemoryBIOFactory( clientOpts, isClient=True, wrappedFactory=plainServerFactory ) serverFactory = TLSMemoryBIOFactory( serverOpts, isClient=False, wrappedFactory=plainClientFactory ) sProto, cProto, pump = connectedServerAndClient( lambda: serverFactory.buildProtocol(None), lambda: clientFactory.buildProtocol(None), ) return sProto, cProto, serverWrappedProto, clientWrappedProto, pump def loopbackTLSConnection(trustRoot, privateKeyFile, chainedCertFile=None): """ Create a loopback TLS connection with the given trust and keys. @param trustRoot: the C{trustRoot} argument for the client connection's context. @type trustRoot: L{sslverify.IOpenSSLTrustRoot} @param privateKeyFile: The name of the file containing the private key. @type privateKeyFile: L{str} (native string; file name) @param chainedCertFile: The name of the chained certificate file. @type chainedCertFile: L{str} (native string; file name) @return: 3-tuple of server-protocol, client-protocol, and L{IOPump} @rtype: L{tuple} """ class ContextFactory: def getContext(self): """ Create a context for the server side of the connection. @return: an SSL context using a certificate and key. @rtype: C{OpenSSL.SSL.Context} """ ctx = SSL.Context(SSL.SSLv23_METHOD) if chainedCertFile is not None: ctx.use_certificate_chain_file(chainedCertFile) ctx.use_privatekey_file(privateKeyFile) # Let the test author know if they screwed something up. ctx.check_privatekey() return ctx serverOpts = ContextFactory() clientOpts = sslverify.OpenSSLCertificateOptions(trustRoot=trustRoot) return _loopbackTLSConnection(serverOpts, clientOpts) def loopbackTLSConnectionInMemory( trustRoot, privateKey, serverCertificate, clientProtocols=None, serverProtocols=None, clientOptions=None, ): """ Create a loopback TLS connection with the given trust and keys. Like L{loopbackTLSConnection}, but using in-memory certificates and keys rather than writing them to disk. @param trustRoot: the C{trustRoot} argument for the client connection's context. @type trustRoot: L{sslverify.IOpenSSLTrustRoot} @param privateKey: The private key. @type privateKey: L{str} (native string) @param serverCertificate: The certificate used by the server. @type chainedCertFile: L{str} (native string) @param clientProtocols: The protocols the client is willing to negotiate using NPN/ALPN. @param serverProtocols: The protocols the server is willing to negotiate using NPN/ALPN. @param clientOptions: The type of C{OpenSSLCertificateOptions} class to use for the client. Defaults to C{OpenSSLCertificateOptions}. @return: 3-tuple of server-protocol, client-protocol, and L{IOPump} @rtype: L{tuple} """ if clientOptions is None: clientOptions = sslverify.OpenSSLCertificateOptions clientCertOpts = clientOptions( trustRoot=trustRoot, acceptableProtocols=clientProtocols ) serverCertOpts = sslverify.OpenSSLCertificateOptions( privateKey=privateKey, certificate=serverCertificate, acceptableProtocols=serverProtocols, ) return _loopbackTLSConnection(serverCertOpts, clientCertOpts) def pathContainingDumpOf(testCase, *dumpables): """ Create a temporary file to store some serializable-as-PEM objects in, and return its name. @param testCase: a test case to use for generating a temporary directory. @type testCase: L{twisted.trial.unittest.TestCase} @param dumpables: arguments are objects from pyOpenSSL with a C{dump} method, taking a pyOpenSSL file-type constant, such as L{OpenSSL.crypto.FILETYPE_PEM} or L{OpenSSL.crypto.FILETYPE_ASN1}. @type dumpables: L{tuple} of L{object} with C{dump} method taking L{int} returning L{bytes} @return: the path to a file where all of the dumpables were dumped in PEM format. @rtype: L{str} """ fname = testCase.mktemp() with open(fname, "wb") as f: for dumpable in dumpables: f.write(dumpable.dump(FILETYPE_PEM)) return fname class DataCallbackProtocol(protocol.Protocol): def dataReceived(self, data): d, self.factory.onData = self.factory.onData, None if d is not None: d.callback(data) def connectionLost(self, reason): d, self.factory.onLost = self.factory.onLost, None if d is not None: d.errback(reason) class WritingProtocol(protocol.Protocol): byte = b"x" def connectionMade(self): self.transport.write(self.byte) def connectionLost(self, reason): self.factory.onLost.errback(reason) class FakeContext: """ Introspectable fake of an C{OpenSSL.SSL.Context}. Saves call arguments for later introspection. Necessary because C{Context} offers poor introspection. cf. this U{pyOpenSSL bug}. @ivar _method: See C{method} parameter of L{__init__}. @ivar _options: L{int} of C{OR}ed values from calls of L{set_options}. @ivar _certificate: Set by L{use_certificate}. @ivar _privateKey: Set by L{use_privatekey}. @ivar _verify: Set by L{set_verify}. @ivar _verifyDepth: Set by L{set_verify_depth}. @ivar _mode: Set by L{set_mode}. @ivar _sessionID: Set by L{set_session_id}. @ivar _extraCertChain: Accumulated L{list} of all extra certificates added by L{add_extra_chain_cert}. @ivar _cipherList: Set by L{set_cipher_list}. @ivar _dhFilename: Set by L{load_tmp_dh}. @ivar _defaultVerifyPathsSet: Set by L{set_default_verify_paths} @ivar _ecCurve: Set by L{set_tmp_ecdh} """ _options = 0 def __init__(self, method): self._method = method self._extraCertChain = [] self._defaultVerifyPathsSet = False self._ecCurve = None # Note that this value is explicitly documented as the default by # https://www.openssl.org/docs/man1.1.1/man3/ # SSL_CTX_set_session_cache_mode.html self._sessionCacheMode = SSL.SESS_CACHE_SERVER def set_options(self, options): self._options |= options def use_certificate(self, certificate): self._certificate = certificate def use_privatekey(self, privateKey): self._privateKey = privateKey def check_privatekey(self): return None def set_mode(self, mode): """ Set the mode. See L{SSL.Context.set_mode}. @param mode: See L{SSL.Context.set_mode}. """ self._mode = mode def set_verify(self, flags, callback): self._verify = flags, callback def set_verify_depth(self, depth): self._verifyDepth = depth def set_session_id(self, sessionIDContext): # This fake should change when the upstream changes: # https://github.com/pyca/pyopenssl/issues/845 self._sessionIDContext = sessionIDContext def set_session_cache_mode(self, cacheMode): """ Set the session cache mode on the context, as per L{SSL.Context.set_session_cache_mode}. """ self._sessionCacheMode = cacheMode def get_session_cache_mode(self): """ Retrieve the session cache mode from the context, as per L{SSL.Context.get_session_cache_mode}. """ return self._sessionCacheMode def add_extra_chain_cert(self, cert): self._extraCertChain.append(cert) def set_cipher_list(self, cipherList): self._cipherList = cipherList def load_tmp_dh(self, dhfilename): self._dhFilename = dhfilename def set_default_verify_paths(self): """ Set the default paths for the platform. """ self._defaultVerifyPathsSet = True def set_tmp_ecdh(self, curve): """ Set an ECDH curve. Should only be called by OpenSSL 1.0.1 code. @param curve: See L{OpenSSL.SSL.Context.set_tmp_ecdh} """ self._ecCurve = curve class ClientOptionsTests(SynchronousTestCase): """ Tests for L{sslverify.optionsForClientTLS}. """ if skipSSL: skip = skipSSL def test_extraKeywords(self): """ When passed a keyword parameter other than C{extraCertificateOptions}, L{sslverify.optionsForClientTLS} raises an exception just like a normal Python function would. """ error = self.assertRaises( TypeError, sslverify.optionsForClientTLS, hostname="alpha", someRandomThing="beta", ) self.assertEqual( str(error), "optionsForClientTLS() got an unexpected keyword argument " "'someRandomThing'", ) def test_bytesFailFast(self): """ If you pass L{bytes} as the hostname to L{sslverify.optionsForClientTLS} it immediately raises a L{TypeError}. """ error = self.assertRaises( TypeError, sslverify.optionsForClientTLS, b"not-actually-a-hostname.com" ) expectedText = ( "optionsForClientTLS requires text for host names, not " + bytes.__name__ ) self.assertEqual(str(error), expectedText) def test_dNSNameHostname(self): """ If you pass a dNSName to L{sslverify.optionsForClientTLS} L{_hostnameIsDnsName} will be True """ options = sslverify.optionsForClientTLS("example.com") self.assertTrue(options._hostnameIsDnsName) def test_IPv4AddressHostname(self): """ If you pass an IPv4 address to L{sslverify.optionsForClientTLS} L{_hostnameIsDnsName} will be False """ options = sslverify.optionsForClientTLS("127.0.0.1") self.assertFalse(options._hostnameIsDnsName) def test_IPv6AddressHostname(self): """ If you pass an IPv6 address to L{sslverify.optionsForClientTLS} L{_hostnameIsDnsName} will be False """ options = sslverify.optionsForClientTLS("::1") self.assertFalse(options._hostnameIsDnsName) class FakeChooseDiffieHellmanEllipticCurve: """ A fake implementation of L{_ChooseDiffieHellmanEllipticCurve} """ def __init__(self, versionNumber, openSSLlib, openSSLcrypto): """ A no-op constructor. """ def configureECDHCurve(self, ctx): """ A null configuration. @param ctx: An L{OpenSSL.SSL.Context} that would be configured. """ class OpenSSLOptionsTestsMixin: """ A mixin for L{OpenSSLOptions} test cases creates client and server certificates, signs them with a CA, and provides a L{loopback} that creates TLS a connections with them. """ if skipSSL: skip = skipSSL serverPort = clientConn = None onServerLost = onClientLost = None def setUp(self): """ Create class variables of client and server certificates. """ self.sKey, self.sCert = makeCertificate( O=b"Server Test Certificate", CN=b"server" ) self.cKey, self.cCert = makeCertificate( O=b"Client Test Certificate", CN=b"client" ) self.caCert1 = makeCertificate(O=b"CA Test Certificate 1", CN=b"ca1")[1] self.caCert2 = makeCertificate(O=b"CA Test Certificate", CN=b"ca2")[1] self.caCerts = [self.caCert1, self.caCert2] self.extraCertChain = self.caCerts def tearDown(self): if self.serverPort is not None: self.serverPort.stopListening() if self.clientConn is not None: self.clientConn.disconnect() L = [] if self.onServerLost is not None: L.append(self.onServerLost) if self.onClientLost is not None: L.append(self.onClientLost) return defer.DeferredList(L, consumeErrors=True) def loopback( self, serverCertOpts, clientCertOpts, onServerLost=None, onClientLost=None, onData=None, ): if onServerLost is None: self.onServerLost = onServerLost = defer.Deferred() if onClientLost is None: self.onClientLost = onClientLost = defer.Deferred() if onData is None: onData = defer.Deferred() serverFactory = protocol.ServerFactory() serverFactory.protocol = DataCallbackProtocol serverFactory.onLost = onServerLost serverFactory.onData = onData clientFactory = protocol.ClientFactory() clientFactory.protocol = WritingProtocol clientFactory.onLost = onClientLost self.serverPort = reactor.listenSSL(0, serverFactory, serverCertOpts) self.clientConn = reactor.connectSSL( "127.0.0.1", self.serverPort.getHost().port, clientFactory, clientCertOpts ) class OpenSSLOptionsTests(OpenSSLOptionsTestsMixin, TestCase): """ Tests for L{sslverify.OpenSSLOptions}. """ def setUp(self): """ Same as L{OpenSSLOptionsTestsMixin.setUp}, but it also patches L{sslverify._ChooseDiffieHellmanEllipticCurve}. """ super().setUp() self.patch( sslverify, "_ChooseDiffieHellmanEllipticCurve", FakeChooseDiffieHellmanEllipticCurve, ) def test_constructorWithOnlyPrivateKey(self): """ C{privateKey} and C{certificate} make only sense if both are set. """ self.assertRaises( ValueError, sslverify.OpenSSLCertificateOptions, privateKey=self.sKey ) def test_constructorWithOnlyCertificate(self): """ C{privateKey} and C{certificate} make only sense if both are set. """ self.assertRaises( ValueError, sslverify.OpenSSLCertificateOptions, certificate=self.sCert ) def test_constructorWithCertificateAndPrivateKey(self): """ Specifying C{privateKey} and C{certificate} initializes correctly. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert ) self.assertEqual(opts.privateKey, self.sKey) self.assertEqual(opts.certificate, self.sCert) self.assertEqual(opts.extraCertChain, []) def test_constructorDoesNotAllowVerifyWithoutCACerts(self): """ C{verify} must not be C{True} without specifying C{caCerts}. """ self.assertRaises( ValueError, sslverify.OpenSSLCertificateOptions, privateKey=self.sKey, certificate=self.sCert, verify=True, ) def test_constructorDoesNotAllowLegacyWithTrustRoot(self): """ C{verify}, C{requireCertificate}, and C{caCerts} must not be specified by the caller (to be I{any} value, even the default!) when specifying C{trustRoot}. """ self.assertRaises( TypeError, sslverify.OpenSSLCertificateOptions, privateKey=self.sKey, certificate=self.sCert, verify=True, trustRoot=None, caCerts=self.caCerts, ) self.assertRaises( TypeError, sslverify.OpenSSLCertificateOptions, privateKey=self.sKey, certificate=self.sCert, trustRoot=None, requireCertificate=True, ) def test_constructorAllowsCACertsWithoutVerify(self): """ It's currently a NOP, but valid. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, caCerts=self.caCerts ) self.assertFalse(opts.verify) self.assertEqual(self.caCerts, opts.caCerts) def test_constructorWithVerifyAndCACerts(self): """ Specifying C{verify} and C{caCerts} initializes correctly. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, verify=True, caCerts=self.caCerts, ) self.assertTrue(opts.verify) self.assertEqual(self.caCerts, opts.caCerts) def test_constructorSetsExtraChain(self): """ Setting C{extraCertChain} works if C{certificate} and C{privateKey} are set along with it. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, extraCertChain=self.extraCertChain, ) self.assertEqual(self.extraCertChain, opts.extraCertChain) def test_constructorDoesNotAllowExtraChainWithoutPrivateKey(self): """ A C{extraCertChain} without C{privateKey} doesn't make sense and is thus rejected. """ self.assertRaises( ValueError, sslverify.OpenSSLCertificateOptions, certificate=self.sCert, extraCertChain=self.extraCertChain, ) def test_constructorDoesNotAllowExtraChainWithOutPrivateKey(self): """ A C{extraCertChain} without C{certificate} doesn't make sense and is thus rejected. """ self.assertRaises( ValueError, sslverify.OpenSSLCertificateOptions, privateKey=self.sKey, extraCertChain=self.extraCertChain, ) def test_extraChainFilesAreAddedIfSupplied(self): """ If C{extraCertChain} is set and all prerequisites are met, the specified chain certificates are added to C{Context}s that get created. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, extraCertChain=self.extraCertChain, ) opts._contextFactory = FakeContext ctx = opts.getContext() self.assertEqual(self.sKey, ctx._privateKey) self.assertEqual(self.sCert, ctx._certificate) self.assertEqual(self.extraCertChain, ctx._extraCertChain) def test_extraChainDoesNotBreakPyOpenSSL(self): """ C{extraCertChain} doesn't break C{OpenSSL.SSL.Context} creation. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, extraCertChain=self.extraCertChain, ) ctx = opts.getContext() self.assertIsInstance(ctx, SSL.Context) def test_acceptableCiphersAreAlwaysSet(self): """ If the user doesn't supply custom acceptable ciphers, a shipped secure default is used. We can't check directly for it because the effective cipher string we set varies with platforms. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, ) opts._contextFactory = FakeContext ctx = opts.getContext() self.assertEqual(opts._cipherString.encode("ascii"), ctx._cipherList) def test_givesMeaningfulErrorMessageIfNoCipherMatches(self): """ If there is no valid cipher that matches the user's wishes, a L{ValueError} is raised. """ self.assertRaises( ValueError, sslverify.OpenSSLCertificateOptions, privateKey=self.sKey, certificate=self.sCert, acceptableCiphers=sslverify.OpenSSLAcceptableCiphers.fromOpenSSLCipherString( "" ), ) def test_honorsAcceptableCiphersArgument(self): """ If acceptable ciphers are passed, they are used. """ @implementer(interfaces.IAcceptableCiphers) class FakeAcceptableCiphers: def selectCiphers(self, _): return [sslverify.OpenSSLCipher("sentinel")] opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, acceptableCiphers=FakeAcceptableCiphers(), ) opts._contextFactory = FakeContext ctx = opts.getContext() self.assertEqual(b"sentinel", ctx._cipherList) def test_basicSecurityOptionsAreSet(self): """ Every context must have C{OP_NO_SSLv2}, C{OP_NO_COMPRESSION}, and C{OP_CIPHER_SERVER_PREFERENCE} set. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, ) opts._contextFactory = FakeContext ctx = opts.getContext() options = ( SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION | SSL.OP_CIPHER_SERVER_PREFERENCE ) self.assertEqual(options, ctx._options & options) def test_modeIsSet(self): """ Every context must be in C{MODE_RELEASE_BUFFERS} mode. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, ) opts._contextFactory = FakeContext ctx = opts.getContext() self.assertEqual(SSL.MODE_RELEASE_BUFFERS, ctx._mode) def test_singleUseKeys(self): """ If C{singleUseKeys} is set, every context must have C{OP_SINGLE_DH_USE} and C{OP_SINGLE_ECDH_USE} set. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, enableSingleUseKeys=True, ) opts._contextFactory = FakeContext ctx = opts.getContext() options = SSL.OP_SINGLE_DH_USE | SSL.OP_SINGLE_ECDH_USE self.assertEqual(options, ctx._options & options) def test_methodIsDeprecated(self): """ Passing C{method} to L{sslverify.OpenSSLCertificateOptions} is deprecated. """ sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, method=SSL.SSLv23_METHOD, ) message = ( "Passing method to twisted.internet.ssl.CertificateOptions " "was deprecated in Twisted 17.1.0. Please use a " "combination of insecurelyLowerMinimumTo, raiseMinimumTo, " "and lowerMaximumSecurityTo instead, as Twisted will " "correctly configure the method." ) warnings = self.flushWarnings([self.test_methodIsDeprecated]) self.assertEqual(1, len(warnings)) self.assertEqual(DeprecationWarning, warnings[0]["category"]) self.assertEqual(message, warnings[0]["message"]) def test_tlsv1ByDefault(self): """ L{sslverify.OpenSSLCertificateOptions} will make the default minimum TLS version v1.0, if no C{method}, or C{insecurelyLowerMinimumTo} is given. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert ) opts._contextFactory = FakeContext ctx = opts.getContext() options = ( SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION | SSL.OP_CIPHER_SERVER_PREFERENCE | SSL.OP_NO_SSLv3 ) self.assertEqual(options, ctx._options & options) def test_tlsProtocolsAtLeastWithMinimum(self): """ Passing C{insecurelyLowerMinimumTo} along with C{raiseMinimumTo} to L{sslverify.OpenSSLCertificateOptions} will cause it to raise an exception. """ with self.assertRaises(TypeError) as e: sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, raiseMinimumTo=sslverify.TLSVersion.TLSv1_2, insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_2, ) self.assertIn("raiseMinimumTo", e.exception.args[0]) self.assertIn("insecurelyLowerMinimumTo", e.exception.args[0]) self.assertIn("exclusive", e.exception.args[0]) def test_tlsProtocolsNoMethodWithAtLeast(self): """ Passing C{raiseMinimumTo} along with C{method} to L{sslverify.OpenSSLCertificateOptions} will cause it to raise an exception. """ with self.assertRaises(TypeError) as e: sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, method=SSL.SSLv23_METHOD, raiseMinimumTo=sslverify.TLSVersion.TLSv1_2, ) self.assertIn("method", e.exception.args[0]) self.assertIn("raiseMinimumTo", e.exception.args[0]) self.assertIn("exclusive", e.exception.args[0]) def test_tlsProtocolsNoMethodWithMinimum(self): """ Passing C{insecurelyLowerMinimumTo} along with C{method} to L{sslverify.OpenSSLCertificateOptions} will cause it to raise an exception. """ with self.assertRaises(TypeError) as e: sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, method=SSL.SSLv23_METHOD, insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_2, ) self.assertIn("method", e.exception.args[0]) self.assertIn("insecurelyLowerMinimumTo", e.exception.args[0]) self.assertIn("exclusive", e.exception.args[0]) def test_tlsProtocolsNoMethodWithMaximum(self): """ Passing C{lowerMaximumSecurityTo} along with C{method} to L{sslverify.OpenSSLCertificateOptions} will cause it to raise an exception. """ with self.assertRaises(TypeError) as e: sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, method=SSL.SSLv23_METHOD, lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_2, ) self.assertIn("method", e.exception.args[0]) self.assertIn("lowerMaximumSecurityTo", e.exception.args[0]) self.assertIn("exclusive", e.exception.args[0]) def test_tlsVersionRangeInOrder(self): """ Passing out of order TLS versions to C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} will cause it to raise an exception. """ with self.assertRaises(ValueError) as e: sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_0, lowerMaximumSecurityTo=sslverify.TLSVersion.SSLv3, ) self.assertEqual( e.exception.args, ( ( "insecurelyLowerMinimumTo needs to be lower than " "lowerMaximumSecurityTo" ), ), ) def test_tlsVersionRangeInOrderAtLeast(self): """ Passing out of order TLS versions to C{raiseMinimumTo} and C{lowerMaximumSecurityTo} will cause it to raise an exception. """ with self.assertRaises(ValueError) as e: sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, raiseMinimumTo=sslverify.TLSVersion.TLSv1_0, lowerMaximumSecurityTo=sslverify.TLSVersion.SSLv3, ) self.assertEqual( e.exception.args, (("raiseMinimumTo needs to be lower than " "lowerMaximumSecurityTo"),), ) def test_tlsProtocolsreduceToMaxWithoutMin(self): """ When calling L{sslverify.OpenSSLCertificateOptions} with C{lowerMaximumSecurityTo} but no C{raiseMinimumTo} or C{insecurelyLowerMinimumTo} set, and C{lowerMaximumSecurityTo} is below the minimum default, the minimum will be made the new maximum. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, lowerMaximumSecurityTo=sslverify.TLSVersion.SSLv3, ) opts._contextFactory = FakeContext ctx = opts.getContext() options = ( SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION | SSL.OP_CIPHER_SERVER_PREFERENCE | SSL.OP_NO_TLSv1 | SSL.OP_NO_TLSv1_1 | SSL.OP_NO_TLSv1_2 | opts._OP_NO_TLSv1_3 ) self.assertEqual(options, ctx._options & options) def test_tlsProtocolsSSLv3Only(self): """ When calling L{sslverify.OpenSSLCertificateOptions} with C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} set to SSLv3, it will exclude all others. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, insecurelyLowerMinimumTo=sslverify.TLSVersion.SSLv3, lowerMaximumSecurityTo=sslverify.TLSVersion.SSLv3, ) opts._contextFactory = FakeContext ctx = opts.getContext() options = ( SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION | SSL.OP_CIPHER_SERVER_PREFERENCE | SSL.OP_NO_TLSv1 | SSL.OP_NO_TLSv1_1 | SSL.OP_NO_TLSv1_2 | opts._OP_NO_TLSv1_3 ) self.assertEqual(options, ctx._options & options) def test_tlsProtocolsTLSv1Point0Only(self): """ When calling L{sslverify.OpenSSLCertificateOptions} with C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} set to v1.0, it will exclude all others. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_0, lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_0, ) opts._contextFactory = FakeContext ctx = opts.getContext() options = ( SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION | SSL.OP_CIPHER_SERVER_PREFERENCE | SSL.OP_NO_SSLv3 | SSL.OP_NO_TLSv1_1 | SSL.OP_NO_TLSv1_2 | opts._OP_NO_TLSv1_3 ) self.assertEqual(options, ctx._options & options) def test_tlsProtocolsTLSv1Point1Only(self): """ When calling L{sslverify.OpenSSLCertificateOptions} with C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} set to v1.1, it will exclude all others. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_1, lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_1, ) opts._contextFactory = FakeContext ctx = opts.getContext() options = ( SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION | SSL.OP_CIPHER_SERVER_PREFERENCE | SSL.OP_NO_SSLv3 | SSL.OP_NO_TLSv1 | SSL.OP_NO_TLSv1_2 | opts._OP_NO_TLSv1_3 ) self.assertEqual(options, ctx._options & options) def test_tlsProtocolsTLSv1Point2Only(self): """ When calling L{sslverify.OpenSSLCertificateOptions} with C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} set to v1.2, it will exclude all others. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_2, lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_2, ) opts._contextFactory = FakeContext ctx = opts.getContext() options = ( SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION | SSL.OP_CIPHER_SERVER_PREFERENCE | SSL.OP_NO_SSLv3 | SSL.OP_NO_TLSv1 | SSL.OP_NO_TLSv1_1 | opts._OP_NO_TLSv1_3 ) self.assertEqual(options, ctx._options & options) def test_tlsProtocolsAllModernTLS(self): """ When calling L{sslverify.OpenSSLCertificateOptions} with C{insecurelyLowerMinimumTo} set to TLSv1.0 and C{lowerMaximumSecurityTo} to TLSv1.2, it will exclude both SSLs and the (unreleased) TLSv1.3. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_0, lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_2, ) opts._contextFactory = FakeContext ctx = opts.getContext() options = ( SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION | SSL.OP_CIPHER_SERVER_PREFERENCE | SSL.OP_NO_SSLv3 | opts._OP_NO_TLSv1_3 ) self.assertEqual(options, ctx._options & options) def test_tlsProtocolsAtLeastAllSecureTLS(self): """ When calling L{sslverify.OpenSSLCertificateOptions} with C{raiseMinimumTo} set to TLSv1.2, it will ignore all TLSs below 1.2 and SSL. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, raiseMinimumTo=sslverify.TLSVersion.TLSv1_2, ) opts._contextFactory = FakeContext ctx = opts.getContext() options = ( SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION | SSL.OP_CIPHER_SERVER_PREFERENCE | SSL.OP_NO_SSLv3 | SSL.OP_NO_TLSv1 | SSL.OP_NO_TLSv1_1 ) self.assertEqual(options, ctx._options & options) def test_tlsProtocolsAtLeastWillAcceptHigherDefault(self): """ When calling L{sslverify.OpenSSLCertificateOptions} with C{raiseMinimumTo} set to a value lower than Twisted's default will cause it to use the more secure default. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, raiseMinimumTo=sslverify.TLSVersion.SSLv3, ) opts._contextFactory = FakeContext ctx = opts.getContext() # Future maintainer warning: this will break if we change our default # up, so you should change it to add the relevant OP_NO flags when we # do make that change and this test fails. options = ( SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION | SSL.OP_CIPHER_SERVER_PREFERENCE | SSL.OP_NO_SSLv3 ) self.assertEqual(options, ctx._options & options) self.assertEqual(opts._defaultMinimumTLSVersion, sslverify.TLSVersion.TLSv1_0) def test_tlsProtocolsAllSecureTLS(self): """ When calling L{sslverify.OpenSSLCertificateOptions} with C{insecurelyLowerMinimumTo} set to TLSv1.2, it will ignore all TLSs below 1.2 and SSL. """ opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_2, ) opts._contextFactory = FakeContext ctx = opts.getContext() options = ( SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION | SSL.OP_CIPHER_SERVER_PREFERENCE | SSL.OP_NO_SSLv3 | SSL.OP_NO_TLSv1 | SSL.OP_NO_TLSv1_1 ) self.assertEqual(options, ctx._options & options) def test_dhParams(self): """ If C{dhParams} is set, they are loaded into each new context. """ class FakeDiffieHellmanParameters: _dhFile = FilePath(b"dh.params") dhParams = FakeDiffieHellmanParameters() opts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, dhParameters=dhParams, ) opts._contextFactory = FakeContext ctx = opts.getContext() self.assertEqual(FakeDiffieHellmanParameters._dhFile.path, ctx._dhFilename) def test_abbreviatingDistinguishedNames(self): """ Check that abbreviations used in certificates correctly map to complete names. """ self.assertEqual( sslverify.DN(CN=b"a", OU=b"hello"), sslverify.DistinguishedName( commonName=b"a", organizationalUnitName=b"hello" ), ) self.assertNotEqual( sslverify.DN(CN=b"a", OU=b"hello"), sslverify.DN(CN=b"a", OU=b"hello", emailAddress=b"xxx"), ) dn = sslverify.DN(CN=b"abcdefg") self.assertRaises(AttributeError, setattr, dn, "Cn", b"x") self.assertEqual(dn.CN, dn.commonName) dn.CN = b"bcdefga" self.assertEqual(dn.CN, dn.commonName) def testInspectDistinguishedName(self): n = sslverify.DN( commonName=b"common name", organizationName=b"organization name", organizationalUnitName=b"organizational unit name", localityName=b"locality name", stateOrProvinceName=b"state or province name", countryName=b"country name", emailAddress=b"email address", ) s = n.inspect() for k in [ "common name", "organization name", "organizational unit name", "locality name", "state or province name", "country name", "email address", ]: self.assertIn(k, s, f"{k!r} was not in inspect output.") self.assertIn(k.title(), s, f"{k!r} was not in inspect output.") def testInspectDistinguishedNameWithoutAllFields(self): n = sslverify.DN(localityName=b"locality name") s = n.inspect() for k in [ "common name", "organization name", "organizational unit name", "state or province name", "country name", "email address", ]: self.assertNotIn(k, s, f"{k!r} was in inspect output.") self.assertNotIn(k.title(), s, f"{k!r} was in inspect output.") self.assertIn("locality name", s) self.assertIn("Locality Name", s) def test_inspectCertificate(self): """ Test that the C{inspect} method of L{sslverify.Certificate} returns a human-readable string containing some basic information about the certificate. """ c = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM) pk = c.getPublicKey() keyHash = pk.keyHash() # Maintenance Note: the algorithm used to compute the "public key hash" # is highly dubious and can differ between underlying versions of # OpenSSL (and across versions of Twisted), since it is not actually # the hash of the public key by itself. If we can get the appropriate # APIs to get the hash of the key itself out of OpenSSL, then we should # be able to make it statically declared inline below again rather than # computing it here. self.assertEqual( c.inspect().split("\n"), [ "Certificate For Subject:", " Common Name: example.twistedmatrix.com", " Country Name: US", " Email Address: nobody@twistedmatrix.com", " Locality Name: Boston", " Organization Name: Twisted Matrix Labs", " Organizational Unit Name: Security", " State Or Province Name: Massachusetts", "", "Issuer:", " Common Name: example.twistedmatrix.com", " Country Name: US", " Email Address: nobody@twistedmatrix.com", " Locality Name: Boston", " Organization Name: Twisted Matrix Labs", " Organizational Unit Name: Security", " State Or Province Name: Massachusetts", "", "Serial Number: 12345", "Digest: C4:96:11:00:30:C3:EC:EE:A3:55:AA:ED:8C:84:85:18", "Public Key with Hash: " + keyHash, ], ) def test_publicKeyMatching(self): """ L{PublicKey.matches} returns L{True} for keys from certificates with the same key, and L{False} for keys from certificates with different keys. """ hostA = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM) hostB = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM) peerA = sslverify.Certificate.loadPEM(A_PEER_CERTIFICATE_PEM) self.assertTrue(hostA.getPublicKey().matches(hostB.getPublicKey())) self.assertFalse(peerA.getPublicKey().matches(hostA.getPublicKey())) def test_enablingAndDisablingSessions(self): """ The enableSessions argument sets the session cache mode; it defaults to False (at least until https://twistedmatrix.com/trac/ticket/9764 can be resolved). """ options = sslverify.OpenSSLCertificateOptions() self.assertEqual(options.enableSessions, False) ctx = options.getContext() self.assertEqual(ctx.get_session_cache_mode(), SSL.SESS_CACHE_OFF) options = sslverify.OpenSSLCertificateOptions(enableSessions=True) self.assertEqual(options.enableSessions, True) ctx = options.getContext() self.assertEqual(ctx.get_session_cache_mode(), SSL.SESS_CACHE_SERVER) def test_certificateOptionsSerialization(self): """ Test that __setstate__(__getstate__()) round-trips properly. """ firstOpts = sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, method=SSL.SSLv23_METHOD, verify=True, caCerts=[self.sCert], verifyDepth=2, requireCertificate=False, verifyOnce=False, enableSingleUseKeys=False, enableSessions=False, fixBrokenPeers=True, enableSessionTickets=True, ) context = firstOpts.getContext() self.assertIs(context, firstOpts._context) self.assertIsNotNone(context) state = firstOpts.__getstate__() self.assertNotIn("_context", state) opts = sslverify.OpenSSLCertificateOptions() opts.__setstate__(state) self.assertEqual(opts.privateKey, self.sKey) self.assertEqual(opts.certificate, self.sCert) self.assertEqual(opts.method, SSL.SSLv23_METHOD) self.assertTrue(opts.verify) self.assertEqual(opts.caCerts, [self.sCert]) self.assertEqual(opts.verifyDepth, 2) self.assertFalse(opts.requireCertificate) self.assertFalse(opts.verifyOnce) self.assertFalse(opts.enableSingleUseKeys) self.assertFalse(opts.enableSessions) self.assertTrue(opts.fixBrokenPeers) self.assertTrue(opts.enableSessionTickets) test_certificateOptionsSerialization.suppress = [ # type: ignore[attr-defined] util.suppress( category=DeprecationWarning, message=r"twisted\.internet\._sslverify\.*__[gs]etstate__", ) ] def test_certificateOptionsSessionTickets(self): """ Enabling session tickets should not set the OP_NO_TICKET option. """ opts = sslverify.OpenSSLCertificateOptions(enableSessionTickets=True) ctx = opts.getContext() self.assertEqual(0, ctx.set_options(0) & 0x00004000) def test_certificateOptionsSessionTicketsDisabled(self): """ Enabling session tickets should set the OP_NO_TICKET option. """ opts = sslverify.OpenSSLCertificateOptions(enableSessionTickets=False) ctx = opts.getContext() self.assertEqual(0x00004000, ctx.set_options(0) & 0x00004000) def test_allowedAnonymousClientConnection(self): """ Check that anonymous connections are allowed when certificates aren't required on the server. """ onData = defer.Deferred() self.loopback( sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, requireCertificate=False ), sslverify.OpenSSLCertificateOptions(requireCertificate=False), onData=onData, ) return onData.addCallback( lambda result: self.assertEqual(result, WritingProtocol.byte) ) def test_refusedAnonymousClientConnection(self): """ Check that anonymous connections are refused when certificates are required on the server. """ onServerLost = defer.Deferred() onClientLost = defer.Deferred() self.loopback( sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, verify=True, caCerts=[self.sCert], requireCertificate=True, ), sslverify.OpenSSLCertificateOptions(requireCertificate=False), onServerLost=onServerLost, onClientLost=onClientLost, ) d = defer.DeferredList([onClientLost, onServerLost], consumeErrors=True) def afterLost(result): ((cSuccess, cResult), (sSuccess, sResult)) = result self.assertFalse(cSuccess) self.assertFalse(sSuccess) # Win32 fails to report the SSL Error, and report a connection lost # instead: there is a race condition so that's not totally # surprising (see ticket #2877 in the tracker) self.assertIsInstance(cResult.value, (SSL.Error, ConnectionLost)) self.assertIsInstance(sResult.value, SSL.Error) return d.addCallback(afterLost) def test_failedCertificateVerification(self): """ Check that connecting with a certificate not accepted by the server CA fails. """ onServerLost = defer.Deferred() onClientLost = defer.Deferred() self.loopback( sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, verify=False, requireCertificate=False, ), sslverify.OpenSSLCertificateOptions( verify=True, requireCertificate=False, caCerts=[self.cCert] ), onServerLost=onServerLost, onClientLost=onClientLost, ) d = defer.DeferredList([onClientLost, onServerLost], consumeErrors=True) def afterLost(result): ((cSuccess, cResult), (sSuccess, sResult)) = result self.assertFalse(cSuccess) self.assertFalse(sSuccess) return d.addCallback(afterLost) def test_successfulCertificateVerification(self): """ Test a successful connection with client certificate validation on server side. """ onData = defer.Deferred() self.loopback( sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, verify=False, requireCertificate=False, ), sslverify.OpenSSLCertificateOptions( verify=True, requireCertificate=True, caCerts=[self.sCert] ), onData=onData, ) return onData.addCallback( lambda result: self.assertEqual(result, WritingProtocol.byte) ) def test_successfulSymmetricSelfSignedCertificateVerification(self): """ Test a successful connection with validation on both server and client sides. """ onData = defer.Deferred() self.loopback( sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, verify=True, requireCertificate=True, caCerts=[self.cCert], ), sslverify.OpenSSLCertificateOptions( privateKey=self.cKey, certificate=self.cCert, verify=True, requireCertificate=True, caCerts=[self.sCert], ), onData=onData, ) return onData.addCallback( lambda result: self.assertEqual(result, WritingProtocol.byte) ) def test_verification(self): """ Check certificates verification building custom certificates data. """ clientDN = sslverify.DistinguishedName(commonName="client") clientKey = sslverify.KeyPair.generate() clientCertReq = clientKey.certificateRequest(clientDN) serverDN = sslverify.DistinguishedName(commonName="server") serverKey = sslverify.KeyPair.generate() serverCertReq = serverKey.certificateRequest(serverDN) clientSelfCertReq = clientKey.certificateRequest(clientDN) clientSelfCertData = clientKey.signCertificateRequest( clientDN, clientSelfCertReq, lambda dn: True, 132 ) clientSelfCert = clientKey.newCertificate(clientSelfCertData) serverSelfCertReq = serverKey.certificateRequest(serverDN) serverSelfCertData = serverKey.signCertificateRequest( serverDN, serverSelfCertReq, lambda dn: True, 516 ) serverSelfCert = serverKey.newCertificate(serverSelfCertData) clientCertData = serverKey.signCertificateRequest( serverDN, clientCertReq, lambda dn: True, 7 ) clientCert = clientKey.newCertificate(clientCertData) serverCertData = clientKey.signCertificateRequest( clientDN, serverCertReq, lambda dn: True, 42 ) serverCert = serverKey.newCertificate(serverCertData) onData = defer.Deferred() serverOpts = serverCert.options(serverSelfCert) clientOpts = clientCert.options(clientSelfCert) self.loopback(serverOpts, clientOpts, onData=onData) return onData.addCallback( lambda result: self.assertEqual(result, WritingProtocol.byte) ) class OpenSSLOptionsECDHIntegrationTests(OpenSSLOptionsTestsMixin, TestCase): """ ECDH-related integration tests for L{OpenSSLOptions}. """ def test_ellipticCurveDiffieHellman(self): """ Connections use ECDH when OpenSSL supports it. """ if not get_elliptic_curves(): raise SkipTest("OpenSSL does not support ECDH.") onData = defer.Deferred() # TLS 1.3 cipher suites do not specify the key exchange # mechanism: # https://wiki.openssl.org/index.php/TLS1.3#Differences_with_TLS1.2_and_below # # and OpenSSL only supports ECHDE groups with TLS 1.3: # https://wiki.openssl.org/index.php/TLS1.3#Groups # # so TLS 1.3 implies ECDHE. Force this test to use TLS 1.3 to # ensure ECDH is selected when it might not be. self.loopback( sslverify.OpenSSLCertificateOptions( privateKey=self.sKey, certificate=self.sCert, requireCertificate=False, lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_3, ), sslverify.OpenSSLCertificateOptions( requireCertificate=False, lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_3, ), onData=onData, ) @onData.addCallback def assertECDH(_): self.assertEqual(len(self.clientConn.factory.protocols), 1) [clientProtocol] = self.clientConn.factory.protocols cipher = clientProtocol.getHandle().get_cipher_name() self.assertIn("ECDH", cipher) return onData class DeprecationTests(SynchronousTestCase): """ Tests for deprecation of L{sslverify.OpenSSLCertificateOptions}'s support of the pickle protocol. """ if skipSSL: skip = skipSSL def test_getstateDeprecation(self): """ L{sslverify.OpenSSLCertificateOptions.__getstate__} is deprecated. """ self.callDeprecated( (Version("Twisted", 15, 0, 0), "a real persistence system"), sslverify.OpenSSLCertificateOptions().__getstate__, ) def test_setstateDeprecation(self): """ L{sslverify.OpenSSLCertificateOptions.__setstate__} is deprecated. """ self.callDeprecated( (Version("Twisted", 15, 0, 0), "a real persistence system"), sslverify.OpenSSLCertificateOptions().__setstate__, {}, ) class TrustRootTests(TestCase): """ Tests for L{sslverify.OpenSSLCertificateOptions}' C{trustRoot} argument, L{sslverify.platformTrust}, and their interactions. """ if skipSSL: skip = skipSSL def setUp(self): """ Patch L{sslverify._ChooseDiffieHellmanEllipticCurve}. """ self.patch( sslverify, "_ChooseDiffieHellmanEllipticCurve", FakeChooseDiffieHellmanEllipticCurve, ) def test_caCertsPlatformDefaults(self): """ Specifying a C{trustRoot} of L{sslverify.OpenSSLDefaultPaths} when initializing L{sslverify.OpenSSLCertificateOptions} loads the platform-provided trusted certificates via C{set_default_verify_paths}. """ opts = sslverify.OpenSSLCertificateOptions( trustRoot=sslverify.OpenSSLDefaultPaths(), ) fc = FakeContext(SSL.TLSv1_METHOD) opts._contextFactory = lambda method: fc opts.getContext() self.assertTrue(fc._defaultVerifyPathsSet) def test_trustRootPlatformRejectsUntrustedCA(self): """ Specifying a C{trustRoot} of L{platformTrust} when initializing L{sslverify.OpenSSLCertificateOptions} causes certificates issued by a newly created CA to be rejected by an SSL connection using these options. Note that this test should I{always} pass, even on platforms where the CA certificates are not installed, as long as L{platformTrust} rejects completely invalid / unknown root CA certificates. This is simply a smoke test to make sure that verification is happening at all. """ caSelfCert, serverCert = certificatesForAuthorityAndServer() chainedCert = pathContainingDumpOf(self, serverCert, caSelfCert) privateKey = pathContainingDumpOf(self, serverCert.privateKey) sProto, cProto, sWrapped, cWrapped, pump = loopbackTLSConnection( trustRoot=platformTrust(), privateKeyFile=privateKey, chainedCertFile=chainedCert, ) # No data was received. self.assertEqual(cWrapped.data, b"") # It was an L{SSL.Error}. self.assertEqual(cWrapped.lostReason.type, SSL.Error) # Some combination of OpenSSL and PyOpenSSL is bad at reporting errors. err = cWrapped.lostReason.value self.assertEqual(err.args[0][0][2], "tlsv1 alert unknown ca") def test_trustRootSpecificCertificate(self): """ Specifying a L{Certificate} object for L{trustRoot} will result in that certificate being the only trust root for a client. """ caCert, serverCert = certificatesForAuthorityAndServer() otherCa, otherServer = certificatesForAuthorityAndServer() sProto, cProto, sWrapped, cWrapped, pump = loopbackTLSConnection( trustRoot=caCert, privateKeyFile=pathContainingDumpOf(self, serverCert.privateKey), chainedCertFile=pathContainingDumpOf(self, serverCert), ) pump.flush() self.assertIsNone(cWrapped.lostReason) self.assertEqual(cWrapped.data, sWrapped.greeting) class ServiceIdentityTests(SynchronousTestCase): """ Tests for the verification of the peer's service's identity via the C{hostname} argument to L{sslverify.OpenSSLCertificateOptions}. """ if skipSSL: skip = skipSSL def serviceIdentitySetup( self, clientHostname, serverHostname, serverContextSetup=lambda ctx: None, validCertificate=True, clientPresentsCertificate=False, validClientCertificate=True, serverVerifies=False, buggyInfoCallback=False, fakePlatformTrust=False, useDefaultTrust=False, ): """ Connect a server and a client. @param clientHostname: The I{client's idea} of the server's hostname; passed as the C{hostname} to the L{sslverify.OpenSSLCertificateOptions} instance. @type clientHostname: L{unicode} @param serverHostname: The I{server's own idea} of the server's hostname; present in the certificate presented by the server. @type serverHostname: L{unicode} @param serverContextSetup: a 1-argument callable invoked with the L{OpenSSL.SSL.Context} after it's produced. @type serverContextSetup: L{callable} taking L{OpenSSL.SSL.Context} returning L{None}. @param validCertificate: Is the server's certificate valid? L{True} if so, L{False} otherwise. @type validCertificate: L{bool} @param clientPresentsCertificate: Should the client present a certificate to the server? Defaults to 'no'. @type clientPresentsCertificate: L{bool} @param validClientCertificate: If the client presents a certificate, should it actually be a valid one, i.e. signed by the same CA that the server is checking? Defaults to 'yes'. @type validClientCertificate: L{bool} @param serverVerifies: Should the server verify the client's certificate? Defaults to 'no'. @type serverVerifies: L{bool} @param buggyInfoCallback: Should we patch the implementation so that the C{info_callback} passed to OpenSSL to have a bug and raise an exception (L{ZeroDivisionError})? Defaults to 'no'. @type buggyInfoCallback: L{bool} @param fakePlatformTrust: Should we fake the platformTrust to be the same as our fake server certificate authority, so that we can test it's being used? Defaults to 'no' and we just pass platform trust. @type fakePlatformTrust: L{bool} @param useDefaultTrust: Should we avoid passing the C{trustRoot} to L{ssl.optionsForClientTLS}? Defaults to 'no'. @type useDefaultTrust: L{bool} @return: the client TLS protocol, the client wrapped protocol, the server TLS protocol, the server wrapped protocol and an L{IOPump} which, when its C{pump} and C{flush} methods are called, will move data between the created client and server protocol instances @rtype: 5-L{tuple} of 4 L{IProtocol}s and L{IOPump} """ serverCA, serverCert = certificatesForAuthorityAndServer(serverHostname) other = {} passClientCert = None clientCA, clientCert = certificatesForAuthorityAndServer("client") if serverVerifies: other.update(trustRoot=clientCA) if clientPresentsCertificate: if validClientCertificate: passClientCert = clientCert else: bogusCA, bogus = certificatesForAuthorityAndServer("client") passClientCert = bogus serverOpts = sslverify.OpenSSLCertificateOptions( privateKey=serverCert.privateKey.original, certificate=serverCert.original, **other, ) serverContextSetup(serverOpts.getContext()) if not validCertificate: serverCA, otherServer = certificatesForAuthorityAndServer(serverHostname) if buggyInfoCallback: def broken(*a, **k): """ Raise an exception. @param a: Arguments for an C{info_callback} @param k: Keyword arguments for an C{info_callback} """ 1 / 0 self.patch( sslverify.ClientTLSOptions, "_identityVerifyingInfoCallback", broken, ) signature = {"hostname": clientHostname} if passClientCert: signature.update(clientCertificate=passClientCert) if not useDefaultTrust: signature.update(trustRoot=serverCA) if fakePlatformTrust: self.patch(sslverify, "platformTrust", lambda: serverCA) clientOpts = sslverify.optionsForClientTLS(**signature) class GreetingServer(protocol.Protocol): greeting = b"greetings!" lostReason = None data = b"" def connectionMade(self): self.transport.write(self.greeting) def dataReceived(self, data): self.data += data def connectionLost(self, reason): self.lostReason = reason class GreetingClient(protocol.Protocol): greeting = b"cheerio!" data = b"" lostReason = None def connectionMade(self): self.transport.write(self.greeting) def dataReceived(self, data): self.data += data def connectionLost(self, reason): self.lostReason = reason serverWrappedProto = GreetingServer() clientWrappedProto = GreetingClient() clientFactory = protocol.Factory() clientFactory.protocol = lambda: clientWrappedProto serverFactory = protocol.Factory() serverFactory.protocol = lambda: serverWrappedProto self.serverOpts = serverOpts self.clientOpts = clientOpts clientTLSFactory = TLSMemoryBIOFactory( clientOpts, isClient=True, wrappedFactory=clientFactory ) serverTLSFactory = TLSMemoryBIOFactory( serverOpts, isClient=False, wrappedFactory=serverFactory ) cProto, sProto, pump = connectedServerAndClient( lambda: serverTLSFactory.buildProtocol(None), lambda: clientTLSFactory.buildProtocol(None), ) return cProto, sProto, clientWrappedProto, serverWrappedProto, pump def test_invalidHostname(self): """ When a certificate containing an invalid hostname is received from the server, the connection is immediately dropped. """ cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup( "wrong-host.example.com", "correct-host.example.com", ) self.assertEqual(cWrapped.data, b"") self.assertEqual(sWrapped.data, b"") cErr = cWrapped.lostReason.value sErr = sWrapped.lostReason.value self.assertIsInstance(cErr, VerificationError) self.assertIsInstance(sErr, ConnectionClosed) def test_validHostname(self): """ Whenever a valid certificate containing a valid hostname is received, connection proceeds normally. """ cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup( "valid.example.com", "valid.example.com", ) self.assertEqual(cWrapped.data, b"greetings!") cErr = cWrapped.lostReason sErr = sWrapped.lostReason self.assertIsNone(cErr) self.assertIsNone(sErr) def test_validHostnameInvalidCertificate(self): """ When an invalid certificate containing a perfectly valid hostname is received, the connection is aborted with an OpenSSL error. """ cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup( "valid.example.com", "valid.example.com", validCertificate=False, ) self.assertEqual(cWrapped.data, b"") self.assertEqual(sWrapped.data, b"") cErr = cWrapped.lostReason.value sErr = sWrapped.lostReason.value self.assertIsInstance(cErr, SSL.Error) self.assertIsInstance(sErr, SSL.Error) def test_realCAsBetterNotSignOurBogusTestCerts(self): """ If we use the default trust from the platform, our dinky certificate should I{really} fail. """ cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup( "valid.example.com", "valid.example.com", validCertificate=False, useDefaultTrust=True, ) self.assertEqual(cWrapped.data, b"") self.assertEqual(sWrapped.data, b"") cErr = cWrapped.lostReason.value sErr = sWrapped.lostReason.value self.assertIsInstance(cErr, SSL.Error) self.assertIsInstance(sErr, SSL.Error) def test_butIfTheyDidItWouldWork(self): """ L{ssl.optionsForClientTLS} should be using L{ssl.platformTrust} by default, so if we fake that out then it should trust ourselves again. """ cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup( "valid.example.com", "valid.example.com", useDefaultTrust=True, fakePlatformTrust=True, ) self.assertEqual(cWrapped.data, b"greetings!") cErr = cWrapped.lostReason sErr = sWrapped.lostReason self.assertIsNone(cErr) self.assertIsNone(sErr) def test_clientPresentsCertificate(self): """ When the server verifies and the client presents a valid certificate for that verification by passing it to L{sslverify.optionsForClientTLS}, communication proceeds. """ cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup( "valid.example.com", "valid.example.com", validCertificate=True, serverVerifies=True, clientPresentsCertificate=True, ) self.assertEqual(cWrapped.data, b"greetings!") cErr = cWrapped.lostReason sErr = sWrapped.lostReason self.assertIsNone(cErr) self.assertIsNone(sErr) def test_clientPresentsBadCertificate(self): """ When the server verifies and the client presents an invalid certificate for that verification by passing it to L{sslverify.optionsForClientTLS}, the connection cannot be established with an SSL error. """ cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup( "valid.example.com", "valid.example.com", validCertificate=True, serverVerifies=True, validClientCertificate=False, clientPresentsCertificate=True, ) self.assertEqual(cWrapped.data, b"") cErr = cWrapped.lostReason.value sErr = sWrapped.lostReason.value self.assertIsInstance(cErr, SSL.Error) self.assertIsInstance(sErr, SSL.Error) @skipIf(skipSNI, skipSNI) def test_hostnameIsIndicated(self): """ Specifying the C{hostname} argument to L{CertificateOptions} also sets the U{Server Name Extension } TLS indication field to the correct value. """ names = [] def setupServerContext(ctx): def servername_received(conn): names.append(conn.get_servername().decode("ascii")) ctx.set_tlsext_servername_callback(servername_received) cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup( "valid.example.com", "valid.example.com", setupServerContext ) self.assertEqual(names, ["valid.example.com"]) @skipIf(skipSNI, skipSNI) def test_hostnameEncoding(self): """ Hostnames are encoded as IDNA. """ names = [] hello = "h\N{LATIN SMALL LETTER A WITH ACUTE}llo.example.com" def setupServerContext(ctx): def servername_received(conn): serverIDNA = _idnaText(conn.get_servername()) names.append(serverIDNA) ctx.set_tlsext_servername_callback(servername_received) cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup( hello, hello, setupServerContext ) self.assertEqual(names, [hello]) self.assertEqual(cWrapped.data, b"greetings!") cErr = cWrapped.lostReason sErr = sWrapped.lostReason self.assertIsNone(cErr) self.assertIsNone(sErr) def test_fallback(self): """ L{sslverify.simpleVerifyHostname} checks string equality on the commonName of a connection's certificate's subject, doing nothing if it matches and raising L{VerificationError} if it doesn't. """ name = "something.example.com" class Connection: def get_peer_certificate(self): """ Fake of L{OpenSSL.SSL.Connection.get_peer_certificate}. @return: A certificate with a known common name. @rtype: L{OpenSSL.crypto.X509} """ cert = X509() cert.get_subject().commonName = name return cert conn = Connection() self.assertIs( sslverify.simpleVerifyHostname(conn, "something.example.com"), None ) self.assertRaises( sslverify.SimpleVerificationError, sslverify.simpleVerifyHostname, conn, "nonsense", ) def test_surpriseFromInfoCallback(self): """ pyOpenSSL isn't always so great about reporting errors. If one occurs in the verification info callback, it should be logged and the connection should be shut down (if possible, anyway; the app_data could be clobbered but there's no point testing for that). """ cProto, sProto, cWrapped, sWrapped, pump = self.serviceIdentitySetup( "correct-host.example.com", "correct-host.example.com", buggyInfoCallback=True, ) self.assertEqual(cWrapped.data, b"") self.assertEqual(sWrapped.data, b"") cErr = cWrapped.lostReason.value sErr = sWrapped.lostReason.value self.assertIsInstance(cErr, ZeroDivisionError) self.assertIsInstance(sErr, (ConnectionClosed, SSL.Error)) errors = self.flushLoggedErrors(ZeroDivisionError) self.assertTrue(errors) def negotiateProtocol(serverProtocols, clientProtocols, clientOptions=None): """ Create the TLS connection and negotiate a next protocol. @param serverProtocols: The protocols the server is willing to negotiate. @param clientProtocols: The protocols the client is willing to negotiate. @param clientOptions: The type of C{OpenSSLCertificateOptions} class to use for the client. Defaults to C{OpenSSLCertificateOptions}. @return: A L{tuple} of the negotiated protocol and the reason the connection was lost. """ caCertificate, serverCertificate = certificatesForAuthorityAndServer() trustRoot = sslverify.OpenSSLCertificateAuthorities( [ caCertificate.original, ] ) sProto, cProto, sWrapped, cWrapped, pump = loopbackTLSConnectionInMemory( trustRoot=trustRoot, privateKey=serverCertificate.privateKey.original, serverCertificate=serverCertificate.original, clientProtocols=clientProtocols, serverProtocols=serverProtocols, clientOptions=clientOptions, ) pump.flush() return (cProto.negotiatedProtocol, cWrapped.lostReason) class NPNOrALPNTests(TestCase): """ NPN and ALPN protocol selection. These tests only run on platforms that have a PyOpenSSL version >= 0.15, and OpenSSL version 1.0.1 or later. """ if skipSSL: skip = skipSSL elif skipNPN: skip = skipNPN def test_nextProtocolMechanismsNPNIsSupported(self): """ When at least NPN is available on the platform, NPN is in the set of supported negotiation protocols. """ supportedProtocols = sslverify.protocolNegotiationMechanisms() self.assertTrue(sslverify.ProtocolNegotiationSupport.NPN in supportedProtocols) def test_NPNAndALPNSuccess(self): """ When both ALPN and NPN are used, and both the client and server have overlapping protocol choices, a protocol is successfully negotiated. Further, the negotiated protocol is the first one in the list. """ protocols = [b"h2", b"http/1.1"] negotiatedProtocol, lostReason = negotiateProtocol( clientProtocols=protocols, serverProtocols=protocols, ) self.assertEqual(negotiatedProtocol, b"h2") self.assertIsNone(lostReason) def test_NPNAndALPNDifferent(self): """ Client and server have different protocol lists: only the common element is chosen. """ serverProtocols = [b"h2", b"http/1.1", b"spdy/2"] clientProtocols = [b"spdy/3", b"http/1.1"] negotiatedProtocol, lostReason = negotiateProtocol( clientProtocols=clientProtocols, serverProtocols=serverProtocols, ) self.assertEqual(negotiatedProtocol, b"http/1.1") self.assertIsNone(lostReason) def test_NPNAndALPNNoAdvertise(self): """ When one peer does not advertise any protocols, the connection is set up with no next protocol. """ protocols = [b"h2", b"http/1.1"] negotiatedProtocol, lostReason = negotiateProtocol( clientProtocols=protocols, serverProtocols=[], ) self.assertIsNone(negotiatedProtocol) self.assertIsNone(lostReason) def test_NPNAndALPNNoOverlap(self): """ When the client and server have no overlap of protocols, the connection fails. """ clientProtocols = [b"h2", b"http/1.1"] serverProtocols = [b"spdy/3"] negotiatedProtocol, lostReason = negotiateProtocol( serverProtocols=clientProtocols, clientProtocols=serverProtocols, ) self.assertIsNone(negotiatedProtocol) self.assertEqual(lostReason.type, SSL.Error) class ALPNTests(TestCase): """ ALPN protocol selection. These tests only run on platforms that have a PyOpenSSL version >= 0.15, and OpenSSL version 1.0.2 or later. This covers only the ALPN specific logic, as any platform that has ALPN will also have NPN and so will run the NPNAndALPNTest suite as well. """ if skipSSL: skip = skipSSL elif skipALPN: skip = skipALPN def test_nextProtocolMechanismsALPNIsSupported(self): """ When ALPN is available on a platform, protocolNegotiationMechanisms includes ALPN in the suported protocols. """ supportedProtocols = sslverify.protocolNegotiationMechanisms() self.assertTrue(sslverify.ProtocolNegotiationSupport.ALPN in supportedProtocols) class NPNAndALPNAbsentTests(TestCase): """ NPN/ALPN operations fail on platforms that do not support them. These tests only run on platforms that have a PyOpenSSL version < 0.15, an OpenSSL version earlier than 1.0.1, or an OpenSSL/cryptography built without NPN support. """ if skipSSL: skip = skipSSL elif not skipNPN or not skipALPN: skip = "NPN and/or ALPN is present on this platform" def test_nextProtocolMechanismsNoNegotiationSupported(self): """ When neither NPN or ALPN are available on a platform, there are no supported negotiation protocols. """ supportedProtocols = sslverify.protocolNegotiationMechanisms() self.assertFalse(supportedProtocols) def test_NPNAndALPNNotImplemented(self): """ A NotImplementedError is raised when using acceptableProtocols on a platform that does not support either NPN or ALPN. """ protocols = [b"h2", b"http/1.1"] self.assertRaises( NotImplementedError, negotiateProtocol, serverProtocols=protocols, clientProtocols=protocols, ) def test_NegotiatedProtocolReturnsNone(self): """ negotiatedProtocol return L{None} even when NPN/ALPN aren't supported. This works because, as neither are supported, negotiation isn't even attempted. """ serverProtocols = None clientProtocols = None negotiatedProtocol, lostReason = negotiateProtocol( clientProtocols=clientProtocols, serverProtocols=serverProtocols, ) self.assertIsNone(negotiatedProtocol) self.assertIsNone(lostReason) class _NotSSLTransport: def getHandle(self): return self class _MaybeSSLTransport: def getHandle(self): return self def get_peer_certificate(self): return None def get_host_certificate(self): return None class _ActualSSLTransport: def getHandle(self): return self def get_host_certificate(self): return sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM).original def get_peer_certificate(self): return sslverify.Certificate.loadPEM(A_PEER_CERTIFICATE_PEM).original class ConstructorsTests(TestCase): if skipSSL: skip = skipSSL def test_peerFromNonSSLTransport(self): """ Verify that peerFromTransport raises an exception if the transport passed is not actually an SSL transport. """ x = self.assertRaises( CertificateError, sslverify.Certificate.peerFromTransport, _NotSSLTransport(), ) self.assertTrue(str(x).startswith("non-TLS")) def test_peerFromBlankSSLTransport(self): """ Verify that peerFromTransport raises an exception if the transport passed is an SSL transport, but doesn't have a peer certificate. """ x = self.assertRaises( CertificateError, sslverify.Certificate.peerFromTransport, _MaybeSSLTransport(), ) self.assertTrue(str(x).startswith("TLS")) def test_hostFromNonSSLTransport(self): """ Verify that hostFromTransport raises an exception if the transport passed is not actually an SSL transport. """ x = self.assertRaises( CertificateError, sslverify.Certificate.hostFromTransport, _NotSSLTransport(), ) self.assertTrue(str(x).startswith("non-TLS")) def test_hostFromBlankSSLTransport(self): """ Verify that hostFromTransport raises an exception if the transport passed is an SSL transport, but doesn't have a host certificate. """ x = self.assertRaises( CertificateError, sslverify.Certificate.hostFromTransport, _MaybeSSLTransport(), ) self.assertTrue(str(x).startswith("TLS")) def test_hostFromSSLTransport(self): """ Verify that hostFromTransport successfully creates the correct certificate if passed a valid SSL transport. """ self.assertEqual( sslverify.Certificate.hostFromTransport( _ActualSSLTransport() ).serialNumber(), 12345, ) def test_peerFromSSLTransport(self): """ Verify that peerFromTransport successfully creates the correct certificate if passed a valid SSL transport. """ self.assertEqual( sslverify.Certificate.peerFromTransport( _ActualSSLTransport() ).serialNumber(), 12346, ) class MultipleCertificateTrustRootTests(TestCase): """ Test the behavior of the trustRootFromCertificates() API call. """ if skipSSL: skip = skipSSL def test_trustRootFromCertificatesPrivatePublic(self): """ L{trustRootFromCertificates} accepts either a L{sslverify.Certificate} or a L{sslverify.PrivateCertificate} instance. """ privateCert = sslverify.PrivateCertificate.loadPEM(A_KEYPAIR) cert = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM) mt = sslverify.trustRootFromCertificates([privateCert, cert]) # Verify that the returned object acts correctly when used as a # trustRoot= param to optionsForClientTLS. sProto, cProto, sWrap, cWrap, pump = loopbackTLSConnectionInMemory( trustRoot=mt, privateKey=privateCert.privateKey.original, serverCertificate=privateCert.original, ) # This connection should succeed self.assertEqual(cWrap.data, b"greetings!") self.assertIsNone(cWrap.lostReason) def test_trustRootSelfSignedServerCertificate(self): """ L{trustRootFromCertificates} called with a single self-signed certificate will cause L{optionsForClientTLS} to accept client connections to a server with that certificate. """ key, cert = makeCertificate(O=b"Server Test Certificate", CN=b"server") selfSigned = sslverify.PrivateCertificate.fromCertificateAndKeyPair( sslverify.Certificate(cert), sslverify.KeyPair(key), ) trust = sslverify.trustRootFromCertificates([selfSigned]) # Since we trust this exact certificate, connections to this server # should succeed. sProto, cProto, sWrap, cWrap, pump = loopbackTLSConnectionInMemory( trustRoot=trust, privateKey=selfSigned.privateKey.original, serverCertificate=selfSigned.original, ) self.assertEqual(cWrap.data, b"greetings!") self.assertIsNone(cWrap.lostReason) def test_trustRootCertificateAuthorityTrustsConnection(self): """ L{trustRootFromCertificates} called with certificate A will cause L{optionsForClientTLS} to accept client connections to a server with certificate B where B is signed by A. """ caCert, serverCert = certificatesForAuthorityAndServer() trust = sslverify.trustRootFromCertificates([caCert]) # Since we've listed the CA's certificate as a trusted cert, a # connection to the server certificate it signed should succeed. sProto, cProto, sWrap, cWrap, pump = loopbackTLSConnectionInMemory( trustRoot=trust, privateKey=serverCert.privateKey.original, serverCertificate=serverCert.original, ) self.assertEqual(cWrap.data, b"greetings!") self.assertIsNone(cWrap.lostReason) def test_trustRootFromCertificatesUntrusted(self): """ L{trustRootFromCertificates} called with certificate A will cause L{optionsForClientTLS} to disallow any connections to a server with certificate B where B is not signed by A. """ key, cert = makeCertificate(O=b"Server Test Certificate", CN=b"server") serverCert = sslverify.PrivateCertificate.fromCertificateAndKeyPair( sslverify.Certificate(cert), sslverify.KeyPair(key), ) untrustedCert = sslverify.Certificate( makeCertificate(O=b"CA Test Certificate", CN=b"unknown CA")[1] ) trust = sslverify.trustRootFromCertificates([untrustedCert]) # Since we only trust 'untrustedCert' which has not signed our # server's cert, we should reject this connection sProto, cProto, sWrap, cWrap, pump = loopbackTLSConnectionInMemory( trustRoot=trust, privateKey=serverCert.privateKey.original, serverCertificate=serverCert.original, ) # This connection should fail, so no data was received. self.assertEqual(cWrap.data, b"") # It was an L{SSL.Error}. self.assertEqual(cWrap.lostReason.type, SSL.Error) # Some combination of OpenSSL and PyOpenSSL is bad at reporting errors. err = cWrap.lostReason.value self.assertEqual(err.args[0][0][2], "tlsv1 alert unknown ca") def test_trustRootFromCertificatesOpenSSLObjects(self): """ L{trustRootFromCertificates} rejects any L{OpenSSL.crypto.X509} instances in the list passed to it. """ private = sslverify.PrivateCertificate.loadPEM(A_KEYPAIR) certX509 = private.original exception = self.assertRaises( TypeError, sslverify.trustRootFromCertificates, [certX509], ) self.assertEqual( "certificates items must be twisted.internet.ssl.CertBase " "instances", exception.args[0], ) class OpenSSLCipherTests(TestCase): """ Tests for twisted.internet._sslverify.OpenSSLCipher. """ if skipSSL: skip = skipSSL cipherName = "CIPHER-STRING" def test_constructorSetsFullName(self): """ The first argument passed to the constructor becomes the full name. """ self.assertEqual( self.cipherName, sslverify.OpenSSLCipher(self.cipherName).fullName ) def test_repr(self): """ C{repr(cipher)} returns a valid constructor call. """ cipher = sslverify.OpenSSLCipher(self.cipherName) self.assertEqual( cipher, eval(repr(cipher), {"OpenSSLCipher": sslverify.OpenSSLCipher}) ) def test_eqSameClass(self): """ Equal type and C{fullName} means that the objects are equal. """ cipher1 = sslverify.OpenSSLCipher(self.cipherName) cipher2 = sslverify.OpenSSLCipher(self.cipherName) self.assertEqual(cipher1, cipher2) def test_eqSameNameDifferentType(self): """ If ciphers have the same name but different types, they're still different. """ class DifferentCipher: fullName = self.cipherName self.assertNotEqual( sslverify.OpenSSLCipher(self.cipherName), DifferentCipher(), ) class ExpandCipherStringTests(TestCase): """ Tests for twisted.internet._sslverify._expandCipherString. """ if skipSSL: skip = skipSSL def test_doesNotStumbleOverEmptyList(self): """ If the expanded cipher list is empty, an empty L{list} is returned. """ self.assertEqual( tuple(), sslverify._expandCipherString("", SSL.SSLv23_METHOD, 0) ) def test_doesNotSwallowOtherSSLErrors(self): """ Only no cipher matches get swallowed, every other SSL error gets propagated. """ def raiser(_): # Unfortunately, there seems to be no way to trigger a real SSL # error artificially. raise SSL.Error([["", "", ""]]) ctx = FakeContext(SSL.SSLv23_METHOD) ctx.set_cipher_list = raiser self.patch(sslverify.SSL, "Context", lambda _: ctx) self.assertRaises( SSL.Error, sslverify._expandCipherString, "ALL", SSL.SSLv23_METHOD, 0 ) def test_returnsTupleOfICiphers(self): """ L{sslverify._expandCipherString} always returns a L{tuple} of L{interfaces.ICipher}. """ ciphers = sslverify._expandCipherString("ALL", SSL.SSLv23_METHOD, 0) self.assertIsInstance(ciphers, tuple) bogus = [] for c in ciphers: if not interfaces.ICipher.providedBy(c): bogus.append(c) self.assertEqual([], bogus) class AcceptableCiphersTests(TestCase): """ Tests for twisted.internet._sslverify.OpenSSLAcceptableCiphers. """ if skipSSL: skip = skipSSL def test_selectOnEmptyListReturnsEmptyList(self): """ If no ciphers are available, nothing can be selected. """ ac = sslverify.OpenSSLAcceptableCiphers(tuple()) self.assertEqual(tuple(), ac.selectCiphers(tuple())) def test_selectReturnsOnlyFromAvailable(self): """ Select only returns a cross section of what is available and what is desirable. """ ac = sslverify.OpenSSLAcceptableCiphers( [ sslverify.OpenSSLCipher("A"), sslverify.OpenSSLCipher("B"), ] ) self.assertEqual( (sslverify.OpenSSLCipher("B"),), ac.selectCiphers( [sslverify.OpenSSLCipher("B"), sslverify.OpenSSLCipher("C")] ), ) def test_fromOpenSSLCipherStringExpandsToTupleOfCiphers(self): """ If L{sslverify.OpenSSLAcceptableCiphers.fromOpenSSLCipherString} is called it expands the string to a tuple of ciphers. """ ac = sslverify.OpenSSLAcceptableCiphers.fromOpenSSLCipherString("ALL") self.assertIsInstance(ac._ciphers, tuple) self.assertTrue(all(sslverify.ICipher.providedBy(c) for c in ac._ciphers)) class DiffieHellmanParametersTests(TestCase): """ Tests for twisted.internet._sslverify.OpenSSLDHParameters. """ if skipSSL: skip = skipSSL filePath = FilePath(b"dh.params") def test_fromFile(self): """ Calling C{fromFile} with a filename returns an instance with that file name saved. """ params = sslverify.OpenSSLDiffieHellmanParameters.fromFile(self.filePath) self.assertEqual(self.filePath, params._dhFile) class FakeLibState: """ State for L{FakeLib} @param setECDHAutoRaises: An exception L{FakeLib.SSL_CTX_set_ecdh_auto} should raise; if L{None}, nothing is raised. @ivar ecdhContexts: A list of SSL contexts with which L{FakeLib.SSL_CTX_set_ecdh_auto} was called @type ecdhContexts: L{list} of L{OpenSSL.SSL.Context}s @ivar ecdhValues: A list of boolean values with which L{FakeLib.SSL_CTX_set_ecdh_auto} was called @type ecdhValues: L{list} of L{boolean}s """ __slots__ = ("setECDHAutoRaises", "ecdhContexts", "ecdhValues") def __init__(self, setECDHAutoRaises): self.setECDHAutoRaises = setECDHAutoRaises self.ecdhContexts = [] self.ecdhValues = [] class FakeLib: """ An introspectable fake of cryptography's lib object. @param state: A L{FakeLibState} instance that contains this fake's state. """ def __init__(self, state): self._state = state def SSL_CTX_set_ecdh_auto(self, ctx, value): """ Record the context and value under in the C{_state} instance variable. @see: L{FakeLibState} @param ctx: An SSL context. @type ctx: L{OpenSSL.SSL.Context} @param value: A boolean value @type value: L{bool} """ self._state.ecdhContexts.append(ctx) self._state.ecdhValues.append(value) if self._state.setECDHAutoRaises is not None: raise self._state.setECDHAutoRaises class FakeLibTests(TestCase): """ Tests for L{FakeLib}. """ def test_SSL_CTX_set_ecdh_auto(self): """ L{FakeLib.SSL_CTX_set_ecdh_auto} records context and value it was called with. """ state = FakeLibState(setECDHAutoRaises=None) lib = FakeLib(state) self.assertNot(state.ecdhContexts) self.assertNot(state.ecdhValues) context, value = "CONTEXT", True lib.SSL_CTX_set_ecdh_auto(context, value) self.assertEqual(state.ecdhContexts, [context]) self.assertEqual(state.ecdhValues, [True]) def test_SSL_CTX_set_ecdh_autoRaises(self): """ L{FakeLib.SSL_CTX_set_ecdh_auto} raises the exception provided by its state, while still recording its arguments. """ state = FakeLibState(setECDHAutoRaises=ValueError) lib = FakeLib(state) self.assertNot(state.ecdhContexts) self.assertNot(state.ecdhValues) context, value = "CONTEXT", True self.assertRaises(ValueError, lib.SSL_CTX_set_ecdh_auto, context, value) self.assertEqual(state.ecdhContexts, [context]) self.assertEqual(state.ecdhValues, [True]) class FakeCryptoState: """ State for L{FakeCrypto} @param getEllipticCurveRaises: What L{FakeCrypto.get_elliptic_curve} should raise; L{None} and it won't raise anything @param getEllipticCurveReturns: What L{FakeCrypto.get_elliptic_curve} should return. @ivar getEllipticCurveCalls: The arguments with which L{FakeCrypto.get_elliptic_curve} has been called. @type getEllipticCurveCalls: L{list} """ __slots__ = ( "getEllipticCurveRaises", "getEllipticCurveReturns", "getEllipticCurveCalls", ) def __init__( self, getEllipticCurveRaises, getEllipticCurveReturns, ): self.getEllipticCurveRaises = getEllipticCurveRaises self.getEllipticCurveReturns = getEllipticCurveReturns self.getEllipticCurveCalls = [] class FakeCrypto: """ An introspectable fake of pyOpenSSL's L{OpenSSL.crypto} module. @ivar state: A L{FakeCryptoState} instance """ def __init__(self, state): self._state = state def get_elliptic_curve(self, curve): """ A fake that records the curve with which it was called. @param curve: see L{crypto.get_elliptic_curve} @return: see L{FakeCryptoState.getEllipticCurveReturns} @raises: see L{FakeCryptoState.getEllipticCurveRaises} """ self._state.getEllipticCurveCalls.append(curve) if self._state.getEllipticCurveRaises is not None: raise self._state.getEllipticCurveRaises return self._state.getEllipticCurveReturns class FakeCryptoTests(SynchronousTestCase): """ Tests for L{FakeCrypto}. """ def test_get_elliptic_curveRecordsArgument(self): """ L{FakeCrypto.test_get_elliptic_curve} records the curve with which it was called. """ state = FakeCryptoState( getEllipticCurveRaises=None, getEllipticCurveReturns=None, ) crypto = FakeCrypto(state) crypto.get_elliptic_curve("a curve name") self.assertEqual(state.getEllipticCurveCalls, ["a curve name"]) def test_get_elliptic_curveReturns(self): """ L{FakeCrypto.test_get_elliptic_curve} returns the value specified by its state object and records what it was called with. """ returnValue = "object" state = FakeCryptoState( getEllipticCurveRaises=None, getEllipticCurveReturns=returnValue, ) crypto = FakeCrypto(state) self.assertIs( crypto.get_elliptic_curve("another curve name"), returnValue, ) self.assertEqual(state.getEllipticCurveCalls, ["another curve name"]) def test_get_elliptic_curveRaises(self): """ L{FakeCrypto.test_get_elliptic_curve} raises the exception specified by its state object. """ state = FakeCryptoState( getEllipticCurveRaises=ValueError, getEllipticCurveReturns=None ) crypto = FakeCrypto(state) self.assertRaises( ValueError, crypto.get_elliptic_curve, "yet another curve name", ) self.assertEqual( state.getEllipticCurveCalls, ["yet another curve name"], ) class ChooseDiffieHellmanEllipticCurveTests(SynchronousTestCase): """ Tests for L{sslverify._ChooseDiffieHellmanEllipticCurve}. @cvar OPENSSL_110: A version number for OpenSSL 1.1.0 @cvar OPENSSL_102: A version number for OpenSSL 1.0.2 @cvar OPENSSL_101: A version number for OpenSSL 1.0.1 @see: U{https://wiki.openssl.org/index.php/Manual:OPENSSL_VERSION_NUMBER(3)} """ if skipSSL: skip = skipSSL OPENSSL_110 = 0x1010007F OPENSSL_102 = 0x100020EF OPENSSL_101 = 0x1000114F def setUp(self): self.libState = FakeLibState(setECDHAutoRaises=False) self.lib = FakeLib(self.libState) self.cryptoState = FakeCryptoState( getEllipticCurveReturns=None, getEllipticCurveRaises=None ) self.crypto = FakeCrypto(self.cryptoState) self.context = FakeContext(SSL.SSLv23_METHOD) def test_openSSL110(self): """ No configuration of contexts occurs under OpenSSL 1.1.0 and later, because they create contexts with secure ECDH curves. @see: U{http://twistedmatrix.com/trac/ticket/9210} """ chooser = sslverify._ChooseDiffieHellmanEllipticCurve( self.OPENSSL_110, openSSLlib=self.lib, openSSLcrypto=self.crypto, ) chooser.configureECDHCurve(self.context) self.assertFalse(self.libState.ecdhContexts) self.assertFalse(self.libState.ecdhValues) self.assertFalse(self.cryptoState.getEllipticCurveCalls) self.assertIsNone(self.context._ecCurve) def test_openSSL102(self): """ OpenSSL 1.0.2 does not set ECDH curves by default, but C{SSL_CTX_set_ecdh_auto} requests that a context choose a secure set curves automatically. """ context = SSL.Context(SSL.SSLv23_METHOD) chooser = sslverify._ChooseDiffieHellmanEllipticCurve( self.OPENSSL_102, openSSLlib=self.lib, openSSLcrypto=self.crypto, ) chooser.configureECDHCurve(context) self.assertEqual(self.libState.ecdhContexts, [context._context]) self.assertEqual(self.libState.ecdhValues, [True]) self.assertFalse(self.cryptoState.getEllipticCurveCalls) self.assertIsNone(self.context._ecCurve) def test_openSSL102SetECDHAutoRaises(self): """ An exception raised by C{SSL_CTX_set_ecdh_auto} under OpenSSL 1.0.2 is suppressed because ECDH is best-effort. """ self.libState.setECDHAutoRaises = BaseException context = SSL.Context(SSL.SSLv23_METHOD) chooser = sslverify._ChooseDiffieHellmanEllipticCurve( self.OPENSSL_102, openSSLlib=self.lib, openSSLcrypto=self.crypto, ) chooser.configureECDHCurve(context) self.assertEqual(self.libState.ecdhContexts, [context._context]) self.assertEqual(self.libState.ecdhValues, [True]) self.assertFalse(self.cryptoState.getEllipticCurveCalls) def test_openSSL101(self): """ OpenSSL 1.0.1 does not set ECDH curves by default, nor does it expose L{SSL_CTX_set_ecdh_auto}. Instead, a single ECDH curve can be set with L{OpenSSL.SSL.Context.set_tmp_ecdh}. """ self.cryptoState.getEllipticCurveReturns = curve = "curve object" chooser = sslverify._ChooseDiffieHellmanEllipticCurve( self.OPENSSL_101, openSSLlib=self.lib, openSSLcrypto=self.crypto, ) chooser.configureECDHCurve(self.context) self.assertFalse(self.libState.ecdhContexts) self.assertFalse(self.libState.ecdhValues) self.assertEqual( self.cryptoState.getEllipticCurveCalls, [sslverify._defaultCurveName], ) self.assertIs(self.context._ecCurve, curve) def test_openSSL101SetECDHRaises(self): """ An exception raised by L{OpenSSL.SSL.Context.set_tmp_ecdh} under OpenSSL 1.0.1 is suppressed because ECHDE is best-effort. """ def set_tmp_ecdh(ctx): raise BaseException self.context.set_tmp_ecdh = set_tmp_ecdh chooser = sslverify._ChooseDiffieHellmanEllipticCurve( self.OPENSSL_101, openSSLlib=self.lib, openSSLcrypto=self.crypto, ) chooser.configureECDHCurve(self.context) self.assertFalse(self.libState.ecdhContexts) self.assertFalse(self.libState.ecdhValues) self.assertEqual( self.cryptoState.getEllipticCurveCalls, [sslverify._defaultCurveName], ) def test_openSSL101NoECC(self): """ Contexts created under an OpenSSL 1.0.1 that doesn't support ECC have no configuration applied. """ self.cryptoState.getEllipticCurveRaises = ValueError chooser = sslverify._ChooseDiffieHellmanEllipticCurve( self.OPENSSL_101, openSSLlib=self.lib, openSSLcrypto=self.crypto, ) chooser.configureECDHCurve(self.context) self.assertFalse(self.libState.ecdhContexts) self.assertFalse(self.libState.ecdhValues) self.assertIsNone(self.context._ecCurve) class KeyPairTests(TestCase): """ Tests for L{sslverify.KeyPair}. """ if skipSSL: skip = skipSSL def setUp(self): """ Create test certificate. """ self.sKey = makeCertificate(O=b"Server Test Certificate", CN=b"server")[0] def test_getstateDeprecation(self): """ L{sslverify.KeyPair.__getstate__} is deprecated. """ self.callDeprecated( (Version("Twisted", 15, 0, 0), "a real persistence system"), sslverify.KeyPair(self.sKey).__getstate__, ) def test_setstateDeprecation(self): """ {sslverify.KeyPair.__setstate__} is deprecated. """ state = sslverify.KeyPair(self.sKey).dump() self.callDeprecated( (Version("Twisted", 15, 0, 0), "a real persistence system"), sslverify.KeyPair(self.sKey).__setstate__, state, ) def test_noTrailingNewlinePemCert(self): noTrailingNewlineKeyPemPath = getModule("twisted.test").filePath.sibling( "cert.pem.no_trailing_newline" ) certPEM = noTrailingNewlineKeyPemPath.getContent() ssl.Certificate.loadPEM(certPEM) class SelectVerifyImplementationTests(SynchronousTestCase): """ Tests for L{_selectVerifyImplementation}. """ if skipSSL: skip = skipSSL def test_dependencyMissing(self): """ If I{service_identity} cannot be imported then L{_selectVerifyImplementation} returns L{simpleVerifyHostname} and L{SimpleVerificationError}. """ with SetAsideModule("service_identity"): sys.modules["service_identity"] = None result = sslverify._selectVerifyImplementation() expected = ( sslverify.simpleVerifyHostname, sslverify.simpleVerifyIPAddress, sslverify.SimpleVerificationError, ) self.assertEqual(expected, result) test_dependencyMissing.suppress = [ # type: ignore[attr-defined] util.suppress( message=( "You do not have a working installation of the " "service_identity module" ), ), ] def test_dependencyMissingWarning(self): """ If I{service_identity} cannot be imported then L{_selectVerifyImplementation} emits a L{UserWarning} advising the user of the exact error. """ with SetAsideModule("service_identity"): sys.modules["service_identity"] = None sslverify._selectVerifyImplementation() [warning] = list( warning for warning in self.flushWarnings() if warning["category"] == UserWarning ) importErrors = [ # Python 3.6.3 "'import of service_identity halted; None in sys.modules'", # Python 3 "'import of 'service_identity' halted; None in sys.modules'", # Python 2 "'No module named service_identity'", ] expectedMessages = [] for importError in importErrors: expectedMessages.append( "You do not have a working installation of the " "service_identity module: {message}. Please install it from " " " "and make sure all of its dependencies are satisfied. " "Without the service_identity module, Twisted can perform only" " rudimentary TLS client hostname verification. Many valid " "certificate/hostname mappings may be rejected.".format( message=importError ) ) self.assertIn(warning["message"], expectedMessages) # Make sure we're abusing the warning system to a sufficient # degree: there is no filename or line number that makes sense for # this warning to "blame" for the problem. It is a system # misconfiguration. So the location information should be blank # (or as blank as we can make it). self.assertEqual(warning["filename"], "") self.assertEqual(warning["lineno"], 0)