blob: 2294dc50c699f279e590cc6b207a7a870506235d [file] [log] [blame]
#!/usr/bin/env python
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
'''Factory Update Server.
The factory update server is implemented as a thread to be started by
shop floor server. It monitors the given state_dir and detects
factory.tar.bz2 file changes and then sets up the new update files
into factory_dir (under state_dir). It also starts an rsync server
to serve factory_dir for clients to fetch update files.
'''
import glob
import logging
import os
import optparse
import shutil
import signal
import subprocess
import threading
import factory_common # pylint: disable=W0611
from cros.factory.utils.process_utils import Spawn
from cros.factory.utils.server_utils import (RsyncModule, StartRsyncServer,
StopRsyncServer)
FACTORY_DIR = 'factory'
AUTOTEST_DIR = 'autotest'
TARBALL_NAME = 'factory.tar.bz2'
BLACKLIST_NAME = 'blacklist'
LATEST_SYMLINK = 'latest'
LATEST_MD5SUM = 'latest.md5sum'
DEFAULT_UPDATE_DIR = '/var/db/factory/updates/'
MD5SUM = 'MD5SUM'
DEFAULT_RSYNCD_ADDR = '0.0.0.0'
DEFAULT_RSYNCD_PORT = 8083
def CalculateMd5sum(filename):
p = subprocess.Popen(('md5sum', filename), stdout=subprocess.PIPE)
output, _ = p.communicate()
return output.split()[0]
class ChangeDetector(object):
"""Detects changes in a file.
Properties:
path: The last path detected to the file.
"""
def __init__(self, pattern):
"""Constructor:
Args:
pattern: The path (or pattern) to monitor.
"""
self.pattern = pattern
self.path = None
self.stat = None
def HasChanged(self):
"""Returns True if the file has changed since the last invocation."""
paths = glob.glob(self.pattern)
last_path = self.path
last_stat = self.stat
if len(paths) != 1:
if not paths:
# No files; that's OK
pass
else:
# Multiple files; that's a problem!
logging.warn('Multiple files match pattern %s', self.pattern)
self.path = None
self.stat = None
return self.path != last_path
self.path = paths[0]
self.stat = os.stat(self.path)
return (self.path != last_path or
not last_stat or
((self.stat.st_mtime, self.stat.st_size) !=
(last_stat.st_mtime, last_stat.st_size)))
class FactoryUpdateServer():
"""The class to handle factory update bundle
Properties:
state_dir: Update state directory (generally shopfloor_data/update)
factory_dir: Updater bundle directory to hold previous and current contents
or factory bundles.
rsyncd_addr: Address on which to open rsyncd.
rsyncd_port: Port on which to open rsyncd.
hwid_path: The path of hwid bundle.
on_idle: If non-None, a function to call on idle (generally every second).
_stop_event: Event to stop thread from running
_rsyncd: Process of rsync server daemon.
_tarball_path: The path to factory tarball file.
_blacklist_path: The path to blacklist file.
_blacklist: A list of md5sum which updater will not trigger update
unless device explicitly want to get an update(generally in
engineering mode).
_thread: The thread to run detectors and handlers in a loop.
_factory_detector: The detector to detect a new updater bundle has been
placed.
_hwid_detector: The detector to detect a new hwid bundle has been placed.
_blacklist_detector: The detector to detect a new blacklist has been
placed.
"""
poll_interval_sec = 1
def __init__(self, state_dir, rsyncd_addr=DEFAULT_RSYNCD_ADDR,
rsyncd_port=DEFAULT_RSYNCD_PORT, on_idle=None):
"""Constructor.
Args:
state_dir: Update state directory (generally shopfloor_data/update).
rsyncd_addr: Address on which to open rsyncd.
rsyncd_port: Port on which to open rsyncd.
on_idle: If non-None, a function to call on idle (generally every
second).
"""
self.state_dir = state_dir
self.factory_dir = os.path.join(state_dir, FACTORY_DIR)
self.rsyncd_addr = rsyncd_addr
self.rsyncd_port = rsyncd_port
if not os.path.exists(self.factory_dir):
os.mkdir(self.factory_dir)
self.hwid_path = None
self.on_idle = on_idle
self._stop_event = threading.Event()
self._rsyncd = StartRsyncServer(
rsyncd_port, state_dir,
[RsyncModule(module='factory', path=self.factory_dir, read_only=True)],
address=self.rsyncd_addr)
self._tarball_path = os.path.join(self.state_dir, TARBALL_NAME)
self._blacklist_path = os.path.join(state_dir, BLACKLIST_NAME)
self._blacklist = []
self._thread = None
self._run_count = 0
self._update_count = 0
self._errors = 0
self._factory_detector = ChangeDetector(self._tarball_path)
self._hwid_detector = ChangeDetector(
os.path.join(state_dir, 'hwid_*.sh'))
self._blacklist_detector = ChangeDetector(self._blacklist_path)
def __del__(self):
self.Stop()
def Start(self):
assert not self._thread
self._thread = threading.Thread(target=self.Run)
self._thread.start()
def Stop(self):
if self._rsyncd:
StopRsyncServer(self._rsyncd)
self._rsyncd = None
self._stop_event.set()
if self._thread:
self._thread.join()
self._thread = None
def GetTestMd5sum(self):
"""Returns the MD5SUM of the current update tarball."""
if not self._factory_detector.path:
return None
md5file = os.path.join(self.state_dir, FACTORY_DIR, LATEST_MD5SUM)
if not os.path.isfile(md5file):
return None
with open(md5file, 'r') as f:
return f.readline().strip()
def NeedsUpdate(self, device_md5sum):
"""Checks if the device with device_md5sum needs to get the update of current_md5sum subjected to blacklist.
Args:
device_md5sum: The md5sum of factory environment on device.
Returns:
False: device_md5sum is in blacklist or
there is no new updater bundle.
True: device_md5sum is not in blacklist and
there is a new updater bundle.
"""
if device_md5sum in self._blacklist:
logging.info('Get device md5sum %s in blacklist', device_md5sum)
return False
current_md5sum = self.GetTestMd5sum()
return current_md5sum and (current_md5sum != device_md5sum)
def _HandleBlacklist(self):
"""Reads blacklist from file"""
with open(self._blacklist_path, 'r') as f:
self._blacklist = f.read().splitlines()
logging.info('Blacklist md5sums: %s', ', '.join(self._blacklist))
def _HandleTarball(self):
new_tarball_path = self._tarball_path + '.new'
# Copy the tarball to avoid possible race condition.
shutil.copyfile(self._tarball_path, new_tarball_path)
# Calculate MD5.
md5sum = CalculateMd5sum(new_tarball_path)
logging.info('Processing tarball ' + self._tarball_path + ' (md5sum=%s)',
md5sum)
# Move to a file containing the MD5.
final_tarball_path = self._tarball_path + '.' + md5sum
os.rename(new_tarball_path, final_tarball_path)
# Create subfolder to hold tarball contents.
final_subfolder = os.path.join(self.factory_dir, md5sum)
final_md5sum = os.path.join(final_subfolder, FACTORY_DIR, MD5SUM)
if os.path.exists(final_subfolder):
if not (os.path.exists(final_md5sum) and
open(final_md5sum).read().strip() == md5sum):
logging.warn('Update directory %s appears not to be set up properly '
'(missing or bad MD5SUM); delete it and restart update '
'server?', final_subfolder)
return
logging.info('Update is already deployed into %s', final_subfolder)
else:
new_subfolder = final_subfolder + '.new'
if os.path.exists(new_subfolder):
shutil.rmtree(new_subfolder)
os.mkdir(new_subfolder)
# Extract tarball.
success = False
try:
try:
logging.info('Staging into %s', new_subfolder)
subprocess.check_call(('tar', '-xjf', final_tarball_path,
'-C', new_subfolder))
except subprocess.CalledProcessError:
logging.error('Failed to extract update files to subfolder %s',
new_subfolder)
return
missing_dirs = [
d for d in (FACTORY_DIR, AUTOTEST_DIR)
if not os.path.exists(os.path.join(new_subfolder, d))]
if missing_dirs:
logging.error('Tarball is missing directories: %r', missing_dirs)
return
factory_dir = os.path.join(new_subfolder, FACTORY_DIR)
with open(os.path.join(factory_dir, MD5SUM), 'w') as f:
f.write(md5sum)
# Extracted and verified. Move it in place.
os.rename(new_subfolder, final_subfolder)
logging.info('Moved to final directory %s', final_subfolder)
success = True
self._update_count += 1
finally:
if os.path.exists(new_subfolder):
shutil.rmtree(new_subfolder, ignore_errors=True)
if (not success) and os.path.exists(final_subfolder):
shutil.rmtree(final_subfolder, ignore_errors=True)
# Update symlink and latest.md5sum.
linkname = os.path.join(self.factory_dir, LATEST_SYMLINK)
if os.path.islink(linkname):
os.remove(linkname)
os.symlink(md5sum, linkname)
with open(os.path.join(self.factory_dir, LATEST_MD5SUM), 'w') as f:
f.write(md5sum)
logging.info('Update files (%s) setup complete', md5sum)
def Run(self):
while True:
try:
self.RunOnce()
except: # pylint: disable=W0702
logging.exception('Error in event loop')
self._stop_event.wait(self.poll_interval_sec)
if self._stop_event.is_set():
break
def RunOnce(self):
try:
self._run_count += 1
if self._factory_detector.HasChanged():
if self._factory_detector.path:
logging.info('Verifying integrity of tarball %s',
self._factory_detector.path)
process = Spawn(['tar', '-tjf', self._tarball_path],
ignore_stdout=True, ignore_stderr=True, call=True)
if process.returncode == 0:
# Re-stat in case it finished being written while we were
# verifying it.
self._factory_detector.HasChanged()
self._HandleTarball()
else:
logging.warn(
'Tarball %s (%d bytes) is corrupt or incomplete',
self._tarball_path, os.path.getsize(self._tarball_path))
else:
# It's disappeared!
logging.warn('Tarball %s has disappeared',
self._tarball_path)
if self._hwid_detector.HasChanged():
self.hwid_path = self._hwid_detector.path
if self.hwid_path is None:
logging.warn('HWID bundle %s is no longer valid',
self._hwid_detector.pattern)
else:
logging.info('Found new HWID bundle %s (MD5SUM %s); serving it',
self.hwid_path, CalculateMd5sum(self.hwid_path))
if self._blacklist_detector.HasChanged():
if self._blacklist_detector.path is None:
self._blacklist = []
logging.warn('Updater blacklist %s is no longer valid,'
'so clear blacklist.', self._blacklist_detector.pattern)
else:
self._HandleBlacklist()
logging.info('Found new updater blacklist %s (MD5SUM %s); serving it',
self._blacklist_path,
CalculateMd5sum(self._blacklist_path))
if self.on_idle:
try:
self.on_idle()
except: # pylint: disable=W0702
logging.exception('Exception in idle hook')
except:
self._errors += 1
raise
def main():
"""Starts factory update server in single process standalone mode.
Shopfloor v1 instantiates factory update server in a thread. The states
are shared between threads by accessing object methods and variables. This
standalone command line entry is added for migrating to v2.
Command line parameters:
--port: Rsync daemon port.
--dir: Update state directory.
--verbose: Log verbosity.
"""
parser = optparse.OptionParser()
parser.add_option('-d', '--dir', dest='state_dir', metavar='STATE_DIR',
default=DEFAULT_UPDATE_DIR,
help='update state directory. (default: %default)')
parser.add_option('-p', '--port', dest='port', metavar='PORT', type='int',
default=DEFAULT_RSYNCD_PORT,
help='rsync daemon port. (default: %default)')
parser.add_option('-v', '--verbose', action='count', dest='verbose',
help='increase log verbosity')
(options, args) = parser.parse_args()
if args:
parser.error('Invalid args: %s' % ' '.join(args))
log_format = '%(message)s'
if options.verbose:
verbosity = logging.DEBUG
else:
verbosity = logging.INFO
logging.basicConfig(level=verbosity, format=log_format)
update_server = FactoryUpdateServer(
options.state_dir, rsyncd_port=options.port)
# Hook SIGTERM,SIGINT and enter the polling loop.
def SignalHandler(unused_signum, unused_frame):
update_server.Stop()
raise SystemExit
signal.signal(signal.SIGTERM, SignalHandler)
signal.signal(signal.SIGINT, SignalHandler)
update_server.Run()
if __name__ == '__main__':
main()