HEX
Server: nginx/1.22.0
System: Linux iZuf6jdxbygmf6cco977lcZ 5.10.84-10.4.al8.x86_64 #1 SMP Tue Apr 12 12:31:07 CST 2022 x86_64
User: root (0)
PHP: 7.4.29
Disabled: passthru,exec,system,chroot,chgrp,chown,shell_exec,proc_open,proc_get_status,ini_alter,ini_restore,dl,readlink,symlink,popepassthru,stream_socket_server,fsocket,popen
Upload Files
File: //lib/python3.6/site-packages/asn1crypto/ocsp.py
# coding: utf-8

"""
ASN.1 type classes for the online certificate status protocol (OCSP). Exports
the following items:

 - OCSPRequest()
 - OCSPResponse()

Other type classes are defined that help compose the types listed above.
"""

from __future__ import unicode_literals, division, absolute_import, print_function

from .algos import DigestAlgorithm, SignedDigestAlgorithm
from .core import (
    Boolean,
    Choice,
    Enumerated,
    GeneralizedTime,
    IA5String,
    Integer,
    Null,
    ObjectIdentifier,
    OctetBitString,
    OctetString,
    ParsableOctetString,
    Sequence,
    SequenceOf,
)
from .crl import AuthorityInfoAccessSyntax, CRLReason
from .keys import PublicKeyAlgorithm
from .x509 import Certificate, GeneralName, GeneralNames, Name


# The structures in this file are taken from https://tools.ietf.org/html/rfc6960


class Version(Integer):
    _map = {
        0: 'v1'
    }


class CertId(Sequence):
    _fields = [
        ('hash_algorithm', DigestAlgorithm),
        ('issuer_name_hash', OctetString),
        ('issuer_key_hash', OctetString),
        ('serial_number', Integer),
    ]


class ServiceLocator(Sequence):
    _fields = [
        ('issuer', Name),
        ('locator', AuthorityInfoAccessSyntax),
    ]


class RequestExtensionId(ObjectIdentifier):
    _map = {
        '1.3.6.1.5.5.7.48.1.7': 'service_locator',
    }


class RequestExtension(Sequence):
    _fields = [
        ('extn_id', RequestExtensionId),
        ('critical', Boolean, {'default': False}),
        ('extn_value', ParsableOctetString),
    ]

    _oid_pair = ('extn_id', 'extn_value')
    _oid_specs = {
        'service_locator': ServiceLocator,
    }


class RequestExtensions(SequenceOf):
    _child_spec = RequestExtension


class Request(Sequence):
    _fields = [
        ('req_cert', CertId),
        ('single_request_extensions', RequestExtensions, {'explicit': 0, 'optional': True}),
    ]

    _processed_extensions = False
    _critical_extensions = None
    _service_locator_value = None

    def _set_extensions(self):
        """
        Sets common named extensions to private attributes and creates a list
        of critical extensions
        """

        self._critical_extensions = set()

        for extension in self['single_request_extensions']:
            name = extension['extn_id'].native
            attribute_name = '_%s_value' % name
            if hasattr(self, attribute_name):
                setattr(self, attribute_name, extension['extn_value'].parsed)
            if extension['critical'].native:
                self._critical_extensions.add(name)

        self._processed_extensions = True

    @property
    def critical_extensions(self):
        """
        Returns a set of the names (or OID if not a known extension) of the
        extensions marked as critical

        :return:
            A set of unicode strings
        """

        if not self._processed_extensions:
            self._set_extensions()
        return self._critical_extensions

    @property
    def service_locator_value(self):
        """
        This extension is used when communicating with an OCSP responder that
        acts as a proxy for OCSP requests

        :return:
            None or a ServiceLocator object
        """

        if self._processed_extensions is False:
            self._set_extensions()
        return self._service_locator_value


class Requests(SequenceOf):
    _child_spec = Request


class ResponseType(ObjectIdentifier):
    _map = {
        '1.3.6.1.5.5.7.48.1.1': 'basic_ocsp_response',
    }


class AcceptableResponses(SequenceOf):
    _child_spec = ResponseType


class PreferredSignatureAlgorithm(Sequence):
    _fields = [
        ('sig_identifier', SignedDigestAlgorithm),
        ('cert_identifier', PublicKeyAlgorithm, {'optional': True}),
    ]


