blob: 8ffba8e2cb3c76a96335677609b6177262e2960b [file] [log] [blame]
# Copyright (c) 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import glob
import json
import logging
import os
import os.path
import sys
import time
from infra.services.service_manager import service
from infra.services.service_manager import service_thread
LOGGER = logging.getLogger(__name__)
def parse_config(content, filename='<unknown>'):
"""Check that the content of a config file is valid.
Args:
content(str): content of config file, as a json string.
Keyword Args:
filename(str): filename from which the content has been read. Used to
display useful messages to the user.
Returns:
config(dict): result of parsing 'content'.
"""
config = json.loads(content)
# Make sure we have all the required keys to help the unwary user.
error_occurred = False
if 'name' not in config:
LOGGER.error("Service name must be specified in the 'name' "
"field. File: %s", filename)
error_occurred = True
if 'cmd' not in config:
LOGGER.error("Command to run must be specified in the 'cmd' "
"field. File: %s", filename)
error_occurred = True
if 'cmd' in config and not isinstance(config['cmd'], list):
LOGGER.error("'cmd' must be a list with executable name and arguments. "
"File: %s", filename)
error_occurred = True
# Make sure all arguments are passed as strings.
if 'cmd' in config:
config['cmd'] = [str(x) for x in config['cmd']]
# 'environment', 'resources', 'stop_time', 'working_directory' are optional.
# We gathered enough errors, bail out.
if error_occurred:
return None
return config
def load_config(filename):
try:
with open(filename) as fh:
config = parse_config(fh.read(), filename=filename)
except Exception:
LOGGER.exception('Error opening or parsing %s', filename)
return None
return config
class _Metadata(object):
def __init__(self, mtime, config=None, thread=None):
self.mtime = mtime
self.config = config
self.thread = thread
class ConfigWatcher(object):
"""Polls a directory for .json files describing services to be run.
Tries to keep the running services in sync with the config files - services
are started immediately when valid configs are added, restarted when their
configs change (adding or removing args for example), and stopped when the
configs are deleted.
"""
def __init__(self, config_directory, config_poll_interval,
service_poll_interval, state_directory, cipd_version_file,
cloudtail, _sleep_fn=time.sleep):
"""
Args:
config_directory(str): Directory containing .json config files to monitor.
config_poll_interval(int): How often (in seconds) to poll config_directory
for changes.
service_poll_interval(int): How often (in seconds) to restart failed
services.
state_directory(str): A file will be created in this directory (with the
same name as the service) when it is running containing its PID and
starttime.
cloudtail (CloudtailFactory): An object that knows how to start cloudtail.
"""
self._config_glob = os.path.join(config_directory, '*.json')
self._config_poll_interval = config_poll_interval
self._service_poll_interval = service_poll_interval
self._state_directory = state_directory
self._cloudtail = cloudtail
self._metadata = {} # Filename -> _Metadata
self._services = {} # Service name -> Filename
self._stop = False
self._sleep_fn = _sleep_fn
self._own_service = service.OwnService(state_directory, cipd_version_file)
def run(self):
"""Runs continuously in this thread until stop() is called."""
if not self._own_service.start(): # pragma: no cover
# Another instance is already running. Exit immediately to prevent the
# ts_mon.close() in BaseApplication from being called.
os._exit(0)
while not self._stop:
self._iteration()
if not self._stop: # pragma: no cover
self._sleep_fn(self._config_poll_interval)
def _iteration(self):
"""Runs one iteration of the loop. Useful for testing."""
own_state = self._own_service.get_running_process_state()
if self._own_service.has_version_changed(own_state):
logging.info("The service_manager's version has changed, exiting")
self.stop()
return
files = set(glob.glob(self._config_glob))
for filename in files:
mtime = os.path.getmtime(filename)
if filename in self._metadata:
metadata = self._metadata[filename]
if mtime == metadata.mtime:
continue
self._config_changed(filename, metadata, mtime)
else:
self._config_added(filename, mtime)
for filename, metadata in self._metadata.iteritems():
if filename not in files and metadata.config is not None:
self._config_removed(metadata)
def stop(self):
"""Signals that run() should stop on its next iteration."""
self._stop = True
for metadata in self._metadata.values():
if metadata.thread is not None:
metadata.thread.stop()
def _config_added(self, filename, mtime):
config = load_config(filename)
if config is None:
# Add a bad metadata entry so we don't call _config_added again every
# time we read it.
self._metadata[filename] = _Metadata(mtime)
return
if config['name'] in self._services:
LOGGER.error('Duplicate service name "%s" (defined in %s and %s)' % (
config['name'], self._services[config['name']], filename))
return
LOGGER.info('Adding new service config for %s', config['name'])
thread = service_thread.ServiceThread(
self._service_poll_interval,
self._state_directory,
config,
self._cloudtail)
thread.start()
thread.start_service()
self._metadata[filename] = _Metadata(mtime, config, thread)
self._services[config['name']] = filename
def _config_changed(self, filename, metadata, new_mtime):
if metadata.config is not None:
del self._services[metadata.config['name']]
metadata.config = load_config(filename)
metadata.mtime = new_mtime
if (metadata.config is not None and
metadata.config['name'] in self._services):
LOGGER.error('Duplicate service name "%s" (defined in %s and %s)' % (
metadata.config['name'],
self._services[metadata.config['name']],
filename))
metadata.config = None
if metadata.config is None:
if metadata.thread is not None:
metadata.thread.stop_service()
return
LOGGER.info('Updating service config for %s', metadata.config['name'])
if metadata.thread is None:
metadata.thread = service_thread.ServiceThread(
self._service_poll_interval,
self._state_directory,
metadata.config,
self._cloudtail)
metadata.thread.start()
metadata.thread.start_service()
else:
metadata.thread.restart_with_new_config(metadata.config)
self._services[metadata.config['name']] = filename
def _config_removed(self, metadata):
LOGGER.info('Removing service config for %s', metadata.config['name'])
del self._services[metadata.config['name']]
metadata.config = None
metadata.mtime = None
metadata.thread.stop_service()