blob: 60f526ead8d94a5ac637df36f900c81338e80d0d [file] [log] [blame]
# Copyright 2016 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Implements a client library for reading and writing LUCI_CONTEXT compatible
files.
Due to arcane details of the UNIX process model and environment variables, this
library is unfortunately NOT threadsafe; there's no way to have multiple
LUCI_CONTEXTS live in a process safely at the same time. As such, this library
will raise an exception if any attempt is made to use it improperly (for example
by having multiple threads call 'write' at the same time).
See ../LUCI_CONTEXT.md for details on the LUCI_CONTEXT concept/protocol."""
import contextlib
import copy
import json
import logging
import os
import sys
import tempfile
import threading
import six
_LOGGER = logging.getLogger(__name__)
# ENV_KEY is the environment variable that we look for to find out where the
# LUCI context file is.
ENV_KEY = 'LUCI_CONTEXT'
# _CUR_CONTEXT contains the cached LUCI Context that is currently available to
# read. A value of None indicates that the value has not yet been populated.
_CUR_CONTEXT = None
_CUR_CONTEXT_LOCK = threading.Lock()
# Write lock is a recursive mutex which is taken when using the write() method.
# This allows the same thread to
_WRITE_LOCK = threading.RLock()
@contextlib.contextmanager
def _tf(data, data_raw=False, leak=False, workdir=None):
tf = tempfile.NamedTemporaryFile(
mode='w', prefix='luci_ctx.', suffix='.json', delete=False, dir=workdir)
_LOGGER.debug('Writing LUCI_CONTEXT file %r', tf.name)
try:
if not data_raw:
json.dump(_to_encodable(data), tf)
else:
# for testing, allows malformed json
tf.write(data)
tf.close() # close it so that winders subprocesses can read it.
yield tf.name
finally:
if not leak:
try:
os.unlink(tf.name)
except OSError as ex:
_LOGGER.error('Failed to delete written LUCI_CONTEXT file %r: %s',
tf.name, ex)
def _to_utf8(obj):
if isinstance(obj, dict):
return {_to_utf8(key): _to_utf8(value) for key, value in obj.items()}
if isinstance(obj, list):
return [_to_utf8(item) for item in obj]
if six.PY2 and isinstance(obj, six.text_type):
return obj.encode('utf-8')
return obj
def _to_encodable(obj):
if isinstance(obj, dict):
return {
_to_encodable(key): _to_encodable(value) for key, value in obj.items()
}
if isinstance(obj, list):
return [_to_encodable(item) for item in obj]
if isinstance(obj, six.binary_type):
return obj.decode('utf-8')
return obj
class MultipleLUCIContextException(Exception):
def __init__(self):
super(MultipleLUCIContextException, self).__init__(
'Attempted to write LUCI_CONTEXT in multiple threads')
def _check_ok(data):
if not isinstance(data, dict):
_LOGGER.error(
'LUCI_CONTEXT does not contain a dict: %s', type(data).__name__)
return False
bad = False
for k, v in data.items():
if not isinstance(v, dict):
bad = True
_LOGGER.error(
'LUCI_CONTEXT[%r] is not a dict: %s', k, type(v).__name__)
return not bad
# this is a separate function from _read_full for testing purposes.
def _initial_load():
global _CUR_CONTEXT
to_assign = {}
ctx_path = os.environ.get(ENV_KEY)
if ctx_path:
if six.PY2:
ctx_path = ctx_path.decode(sys.getfilesystemencoding())
_LOGGER.debug('Loading LUCI_CONTEXT: %r', ctx_path)
try:
with open(ctx_path, 'r') as f:
loaded = _to_utf8(json.load(f))
if _check_ok(loaded):
to_assign = loaded
except OSError as ex:
_LOGGER.error('LUCI_CONTEXT failed to open: %s', ex)
except IOError as ex:
_LOGGER.error('LUCI_CONTEXT failed to read: %s', ex)
except ValueError as ex:
_LOGGER.error('LUCI_CONTEXT failed to decode: %s', ex)
_CUR_CONTEXT = to_assign
def _read_full():
# double-check because I'm a hopeless diehard.
if _CUR_CONTEXT is None:
with _CUR_CONTEXT_LOCK:
if _CUR_CONTEXT is None:
_initial_load()
return _CUR_CONTEXT
def _mutate(section_values):
new_val = read_full()
changed = False
for section, value in six.iteritems(section_values):
if value is None:
if new_val.pop(section, None) is not None:
changed = True
elif isinstance(value, dict):
if new_val.get(section, None) != value:
changed = True
new_val[section] = value
else:
raise ValueError(
'Bad type for LUCI_CONTEXT[%r]: %s', section, type(value).__name__)
return new_val, changed
def read_full():
"""Returns a copy of the entire current contents of the LUCI_CONTEXT as
a dict.
"""
return copy.deepcopy(_read_full())
def read(section_key):
"""Reads from the given section key. Returns the data in the section or None
if the data doesn't exist.
Args:
section_key (str) - The top-level key to read from the LUCI_CONTEXT.
Returns:
A copy of the requested section data (as a dict), or None if the section was
not present.
Example:
Given a LUCI_CONTEXT of:
{
"swarming": {
"secret_bytes": <bytes>
},
"other_service": {
"nested": {
"key": "something"
}
}
}
read('swarming') -> {'secret_bytes': <bytes>}
read('doesnt_exist') -> None
"""
return copy.deepcopy(_read_full().get(section_key, None))
@contextlib.contextmanager
def write(_leak=False, _tmpdir=None, **section_values):
"""Write is a contextmanager which will write all of the provided section
details to a new context, copying over the values from any unmentioned
sections. The new context file will be set in os.environ. When the
contextmanager exits, it will attempt to delete the context file.
Since each call to write produces a new context file on disk, it's beneficial
to group edits together into a single call to write when possible.
Calls to read*() within the context of a call to write will read from the
written value. This written value is stored on a per-thread basis.
NOTE: Because environment variables are per-process and not per-thread, it is
an error to call write() from multiple threads simultaneously. If this is
done, this function raises an exception.
Args:
_leak (bool) - If true, the new LUCI_CONTEXT file won't be deleted after
contextmanager exits.
_tmpdir (str) - an optional directory to use for the newly written
LUCI_CONTEXT file.
section_values (str -> value) - A mapping of section_key to the new value
for that section. A value of None will remove that section. Non-None
values must be of the type 'dict', and must be json serializable.
Raises:
MultipleLUCIContextException if called from multiple threads
simultaneously.
Example:
Given a LUCI_CONTEXT of:
{
"swarming": {
"secret_bytes": <bytes>
},
"other_service": {
...
}
}
with write(swarming=None): ... # deletes 'swarming'
with write(something={...}): ... # sets 'something' section to {...}
"""
new_val, changed = _mutate(section_values)
# If new context remain unchanged, just pass-through
if not changed:
yield
return
global _CUR_CONTEXT
got_lock = _WRITE_LOCK.acquire(blocking=False)
if not got_lock:
raise MultipleLUCIContextException()
try:
with _tf(new_val, leak=_leak, workdir=_tmpdir) as name:
try:
old_value = _CUR_CONTEXT
old_envvar = os.environ.get(ENV_KEY, None)
if six.PY2:
os.environ[ENV_KEY] = name.encode(sys.getfilesystemencoding())
else:
os.environ[ENV_KEY] = name
_CUR_CONTEXT = new_val
yield
finally:
_CUR_CONTEXT = old_value
if old_envvar is None:
del os.environ[ENV_KEY]
else:
os.environ[ENV_KEY] = old_envvar
finally:
_WRITE_LOCK.release()
@contextlib.contextmanager
def stage(_leak=False, _tmpdir=None, **section_values):
"""Prepares and writes new LUCI_CONTEXT file, but doesn't replace the env var.
This is useful when launching new process asynchronously in new LUCI_CONTEXT
environment. In this case, modifying the environment of the current process
(like 'write' does) may be harmful.
Calls the body with a path to the new LUCI_CONTEXT file or None if no changes
have been made (either 'section_values' is empty or has the exact same values
as the current context) and the existing path should be reused (can be
accessed via `os.environ[luci_context.ENV_KEY]`).
"""
new_val, changed = _mutate(section_values)
if not changed and ENV_KEY in os.environ:
yield None
return
with _tf(new_val, leak=_leak, workdir=_tmpdir) as name:
yield name