blob: 40041033a9d06dc184f7acf42802b3db807aabb8 [file] [log] [blame]
# Copyright (c) 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import contextlib
import logging
import threading
from infra.services.service_manager import service
from infra_libs import ts_mon
LOGGER = logging.getLogger(__name__)
class _State(object):
def __init__(self):
self.should_run = None
self.exit = False
self.new_config = None
def clone(self):
ret = _State()
ret.should_run = self.should_run
ret.exit = self.exit
ret.new_config = self.new_config
return ret
class ServiceThread(threading.Thread):
"""Thread that controls a single Service object.
The methods on this object (start_service(), stop_service(), etc.) can be
called from any thread and are asynchronous - they just instruct the thread to
perform the given action on the Service.
This thread also polls the service occasionally and restarts it if it crashed.
"""
failures = ts_mon.CounterMetric('service_manager/failures',
'Number of times each service unexpectedly exited and was restarted by '
'service manager',
[ts_mon.StringField('service')])
reconfigs = ts_mon.CounterMetric('service_manager/reconfigs',
'Number of times each service was restarted because its configuration '
'changed',
[ts_mon.StringField('service')])
upgrades = ts_mon.CounterMetric('service_manager/upgrades',
'Number of times each service was restarted because its version '
'changed',
[ts_mon.StringField('service')])
def __init__(self, poll_interval, state_directory, service_config,
cloudtail,
wait_condition=None):
"""
Args:
poll_interval: How often (in seconds) to restart failed services.
state_directory: A file will be created in this directory (with the same
name as the service) when it is running containing its PID and
starttime.
service_config: A dictionary containing the service's config. See README
for a description of the fields.
cloudtail: An object that knows how to start cloudtail.
"""
super(ServiceThread, self).__init__()
if wait_condition is None: # pragma: no cover
wait_condition = threading.Condition()
self._poll_interval = poll_interval
self._state_directory = state_directory
self._cloudtail = cloudtail
self._service = service.Service(self._state_directory,
service_config,
self._cloudtail)
self._condition = wait_condition # Protects _state.
self._state = _State() # _condition must be held.
self._state_changed = False
self._started = False # Whether we started the service already.
def _wait(self):
with self._condition:
if not self._state_changed: # pragma: no cover
self._condition.wait(self._poll_interval)
# Clone the state object so we can release the lock.
ret = self._state.clone()
self._state.new_config = None
self._state_changed = False
return ret
@contextlib.contextmanager
def _change_state(self):
with self._condition:
yield
self._state_changed = True
self._condition.notify()
def run(self):
while True:
try:
state = self._wait()
if state.exit:
return
elif state.new_config is not None:
# Stop the service if it's currently running.
self._service.stop()
# Recreate it with the new config and start it.
self.reconfigs.increment(fields={'service': self._service.name})
self._service = service.Service(self._state_directory,
state.new_config,
self._cloudtail)
self._service.start()
self._started = True
elif state.should_run == False:
# Ensure the service is stopped.
self._service.stop()
self._started = False
elif state.should_run == True:
try:
proc_state = self._service.get_running_process_state()
except service.UnexpectedProcessStateError:
self.failures.increment(fields={'service': self._service.name})
logging.exception('Unexpected error getting state for service %s',
self._service.name)
except service.ProcessNotRunning as ex:
if self._started:
# We started it last time but it's not running any more.
self.failures.increment(fields={'service': self._service.name})
LOGGER.warning('Service %s failed (%r), restarting',
self._service.name, ex)
else:
# We're about to start it for the first time.
LOGGER.info('Starting service %s for the first time (%r)',
self._service.name, ex)
else:
if self._service.has_version_changed(proc_state):
self.upgrades.increment(fields={'service': self._service.name})
LOGGER.info('Service %s has a new package version, restarting',
self._service.name)
self._service.stop()
elif self._service.has_cmd_changed(proc_state):
self.reconfigs.increment(fields={'service': self._service.name})
LOGGER.info(
'Service %s has new command: was %s, restarting with %s',
self._service.name, proc_state.cmd, self._service.cmd)
self._service.stop()
# Ensure the service is running.
self._service.start()
self._started = True
except Exception:
LOGGER.exception('Service thread failed for service %s',
self._service.name)
def start_service(self):
with self._change_state():
self._state.should_run = True
def stop_service(self):
with self._change_state():
self._state.should_run = False
def stop(self, join=True):
with self._change_state():
self._state.exit = True
if join: # pragma: no cover
self.join()
def restart_with_new_config(self, new_config):
with self._change_state():
self._state.new_config = new_config
self._state.should_run = True