blob: ff092f8888ea860506c4f32b9a474b8be87d99ae [file] [log] [blame]
# Copyright 2019 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
The notarization module manages uploading artifacts for notarization, polling
for results, and stapling Apple Notary notarization tickets.
"""
import collections
import enum
import os
import plistlib
import subprocess
import time
from signing import commands, invoker, logger, model
_LOG_FILE_URL = 'LogFileURL'
_NOTARY_SERVICE_MAX_RETRIES = 3
class NotarizationError(Exception):
pass
class Invoker(invoker.Base):
@staticmethod
def register_arguments(parser):
parser.add_argument(
'--notary-arg',
action='append',
default=[],
help='Specifies additional arguments to pass to the notarization '
'tool. If specified multiple times, the arguments are passed in '
'the given order. These are passed to every invocation of the '
'notarization tool and are intended to specify authentication '
'parameters.')
def __init__(self, args, config):
self._notary_args = args.notary_arg
@property
def notary_args(self):
return self._notary_args
def submit(self, path, config):
command = [
'xcrun',
'notarytool',
'submit',
path,
'--no-wait',
'--output-format',
'plist',
] + self.notary_args
output = commands.run_command_output(command)
try:
plist = plistlib.loads(output)
return plist['id']
except:
raise NotarizationError(
'xcrun notarytool returned output that could not be parsed: {}'
.format(output))
def get_result(self, uuid, config):
command = [
'xcrun',
'notarytool',
'info',
uuid,
'--output-format',
'plist',
] + self.notary_args
output = commands.run_command_output(command)
plist = plistlib.loads(output)
status = plist['status']
if status == 'In Progress':
return NotarizationResult(Status.IN_PROGRESS, status, output, None)
if status == 'Accepted':
return NotarizationResult(Status.SUCCESS, status, output, None)
# notarytool does not provide log file URLs, so instead try to fetch
# the log on failure.
try:
log = self._get_log(uuid, config).decode('utf8')
except Exception as e:
logger.error('Failed to get the notarization log data', e)
log = None
return NotarizationResult(Status.ERROR, status, output, log)
def _get_log(self, uuid, config):
command = ['xcrun', 'notarytool', 'log', uuid] + self.notary_args
return commands.run_command_output(command)
def submit(path, config):
"""Submits an artifact to Apple for notarization.
Args:
path: The path to the artifact that will be uploaded for notarization.
config: The |config.CodeSignConfig| for the artifact.
Returns:
A UUID from the notary service that represents the request.
"""
uuid = config.invoker.notarizer.submit(path, config)
logger.info('Submitted %s for notarization, request UUID: %s.', path, uuid)
return uuid
class Status(enum.Enum):
"""Enum representing the state of a notarization request."""
SUCCESS = enum.auto()
IN_PROGRESS = enum.auto()
ERROR = enum.auto()
"""Tuple object that contains the status and result information of a
notarization request.
"""
NotarizationResult = collections.namedtuple(
'NotarizationResult', ['status', 'status_string', 'output', 'log_file'])
def wait_for_results(uuids, config):
"""Waits for results from the notarization service. This iterates the list
of UUIDs and checks the status of each one. For each successful result, the
function yields to the caller. If a request failed, this raises a
NotarizationError. If no requests are ready, this operation blocks and
retries until a result is ready. After a certain amount of time, the
operation will time out with a NotarizationError if no results are
produced.
Args:
uuids: List of UUIDs to check for results. The list must not be empty.
config: The |config.CodeSignConfig| object.
Yields:
The UUID of a successful notarization request.
"""
assert len(uuids)
wait_set = set(uuids)
sleep_time_seconds = 5
total_sleep_time_seconds = 0
while len(wait_set) > 0:
for uuid in list(wait_set):
result = config.invoker.notarizer.get_result(uuid, config)
if result.status == Status.IN_PROGRESS:
continue
elif result.status == Status.SUCCESS:
logger.info('Successfully notarized request %s. Log file: %s',
uuid, result.log_file)
wait_set.remove(uuid)
yield uuid
else:
logger.error(
'Failed to notarize request %s.\n'
'Output:\n%s\n'
'Log file:\n%s', uuid, result.output, result.log_file)
raise NotarizationError(
'Notarization request {} failed with status: "{}".'.format(
uuid,
result.status_string,
))
if len(wait_set) > 0:
# Do not wait more than 60 minutes for all the operations to
# complete.
if total_sleep_time_seconds < 60 * 60:
# No results were available, so wait and try again in some
# number of seconds. Do not wait more than 1 minute for any
# iteration.
time.sleep(sleep_time_seconds)
total_sleep_time_seconds += sleep_time_seconds
sleep_time_seconds = min(sleep_time_seconds * 2, 60)
else:
raise NotarizationError(
'Timed out waiting for notarization requests: {}'.format(
wait_set))
def staple_bundled_parts(parts, paths):
"""Staples all the bundled executable components of the app bundle.
Args:
parts: A list of |model.CodeSignedProduct|.
paths: A |model.Paths| object.
"""
# Only staple the signed, bundled executables.
part_paths = [
part.path
for part in parts
if part.path[-4:] in ('.app', '.xpc')
]
# Reverse-sort the paths so that more nested paths are stapled before
# less-nested ones.
part_paths.sort(reverse=True)
for part_path in part_paths:
staple(os.path.join(paths.work, part_path))
def staple(path):
"""Staples a notarization ticket to the artifact at |path|. The
notarization must have been performed with submit() and then completed by
Apple's notary service, which can be checked with wait_for_one_result().
Args:
path: The path to the artifact that had previously been submitted for
notarization and is now ready for stapling.
"""
retry = Retry('staple', sleep_before_retry=True)
while retry.keep_going():
try:
commands.run_command(
['xcrun', 'stapler', 'staple', '--verbose', path])
return
except subprocess.CalledProcessError as e:
# Known bad codes:
bad_codes = (
65, # CloudKit query failed due to "(null)"
68, # A server with the specified hostname could not be found.
)
if e.returncode in bad_codes and retry.failed_should_retry(
f'Output: {e.output}'):
continue
raise e
class Retry(object):
"""Retry is a helper class that manages retrying notarization operations
that may fail due to transient issues. Usage:
retry = Retry('staple')
while retry.keep_going():
try:
return operation()
except Exception as e:
if is_transient(e) and retry.failed_should_retry():
continue
raise e
"""
def __init__(self, desc, sleep_before_retry=False):
"""Creates a retry state object.
Args:
desc: A short description of the operation to retry.
sleep_before_retry: If True, will sleep before proceeding with
a retry.
"""
self._attempt = 0
self._desc = desc
self._sleep_before_retry = sleep_before_retry
def keep_going(self):
"""Used as the condition for a retry loop."""
if self._attempt < _NOTARY_SERVICE_MAX_RETRIES:
return True
raise RuntimeError(
'Loop should have terminated at failed_should_retry()')
def failed_should_retry(self, msg=''):
"""If the operation failed and the caller wants to retry it, this
method increments the attempt count and determines if another
attempt should be made.
Args:
msg: An optional message to describe the failure.
Returns:
True if the retry loop should continue, and False if the loop
should terminate with an error.
"""
self._attempt += 1
if self._attempt < _NOTARY_SERVICE_MAX_RETRIES:
retry_when_message = ('after 30 seconds' if self._sleep_before_retry
else 'immediately')
logger.warning(f'Error during notarization command {self._desc}. ' +
f'Retrying {retry_when_message}. {msg}')
if self._sleep_before_retry:
time.sleep(30)
return True
return False