| #!/usr/bin/env python3 |
| # Copyright 2014 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """ |
| This utility takes a JSON input that describes a CRLSet and produces a |
| CRLSet from it. |
| |
| The input is taken on stdin and is a dict with the following keys: |
| - BlockedBySPKI: An array of strings, where each string is a filename |
| containing a PEM certificate, from which an SPKI will be extracted. |
| - BlockedByHash: A dict of string to an array of strings. The dict key is |
| a filename containing a PEM certificate, representing the issuer cert, |
| while the array of strings contain the filenames of PEM format |
| certificates whose serials are blocked. |
| - LimitedSubjects: A dict of string to an array of strings, where the key is |
| a filename containing a PEM format certificate, and the strings are the |
| filenames of PEM format certificates. Certificates that share a Subject |
| with the key will be restricted to the set of SPKIs extracted from the |
| files in the values. |
| - Sequence: An optional integer sequence number to use for the CRLSet. If |
| not present, defaults to 1. |
| |
| For example: |
| |
| { |
| "BlockedBySPKI": ["/tmp/blocked-certificate"], |
| "BlockedByHash": { |
| "/tmp/intermediate-certificate": [1, 2, 3] |
| }, |
| "LimitedSubjects": { |
| "/tmp/limited-certificate": [ |
| "/tmp/limited-certificate", |
| "/tmp/limited-certificate2" |
| ] |
| }, |
| "Sequence": 23 |
| } |
| """ |
| |
| import base64 |
| import collections |
| import hashlib |
| import json |
| import optparse |
| import six |
| import struct |
| import sys |
| |
| |
| def _pem_cert_to_binary(pem_filename): |
| """Decodes the first PEM-encoded certificate in a given file into binary |
| |
| Args: |
| pem_filename: A filename that contains a PEM-encoded certificate. It may |
| contain additional data (keys, textual representation) which will be |
| ignored |
| |
| Returns: |
| A byte array containing the decoded certificate data |
| """ |
| pem_data = "" |
| started = False |
| |
| with open(pem_filename, 'r') as pem_file: |
| for line in pem_file: |
| if not started: |
| if line.startswith('-----BEGIN CERTIFICATE'): |
| started = True |
| else: |
| if line.startswith('-----END CERTIFICATE'): |
| break |
| pem_data += line[:-1].strip() |
| |
| return base64.b64decode(pem_data) |
| |
| |
| def _parse_asn1_element(der_bytes): |
| """Parses a DER-encoded tag/Length/Value into its component parts |
| |
| Args: |
| der_bytes: A DER-encoded ASN.1 data type |
| |
| Returns: |
| A tuple of the ASN.1 tag value, the length of the ASN.1 header that was |
| read, the sequence of bytes for the value, and then any data from der_bytes |
| that was not part of the tag/Length/Value. |
| """ |
| tag = six.indexbytes(der_bytes, 0) |
| length = six.indexbytes(der_bytes, 1) |
| header_length = 2 |
| |
| if length & 0x80: |
| num_length_bytes = length & 0x7f |
| length = 0 |
| for i in range(2, 2 + num_length_bytes): |
| length <<= 8 |
| length += six.indexbytes(der_bytes, i) |
| header_length = 2 + num_length_bytes |
| |
| contents = der_bytes[:header_length + length] |
| rest = der_bytes[header_length + length:] |
| |
| return (tag, header_length, contents, rest) |
| |
| |
| class ASN1Iterator(object): |
| """Iterator that parses and iterates through a ASN.1 DER structure""" |
| |
| def __init__(self, contents): |
| self._tag = 0 |
| self._header_length = 0 |
| self._rest = None |
| self._contents = contents |
| self.step_into() |
| |
| def step_into(self): |
| """Begins processing the inner contents of the next ASN.1 element""" |
| (self._tag, self._header_length, self._contents, self._rest) = ( |
| _parse_asn1_element(self._contents[self._header_length:])) |
| |
| def step_over(self): |
| """Skips/ignores the next ASN.1 element""" |
| (self._tag, self._header_length, self._contents, self._rest) = ( |
| _parse_asn1_element(self._rest)) |
| |
| def tag(self): |
| """Returns the ASN.1 tag of the current element""" |
| return self._tag |
| |
| def contents(self): |
| """Returns the raw data of the current element""" |
| return self._contents |
| |
| def encoded_value(self): |
| """Returns the encoded value of the current element (i.e. without header)""" |
| return self._contents[self._header_length:] |
| |
| |
| def _der_cert_to_spki(der_bytes): |
| """Returns the subjectPublicKeyInfo of a DER-encoded certificate |
| |
| Args: |
| der_bytes: A DER-encoded certificate (RFC 5280) |
| |
| Returns: |
| A byte array containing the subjectPublicKeyInfo |
| """ |
| iterator = ASN1Iterator(der_bytes) |
| iterator.step_into() # enter certificate structure |
| iterator.step_into() # enter TBSCertificate |
| iterator.step_over() # over version |
| iterator.step_over() # over serial |
| iterator.step_over() # over signature algorithm |
| iterator.step_over() # over issuer name |
| iterator.step_over() # over validity |
| iterator.step_over() # over subject name |
| return iterator.contents() |
| |
| |
| def der_cert_to_spki_hash(der_cert): |
| """Gets the SHA-256 hash of the subjectPublicKeyInfo of a DER encoded cert |
| |
| Args: |
| der_cert: A string containing the DER-encoded certificate |
| |
| Returns: |
| The SHA-256 hash of the certificate, as a byte sequence |
| """ |
| return hashlib.sha256(_der_cert_to_spki(der_cert)).digest() |
| |
| |
| def pem_cert_file_to_spki_hash(pem_filename): |
| """Gets the SHA-256 hash of the subjectPublicKeyInfo of a cert in a file |
| |
| Args: |
| pem_filename: A file containing a PEM-encoded certificate. |
| |
| Returns: |
| The SHA-256 hash of the first certificate in the file, as a byte sequence |
| """ |
| return der_cert_to_spki_hash(_pem_cert_to_binary(pem_filename)) |
| |
| |
| def der_cert_to_subject_hash(der_bytes): |
| """Returns SHA256(subject) of a DER-encoded certificate |
| |
| Args: |
| der_bytes: A DER-encoded certificate (RFC 5280) |
| |
| Returns: |
| The SHA-256 hash of the certificate's subject. |
| """ |
| iterator = ASN1Iterator(der_bytes) |
| iterator.step_into() # enter certificate structure |
| iterator.step_into() # enter TBSCertificate |
| iterator.step_over() # over version |
| iterator.step_over() # over serial |
| iterator.step_over() # over signature algorithm |
| iterator.step_over() # over issuer name |
| iterator.step_over() # over validity |
| return hashlib.sha256(iterator.contents()).digest() |
| |
| |
| def pem_cert_file_to_subject_hash(pem_filename): |
| """Gets the SHA-256 hash of the subject of a cert in a file |
| |
| Args: |
| pem_filename: A file containing a PEM-encoded certificate. |
| |
| Returns: |
| The SHA-256 hash of the subject of the first certificate in the file, as a |
| byte sequence |
| """ |
| return der_cert_to_subject_hash(_pem_cert_to_binary(pem_filename)) |
| |
| |
| def der_cert_to_serial(der_bytes): |
| """Gets the serial of a DER-encoded certificate, omitting leading 0x00 |
| |
| Args: |
| der_bytes: A DER-encoded certificates (RFC 5280) |
| |
| Returns: |
| The encoded serial number value (omitting tag and length), and omitting |
| any leading 0x00 used to indicate it is a positive INTEGER. |
| """ |
| iterator = ASN1Iterator(der_bytes) |
| iterator.step_into() # enter certificate structure |
| iterator.step_into() # enter TBSCertificate |
| iterator.step_over() # over version |
| raw_serial = iterator.encoded_value() |
| if six.indexbytes(raw_serial, 0) == 0x00 and len(raw_serial) > 1: |
| raw_serial = raw_serial[1:] |
| return raw_serial |
| |
| |
| def pem_cert_file_to_serial(pem_filename): |
| """Gets the DER-encoded serial of a cert in a file, omitting leading 0x00 |
| |
| Args: |
| pem_filename: A file containing a PEM-encoded certificate. |
| |
| Returns: |
| The DER-encoded serial as a byte sequence |
| """ |
| return der_cert_to_serial(_pem_cert_to_binary(pem_filename)) |
| |
| |
| def main(): |
| parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) |
| parser.add_option('-o', '--output', |
| help='Specifies the output file. The default is stdout.') |
| options, _ = parser.parse_args() |
| outfile = sys.stdout |
| if options.output and options.output != '-': |
| outfile = open(options.output, 'wb') |
| |
| config = json.load(sys.stdin) |
| blocked_spkis = [ |
| base64.b64encode(pem_cert_file_to_spki_hash(pem_file)).decode('ascii') |
| for pem_file in config.get('BlockedBySPKI', []) |
| ] |
| parents = { |
| pem_cert_file_to_spki_hash(pem_file): [ |
| pem_cert_file_to_serial(issued_cert_file) |
| for issued_cert_file in issued_certs |
| ] |
| for pem_file, issued_certs in config.get('BlockedByHash', {}).items() |
| } |
| limited_subjects = { |
| base64.b64encode(pem_cert_file_to_subject_hash(pem_file)).decode('ascii'): |
| [ |
| base64.b64encode(pem_cert_file_to_spki_hash(filename)).decode('ascii') |
| for filename in allowed_pems |
| ] |
| for pem_file, allowed_pems in config.get('LimitedSubjects', {}).items() |
| } |
| known_interception_spkis = [ |
| base64.b64encode(pem_cert_file_to_spki_hash(pem_file)).decode('ascii') |
| for pem_file in config.get('KnownInterceptionSPKIs', []) |
| ] |
| blocked_interception_spkis = [ |
| base64.b64encode(pem_cert_file_to_spki_hash(pem_file)).decode('ascii') |
| for pem_file in config.get('BlockedInterceptionSPKIs', []) |
| ] |
| header_json = { |
| 'Version': 0, |
| 'ContentType': 'CRLSet', |
| 'Sequence': int(config.get("Sequence", 1)), |
| 'NumParents': len(parents), |
| 'BlockedSPKIs': blocked_spkis, |
| 'LimitedSubjects': limited_subjects, |
| 'KnownInterceptionSPKIs': known_interception_spkis, |
| 'BlockedInterceptionSPKIs': blocked_interception_spkis |
| } |
| header = json.dumps(header_json) |
| outfile.write(struct.pack('<H', len(header))) |
| outfile.write(header.encode('utf-8')) |
| for spki, serials in sorted(parents.items()): |
| outfile.write(spki) |
| outfile.write(struct.pack('<I', len(serials))) |
| for serial in serials: |
| raw_serial = [] |
| if not serial: |
| raw_serial = b'\x00' |
| else: |
| raw_serial = serial |
| |
| outfile.write(struct.pack('<B', len(raw_serial))) |
| outfile.write(raw_serial) |
| return 0 |
| |
| |
| if __name__ == '__main__': |
| sys.exit(main()) |