# Lint as: python2, python3
# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Facade to access the display-related functionality."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import multiprocessing
import numpy
import os
import re
import shutil
import time
import json
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import utils as common_utils
from autotest_lib.client.common_lib.cros import retry
from autotest_lib.client.cros import constants
from autotest_lib.client.cros.graphics import graphics_utils
from autotest_lib.client.cros.multimedia import facade_resource
from autotest_lib.client.cros.multimedia import image_generator
from autotest_lib.client.cros.power import sys_power
from six.moves import range
from telemetry.internal.browser import web_contents

class TimeoutException(Exception):
    """Timeout Exception class."""
    pass


_FLAKY_CALL_RETRY_TIMEOUT_SEC = 60
_FLAKY_DISPLAY_CALL_RETRY_DELAY_SEC = 2

_retry_display_call = retry.retry(
        (KeyError, error.CmdError),
        timeout_min=_FLAKY_CALL_RETRY_TIMEOUT_SEC / 60.0,
        delay_sec=_FLAKY_DISPLAY_CALL_RETRY_DELAY_SEC)


class DisplayFacadeNative(object):
    """Facade to access the display-related functionality.

    The methods inside this class only accept Python native types.
    """

    CALIBRATION_IMAGE_PATH = '/tmp/calibration.png'
    MINIMUM_REFRESH_RATE_EXPECTED = 25.0
    DELAY_TIME = 3
    MAX_TYPEC_PORT = 6

    def __init__(self, resource):
        """Initializes a DisplayFacadeNative.

        @param resource: A FacadeResource object.
        """
        self._resource = resource
        self._image_generator = image_generator.ImageGenerator()


    @facade_resource.retry_chrome_call
    def get_display_info(self):
        """Gets the display info from Chrome.system.display API.

        @return array of dict for display info.
        """
        extension = self._resource.get_extension(
                constants.DISPLAY_TEST_EXTENSION)
        extension.ExecuteJavaScript('window.__display_info = null;')
        extension.ExecuteJavaScript(
                "chrome.system.display.getInfo(function(info) {"
                "window.__display_info = info;})")
        utils.wait_for_value(lambda: (
                extension.EvaluateJavaScript("window.__display_info") != None),
                expected_value=True)
        return extension.EvaluateJavaScript("window.__display_info")


    @facade_resource.retry_chrome_call
    def get_window_info(self):
        """Gets the current window info from Chrome.system.window API.

        @return a dict for the information of the current window.
        """
        extension = self._resource.get_extension()
        extension.ExecuteJavaScript('window.__window_info = null;')
        extension.ExecuteJavaScript(
                "chrome.windows.getCurrent(function(info) {"
                "window.__window_info = info;})")
        utils.wait_for_value(lambda: (
                extension.EvaluateJavaScript("window.__window_info") != None),
                expected_value=True)
        return extension.EvaluateJavaScript("window.__window_info")


    @facade_resource.retry_chrome_call
    def create_window(self, url='chrome://newtab'):
        """Creates a new window from chrome.windows.create API.

        @param url: Optional URL for the new window.

        @return Identifier for the new window.

        @raise TimeoutException if it fails.
        """
        extension = self._resource.get_extension()

        extension.ExecuteJavaScript(
                """
                var __new_window_id = null;
                chrome.windows.create(
                        {url: '%s'},
                        function(win) {
                            __new_window_id = win.id});
                """ % (url)
        )
        extension.WaitForJavaScriptCondition(
                "__new_window_id !== null",
                timeout=web_contents.DEFAULT_WEB_CONTENTS_TIMEOUT)

        return extension.EvaluateJavaScript("__new_window_id")


    @facade_resource.retry_chrome_call
    def update_window(self, window_id, state=None, bounds=None):
        """Updates an existing window using the chrome.windows.update API.

        @param window_id: Identifier for the window to update.
        @param state: Optional string to set the state such as 'normal',
                      'maximized', or 'fullscreen'.
        @param bounds: Optional dictionary with keys top, left, width, and
                       height to reposition the window.

        @return True if success.

        @raise TimeoutException if it fails.
        """
        extension = self._resource.get_extension()
        params = {}

        if state:
            params['state'] = state
        if bounds:
            params['top'] = bounds['top']
            params['left'] = bounds['left']
            params['width'] = bounds['width']
            params['height'] = bounds['height']

        if not params:
            logging.info('Nothing to update for window_id={}'.format(window_id))
            return True

        extension.ExecuteJavaScript(
                """
                var __status = 'Running';
                chrome.windows.update(%d, %s,
                        function(win) {
                            __status = 'Done'});
                """ % (window_id, json.dumps(params))
        )
        extension.WaitForJavaScriptCondition(
                "__status == 'Done'",
                timeout=web_contents.DEFAULT_WEB_CONTENTS_TIMEOUT)

        return True


    def _get_display_by_id(self, display_id):
        """Gets a display by ID.

        @param display_id: id of the display.

        @return: A dict of various display info.
        """
        for display in self.get_display_info():
            if display['id'] == display_id:
                return display
        raise RuntimeError('Cannot find display ' + display_id)


    def get_display_modes(self, display_id):
        """Gets all the display modes for the specified display.

        @param display_id: id of the display to get modes from.

        @return: A list of DisplayMode dicts.
        """
        display = self._get_display_by_id(display_id)
        return display['modes']


    def get_display_rotation(self, display_id):
        """Gets the display rotation for the specified display.

        @param display_id: id of the display to get modes from.

        @return: Degree of rotation.
        """
        display = self._get_display_by_id(display_id)
        return display['rotation']


    def get_display_notifications(self):
        """Gets the display notifications

        @return: Returns a list of display related notifications only.
        """
        display_notifications = []
        for notification in self._resource.get_visible_notifications():
            if notification['id'] == 'chrome://settings/display':
                display_notifications.append(notification)
        return display_notifications


    def set_display_rotation(self, display_id, rotation,
                             delay_before_rotation=0, delay_after_rotation=0):
        """Sets the display rotation for the specified display.

        @param display_id: id of the display to get modes from.
        @param rotation: degree of rotation
        @param delay_before_rotation: time in second for delay before rotation
        @param delay_after_rotation: time in second for delay after rotation
        """
        time.sleep(delay_before_rotation)
        extension = self._resource.get_extension(
                constants.DISPLAY_TEST_EXTENSION)
        extension.ExecuteJavaScript(
                """
                window.__set_display_rotation_has_error = null;
                chrome.system.display.setDisplayProperties('%(id)s',
                    {"rotation": %(rotation)d}, () => {
                    if (chrome.runtime.lastError) {
                        console.error('Failed to set display rotation',
                            chrome.runtime.lastError);
                        window.__set_display_rotation_has_error = "failure";
                    } else {
                        window.__set_display_rotation_has_error = "success";
                    }
                });
                """
                % {'id': display_id, 'rotation': rotation}
        )
        utils.wait_for_value(lambda: (
                extension.EvaluateJavaScript(
                    'window.__set_display_rotation_has_error') != None),
                expected_value=True)
        time.sleep(delay_after_rotation)
        result = extension.EvaluateJavaScript(
                'window.__set_display_rotation_has_error')
        if result != 'success':
            raise RuntimeError('Failed to set display rotation: %r' % result)


    def get_available_resolutions(self, display_id):
        """Gets the resolutions from the specified display.

        @return a list of (width, height) tuples.
        """
        display = self._get_display_by_id(display_id)
        modes = display['modes']
        if 'widthInNativePixels' not in modes[0]:
            raise RuntimeError('Cannot find widthInNativePixels attribute')
        if display['isInternal']:
            logging.info("Getting resolutions of internal display")
            return list(set([(mode['width'], mode['height']) for mode in
                             modes]))
        return list(set([(mode['widthInNativePixels'],
                          mode['heightInNativePixels']) for mode in modes]))


    def has_internal_display(self):
        """Returns whether the device has an internal display.

        @return whether the device has an internal display
        """
        return len([d for d in self.get_display_info() if d['isInternal']]) > 0


    def get_internal_display_id(self):
        """Gets the internal display id.

        @return the id of the internal display.
        """
        for display in self.get_display_info():
            if display['isInternal']:
                return display['id']
        raise RuntimeError('Cannot find internal display')


    def get_first_external_display_id(self):
        """Gets the first external display id.

        @return the id of the first external display; -1 if not found.
        """
        # Get the first external and enabled display
        for display in self.get_display_info():
            if display['isEnabled'] and not display['isInternal']:
                return display['id']
        return -1


    def set_resolution(self, display_id, width, height, timeout=3):
        """Sets the resolution of the specified display.

        @param display_id: id of the display to set resolution for.
        @param width: width of the resolution
        @param height: height of the resolution
        @param timeout: maximal time in seconds waiting for the new resolution
                to settle in.
        @raise TimeoutException when the operation is timed out.
        """

        extension = self._resource.get_extension(
                constants.DISPLAY_TEST_EXTENSION)
        extension.ExecuteJavaScript(
                """
                window.__set_resolution_progress = null;
                chrome.system.display.getInfo((info_array) => {
                    var mode;
                    for (var info of info_array) {
                        if (info['id'] == '%(id)s') {
                            for (var m of info['modes']) {
                                if (m['width'] == %(width)d &&
                                    m['height'] == %(height)d) {
                                    mode = m;
                                    break;
                                }
                            }
                            break;
                        }
                    }
                    if (mode === undefined) {
                        console.error('Failed to select the resolution ' +
                            '%(width)dx%(height)d');
                        window.__set_resolution_progress = "mode not found";
                        return;
                    }

                    chrome.system.display.setDisplayProperties('%(id)s',
                        {'displayMode': mode}, () => {
                            if (chrome.runtime.lastError) {
                                window.__set_resolution_progress = "failed: " +
                                    chrome.runtime.lastError.message;
                            } else {
                                window.__set_resolution_progress = "succeeded";
                            }
                        }
                    );
                });
                """
                % {'id': display_id, 'width': width, 'height': height}
        )
        utils.wait_for_value(lambda: (
                extension.EvaluateJavaScript(
                    'window.__set_resolution_progress') != None),
                expected_value=True)
        result = extension.EvaluateJavaScript(
                'window.__set_resolution_progress')
        if result != 'succeeded':
            raise RuntimeError('Failed to set resolution: %r' % result)


    @_retry_display_call
    def get_external_resolution(self):
        """Gets the resolution of the external screen.

        @return The resolution tuple (width, height)
        """
        return graphics_utils.get_external_resolution()

    def get_internal_resolution(self):
        """Gets the resolution of the internal screen.

        @return The resolution tuple (width, height) or None if internal screen
                is not available
        """
        for display in self.get_display_info():
            if display['isInternal']:
                bounds = display['bounds']
                return (bounds['width'], bounds['height'])
        return None


    def set_content_protection(self, state):
        """Sets the content protection of the external screen.

        @param state: One of the states 'Undesired', 'Desired', or 'Enabled'
        """
        connector = self.get_external_connector_name()
        graphics_utils.set_content_protection(connector, state)


    def get_content_protection(self):
        """Gets the state of the content protection.

        @param output: The output name as a string.
        @return: A string of the state, like 'Undesired', 'Desired', or 'Enabled'.
                 False if not supported.
        """
        connector = self.get_external_connector_name()
        return graphics_utils.get_content_protection(connector)


    def get_external_crtc_id(self):
        """Gets the external crtc.

        @return The id of the external crtc."""
        return graphics_utils.get_external_crtc_id()


    def get_internal_crtc_id(self):
        """Gets the internal crtc.

        @retrun The id of the internal crtc."""
        return graphics_utils.get_internal_crtc_id()


    def take_internal_screenshot(self, path):
        """Takes internal screenshot.

        @param path: path to image file.
        """
        self.take_screenshot_crtc(path, self.get_internal_crtc_id())


    def take_external_screenshot(self, path):
        """Takes external screenshot.

        @param path: path to image file.
        """
        self.take_screenshot_crtc(path, self.get_external_crtc_id())


    def take_screenshot_crtc(self, path, id):
        """Captures the DUT screenshot, use id for selecting screen.

        @param path: path to image file.
        @param id: The id of the crtc to screenshot.
        """

        graphics_utils.take_screenshot_crop(path, crtc_id=id)
        return True


    def save_calibration_image(self, path):
        """Save the calibration image to the given path.

        @param path: path to image file.
        """
        shutil.copy(self.CALIBRATION_IMAGE_PATH, path)
        return True


    def take_tab_screenshot(self, output_path, url_pattern=None):
        """Takes a screenshot of the tab specified by the given url pattern.

        @param output_path: A path of the output file.
        @param url_pattern: A string of url pattern used to search for tabs.
                            Default is to look for .svg image.
        """
        if url_pattern is None:
            # If no URL pattern is provided, defaults to capture the first
            # tab that shows SVG image.
            url_pattern = '.svg'

        tabs = self._resource.get_tabs()
        for i in range(0, len(tabs)):
            if url_pattern in tabs[i].url:
                data = tabs[i].Screenshot(timeout=5)
                # Flip the colors from BGR to RGB.
                data = numpy.fliplr(data.reshape(-1, 3)).reshape(data.shape)
                data.tofile(output_path)
                break
        return True


    def toggle_mirrored(self):
        """Toggles mirrored."""
        graphics_utils.screen_toggle_mirrored()
        return True


    def hide_cursor(self):
        """Hides mouse cursor."""
        graphics_utils.hide_cursor()
        return True


    def hide_typing_cursor(self):
        """Hides typing cursor."""
        graphics_utils.hide_typing_cursor()
        return True


    def is_mirrored_enabled(self):
        """Checks the mirrored state.

        @return True if mirrored mode is enabled.
        """
        return bool(self.get_display_info()[0]['mirroringSourceId'])


    def set_mirrored(self, is_mirrored):
        """Sets mirrored mode.

        @param is_mirrored: True or False to indicate mirrored state.
        @return True if success, False otherwise.
        """
        if self.is_mirrored_enabled() == is_mirrored:
            return True

        retries = 4
        while retries > 0:
            self.toggle_mirrored()
            result = utils.wait_for_value(self.is_mirrored_enabled,
                                          expected_value=is_mirrored,
                                          timeout_sec=3)
            if result == is_mirrored:
                return True
            retries -= 1
        return False


    def is_display_primary(self, internal=True):
        """Checks if internal screen is primary display.

        @param internal: is internal/external screen primary status requested
        @return boolean True if internal display is primary.
        """
        for info in self.get_display_info():
            if info['isInternal'] == internal and info['isPrimary']:
                return True
        return False


    def suspend_resume(self, suspend_time=10):
        """Suspends the DUT for a given time in second.

        @param suspend_time: Suspend time in second.
        """
        sys_power.do_suspend(suspend_time)
        return True


    def suspend_resume_bg(self, suspend_time=10):
        """Suspends the DUT for a given time in second in the background.

        @param suspend_time: Suspend time in second.
        """
        process = multiprocessing.Process(target=self.suspend_resume,
                                          args=(suspend_time,))
        process.start()
        return True


    @_retry_display_call
    def get_external_connector_name(self):
        """Gets the name of the external output connector.

        @return The external output connector name as a string, if any.
                Otherwise, return False.
        """
        return graphics_utils.get_external_connector_name()


    def get_internal_connector_name(self):
        """Gets the name of the internal output connector.

        @return The internal output connector name as a string, if any.
                Otherwise, return False.
        """
        return graphics_utils.get_internal_connector_name()


    def wait_external_display_connected(self, display):
        """Waits for the specified external display to be connected.

        @param display: The display name as a string, like 'HDMI1', or
                        False if no external display is expected.
        @return: True if display is connected; False otherwise.
        """
        result = utils.wait_for_value(self.get_external_connector_name,
                                      expected_value=display)
        return result == display


    @facade_resource.retry_chrome_call
    def move_to_display(self, display_id):
        """Moves the current window to the indicated display.

        @param display_id: The id of the indicated display.
        @return True if success.

        @raise TimeoutException if it fails.
        """
        display_info = self._get_display_by_id(display_id)
        if not display_info['isEnabled']:
            raise RuntimeError('Cannot find the indicated display')
        target_bounds = display_info['bounds']

        extension = self._resource.get_extension()
        # If the area of bounds is empty (here we achieve this by setting
        # width and height to zero), the window_sizer will automatically
        # determine an area which is visible and fits on the screen.
        # For more details, see chrome/browser/ui/window_sizer.cc
        # Without setting state to 'normal', if the current state is
        # 'minimized', 'maximized' or 'fullscreen', the setting of
        # 'left', 'top', 'width' and 'height' will be ignored.
        # For more details, see chrome/browser/extensions/api/tabs/tabs_api.cc
        extension.ExecuteJavaScript(
                """
                var __status = 'Running';
                chrome.windows.update(
                        chrome.windows.WINDOW_ID_CURRENT,
                        {left: %d, top: %d, width: 0, height: 0,
                         state: 'normal'},
                        function(info) {
                            if (info.left == %d && info.top == %d &&
                                info.state == 'normal')
                                __status = 'Done'; });
                """
                % (target_bounds['left'], target_bounds['top'],
                   target_bounds['left'], target_bounds['top'])
        )
        extension.WaitForJavaScriptCondition(
                "__status == 'Done'",
                timeout=web_contents.DEFAULT_WEB_CONTENTS_TIMEOUT)
        return True


    def is_fullscreen_enabled(self):
        """Checks the fullscreen state.

        @return True if fullscreen mode is enabled.
        """
        return self.get_window_info()['state'] == 'fullscreen'


    def set_fullscreen(self, is_fullscreen):
        """Sets the current window to full screen.

        @param is_fullscreen: True or False to indicate fullscreen state.
        @return True if success, False otherwise.
        """
        extension = self._resource.get_extension()
        if not extension:
            raise RuntimeError('Autotest extension not found')

        if is_fullscreen:
            window_state = "fullscreen"
        else:
            window_state = "normal"
        extension.ExecuteJavaScript(
                """
                var __status = 'Running';
                chrome.windows.update(
                        chrome.windows.WINDOW_ID_CURRENT,
                        {state: '%s'},
                        function() { __status = 'Done'; });
                """
                % window_state)
        utils.wait_for_value(lambda: (
                extension.EvaluateJavaScript('__status') == 'Done'),
                expected_value=True)
        return self.is_fullscreen_enabled() == is_fullscreen


    def load_url(self, url):
        """Loads the given url in a new tab. The new tab will be active.

        @param url: The url to load as a string.
        @return a str, the tab descriptor of the opened tab.
        """
        return self._resource.load_url(url)


    def load_calibration_image(self, resolution):
        """Opens a new tab and loads a full screen calibration
           image from the HTTP server.

        @param resolution: A tuple (width, height) of resolution.
        @return a str, the tab descriptor of the opened tab.
        """
        path = self.CALIBRATION_IMAGE_PATH
        self._image_generator.generate_image(resolution[0], resolution[1], path)
        os.chmod(path, 0o644)
        tab_descriptor = self.load_url('file://%s' % path)
        return tab_descriptor


    def load_color_sequence(self, tab_descriptor, color_sequence):
        """Displays a series of colors on full screen on the tab.
        tab_descriptor is returned by any open tab API of display facade.
        e.g.,
        tab_descriptor = load_url('about:blank')
        load_color_sequence(tab_descriptor, color)

        @param tab_descriptor: Indicate which tab to test.
        @param color_sequence: An integer list for switching colors.
        @return A list of the timestamp for each switch.
        """
        tab = self._resource.get_tab_by_descriptor(tab_descriptor)
        color_sequence_for_java_script = (
                'var color_sequence = [' +
                ','.join("'#%06X'" % x for x in color_sequence) +
                '];')
        # Paints are synchronized to the fresh rate of the screen by
        # window.requestAnimationFrame.
        tab.ExecuteJavaScript(color_sequence_for_java_script + """
            function render(timestamp) {
                window.timestamp_list.push(timestamp);
                if (window.count < color_sequence.length) {
                    document.body.style.backgroundColor =
                            color_sequence[count];
                    window.count++;
                    window.requestAnimationFrame(render);
                }
            }
            window.count = 0;
            window.timestamp_list = [];
            window.requestAnimationFrame(render);
            """)

        # Waiting time is decided by following concerns:
        # 1. MINIMUM_REFRESH_RATE_EXPECTED: the minimum refresh rate
        #    we expect it to be. Real refresh rate is related to
        #    not only hardware devices but also drivers and browsers.
        #    Most graphics devices support at least 60fps for a single
        #    monitor, and under mirror mode, since the both frames
        #    buffers need to be updated for an input frame, the refresh
        #    rate will decrease by half, so here we set it to be a
        #    little less than 30 (= 60/2) to make it more tolerant.
        # 2. DELAY_TIME: extra wait time for timeout.
        tab.WaitForJavaScriptCondition(
                'window.count == color_sequence.length',
                timeout=(
                    (len(color_sequence) / self.MINIMUM_REFRESH_RATE_EXPECTED)
                    + self.DELAY_TIME))
        return tab.EvaluateJavaScript("window.timestamp_list")


    def close_tab(self, tab_descriptor):
        """Disables fullscreen and closes the tab of the given tab descriptor.
        tab_descriptor is returned by any open tab API of display facade.
        e.g.,
        1.
        tab_descriptor = load_url(url)
        close_tab(tab_descriptor)

        2.
        tab_descriptor = load_calibration_image(resolution)
        close_tab(tab_descriptor)

        @param tab_descriptor: Indicate which tab to be closed.
        """
        if tab_descriptor:
            # set_fullscreen(False) is necessary here because currently there
            # is a bug in tabs.Close(). If the current state is fullscreen and
            # we call close_tab() without setting state back to normal, it will
            # cancel fullscreen mode without changing system configuration, and
            # so that the next time someone calls set_fullscreen(True), the
            # function will find that current state is already 'fullscreen'
            # (though it is not) and do nothing, which will break all the
            # following tests.
            self.set_fullscreen(False)
            self._resource.close_tab(tab_descriptor)
        else:
            logging.error('close_tab: not a valid tab_descriptor')

        return True


    def reset_connector_if_applicable(self, connector_type):
        """Resets Type-C video connector from host end if applicable.

        It's the workaround sequence since sometimes Type-C dongle becomes
        corrupted and needs to be re-plugged.

        @param connector_type: A string, like "VGA", "DVI", "HDMI", or "DP".
        """
        if connector_type != 'HDMI' and connector_type != 'DP':
            return
        # Decide if we need to add --name=cros_pd
        usbpd_command = 'ectool --name=cros_pd usbpd'
        try:
            common_utils.run('%s 0' % usbpd_command)
        except error.CmdError:
            usbpd_command = 'ectool usbpd'

        port = 0
        while port < self.MAX_TYPEC_PORT:
            # We use usbpd to get Role information and then power cycle the
            # SRC one.
            command = '%s %d' % (usbpd_command, port)
            try:
                output = common_utils.run(command).stdout
                if re.compile('Role.*SRC').search(output):
                    logging.info('power-cycle Type-C port %d', port)
                    common_utils.run('%s sink' % command)
                    common_utils.run('%s auto' % command)
                port += 1
            except error.CmdError:
                break