class PreferredSignatureAlgorithms(SequenceOf):
    _child_spec = PreferredSignatureAlgorithm


class TBSRequestExtensionId(ObjectIdentifier):
    _map = {
        '1.3.6.1.5.5.7.48.1.2': 'nonce',
        '1.3.6.1.5.5.7.48.1.4': 'acceptable_responses',
        '1.3.6.1.5.5.7.48.1.8': 'preferred_signature_algorithms',
    }


class TBSRequestExtension(Sequence):
    _fields = [
        ('extn_id', TBSRequestExtensionId),
        ('critical', Boolean, {'default': False}),
        ('extn_value', ParsableOctetString),
    ]

    _oid_pair = ('extn_id', 'extn_value')
    _oid_specs = {
        'nonce': OctetString,
        'acceptable_responses': AcceptableResponses,
        'preferred_signature_algorithms': PreferredSignatureAlgorithms,
    }


class TBSRequestExtensions(SequenceOf):
    _child_spec = TBSRequestExtension


class TBSRequest(Sequence):
    _fields = [
        ('version', Version, {'explicit': 0, 'default': 'v1'}),
        ('requestor_name', GeneralName, {'explicit': 1, 'optional': True}),
        ('request_list', Requests),
        ('request_extensions', TBSRequestExtensions, {'explicit': 2, 'optional': True}),
    ]


class Certificates(SequenceOf):
    _child_spec = Certificate


class Signature(Sequence):
    _fields = [
        ('signature_algorithm', SignedDigestAlgorithm),
        ('signature', OctetBitString),
        ('certs', Certificates, {'explicit': 0, 'optional': True}),
    ]


class OCSPRequest(Sequence):
    _fields = [
        ('tbs_request', TBSRequest),
        ('optional_signature', Signature, {'explicit': 0, 'optional': True}),
    ]

    _processed_extensions = False
    _critical_extensions = None
    _nonce_value = None
    _acceptable_responses_value = None
    _preferred_signature_algorithms_value = None

    def _set_extensions(self):
        """
        Sets common named extensions to private attributes and creates a list
        of critical extensions
        """

        self._critical_extensions = set()

        for extension in self['tbs_request']['request_extensions']:
            name = extension['extn_id'].native
            attribute_name = '_%s_value' % name
            if hasattr(self, attribute_name):
                setattr(self, attribute_name, extension['extn_value'].parsed)
            if extension['critical'].native:
                self._critical_extensions.add(name)

        self._processed_extensions = True

    @property
    def critical_extensions(self):
        """
        Returns a set of the names (or OID if not a known extension) of the
        extensions marked as critical

        :return:
            A set of unicode strings
        """

        if not self._processed_extensions:
            self._set_extensions()
        return self._critical_extensions

    @property
    def nonce_value(self):
        """
        This extension is used to prevent replay attacks by including a unique,
        random value with each request/response pair

        :return:
            None or an OctetString object
        """

        if self._processed_extensions is False:
            self._set_extensions()
        return self._nonce_value

    @property
    def acceptable_responses_value(self):
        """
        This extension is used to allow the client and server to communicate
        with alternative response formats other than just basic_ocsp_response,
        although no other formats are defined in the standard.

        :return:
            None or an AcceptableResponses object
        """

        if self._processed_extensions is False:
            self._set_extensions()
        return self._acceptable_responses_value

    @property
    def preferred_signature_algorithms_value(self):
        """
        This extension is used by the client to define what signature algorithms
        are preferred, including both the hash algorithm and the public key
        algorithm, with a level of detail down to even the public key algorithm
        parameters, such as curve name.

        :return:
            None or a PreferredSignatureAlgorithms object
        """

        if self._processed_extensions is False:
            self._set_extensions()
        return self._preferred_signature_algorithms_value


class OCSPResponseStatus(Enumerated):
    _map = {
        0: 'successful',
        1: 'malformed_request',
        2: 'internal_error',
        3: 'try_later',
        5: 'sign_required',
        6: 'unauthorized',
    }


class ResponderId(Choice):
    _alternatives = [
        ('by_name', Name, {'explicit': 1}),
        ('by_key', OctetString, {'explicit': 2}),
    ]


