blob: bff20b369ff046b2f359cc9be2c28fe4e56cf8c6 [file] [log] [blame]
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Platform-specific utilities."""
from __future__ import print_function
import os
import platform
import time
import six
# Cache to speed up.
_CURRENT_PLATFORM_SYSTEM = platform.system()
# Constants for platform.system(). Note the 'Default' is locally defined for
# internal usage.
# Conditional imports. For syntax checking and cross-compiling, we want to
# ignore conditional import errors.
import fcntl
except ImportError:
# A dictionary to hold declarations from @Provider.
def Provider(api_name, systems):
"""Decorator to provide an API on given platform systems.
api_name: A string for API name.
systems: A list of supported platform systems.
assert not isinstance(systems, six.string_types), "systems must be list."
if api_name not in _PROVIDER_MAP:
_PROVIDER_MAP[api_name] = {}
def ProviderDecorator(func):
_PROVIDER_MAP[api_name].update(dict(((name, func) for name in systems)))
return func
return ProviderDecorator
def GetProvider(api_name, system=None):
"""Finds right provider for given system by API name.
api_name: A string for API name.
system: A string for system name, as defined in platform.system().
The function that implements target API on given system.
NotImplementedError if the given system has no implementation for API.
systems = _PROVIDER_MAP.get(api_name, {})
if system is None:
func = systems.get(system, systems.get(_SYSTEM_DEFAULT, None))
if func is None:
raise NotImplementedError('No implementation on %s for <%s>' % (
system, api_name))
return func
@Provider('MonotonicTime', [_SYSTEM_WINDOWS])
def WindowsMonotonicTime():
# TODO(kitching): Write a MonotonicTime for Windows. See notes written here:
# Fall back to time.time on Windows systems.
return time.time()
_clock_gettime = None
@Provider('MonotonicTime', [_SYSTEM_DEFAULT])
def UnixMonotonicTime():
"""Gets the raw monotonic time.
This function opens with ctypes and call:
int clock_gettime(clockid_t clk_id, struct timespec *tp);
to get raw monotonic time.
The system monotonic time in seconds.
global _clock_gettime
if _clock_gettime:
return _clock_gettime()
# ctypes and ctypes.utils may be not availalbe, especially on Android which
# does not have librt so we have to do delay-loading here.
import ctypes
import ctypes.util
class TimeSpec(ctypes.Structure):
"""A representation of struct timespec in C."""
_fields_ = [
('tv_sec', ctypes.c_long),
('tv_nsec', ctypes.c_long),
librt_name = ctypes.util.find_library('rt')
librt = ctypes.cdll.LoadLibrary(librt_name)
clock_gettime = librt.clock_gettime
clock_gettime.argtypes = [ctypes.c_int, ctypes.POINTER(TimeSpec)]
t = TimeSpec()
def rt_clock_gettime():
if clock_gettime(CLOCK_MONOTONIC_RAW, ctypes.pointer(t)) != 0:
errno = ctypes.get_errno()
raise OSError(errno, os.strerror(errno))
return t.tv_sec + 1e-9 * t.tv_nsec
_clock_gettime = rt_clock_gettime
# Either ctypes or librt failed. Try to provide system time if possible.
_clock_gettime = time.time
return _clock_gettime()
@Provider('FileLock', [_SYSTEM_DEFAULT])
def UnixFileLock(fd, do_lock=True, is_exclusive=True, is_blocking=True):
if do_lock:
fcntl.flock(fd, ((fcntl.LOCK_EX if is_exclusive else 0) |
(0 if is_blocking else fcntl.LOCK_NB)))
fcntl.flock(fd, fcntl.LOCK_UN)
@Provider('FileLock', [_SYSTEM_WINDOWS])
def WindowsFileLock(fd, do_lock=True, is_exclusive=True, is_blocking=True):
# TODO(hungte) Implement file locking on Windows.