# This file is dual licensed under the terms of the Apache License, Version # 2.0, and the BSD License. See the LICENSE file in the root of this repository # for complete details. import datetime import typing from cryptography import utils, x509 from cryptography.exceptions import UnsupportedAlgorithm from cryptography.hazmat.backends.openssl.decode_asn1 import ( _CRL_ENTRY_REASON_CODE_TO_ENUM, _asn1_integer_to_int, _asn1_string_to_bytes, _decode_x509_name, _obj2txt, _parse_asn1_generalized_time, ) from cryptography.hazmat.backends.openssl.x509 import _Certificate from cryptography.hazmat.primitives import hashes, serialization from cryptography.x509.ocsp import ( OCSPCertStatus, OCSPRequest, OCSPResponse, OCSPResponseStatus, _CERT_STATUS_TO_ENUM, _OIDS_TO_HASH, _RESPONSE_STATUS_TO_ENUM, ) def _issuer_key_hash(backend, cert_id): key_hash = backend._ffi.new("ASN1_OCTET_STRING **") res = backend._lib.OCSP_id_get0_info( backend._ffi.NULL, backend._ffi.NULL, key_hash, backend._ffi.NULL, cert_id, ) backend.openssl_assert(res == 1) backend.openssl_assert(key_hash[0] != backend._ffi.NULL) return _asn1_string_to_bytes(backend, key_hash[0]) def _issuer_name_hash(backend, cert_id): name_hash = backend._ffi.new("ASN1_OCTET_STRING **") res = backend._lib.OCSP_id_get0_info( name_hash, backend._ffi.NULL, backend._ffi.NULL, backend._ffi.NULL, cert_id, ) backend.openssl_assert(res == 1) backend.openssl_assert(name_hash[0] != backend._ffi.NULL) return _asn1_string_to_bytes(backend, name_hash[0]) def _serial_number(backend, cert_id): num = backend._ffi.new("ASN1_INTEGER **") res = backend._lib.OCSP_id_get0_info( backend._ffi.NULL, backend._ffi.NULL, backend._ffi.NULL, num, cert_id ) backend.openssl_assert(res == 1) backend.openssl_assert(num[0] != backend._ffi.NULL) return _asn1_integer_to_int(backend, num[0]) def _hash_algorithm(backend, cert_id): asn1obj = backend._ffi.new("ASN1_OBJECT **") res = backend._lib.OCSP_id_get0_info( backend._ffi.NULL, asn1obj, backend._ffi.NULL, backend._ffi.NULL, cert_id, ) backend.openssl_assert(res == 1) backend.openssl_assert(asn1obj[0] != backend._ffi.NULL) oid = _obj2txt(backend, asn1obj[0]) try: return _OIDS_TO_HASH[oid] except KeyError: raise UnsupportedAlgorithm( "Signature algorithm OID: {} not recognized".format(oid) ) class _OCSPResponse(OCSPResponse): def __init__(self, backend, ocsp_response): self._backend = backend self._ocsp_response = ocsp_response status = self._backend._lib.OCSP_response_status(self._ocsp_response) self._backend.openssl_assert(status in _RESPONSE_STATUS_TO_ENUM) self._status = _RESPONSE_STATUS_TO_ENUM[status] if self._status is OCSPResponseStatus.SUCCESSFUL: basic = self._backend._lib.OCSP_response_get1_basic( self._ocsp_response ) self._backend.openssl_assert(basic != self._backend._ffi.NULL) self._basic = self._backend._ffi.gc( basic, self._backend._lib.OCSP_BASICRESP_free ) num_resp = self._backend._lib.OCSP_resp_count(self._basic) if num_resp != 1: raise ValueError( "OCSP response contains more than one SINGLERESP structure" ", which this library does not support. " "{} found".format(num_resp) ) self._single = self._backend._lib.OCSP_resp_get0(self._basic, 0) self._backend.openssl_assert( self._single != self._backend._ffi.NULL ) self._cert_id = self._backend._lib.OCSP_SINGLERESP_get0_id( self._single ) self._backend.openssl_assert( self._cert_id != self._backend._ffi.NULL ) response_status = utils.read_only_property("_status") def _requires_successful_response(self) -> None: if self.response_status != OCSPResponseStatus.SUCCESSFUL: raise ValueError( "OCSP response status is not successful so the property " "has no value" ) @property def signature_algorithm_oid(self) -> x509.ObjectIdentifier: self._requires_successful_response() alg = self._backend._lib.OCSP_resp_get0_tbs_sigalg(self._basic) self._backend.openssl_assert(alg != self._backend._ffi.NULL) oid = _obj2txt(self._backend, alg.algorithm) return x509.ObjectIdentifier(oid) @property def signature_hash_algorithm( self, ) -> typing.Optional[hashes.HashAlgorithm]: self._requires_successful_response() oid = self.signature_algorithm_oid try: return x509._SIG_OIDS_TO_HASH[oid] except KeyError: raise UnsupportedAlgorithm( "Signature algorithm OID:{} not recognized".format(oid) ) @property def signature(self) -> bytes: self._requires_successful_response() sig = self._backend._lib.OCSP_resp_get0_signature(self._basic) self._backend.openssl_assert(sig != self._backend._ffi.NULL) return _asn1_string_to_bytes(self._backend, sig) @property def tbs_response_bytes(self) -> bytes: self._requires_successful_response() respdata = self._backend._lib.OCSP_resp_get0_respdata(self._basic) self._backend.openssl_assert(respdata != self._backend._ffi.NULL) pp = self._backend._ffi.new("unsigned char **") res = self._backend._lib.i2d_OCSP_RESPDATA(respdata, pp) self._backend.openssl_assert(pp[0] != self._backend._ffi.NULL) pp = self._backend._ffi.gc( pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0]) ) self._backend.openssl_assert(res > 0) return self._backend._ffi.buffer(pp[0], res)[:] @property def certificates(self) -> typing.List[x509.Certificate]: self._requires_successful_response() sk_x509 = self._backend._lib.OCSP_resp_get0_certs(self._basic) num = self._backend._lib.sk_X509_num(sk_x509) certs: typing.List[x509.Certificate] = [] for i in range(num): x509_ptr = self._backend._lib.sk_X509_value(sk_x509, i) self._backend.openssl_assert(x509_ptr != self._backend._ffi.NULL) cert = _Certificate(self._backend, x509_ptr) # We need to keep the OCSP response that the certificate came from # alive until the Certificate object itself goes out of scope, so # we give it a private reference. cert._ocsp_resp_ref = self certs.append(cert) return certs @property def responder_key_hash(self) -> typing.Optional[bytes]: self._requires_successful_response() _, asn1_string = self._responder_key_name() if asn1_string == self._backend._ffi.NULL: return None else: return _asn1_string_to_bytes(self._backend, asn1_string) @property def responder_name(self) -> typing.Optional[x509.Name]: self._requires_successful_response() x509_name, _ = self._responder_key_name() if x509_name == self._backend._ffi.NULL: return None else: return _decode_x509_name(self._backend, x509_name) def _responder_key_name(self): asn1_string = self._backend._ffi.new("ASN1_OCTET_STRING **") x509_name = self._backend._ffi.new("X509_NAME **") res = self._backend._lib.OCSP_resp_get0_id( self._basic, asn1_string, x509_name ) self._backend.openssl_assert(res == 1) return x509_name[0], asn1_string[0] @property def produced_at(self) -> datetime.datetime: self._requires_successful_response() produced_at = self._backend._lib.OCSP_resp_get0_produced_at( self._basic ) return _parse_asn1_generalized_time(self._backend, produced_at) @property def certificate_status(self) -> OCSPCertStatus: self._requires_successful_response() status = self._backend._lib.OCSP_single_get0_status( self._single, self._backend._ffi.NULL, self._backend._ffi.NULL, self._backend._ffi.NULL, self._backend._ffi.NULL, ) self._backend.openssl_assert(status in _CERT_STATUS_TO_ENUM) return _CERT_STATUS_TO_ENUM[status] @property def revocation_time(self) -> typing.Optional[datetime.datetime]: self._requires_successful_response() if self.certificate_status is not OCSPCertStatus.REVOKED: return None asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **") self._backend._lib.OCSP_single_get0_status( self._single, self._backend._ffi.NULL, asn1_time, self._backend._ffi.NULL, self._backend._ffi.NULL, ) self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL) return _parse_asn1_generalized_time(self._backend, asn1_time[0]) @property def revocation_reason(self) -> typing.Optional[x509.ReasonFlags]: self._requires_successful_response() if self.certificate_status is not OCSPCertStatus.REVOKED: return None reason_ptr = self._backend._ffi.new("int *") self._backend._lib.OCSP_single_get0_status( self._single, reason_ptr, self._backend._ffi.NULL, self._backend._ffi.NULL, self._backend._ffi.NULL, ) # If no reason is encoded OpenSSL returns -1 if reason_ptr[0] == -1: return None else: self._backend.openssl_assert( reason_ptr[0] in _CRL_ENTRY_REASON_CODE_TO_ENUM ) return _CRL_ENTRY_REASON_CODE_TO_ENUM[reason_ptr[0]] @property def this_update(self) -> datetime.datetime: self._requires_successful_response() asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **") self._backend._lib.OCSP_single_get0_status( self._single, self._backend._ffi.NULL, self._backend._ffi.NULL, asn1_time, self._backend._ffi.NULL, ) self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL) return _parse_asn1_generalized_time(self._backend, asn1_time[0]) @property def next_update(self) -> typing.Optional[datetime.datetime]: self._requires_successful_response() asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **") self._backend._lib.OCSP_single_get0_status( self._single, self._backend._ffi.NULL, self._backend._ffi.NULL, self._backend._ffi.NULL, asn1_time, ) if asn1_time[0] != self._backend._ffi.NULL: return _parse_asn1_generalized_time(self._backend, asn1_time[0]) else: return None @property def issuer_key_hash(self) -> bytes: self._requires_successful_response() return _issuer_key_hash(self._backend, self._cert_id) @property def issuer_name_hash(self) -> bytes: self._requires_successful_response() return _issuer_name_hash(self._backend, self._cert_id) @property def hash_algorithm(self) -> hashes.HashAlgorithm: self._requires_successful_response() return _hash_algorithm(self._backend, self._cert_id) @property def serial_number(self) -> int: self._requires_successful_response() return _serial_number(self._backend, self._cert_id) @utils.cached_property def extensions(self) -> x509.Extensions: self._requires_successful_response() return self._backend._ocsp_basicresp_ext_parser.parse(self._basic) @utils.cached_property def single_extensions(self) -> x509.Extensions: self._requires_successful_response() return self._backend._ocsp_singleresp_ext_parser.parse(self._single) def public_bytes(self, encoding: serialization.Encoding) -> bytes: if encoding is not serialization.Encoding.DER: raise ValueError("The only allowed encoding value is Encoding.DER") bio = self._backend._create_mem_bio_gc() res = self._backend._lib.i2d_OCSP_RESPONSE_bio( bio, self._ocsp_response ) self._backend.openssl_assert(res > 0) return self._backend._read_mem_bio(bio) class _OCSPRequest(OCSPRequest): def __init__(self, backend, ocsp_request): if backend._lib.OCSP_request_onereq_count(ocsp_request) > 1: raise NotImplementedError( "OCSP request contains more than one request" ) self._backend = backend self._ocsp_request = ocsp_request self._request = self._backend._lib.OCSP_request_onereq_get0( self._ocsp_request, 0 ) self._backend.openssl_assert(self._request != self._backend._ffi.NULL) self._cert_id = self._backend._lib.OCSP_onereq_get0_id(self._request) self._backend.openssl_assert(self._cert_id != self._backend._ffi.NULL) @property def issuer_key_hash(self) -> bytes: return _issuer_key_hash(self._backend, self._cert_id) @property def issuer_name_hash(self) -> bytes: return _issuer_name_hash(self._backend, self._cert_id) @property def serial_number(self) -> int: return _serial_number(self._backend, self._cert_id) @property def hash_algorithm(self) -> hashes.HashAlgorithm: return _hash_algorithm(self._backend, self._cert_id) @utils.cached_property def extensions(self) -> x509.Extensions: return self._backend._ocsp_req_ext_parser.parse(self._ocsp_request) def public_bytes(self, encoding: serialization.Encoding) -> bytes: if encoding is not serialization.Encoding.DER: raise ValueError("The only allowed encoding value is Encoding.DER") bio = self._backend._create_mem_bio_gc() res = self._backend._lib.i2d_OCSP_REQUEST_bio(bio, self._ocsp_request) self._backend.openssl_assert(res > 0) return self._backend._read_mem_bio(bio)