class RevokedInfo(Sequence):
    _fields = [
        ('revocation_time', GeneralizedTime),
        ('revocation_reason', CRLReason, {'explicit': 0, 'optional': True}),
    ]


class CertStatus(Choice):
    _alternatives = [
        ('good', Null, {'implicit': 0}),
        ('revoked', RevokedInfo, {'implicit': 1}),
        ('unknown', Null, {'implicit': 2}),
    ]


class CrlId(Sequence):
    _fields = [
        ('crl_url', IA5String, {'explicit': 0, 'optional': True}),
        ('crl_num', Integer, {'explicit': 1, 'optional': True}),
        ('crl_time', GeneralizedTime, {'explicit': 2, 'optional': True}),
    ]


class SingleResponseExtensionId(ObjectIdentifier):
    _map = {
        '1.3.6.1.5.5.7.48.1.3': 'crl',
        '1.3.6.1.5.5.7.48.1.6': 'archive_cutoff',
        # These are CRLEntryExtension values from
        # https://tools.ietf.org/html/rfc5280
        '2.5.29.21': 'crl_reason',
        '2.5.29.24': 'invalidity_date',
        '2.5.29.29': 'certificate_issuer',
        # https://tools.ietf.org/html/rfc6962.html#page-13
        '1.3.6.1.4.1.11129.2.4.5': 'signed_certificate_timestamp_list',
    }


class SingleResponseExtension(Sequence):
    _fields = [
        ('extn_id', SingleResponseExtensionId),
        ('critical', Boolean, {'default': False}),
        ('extn_value', ParsableOctetString),
    ]

    _oid_pair = ('extn_id', 'extn_value')
    _oid_specs = {
        'crl': CrlId,
        'archive_cutoff': GeneralizedTime,
        'crl_reason': CRLReason,
        'invalidity_date': GeneralizedTime,
        'certificate_issuer': GeneralNames,
        'signed_certificate_timestamp_list': OctetString,
    }


class SingleResponseExtensions(SequenceOf):
    _child_spec = SingleResponseExtension


class SingleResponse(Sequence):
    _fields = [
        ('cert_id', CertId),
        ('cert_status', CertStatus),
        ('this_update', GeneralizedTime),
        ('next_update', GeneralizedTime, {'explicit': 0, 'optional': True}),
        ('single_extensions', SingleResponseExtensions, {'explicit': 1, 'optional': True}),
    ]

    _processed_extensions = False
    _critical_extensions = None
    _crl_value = None
    _archive_cutoff_value = None
    _crl_reason_value = None
    _invalidity_date_value = None
    _certificate_issuer_value = None

    def _set_extensions(self):
        """
        Sets common named extensions to private attributes and creates a list
        of critical extensions
        """

        self._critical_extensions = set()

        for extension in self['single_extensions']:
            name = extension['extn_id'].native
            attribute_name = '_%s_value' % name
            if hasattr(self, attribute_name):
                setattr(self, attribute_name, extension['extn_value'].parsed)
            if extension['critical'].native:
                self._critical_extensions.add(name)

        self._processed_extensions = True

    @property
    def critical_extensions(self):
        """
        Returns a set of the names (or OID if not a known extension) of the
        extensions marked as critical

        :return:
            A set of unicode strings
        """

        if not self._processed_extensions:
            self._set_extensions()
        return self._critical_extensions

    @property
    def crl_value(self):
        """
        This extension is used to locate the CRL that a certificate's revocation
        is contained within.

        :return:
            None or a CrlId object
        """

        if self._processed_extensions is False:
            self._set_extensions()
        return self._crl_value

    @property
    def archive_cutoff_value(self):
        """
        This extension is used to indicate the date at which an archived
        (historical) certificate status entry will no longer be available.

        :return:
            None or a GeneralizedTime object
        """

        if self._processed_extensions is False:
            self._set_extensions()
        return self._archive_cutoff_value

    @property
    def crl_reason_value(self):
        """
        This extension indicates the reason that a certificate was revoked.

        :return:
            None or a CRLReason object
        """

        if self._processed_extensions is False:
            self._set_extensions()
        return self._crl_reason_value

    @property
    def invalidity_date_value(self):
        """
        This extension indicates the suspected date/time the private key was
        compromised or the certificate became invalid. This would usually be
        before the revocation date, which is when the CA processed the
        revocation.

        :return:
            None or a GeneralizedTime object
        """

        if self._processed_extensions is False:
            self._set_extensions()
        return self._invalidity_date_value

    @property
    def certificate_issuer_value(self):
        """
        This extension indicates the issuer of the certificate in question.

        :return:
            None or an x509.GeneralNames object
        """

        if self._processed_extensions is False:
            self._set_extensions()
        return self._certificate_issuer_value


