blob: 887427923bb64e17adefc0d582293e8bf5b4bae8 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
'''Factory Update Server.
The factory update server is implemented as a thread to be started by
shop floor server. It monitors the given state_dir and detects
factory.tar.bz2 file changes and then sets up the new update files
into factory_dir (under state_dir). It also starts an rsync server
to serve factory_dir for clients to fetch update files.
'''
import errno
import logging
import os
import shutil
import subprocess
import threading
import time
FACTORY_DIR = 'factory'
AUTOTEST_DIR = 'autotest'
TARBALL_NAME = 'factory.tar.bz2'
LATEST_SYMLINK = 'latest'
LATEST_MD5SUM = 'latest.md5sum'
MD5SUM = 'MD5SUM'
DEFAULT_RSYNCD_PORT = 8083
RSYNCD_CONFIG_TEMPLATE = '''port = %(port)d
pid file = %(pidfile)s
log file = %(logfile)s
use chroot = no
[factory]
path = %(factory_dir)s
read only = yes
'''
def StartRsyncServer(port, state_dir, factory_dir):
configfile = os.path.join(state_dir, 'rsyncd.conf')
pidfile = os.path.join(state_dir, 'rsyncd.pid')
if os.path.exists(pidfile):
# Since rsyncd will not overwrite it if it already exists
os.unlink(pidfile)
logfile = os.path.join(state_dir, 'rsyncd.log')
data = RSYNCD_CONFIG_TEMPLATE % dict(port=port,
pidfile=pidfile,
logfile=logfile,
factory_dir=factory_dir)
with open(configfile, 'w') as f:
f.write(data)
p = subprocess.Popen(('rsync', '--daemon', '--no-detach',
'--config=%s' % configfile))
logging.info('Rsync server (pid %d) started on port %d', p.pid, port)
return p
def StopRsyncServer(rsyncd_process):
logging.info('Stopping rsync server (pid %d)', rsyncd_process.pid)
rsyncd_process.terminate()
# If not terminated in a second, send a kill -9.
def WaitAndKill():
time.sleep(1)
try:
rsyncd_process.kill()
except:
pass
thread = threading.Thread(target=WaitAndKill)
thread.daemon = True
thread.start()
rsyncd_process.wait()
logging.debug('Rsync server stopped')
def CalculateMd5sum(filename):
p = subprocess.Popen(('md5sum', filename), stdout=subprocess.PIPE)
output, _ = p.communicate()
return output.split()[0]
class FactoryUpdateServer():
def __init__(self, state_dir, rsyncd_port=DEFAULT_RSYNCD_PORT,
poll_interval_sec=1):
self.state_dir = state_dir
self.factory_dir = os.path.join(state_dir, FACTORY_DIR)
self.rsyncd_port = rsyncd_port
if not os.path.exists(self.factory_dir):
os.mkdir(self.factory_dir)
self.poll_interval_sec = poll_interval_sec
self._stop_event = threading.Event()
self._rsyncd = StartRsyncServer(rsyncd_port, state_dir, self.factory_dir)
self._tarball_path = os.path.join(self.state_dir, TARBALL_NAME)
self._thread = None
self._last_stat = None
self._run_count = 0
self._update_count = 0
self._errors = 0
def Start(self):
assert not self._thread
self._thread = threading.Thread(target=self.Run)
self._thread.start()
def Stop(self):
if self._rsyncd:
StopRsyncServer(self._rsyncd)
self._rsyncd = None
self._stop_event.set()
if self._thread:
self._thread.join()
self._thread = None
def _HandleTarball(self):
new_tarball_path = self._tarball_path + '.new'
# Copy the tarball to avoid possible race condition.
shutil.copyfile(self._tarball_path, new_tarball_path)
# Calculate MD5.
md5sum = CalculateMd5sum(new_tarball_path)
logging.info('Processing tarball ' + self._tarball_path + ' (md5sum=%s)',
md5sum)
# Move to a file containing the MD5.
final_tarball_path = self._tarball_path + '.' + md5sum
os.rename(new_tarball_path, final_tarball_path)
# Create subfolder to hold tarball contents.
final_subfolder = os.path.join(self.factory_dir, md5sum)
final_md5sum = os.path.join(final_subfolder, FACTORY_DIR, MD5SUM)
if os.path.exists(final_subfolder):
if not (os.path.exists(final_md5sum) and
open(final_md5sum).read().strip() == md5sum):
logging.warn('Update directory %s appears not to be set up properly '
'(missing or bad MD5SUM); delete it and restart update '
'server?', final_subfolder)
return
logging.info('Update is already deployed into %s', final_subfolder)
else:
new_subfolder = final_subfolder + '.new'
if os.path.exists(new_subfolder):
shutil.rmtree(new_subfolder)
os.mkdir(new_subfolder)
# Extract tarball.
success = False
try:
try:
logging.info('Staging into %s', new_subfolder)
subprocess.check_call(('tar', '-xjf', final_tarball_path,
'-C', new_subfolder))
except subprocess.CalledProcessError:
logging.error('Failed to extract update files to subfolder %s',
new_subfolder)
return
missing_dirs = [
d for d in (FACTORY_DIR, AUTOTEST_DIR)
if not os.path.exists(os.path.join(new_subfolder, d))]
if missing_dirs:
logging.error('Tarball is missing directories: %r', missing_dirs)
return
factory_dir = os.path.join(new_subfolder, FACTORY_DIR)
with open(os.path.join(factory_dir, MD5SUM), 'w') as f:
f.write(md5sum)
# Extracted and verified. Move it in place.
os.rename(new_subfolder, final_subfolder)
logging.info('Moved to final directory %s', final_subfolder)
success = True
self._update_count += 1
finally:
if os.path.exists(new_subfolder):
shutil.rmtree(new_subfolder, ignore_errors=True)
if (not success) and os.path.exists(final_subfolder):
shutil.rmtree(final_subfolder, ignore_errors=True)
# Update symlink and latest.md5sum.
linkname = os.path.join(self.factory_dir, LATEST_SYMLINK)
if os.path.islink(linkname):
os.remove(linkname)
os.symlink(md5sum, linkname)
with open(os.path.join(self.factory_dir, LATEST_MD5SUM), 'w') as f:
f.write(md5sum)
logging.info('Update files (%s) setup complete', md5sum)
def Run(self):
while True:
try:
self.RunOnce()
except:
logging.exception('Error in event loop')
self._stop_event.wait(self.poll_interval_sec)
if self._stop_event.is_set():
break
def RunOnce(self):
try:
self._run_count += 1
try:
stat = os.stat(self._tarball_path)
except OSError as e:
if e.errno == errno.ENOENT:
# File doesn't exist
return
raise
if (self._last_stat and
((stat.st_mtime, stat.st_size) ==
(self._last_stat.st_mtime, self._last_stat.st_size))):
# No change
return
self._last_stat = stat
try:
with open(os.devnull, "w") as devnull:
logging.info('Verifying integrity of tarball %s', self._tarball_path)
subprocess.check_call(['tar', '-tjf', self._tarball_path],
stdout=devnull, stderr=devnull)
except subprocess.CalledProcessError:
# E.g., still copying
logging.warn('Tarball %s (%d bytes) is corrupt or incomplete',
self._tarball_path, stat.st_size)
return
# Re-stat in case it finished being written while we were
# verifying it.
self._last_stat = os.stat(self._tarball_path)
self._HandleTarball()
except:
self._errors += 1
raise