| # -*- mode: python; coding: utf-8 -*- |
| # Copyright 2014 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Prompts the operator to input a string of data.""" |
| |
| from __future__ import print_function |
| import logging |
| import re |
| import socket |
| import time |
| import unittest |
| |
| import factory_common # pylint: disable=unused-import |
| from cros.factory.device import device_utils |
| from cros.factory.test import event as test_event |
| from cros.factory.test import event_log |
| from cros.factory.test import factory |
| from cros.factory.test.fixture import bft_fixture |
| from cros.factory.test import shopfloor |
| from cros.factory.test import test_ui |
| from cros.factory.test import ui_templates |
| from cros.factory.tools import ghost |
| from cros.factory.utils.arg_utils import Arg |
| from cros.factory.utils import debug_utils |
| from cros.factory.utils import process_utils |
| |
| |
| class Scan(unittest.TestCase): |
| """The main class for this pytest.""" |
| ARGS = [ |
| Arg('aux_table_name', str, |
| 'Name of the auxiliary table containing the device', optional=True), |
| Arg('label_en', str, |
| 'Name of the ID or serial number being scanned, e.g., ' |
| '"MLB serial number"'), |
| Arg('label_zh', (str, unicode), |
| 'Chinese name of the ID or serial number being scanned ' |
| '(defaults to the same as the English label)'), |
| Arg('event_log_key', str, 'Key to use for event log', optional=True), |
| Arg('shared_data_key', str, |
| 'Key to use to store in scanned value in shared data', |
| optional=True), |
| Arg('device_data_key', str, |
| 'Key to use to store in scanned value in device data', |
| optional=True), |
| Arg('dut_data_key', str, |
| 'Key to use to store in scanned value in DUT.', |
| optional=True), |
| Arg('ro_vpd_key', str, |
| 'Key to use to store in scanned value in RO VPD', optional=True), |
| Arg('rw_vpd_key', str, |
| 'Key to use to store in scanned value in RW VPD', optional=True), |
| Arg('regexp', str, 'Regexp that the scanned value must match', |
| optional=True), |
| Arg('check_device_data_key', str, |
| 'Checks that the given value in device data matches the scanned ' |
| 'value', |
| optional=True), |
| Arg('bft_scan_fixture_id', bool, 'True to scan BFT fixture ID.', |
| default=False), |
| Arg('bft_scan_barcode', bool, 'True to trigger BFT barcode scanner.', |
| default=False), |
| Arg('bft_save_barcode', bool, |
| 'True to trigger BFT barcode scanner and save in BFT.', |
| default=False), |
| Arg('bft_get_barcode', bool, |
| 'True to get barcode from BFT. BFT stores barcode in advance so this ' |
| 'obtains barcode immidiately.', default=False), |
| Arg('bft_fixture', dict, bft_fixture.TEST_ARG_HELP, default=None, |
| optional=True), |
| Arg('barcode_scan_interval_secs', (int, float), |
| 'Interval for repeatedly trigger BFT\'s barcode scaner', |
| default=2.0), |
| Arg('match_the_last_few_chars', int, |
| 'This is for OP to manually input last few SN chars based on the\n' |
| 'sticker on machine to make sure SN in VPD matches sticker SN.', |
| default=0), |
| Arg('ignore_case', bool, 'True to ignore case from input.', |
| default=False), |
| Arg('value_assigned', str, |
| 'If not None, use the value to fill the key.', optional=True), |
| Arg('reconnect_ghost', bool, |
| 'Reconnect ghost to update machine ID', default=False, optional=True)] |
| |
| def HandleScanValue(self, event): |
| def SetError(label_en, label_zh=None): |
| logging.info('Scan error: %r', label_en) |
| self.ui.SetHTML('<span class="test-error">' + |
| test_ui.MakeLabel(label_en, label_zh) + |
| '</span>', |
| id='scan-status') |
| self.ui.RunJS('$("scan-value").disabled = false;' |
| '$("scan-value").value = ""') |
| self.ui.SetFocus('scan-value') |
| |
| self.ui.RunJS('$("scan-value").disabled = true') |
| scan_value = event.data.strip() |
| if self.args.ignore_case: |
| scan_value = scan_value.upper() |
| esc_scan_value = test_ui.Escape(scan_value) |
| if not scan_value: |
| return SetError('The scanned value is empty.', |
| label_zh=u'掃描編號是空的。') |
| if self.args.regexp: |
| match = re.match(self.args.regexp, scan_value) |
| if not match or match.group(0) != scan_value: |
| return SetError( |
| 'The scanned value "%s" does not match ' |
| 'the expected format.' % esc_scan_value, |
| label_zh=u'所掃描的編號「%s」格式不對。' % esc_scan_value) |
| |
| if self.args.aux_table_name: |
| try: |
| shopfloor.select_aux_data(self.args.aux_table_name, |
| scan_value) |
| except shopfloor.ServerFault: |
| logging.exception('select_aux_data failed') |
| return SetError( |
| 'The scanned value "%s" is not a known %s.' % ( |
| esc_scan_value, self.args.label_en), |
| label_zh=u'所掃描的編號「%s」不是已知的%s。' % ( |
| esc_scan_value, self.args.label_zh)) |
| except socket.error as e: |
| logging.exception('select_aux_data failed') |
| return SetError( |
| 'Unable to contact shopfloor server: %s' % e, |
| label_zh=u'連不到 shopfloor server: %s' % e) |
| except: # pylint: disable=bare-except |
| logging.exception('select_aux_data failed') |
| return SetError(debug_utils.FormatExceptionOnly()) |
| |
| if self.args.event_log_key: |
| event_log.Log('scan', key=self.args.event_log_key, value=scan_value) |
| |
| if self.args.shared_data_key: |
| factory.set_shared_data(self.args.shared_data_key, |
| scan_value) |
| |
| if self.args.device_data_key: |
| shopfloor.UpdateDeviceData({self.args.device_data_key: scan_value}) |
| |
| if self.args.dut_data_key: |
| self.dut.storage.UpdateDict({self.args.dut_data_key: scan_value}) |
| |
| if self.args.check_device_data_key: |
| expected_value = shopfloor.GetDeviceData().get( |
| self.args.check_device_data_key) |
| |
| if self.args.match_the_last_few_chars != 0: |
| expected_value = expected_value[-self.args.match_the_last_few_chars:] |
| |
| if expected_value != scan_value: |
| logging.error('Expected %r but got %r', expected_value, scan_value) |
| |
| # Show expected value only in engineering mode, so the user |
| # can't fake it. |
| esc_expected_value = test_ui.Escape(expected_value or 'None') |
| return SetError( |
| 'The scanned value "%s" does not match ' |
| 'the expected value' |
| '<span class=test-engineering-mode-only> "%s"</span>.' % ( |
| esc_scan_value, esc_expected_value), |
| label_zh=( |
| u'所掃描的編號「%s」不搭配所期望的編號' |
| u'<span class=test-engineering-mode-only>「%s」</span>。' % ( |
| esc_scan_value, esc_expected_value))) |
| |
| if self.args.rw_vpd_key or self.args.ro_vpd_key: |
| self.ui.SetHTML( |
| ' '.join([test_ui.MakeLabel('Writing to VPD. Please wait...', |
| u'正在写到 VPD,请稍等...'), |
| test_ui.SPINNER_HTML_16x16]), |
| id='scan-status') |
| try: |
| if self.args.rw_vpd_key: |
| self.dut.vpd.rw.Update({self.args.rw_vpd_key: scan_value}) |
| if self.args.ro_vpd_key: |
| self.dut.vpd.ro.Update({self.args.ro_vpd_key: scan_value}) |
| except: # pylint: disable=bare-except |
| logging.exception('Setting VPD failed') |
| return SetError(debug_utils.FormatExceptionOnly()) |
| |
| self.ui.event_client.post_event( |
| test_event.Event(test_event.Event.Type.UPDATE_SYSTEM_INFO)) |
| self.ui.Pass() |
| |
| def setUp(self): |
| self.dut = device_utils.CreateDUTInterface() |
| self.ui = test_ui.UI() |
| self.auto_scan_timer = None |
| self.fixture = None |
| if self.args.bft_fixture: |
| self.fixture = bft_fixture.CreateBFTFixture(**self.args.bft_fixture) |
| |
| def tearDown(self): |
| if self.fixture: |
| self.fixture.Disconnect() |
| |
| if self.auto_scan_timer: |
| self.auto_scan_timer.cancel() |
| |
| if self.args.reconnect_ghost: |
| self.KickGhost() |
| |
| def ScanBarcode(self): |
| while True: |
| self.fixture.ScanBarcode() |
| time.sleep(self.args.barcode_scan_interval_secs) |
| |
| def BFTScanSaveBarcode(self): |
| while True: |
| self.fixture.TriggerScanner() |
| time.sleep(self.args.barcode_scan_interval_secs) |
| |
| def KickGhost(self): |
| server = ghost.GhostRPCServer() |
| try: |
| server.Reconnect() |
| except socket.error as e: |
| logging.exception(str(e)) |
| |
| def runTest(self): |
| template = ui_templates.OneSection(self.ui) |
| |
| if not self.args.label_zh: |
| self.args.label_zh = self.args.label_en |
| |
| # A workaround that some existing test lists do not use unicode |
| # for Chinese string. |
| if type(self.args.label_zh) is str: |
| self.args.label_zh = unicode(self.args.label_zh, encoding='utf-8') |
| |
| template.SetTitle(test_ui.MakeLabel( |
| 'Scan %s' % self.args.label_en.title(), |
| u'扫描%s' % self.args.label_zh)) |
| |
| template.SetState( |
| test_ui.MakeLabel( |
| 'Please scan the %s and press ENTER.' % self.args.label_en, |
| u'请扫描%s後按下 ENTER。' % self.args.label_zh) + |
| '<br><input id="scan-value" type="text" size="20">' |
| '<br> ' |
| '<p id="scan-status"> ') |
| self.ui.RunJS("document.getElementById('scan-value').focus()") |
| self.ui.BindKeyJS( |
| test_ui.ENTER_KEY, |
| 'window.test.sendTestEvent("scan_value",' |
| 'document.getElementById("scan-value").value)') |
| self.ui.AddEventHandler('scan_value', self.HandleScanValue) |
| |
| if self.args.value_assigned is not None: |
| self.ui.RunJS( |
| 'window.test.sendTestEvent("scan_value", "%s")' % |
| self.args.value_assigned) |
| elif self.args.bft_scan_fixture_id: |
| logging.info('Getting fixture ID...') |
| fixture_id = self.fixture.GetFixtureId() |
| self.ui.RunJS( |
| 'window.test.sendTestEvent("scan_value", "%d")' % fixture_id) |
| elif self.args.bft_scan_barcode: |
| logging.info('Triggering barcode scanner...') |
| process_utils.StartDaemonThread(target=self.ScanBarcode) |
| elif self.args.bft_save_barcode: |
| logging.info('Triggering barcode scanner...') |
| process_utils.StartDaemonThread(target=self.BFTScanSaveBarcode) |
| elif self.args.bft_get_barcode: |
| logging.info('Getting barcode from BFT...') |
| barcode = self.fixture.ScanBarcode() |
| self.ui.RunJS( |
| 'window.test.sendTestEvent("scan_value", "%s")' % barcode) |
| |
| self.ui.Run() |