factory-utils: Factory update server.

To allow dynamic update of test list and autotests in factory.  The
factory update server is implemented as a thread to be started by shop
floor server.

BUG=chrome-os-partner:9169
TEST=python factory_update_server_unittest.py

Change-Id: I38ef07be79062b7eecc8108c6383257cb263f2da
Reviewed-on: https://gerrit.chromium.org/gerrit/21312
Reviewed-by: Hung-Te Lin <hungte@chromium.org>
Commit-Ready: Chinyue Chen <chinyue@chromium.org>
Tested-by: Chinyue Chen <chinyue@chromium.org>
diff --git a/factory_setup/factory_update_server.py b/factory_setup/factory_update_server.py
new file mode 100755
index 0000000..1ba92bf
--- /dev/null
+++ b/factory_setup/factory_update_server.py
@@ -0,0 +1,138 @@
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+'''Factory Update Server.
+
+The factory update server is implemented as a thread to be started by shop floor
+server.  It monitors the given state_dir and detects autotest.tar.bz2 file
+changes and then sets up the new update files into update_dir (under state_dir).
+It also starts an rsync server to serve update_dir for clients to fetch update
+files.
+'''
+
+import logging
+import os
+import pyinotify
+import shutil
+import subprocess
+import threading
+
+
+UPDATE_DIR = 'autotest'
+TARBALL_NAME = 'autotest.tar.bz2'
+LATEST_SYMLINK = 'latest'
+LATEST_MD5SUM = 'latest.md5sum'
+DEFAULT_RSYNCD_PORT = 8083
+RSYNCD_CONFIG_TEMPLATE = '''port = %(port)d
+pid file = %(pidfile)s
+log file = %(logfile)s
+use chroot = no
+[autotest]
+  path = %(update_dir)s
+  read only = yes
+'''
+
+
+def StartRsyncServer(port, state_dir, update_dir):
+  configfile = os.path.join(state_dir, 'rsyncd.conf')
+  pidfile = os.path.join(state_dir, 'rsyncd.pid')
+  logfile = os.path.join(state_dir, 'rsyncd.log')
+  data = RSYNCD_CONFIG_TEMPLATE % dict(port=port,
+                                       pidfile=pidfile,
+                                       logfile=logfile,
+                                       update_dir=update_dir)
+  with open(configfile, 'w') as f:
+    f.write(data)
+
+  p = subprocess.Popen(('rsync', '--daemon', '--no-detach',
+                        '--config=%s' % configfile))
+  logging.debug('Rsync server (pid %d) started on port %d', p.pid, port)
+  return p
+
+
+def StopRsyncServer(rsyncd_process):
+  logging.debug('Stopping rsync server (pid %d)', rsyncd_process.pid)
+  rsyncd_process.terminate()
+  rsyncd_process.wait()
+  logging.debug('Rsync server stopped')
+
+
+def CalculateMd5sum(filename):
+  p = subprocess.Popen(('md5sum', filename), stdout=subprocess.PIPE)
+  output, _ = p.communicate()
+  return output.split()[0]
+
+
+class HandleEvents(pyinotify.ProcessEvent):
+
+  def __init__(self, update_dir):
+    self.update_dir = update_dir
+
+  def process_IN_CLOSE_WRITE(self, event):
+    if event.name == TARBALL_NAME:
+      # Calculate MD5.
+      md5sum = CalculateMd5sum(event.pathname)
+      logging.info('Found new ' + TARBALL_NAME + ' (%s)', md5sum)
+
+      # Create subfolder to hold tarball contents.
+      subfolder = os.path.join(self.update_dir, md5sum)
+      if os.path.exists(subfolder):
+        logging.error('Subfolder %s already exists', subfolder)
+        return
+      try:
+        os.mkdir(subfolder)
+      except Exception:
+        logging.error('Unable to create subfolder %s', subfolder)
+        return
+
+      # Extract tarball.
+      try:
+        subprocess.check_call(('tar', '-xjf', event.pathname, '-C', subfolder))
+      except subprocess.CalledProcessError:
+        logging.error('Failed to extract update files to subfolder %s',
+                      subfolder)
+        shutil.rmtree(subfolder)  # Clean up on error.
+        return
+
+      # Update symlink and latest.md5sum.
+      linkname = os.path.join(self.update_dir, LATEST_SYMLINK)
+      if os.path.islink(linkname):
+        os.remove(linkname)
+      os.symlink(md5sum, linkname)
+      with open(os.path.join(self.update_dir, LATEST_MD5SUM), 'w') as f:
+        f.write(md5sum)
+      logging.info('New update files (%s) setup complete', md5sum)
+
+
+class FactoryUpdateServer(threading.Thread):
+
+  def __init__(self, state_dir, rsyncd_port=DEFAULT_RSYNCD_PORT):
+    threading.Thread.__init__(self)
+    self.state_dir = state_dir
+    self.update_dir = os.path.join(state_dir, UPDATE_DIR)
+    if not os.path.exists(self.update_dir):
+      os.mkdir(self.update_dir)
+    self._stop_event = threading.Event()
+    self._rsyncd = StartRsyncServer(rsyncd_port, state_dir, self.update_dir)
+
+  def run(self):
+    wm = pyinotify.WatchManager()
+    notifier = pyinotify.Notifier(wm, HandleEvents(self.update_dir))
+    mask = pyinotify.IN_CLOSE_WRITE
+    wm.add_watch(self.state_dir, mask)
+    try:
+      while True:
+        notifier.process_events()
+        if self._stop_event.is_set():
+          break
+        if notifier.check_events(500):
+          notifier.read_events()
+    finally:
+      notifier.stop()
+    logging.debug('Factory update server stopped')
+
+  def stop(self):
+    StopRsyncServer(self._rsyncd)
+    self._stop_event.set()
+    self.join()
diff --git a/factory_setup/factory_update_server_unittest.py b/factory_setup/factory_update_server_unittest.py
new file mode 100755
index 0000000..4f6698e
--- /dev/null
+++ b/factory_setup/factory_update_server_unittest.py
@@ -0,0 +1,62 @@
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+'''Tests for Factory Update Server.'''
+
+import factory_update_server
+import os
+import shutil
+import sys
+import tempfile
+import time
+import unittest
+
+
+class FactoryUpdateServerTest(unittest.TestCase):
+
+  def setUp(self):
+    self.base_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
+    self.work_dir = tempfile.mkdtemp(prefix='dts')
+    self.update_server = factory_update_server.FactoryUpdateServer(
+        self.work_dir)
+    self.update_server.start()
+
+  def tearDown(self):
+    self.update_server.stop()
+    shutil.rmtree(self.work_dir)
+
+  def testUpdateDirCreated(self):
+    self.assertTrue(os.path.isdir(os.path.join(self.work_dir, 'autotest')))
+
+  def testRsyncServerStarted(self):
+    self.assertTrue(self.update_server._rsyncd.poll() is None)
+
+  def testMd5sumCalculation(self):
+    md5sum = factory_update_server.CalculateMd5sum(
+        os.path.join(self.base_dir, 'testdata/shopfloor/autotest.tar.bz2'))
+    self.assertEqual(md5sum, '36a7e683170c4bf06982746a2de9cbee')
+
+  def testUpdateFilesSetup(self):
+    # No latest.md5sum file at the beginning.
+    md5file = os.path.join(self.work_dir, 'autotest/latest.md5sum')
+    self.assertFalse(os.path.exists(md5file))
+
+    # Put autotest.tar.bz2 into the working folder.
+    tarball = os.path.join(self.base_dir, 'testdata/shopfloor/autotest.tar.bz2')
+    shutil.copy(tarball, self.work_dir)
+    # Wait a little while for update files setup.
+    time.sleep(1)
+
+    # Check that latest.md5sum is created with correct value and update files
+    # extracted.
+    self.assertTrue(os.path.isfile(md5file))
+    with open(md5file, 'r') as f:
+      md5sum = f.readline().strip()
+    self.assertEqual(md5sum, '36a7e683170c4bf06982746a2de9cbee')
+    self.assertTrue(os.path.isdir(os.path.join(
+        self.work_dir, 'autotest/36a7e683170c4bf06982746a2de9cbee')))
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/factory_setup/shopfloor_server.py b/factory_setup/shopfloor_server.py
index e39dc64..2c8f4f5 100755
--- a/factory_setup/shopfloor_server.py
+++ b/factory_setup/shopfloor_server.py
@@ -21,6 +21,8 @@
 import shopfloor
 import SimpleXMLRPCServer
 
