| #!/usr/bin/env python |
| # Copyright 2017 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Generate a C++ file containing information on all accepted CT logs.""" |
| |
| import base64 |
| import datetime |
| import hashlib |
| import json |
| import math |
| import sys |
| |
| |
| def _write_cpp_header(f): |
| f.write("// This file is auto-generated, DO NOT EDIT.\n\n") |
| |
| |
| def _write_disqualified_log_info_struct_definition(f): |
| f.write( |
| "// Information related to previously-qualified, but now disqualified," |
| "\n" |
| "// CT logs.\n" |
| "struct DisqualifiedCTLogInfo {\n" |
| " // The ID of the log (the SHA-256 hash of |log_info.log_key|.\n" |
| " const char log_id[33];\n" |
| " const CTLogInfo log_info;\n" |
| " // The offset from the Unix Epoch of when the log was disqualified." |
| "\n" |
| " // SCTs embedded in pre-certificates after this date should not" |
| " count\n" |
| " // towards any uniqueness/freshness requirements.\n" |
| " const base::Time disqualification_date;\n" |
| "};\n\n") |
| |
| |
| def _split_and_hexify_binary_data(bin_data): |
| """Pretty-prints, in hex, the given bin_data.""" |
| if sys.version_info.major == 2: |
| hex_data = "".join("\\x%.2x" % ord(c) for c in bin_data) |
| else: |
| hex_data = "".join("\\x%.2x" % c for c in bin_data) |
| # line_width % 4 must be 0 to avoid splitting the hex-encoded data |
| # across '\' which will escape the quotation marks. |
| line_width = 68 |
| assert line_width % 4 == 0 |
| num_splits = int(math.ceil(len(hex_data) / float(line_width))) |
| return [ |
| '"%s"' % hex_data[i * line_width:(i + 1) * line_width] |
| for i in range(num_splits) |
| ] |
| |
| |
| def _get_log_ids_array(log_ids, array_name): |
| num_logs = len(log_ids) |
| log_ids.sort() |
| log_id_length = len(log_ids[0]) + 1 |
| log_id_code = [ |
| "// The list is sorted.\n", |
| "const char %s[][%d] = {\n" % (array_name, log_id_length) |
| ] |
| for i in range(num_logs): |
| split_hex_id = _split_and_hexify_binary_data(log_ids[i]) |
| s = " %s" % ("\n ".join(split_hex_id)) |
| if (i < num_logs - 1): |
| s += ',\n' |
| log_id_code.append(s) |
| log_id_code.append('};\n\n') |
| return log_id_code |
| |
| |
| def _get_log_ids_for_operator(logs_by_operator, operator_name): |
| """Returns a list of Log IDs of logs operated by operator_name.""" |
| for operator in logs_by_operator: |
| if operator["name"] == operator_name: |
| return [ |
| base64.b64decode(log["log_id"]) |
| for log in operator["logs"] |
| if _is_log_once_or_currently_qualified(log) |
| ] |
| return [] |
| |
| |
| def _is_log_disqualified(log): |
| # Disqualified logs are denoted with state="retired" |
| assert (len(log.get("state")) == 1) |
| log_state = list(log.get("state"))[0] |
| return log_state == "retired" |
| |
| |
| def _escape_c_string(s): |
| def _escape_char(c): |
| if ord(c) == 0: |
| raise ValueError( |
| 'String with NUL character cannot be converted to C string') |
| if 32 <= ord(c) <= 126 and c not in '\\"': |
| return c |
| else: |
| return "\\%03o" % ord(c) |
| |
| return "".join(_escape_char(c) for c in s) |
| |
| |
| def _to_loginfo_struct(log, index): |
| """Converts the given log to a CTLogInfo initialization code.""" |
| log_key = base64.b64decode(log["key"]) |
| split_hex_key = _split_and_hexify_binary_data(log_key) |
| s = " {" |
| s += "\n ".join(split_hex_key) |
| s += ',\n %d' % (len(log_key)) |
| s += ',\n "%s"' % (_escape_c_string(log["description"])) |
| s += ',\n "' |
| if "current_operator" in log: |
| s += (_escape_c_string(log["current_operator"])) |
| s += '",\n ' |
| if "previous_operators" in log: |
| s += 'kPreviousOperators%d, %d' % (index, len(log["previous_operators"])) |
| else: |
| s += 'nullptr, 0' |
| s += '}' |
| return s |
| |
| |
| def _get_log_definitions(logs): |
| """Returns a list of strings, each is a CTLogInfo definition.""" |
| return [_to_loginfo_struct(log, index) for index, log in enumerate(logs)] |
| |
| |
| def _timestamp_to_timedelta_since_unixepoch(timestamp): |
| dt = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%SZ") |
| unix_epoch = datetime.datetime(1970, 1, 1, 0, 0) |
| # Both objects in this operation are intended to represent UTC |
| # time, although they have a tzinfo of None associated with them. |
| return (dt - unix_epoch).total_seconds() |
| |
| |
| def _to_disqualified_loginfo_struct(log, index): |
| log_id = base64.b64decode(log["log_id"]) |
| s = " {" |
| s += "\n ".join(_split_and_hexify_binary_data(log_id)) |
| s += ",\n" |
| s += _to_loginfo_struct(log, index) |
| s += ",\n" |
| s += ' base::Time::FromTimeT(%d)' % ( |
| _timestamp_to_timedelta_since_unixepoch( |
| log["state"]["retired"]["timestamp"])) |
| s += '}' |
| return s |
| |
| |
| def _get_disqualified_log_definitions(logs, starting_index): |
| """Returns a list of DisqualifiedCTLogInfo definitions.""" |
| return [_to_disqualified_loginfo_struct(log, starting_index + index) |
| for index, log in enumerate(logs)] |
| |
| |
| def _sorted_disqualified_logs(all_logs): |
| return sorted([l for l in all_logs if _is_log_disqualified(l)], |
| key = lambda l: base64.b64decode(l["log_id"])) |
| |
| |
| def _to_previous_operators_struct(log, index): |
| s = "" |
| if "previous_operators" in log: |
| s += 'const PreviousOperatorEntry kPreviousOperators%d[] = {' % index |
| for operator_switch in sorted(log["previous_operators"], key=lambda x: |
| _timestamp_to_timedelta_since_unixepoch( |
| x["end_time"])): |
| s += '\n {"%s", ' % (_escape_c_string(operator_switch["name"])) |
| s += ( |
| 'base::Time::FromTimeT(%d)},' % |
| _timestamp_to_timedelta_since_unixepoch(operator_switch["end_time"])) |
| s += '};\n' |
| return s |
| |
| |
| def _get_previous_operators(logs, starting_index): |
| prev_operators = [] |
| for index, log in enumerate(logs): |
| operators = _to_previous_operators_struct(log, starting_index + index) |
| prev_operators.append(operators) |
| return prev_operators |
| |
| |
| def _write_qualifying_logs_loginfo(f, qualifying_logs): |
| f.write("// The set of all presently-qualifying CT logs.\n") |
| f.write("const CTLogInfo kCTLogList[] = {\n") |
| f.write(",\n".join(_get_log_definitions(qualifying_logs))) |
| f.write("\n};\n\n") |
| |
| |
| def _write_previous_operator_info(f, qualifying_logs, disqualified_logs): |
| f.write("// Previous operators for presently-qualifying CT logs.\n") |
| prev_operators = _get_previous_operators(qualifying_logs, 0) |
| for operator in prev_operators: |
| f.write(operator) |
| f.write("\n") |
| f.write("// Previous operators for disqualified CT logs.\n") |
| prev_operators = _get_previous_operators(disqualified_logs, |
| len(qualifying_logs)) |
| for operator in prev_operators: |
| f.write(operator) |
| f.write("\n") |
| |
| |
| |
| def _is_log_once_or_currently_qualified(log): |
| assert (len(log.get("state")) == 1) |
| return list(log.get("state"))[0] not in ("pending", "rejected") |
| |
| def _generate_log_list_timestamp(timestamp): |
| s = "" |
| s += "// The time at which this log list was last updated.\n"; |
| s += "const base::Time kLogListTimestamp = " |
| s += 'base::Time::FromTimeT(%d);\n\n' % ( |
| _timestamp_to_timedelta_since_unixepoch(timestamp)) |
| return s |
| |
| |
| def generate_cpp_file(input_file, f): |
| """Generate a header file of known logs to be included by Chromium.""" |
| json_log_list = json.load(input_file) |
| _write_cpp_header(f) |
| |
| # Logs with pending/rejected should not be considered by Chromium |
| logs_by_operator = json_log_list["operators"] |
| logs = [] |
| for operator in logs_by_operator: |
| for log in operator["logs"]: |
| if _is_log_once_or_currently_qualified(log): |
| log["current_operator"] = operator["name"] |
| logs.append(log) |
| |
| # Write the timestamp value. |
| f.write(_generate_log_list_timestamp(json_log_list["log_list_timestamp"])) |
| |
| # Get the lists of currently-qualifying and disqualified logs. |
| qualifying_logs = [log for log in logs if not _is_log_disqualified(log)] |
| sorted_disqualified_logs = _sorted_disqualified_logs(logs) |
| |
| # Write previous operator information. |
| _write_previous_operator_info(f, qualifying_logs, sorted_disqualified_logs) |
| # Write the list of currently-qualifying logs. |
| _write_qualifying_logs_loginfo(f, qualifying_logs) |
| |
| # Write the IDs of all CT Logs operated by Google |
| google_log_ids = _get_log_ids_for_operator(logs_by_operator, "Google") |
| f.writelines(_get_log_ids_array(google_log_ids, "kGoogleLogIDs")) |
| |
| # Write the list of all disqualified logs. |
| _write_disqualified_log_info_struct_definition(f) |
| f.write("// The set of all disqualified logs, sorted by |log_id|.\n") |
| f.write("constexpr DisqualifiedCTLogInfo kDisqualifiedCTLogList[] = {\n") |
| f.write(",\n".join( |
| _get_disqualified_log_definitions(sorted_disqualified_logs, |
| len(qualifying_logs)))) |
| f.write("\n};\n") |
| |
| |
| def main(): |
| if len(sys.argv) != 3: |
| print(("usage: %s in_loglist_json out_header" % sys.argv[0])) |
| return 1 |
| with open(sys.argv[1], "r") as infile, open(sys.argv[2], "w") as outfile: |
| generate_cpp_file(infile, outfile) |
| return 0 |
| |
| |
| if __name__ == "__main__": |
| sys.exit(main()) |