class Responses(SequenceOf):
    _child_spec = SingleResponse


class ResponseDataExtensionId(ObjectIdentifier):
    _map = {
        '1.3.6.1.5.5.7.48.1.2': 'nonce',
        '1.3.6.1.5.5.7.48.1.9': 'extended_revoke',
    }


class ResponseDataExtension(Sequence):
    _fields = [
        ('extn_id', ResponseDataExtensionId),
        ('critical', Boolean, {'default': False}),
        ('extn_value', ParsableOctetString),
    ]

    _oid_pair = ('extn_id', 'extn_value')
    _oid_specs = {
        'nonce': OctetString,
        'extended_revoke': Null,
    }


class ResponseDataExtensions(SequenceOf):
    _child_spec = ResponseDataExtension


class ResponseData(Sequence):
    _fields = [
        ('version', Version, {'explicit': 0, 'default': 'v1'}),
        ('responder_id', ResponderId),
        ('produced_at', GeneralizedTime),
        ('responses', Responses),
        ('response_extensions', ResponseDataExtensions, {'explicit': 1, 'optional': True}),
    ]


class BasicOCSPResponse(Sequence):
    _fields = [
        ('tbs_response_data', ResponseData),
        ('signature_algorithm', SignedDigestAlgorithm),
        ('signature', OctetBitString),
        ('certs', Certificates, {'explicit': 0, 'optional': True}),
    ]


class ResponseBytes(Sequence):
    _fields = [
        ('response_type', ResponseType),
        ('response', ParsableOctetString),
    ]

    _oid_pair = ('response_type', 'response')
    _oid_specs = {
        'basic_ocsp_response': BasicOCSPResponse,
    }


class OCSPResponse(Sequence):
    _fields = [
        ('response_status', OCSPResponseStatus),
        ('response_bytes', ResponseBytes, {'explicit': 0, 'optional': True}),
    ]

    _processed_extensions = False
    _critical_extensions = None
    _nonce_value = None
    _extended_revoke_value = None

    def _set_extensions(self):
        """
        Sets common named extensions to private attributes and creates a list
        of critical extensions
        """

        self._critical_extensions = set()

        for extension in self['response_bytes']['response'].parsed['tbs_response_data']['response_extensions']:
            name = extension['extn_id'].native
            attribute_name = '_%s_value' % name
            if hasattr(self, attribute_name):
                setattr(self, attribute_name, extension['extn_value'].parsed)
            if extension['critical'].native:
                self._critical_extensions.add(name)

        self._processed_extensions = True

    @property
    def critical_extensions(self):
        """
        Returns a set of the names (or OID if not a known extension) of the
        extensions marked as critical

        :return:
            A set of unicode strings
        """

        if not self._processed_extensions:
            self._set_extensions()
        return self._critical_extensions

    @property
    def nonce_value(self):
        """
        This extension is used to prevent replay attacks on the request/response
        exchange

        :return:
            None or an OctetString object
        """

        if self._processed_extensions is False:
            self._set_extensions()
        return self._nonce_value

    @property
    def extended_revoke_value(self):
        """
        This extension is used to signal that the responder will return a
        "revoked" status for non-issued certificates.

        :return:
            None or a Null object (if present)
        """

        if self._processed_extensions is False:
            self._set_extensions()
        return self._extended_revoke_value

    @property
    def basic_ocsp_response(self):
        """
        A shortcut into the BasicOCSPResponse sequence

        :return:
            None or an asn1crypto.ocsp.BasicOCSPResponse object
        """

        return self['response_bytes']['response'].parsed

    @property
    def response_data(self):
        """
        A shortcut into the parsed, ResponseData sequence

        :return:
            None or an asn1crypto.ocsp.ResponseData object
        """

        return self['response_bytes']['response'].parsed['tbs_response_data']