+from factory_update_server import FactoryUpdateServer
+
 
 _DEFAULT_SERVER_PORT = 8082
 # By default, this server is supposed to serve on same host running omaha
@@ -119,8 +121,15 @@
     logging.exception('Failed loading module: %s', options.module)
     exit(1)
 
-  logging.debug('Starting server...')
-  _RunAsServer(address=options.address, port=options.port, instance=instance)
+  update_server = FactoryUpdateServer(options.testdir)
+  try:
+    logging.debug('Starting factory update server...')
+    update_server.start()
+
+    logging.debug('Starting RPC server...')
+    _RunAsServer(address=options.address, port=options.port, instance=instance)
+  finally:
+    update_server.stop()
 
 
 if __name__ == '__main__':
diff --git a/factory_setup/shopfloor_unittest.py b/factory_setup/shopfloor_unittest.py
index 57635bc..a2841a7 100755
--- a/factory_setup/shopfloor_unittest.py
+++ b/factory_setup/shopfloor_unittest.py
@@ -49,6 +49,7 @@
   def tearDown(self):
     '''Terminates shop floor server'''
     self.process.terminate()
+    self.process.wait()
     shutil.rmtree(self.work_dir)
 
   def testGetHWID(self):
@@ -138,5 +139,6 @@
   def testGetTestMd5sumWithoutMd5sumFile(self):
     self.assertTrue(self.proxy.GetTestMd5sum() is None)
 
+
 if __name__ == '__main__':
   unittest.main()
diff --git a/factory_setup/testdata/shopfloor/autotest.tar.bz2 b/factory_setup/testdata/shopfloor/autotest.tar.bz2
new file mode 100644
index 0000000..041cd3a
--- /dev/null
+++ b/factory_setup/testdata/shopfloor/autotest.tar.bz2
Binary files differ