Extract OCSPCertStatus::Status to standalone OCSPRevocationStatus, and add OCSPVerifyResult for tracking stapled OCSP responses cross-platform. OCSPVerifyResult is populated by CertVerifyProc, but is currently unused. In the future, it will be consumed by Expect-Staple reports.
This CL also updates mini-CA and the spawned test server to be able to send a wider range of OCSP responses. Since OCSP responses are short lived, test the new functionality in url_request_unittest.cc and dynamically generate OCSP responses.
BUG=598021
Review-Url: https://codereview.chromium.org/2100303002
Cr-Original-Commit-Position: refs/heads/master@{#406699}
Cr-Mirrored-From: https://chromium.googlesource.com/chromium/src
Cr-Mirrored-Commit: 612337a1d7cadc52d0217b9f399eb1fab445d3e2
diff --git a/minica.py b/minica.py
index acf68fc..7d7b06b 100644
--- a/minica.py
+++ b/minica.py
@@ -3,9 +3,31 @@
# found in the LICENSE file.
import asn1
+import datetime
import hashlib
+import itertools
import os
+import time
+GENERALIZED_TIME_FORMAT = "%Y%m%d%H%M%SZ"
+
+OCSP_STATE_GOOD = 1
+OCSP_STATE_REVOKED = 2
+OCSP_STATE_INVALID_RESPONSE = 3
+OCSP_STATE_UNAUTHORIZED = 4
+OCSP_STATE_UNKNOWN = 5
+OCSP_STATE_TRY_LATER = 6
+OCSP_STATE_INVALID_RESPONSE_DATA = 7
+OCSP_STATE_MISMATCHED_SERIAL = 8
+
+OCSP_DATE_VALID = 1
+OCSP_DATE_OLD = 2
+OCSP_DATE_EARLY = 3
+OCSP_DATE_LONG = 4
+
+OCSP_PRODUCED_VALID = 1
+OCSP_PRODUCED_BEFORE_CERT = 2
+OCSP_PRODUCED_AFTER_CERT = 3
# This file implements very minimal certificate and OCSP generation. It's
# designed to test revocation checking.
@@ -245,15 +267,8 @@
asn1.BitString(privkey.Sign(tbsCert)),
]))
-
-def MakeOCSPResponse(issuer_cn, issuer_key, serial, ocsp_state):
- # https://tools.ietf.org/html/rfc2560
- issuer_name_hash = asn1.OCTETSTRING(
- hashlib.sha1(asn1.ToDER(Name(cn = issuer_cn))).digest())
-
- issuer_key_hash = asn1.OCTETSTRING(
- hashlib.sha1(asn1.ToDER(issuer_key)).digest())
-
+def MakeOCSPSingleResponse(
+ issuer_name_hash, issuer_key_hash, serial, ocsp_state, ocsp_date):
cert_status = None
if ocsp_state == OCSP_STATE_REVOKED:
cert_status = asn1.Explicit(1, asn1.GeneralizedTime("20100101060000Z"))
@@ -261,28 +276,85 @@
cert_status = asn1.Raw(asn1.TagAndLength(0x80 | 2, 0))
elif ocsp_state == OCSP_STATE_GOOD:
cert_status = asn1.Raw(asn1.TagAndLength(0x80 | 0, 0))
+ elif ocsp_state == OCSP_STATE_MISMATCHED_SERIAL:
+ cert_status = asn1.Raw(asn1.TagAndLength(0x80 | 0, 0))
+ serial -= 1
else:
raise ValueError('Bad OCSP state: ' + str(ocsp_state))
+ now = datetime.datetime.fromtimestamp(time.mktime(time.gmtime()))
+ if ocsp_date == OCSP_DATE_VALID:
+ thisUpdate = now - datetime.timedelta(days=1)
+ nextUpdate = thisUpdate + datetime.timedelta(weeks=1)
+ elif ocsp_date == OCSP_DATE_OLD:
+ thisUpdate = now - datetime.timedelta(hours=1, weeks=1)
+ nextUpdate = thisUpdate + datetime.timedelta(weeks=1)
+ elif ocsp_date == OCSP_DATE_EARLY:
+ thisUpdate = now + datetime.timedelta(hours=1)
+ nextUpdate = thisUpdate + datetime.timedelta(weeks=1)
+ elif ocsp_date == OCSP_DATE_LONG:
+ thisUpdate = now - datetime.timedelta(days=365)
+ nextUpdate = thisUpdate + datetime.timedelta(hours=1, days=365)
+ elif ocsp_date == OCSP_DATE_BEFORE_CERT:
+ thisUpdate = now - datetime.timedelta(days=1)
+ nextUpdate = thisUpdate + datetime.timedelta(weeks=1)
+ elif ocsp_date == OCSP_DATE_AFTER_CERT:
+ thisUpdate = now - datetime.timedelta(days=1)
+ nextUpdate = thisUpdate + datetime.timedelta(weeks=1)
+ else:
+ raise ValueError('Bad OCSP date: ' + str(ocsp_date))
+
+ return asn1.SEQUENCE([ # SingleResponse
+ asn1.SEQUENCE([ # CertID
+ asn1.SEQUENCE([ # hashAlgorithm
+ HASH_SHA1,
+ None,
+ ]),
+ issuer_name_hash,
+ issuer_key_hash,
+ serial,
+ ]),
+ cert_status,
+ asn1.GeneralizedTime( # thisUpdate
+ thisUpdate.strftime(GENERALIZED_TIME_FORMAT)
+ ),
+ asn1.Explicit( # nextUpdate
+ 0,
+ asn1.GeneralizedTime(nextUpdate.strftime(GENERALIZED_TIME_FORMAT))
+ ),
+ ])
+
+def MakeOCSPResponse(
+ issuer_cn, issuer_key, serial, ocsp_states, ocsp_dates, ocsp_produced):
+ # https://tools.ietf.org/html/rfc2560
+ issuer_name_hash = asn1.OCTETSTRING(
+ hashlib.sha1(asn1.ToDER(Name(cn = issuer_cn))).digest())
+
+ issuer_key_hash = asn1.OCTETSTRING(
+ hashlib.sha1(asn1.ToDER(issuer_key)).digest())
+
+ now = datetime.datetime.fromtimestamp(time.mktime(time.gmtime()))
+ if ocsp_produced == OCSP_PRODUCED_VALID:
+ producedAt = now - datetime.timedelta(days=1)
+ elif ocsp_produced == OCSP_PRODUCED_BEFORE_CERT:
+ producedAt = datetime.datetime.strptime(
+ "19100101050000Z", GENERALIZED_TIME_FORMAT)
+ elif ocsp_produced == OCSP_PRODUCED_AFTER_CERT:
+ producedAt = datetime.datetime.strptime(
+ "20321201070000Z", GENERALIZED_TIME_FORMAT)
+ else:
+ raise ValueError('Bad OCSP produced: ' + str(ocsp_produced))
+
+ single_responses = [
+ MakeOCSPSingleResponse(issuer_name_hash, issuer_key_hash, serial,
+ ocsp_state, ocsp_date)
+ for ocsp_state, ocsp_date in itertools.izip(ocsp_states, ocsp_dates)
+ ]
+
basic_resp_data_der = asn1.ToDER(asn1.SEQUENCE([
asn1.Explicit(2, issuer_key_hash),
- asn1.GeneralizedTime("20100101060000Z"), # producedAt
- asn1.SEQUENCE([
- asn1.SEQUENCE([ # SingleResponse
- asn1.SEQUENCE([ # CertID
- asn1.SEQUENCE([ # hashAlgorithm
- HASH_SHA1,
- None,
- ]),
- issuer_name_hash,
- issuer_key_hash,
- serial,
- ]),
- cert_status,
- asn1.GeneralizedTime("20100101060000Z"), # thisUpdate
- asn1.Explicit(0, asn1.GeneralizedTime("20300101060000Z")), # nextUpdate
- ]),
- ]),
+ asn1.GeneralizedTime(producedAt.strftime(GENERALIZED_TIME_FORMAT)),
+ asn1.SEQUENCE(single_responses),
]))
basic_resp = asn1.SEQUENCE([
@@ -311,19 +383,15 @@
pem += '-----END CERTIFICATE-----\n'
return pem
-OCSP_STATE_GOOD = 1
-OCSP_STATE_REVOKED = 2
-OCSP_STATE_INVALID = 3
-OCSP_STATE_UNAUTHORIZED = 4
-OCSP_STATE_UNKNOWN = 5
-
# unauthorizedDER is an OCSPResponse with a status of 6:
# SEQUENCE { ENUM(6) }
unauthorizedDER = '30030a0106'.decode('hex')
def GenerateCertKeyAndOCSP(subject = "127.0.0.1",
ocsp_url = "http://127.0.0.1",
- ocsp_state = OCSP_STATE_GOOD,
+ ocsp_states = None,
+ ocsp_dates = None,
+ ocsp_produced = OCSP_PRODUCED_VALID,
serial = 0):
'''GenerateCertKeyAndOCSP returns a (cert_and_key_pem, ocsp_der) where:
* cert_and_key_pem contains a certificate and private key in PEM format
@@ -331,6 +399,11 @@
* ocsp_der contains a DER encoded OCSP response or None if ocsp_url is
None'''
+ if ocsp_states is None:
+ ocsp_states = [OCSP_STATE_GOOD]
+ if ocsp_dates is None:
+ ocsp_dates = [OCSP_DATE_VALID]
+
if serial == 0:
serial = RandomNumber(16)
cert_der = MakeCertificate(ISSUER_CN, bytes(subject), serial, KEY, KEY,
@@ -339,11 +412,35 @@
ocsp_der = None
if ocsp_url is not None:
- if ocsp_state == OCSP_STATE_UNAUTHORIZED:
+ if ocsp_states[0] == OCSP_STATE_UNAUTHORIZED:
ocsp_der = unauthorizedDER
- elif ocsp_state == OCSP_STATE_INVALID:
+ elif ocsp_states[0] == OCSP_STATE_INVALID_RESPONSE:
ocsp_der = '3'
+ elif ocsp_states[0] == OCSP_STATE_TRY_LATER:
+ resp = asn1.SEQUENCE([
+ asn1.ENUMERATED(3),
+ ])
+ ocsp_der = asn1.ToDER(resp)
+ elif ocsp_states[0] == OCSP_STATE_INVALID_RESPONSE_DATA:
+ invalid_data = asn1.ToDER(asn1.OCTETSTRING('not ocsp data'))
+ basic_resp = asn1.SEQUENCE([
+ asn1.Raw(invalid_data),
+ asn1.SEQUENCE([
+ SHA256_WITH_RSA_ENCRYPTION,
+ None,
+ ]),
+ asn1.BitString(KEY.Sign(invalid_data)),
+ ])
+ resp = asn1.SEQUENCE([
+ asn1.ENUMERATED(0),
+ asn1.Explicit(0, asn1.SEQUENCE([
+ OCSP_TYPE_BASIC,
+ asn1.OCTETSTRING(asn1.ToDER(basic_resp)),
+ ])),
+ ])
+ ocsp_der = asn1.ToDER(resp)
else:
- ocsp_der = MakeOCSPResponse(ISSUER_CN, KEY, serial, ocsp_state)
+ ocsp_der = MakeOCSPResponse(
+ ISSUER_CN, KEY, serial, ocsp_states, ocsp_dates, ocsp_produced)
return (cert_pem + KEY_PEM, ocsp_der)
diff --git a/testserver.py b/testserver.py
index 7950f35..97f2b05 100755
--- a/testserver.py
+++ b/testserver.py
@@ -80,6 +80,13 @@
# Default request queue size for WebSocketServer.
_DEFAULT_REQUEST_QUEUE_SIZE = 128
+OCSP_STATES_NO_SINGLE_RESPONSE = {
+ minica.OCSP_STATE_INVALID_RESPONSE,
+ minica.OCSP_STATE_UNAUTHORIZED,
+ minica.OCSP_STATE_TRY_LATER,
+ minica.OCSP_STATE_INVALID_RESPONSE_DATA,
+}
+
class WebSocketOptions:
"""Holds options for WebSocketServer."""
@@ -1896,27 +1903,75 @@
print ('OCSP server started on %s:%d...' %
(host, self.__ocsp_server.server_port))
- ocsp_state = None
+ ocsp_states = list()
+ for ocsp_state_arg in self.options.ocsp.split(':'):
+ if ocsp_state_arg == 'ok':
+ ocsp_state = minica.OCSP_STATE_GOOD
+ elif ocsp_state_arg == 'revoked':
+ ocsp_state = minica.OCSP_STATE_REVOKED
+ elif ocsp_state_arg == 'invalid':
+ ocsp_state = minica.OCSP_STATE_INVALID_RESPONSE
+ elif ocsp_state_arg == 'unauthorized':
+ ocsp_state = minica.OCSP_STATE_UNAUTHORIZED
+ elif ocsp_state_arg == 'unknown':
+ ocsp_state = minica.OCSP_STATE_UNKNOWN
+ elif ocsp_state_arg == 'later':
+ ocsp_state = minica.OCSP_STATE_TRY_LATER
+ elif ocsp_state_arg == 'invalid_data':
+ ocsp_state = minica.OCSP_STATE_INVALID_RESPONSE_DATA
+ elif ocsp_state_arg == "mismatched_serial":
+ ocsp_state = minica.OCSP_STATE_MISMATCHED_SERIAL
+ else:
+ raise testserver_base.OptionError('unknown OCSP status: ' +
+ ocsp_state_arg)
+ ocsp_states.append(ocsp_state)
- if self.options.ocsp == 'ok':
- ocsp_state = minica.OCSP_STATE_GOOD
- elif self.options.ocsp == 'revoked':
- ocsp_state = minica.OCSP_STATE_REVOKED
- elif self.options.ocsp == 'invalid':
- ocsp_state = minica.OCSP_STATE_INVALID
- elif self.options.ocsp == 'unauthorized':
- ocsp_state = minica.OCSP_STATE_UNAUTHORIZED
- elif self.options.ocsp == 'unknown':
- ocsp_state = minica.OCSP_STATE_UNKNOWN
+ if len(ocsp_states) > 1:
+ if set(ocsp_states) & OCSP_STATES_NO_SINGLE_RESPONSE:
+ raise testserver_base.OptionError('Multiple OCSP responses '
+ 'incompatible with states ' + str(ocsp_states))
+
+ ocsp_dates = list()
+ for ocsp_date_arg in self.options.ocsp_date.split(':'):
+ if ocsp_date_arg == 'valid':
+ ocsp_date = minica.OCSP_DATE_VALID
+ elif ocsp_date_arg == 'old':
+ ocsp_date = minica.OCSP_DATE_OLD
+ elif ocsp_date_arg == 'early':
+ ocsp_date = minica.OCSP_DATE_EARLY
+ elif ocsp_date_arg == 'long':
+ ocsp_date = minica.OCSP_DATE_LONG
+ elif ocsp_date_arg == 'before_cert':
+ ocsp_date = minica.OCSP_DATE_AFTER_CERT
+ elif ocsp_date_arg == 'after_cert':
+ ocsp_date = minica.OCSP_DATE_AFTER_CERT
+ else:
+ raise testserver_base.OptionError('unknown OCSP date: ' +
+ ocsp_date_arg)
+ ocsp_dates.append(ocsp_date)
+
+ if len(ocsp_states) != len(ocsp_dates):
+ raise testserver_base.OptionError('mismatched ocsp and ocsp-date '
+ 'count')
+
+ ocsp_produced = None
+ if self.options.ocsp_produced == 'valid':
+ ocsp_produced = minica.OCSP_PRODUCED_VALID
+ elif self.options.ocsp_produced == 'before':
+ ocsp_produced = minica.OCSP_PRODUCED_BEFORE_CERT
+ elif self.options.ocsp_produced == 'after':
+ ocsp_produced = minica.OCSP_PRODUCED_AFTER_CERT
else:
- raise testserver_base.OptionError('unknown OCSP status: ' +
- self.options.ocsp_status)
+ raise testserver_base.OptionError('unknown OCSP produced: ' +
+ self.options.ocsp_produced)
(pem_cert_and_key, ocsp_der) = minica.GenerateCertKeyAndOCSP(
subject = "127.0.0.1",
ocsp_url = ("http://%s:%d/ocsp" %
(host, self.__ocsp_server.server_port)),
- ocsp_state = ocsp_state,
+ ocsp_states = ocsp_states,
+ ocsp_dates = ocsp_dates,
+ ocsp_produced = ocsp_produced,
serial = self.options.cert_serial)
if self.options.ocsp_server_unavailable:
@@ -2088,6 +2143,12 @@
help='The type of OCSP response generated '
'for the automatically generated '
'certificate. One of [ok,revoked,invalid]')
+ self.option_parser.add_option('--ocsp-date', dest='ocsp_date',
+ default='valid', help='The validity of the '
+ 'range between thisUpdate and nextUpdate')
+ self.option_parser.add_option('--ocsp-produced', dest='ocsp_produced',
+ default='valid', help='producedAt relative '
+ 'to certificate expiry')
self.option_parser.add_option('--cert-serial', dest='cert_serial',
default=0, type=int,
help='If non-zero then the generated '