blob: e36114b15a7575c70fb8bc559848caf1807cef01 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""ChromeOS factory shop floor system implementation, using CSV input.
This module provides an easy way to setup and use shop floor system. Use Google
Docs or Excel to create a spreadsheet and export as CSV (comma separated
values), with following fields:
serial_number: The serial number of each device.
hwid: The HWID string assigned for each serial number.
ro_vpd_*: Read-only VPD values. Example: ro_vpd_test_data will be converted to
"test_data" in RO_VPD section.
rw_vpd_*: Read-writeable VPD values, using same syntax described in ro_vpd_*.
To use this module, run following command in factory_setup folder:
shopfloor_server.py -m shopfloor.simple.ShopFloor -c PATH_TO_CSV_FILE.csv
You can find a sample CSV file in in:
factory_setup/test_data/shopfloor/simple.csv
"""
import csv
import logging
import os
import re
import time
from xmlrpclib import Binary
import shopfloor
class ShopFloor(shopfloor.ShopFloorBase):
"""Sample shop floor system, using CSV file as input.
Device data is read from a 'devices.csv' file in the data directory.
"""
NAME = "CSV-file based shop floor system"
VERSION = 4
def Init(self):
devices_csv = os.path.join(self.data_dir, 'devices.csv')
logging.info("Parsing %s...", devices_csv)
self.data_store = LoadCsvData(devices_csv)
logging.info("Loaded %d entries from %s.",
len(self.data_store), devices_csv)
# Put uploaded reports in a "reports" folder inside data_dir.
self.reports_dir = os.path.join(self.data_dir, 'reports')
if not os.path.isdir(self.reports_dir):
os.mkdir(self.reports_dir)
# Try to touch some files inside directory, to make sure the directory is
# writable, and everything I/O system is working fine.
stamp_file = os.path.join(self.reports_dir, ".touch")
with open(stamp_file, "w") as stamp_handle:
stamp_handle.write("%s - VERSION %s" % (self.NAME, self.VERSION))
os.remove(stamp_file)
def _CheckSerialNumber(self, serial):
"""Checks if serial number is valid, otherwise raise ValueError."""
if serial in self.data_store:
return True
message = "Unknown serial number: %s" % serial
logging.error(message)
raise ValueError(message)
def GetHWID(self, serial):
self._CheckSerialNumber(serial)
return self.data_store[serial]['hwid']
def GetVPD(self, serial):
self._CheckSerialNumber(serial)
return self.data_store[serial]['vpd']
def GetRegistrationCodeMap(self, serial):
self._CheckSerialNumber(serial)
registration_code_map = self.data_store[serial]['registration_code_map']
self.LogRegistrationCodeMap(self.data_store[serial]['hwid'],
registration_code_map)
return registration_code_map
def UploadReport(self, serial, report_blob, report_name=None):
def is_gzip_blob(blob):
"""Check (not 100% accurate) if input blob is gzipped."""
GZIP_MAGIC = '\x1f\x8b'
return blob[:len(GZIP_MAGIC)] == GZIP_MAGIC
self._CheckSerialNumber(serial)
if isinstance(report_blob, shopfloor.Binary):
report_blob = report_blob.data
if not report_name:
report_name = ('%s-%s.rpt' % (re.sub('[^a-zA-Z0-9]', '', serial),
time.strftime("%Y%m%d-%H%M%S%z")))
if is_gzip_blob(report_blob):
report_name += ".gz"
report_path = os.path.join(self.reports_dir, report_name)
with open(report_path, "wb") as report_obj:
report_obj.write(report_blob)
def Finalize(self, serial):
# Finalize is currently not implemented.
self._CheckSerialNumber(serial)
logging.info("Finalized: %s", serial)
def LoadCsvData(filename):
"""Loads a CSV file and returns structured shop floor system data."""
# Required fields.
KEY_SERIAL_NUMBER = 'serial_number'
KEY_HWID = 'hwid'
KEY_REGISTRATION_CODE_USER = 'registration_code_user'
KEY_REGISTRATION_CODE_GROUP = 'registration_code_group'
# Optional fields.
PREFIX_RO_VPD = 'ro_vpd_'
PREFIX_RW_VPD = 'rw_vpd_'
VPD_PREFIXES = (PREFIX_RO_VPD, PREFIX_RW_VPD)
REQUIRED_KEYS = (KEY_SERIAL_NUMBER, KEY_HWID)
OPTIONAL_KEYS = (KEY_REGISTRATION_CODE_USER, KEY_REGISTRATION_CODE_GROUP)
OPTIONAL_PREFIXES = VPD_PREFIXES
def check_field_name(name):
"""Checks if argument is an valid input name."""
if name in REQUIRED_KEYS or name in OPTIONAL_KEYS:
return True
for prefix in OPTIONAL_PREFIXES:
if name.startswith(prefix):
return True
return False
def build_vpd(source):
"""Builds VPD structure by input source."""
vpd = {'ro': {}, 'rw': {}}
for key, value in source.items():
for prefix in VPD_PREFIXES:
if not key.startswith(prefix):
continue
# Key format: $type_vpd_$name (ex, ro_vpd_serial_number)
(key_type, _, key_name) = key.split('_', 2)
if value is None:
continue
vpd[key_type][key_name.strip()] = value.strip()
return vpd
def build_registration_code_map(source):
"""Builds registration_code_map structure.
Returns:
A dict containing 'user' and 'group' keys.
"""
return {'user': source.get(KEY_REGISTRATION_CODE_USER),
'group': source.get(KEY_REGISTRATION_CODE_GROUP)}
data = {}
with open(filename, 'rb') as source:
reader = csv.DictReader(source)
row_number = 0
for row in reader:
row_number += 1
if KEY_SERIAL_NUMBER not in row:
raise ValueError("Missing %s in row %d" % (KEY_SERIAL_NUMBER,
row_number))
serial_number = row[KEY_SERIAL_NUMBER].strip()
hwid = row[KEY_HWID].strip()
# Checks data validity.
if serial_number in data:
raise ValueError("Duplicated %s in row %d: %s" %
(KEY_SERIAL_NUMBER, row_number, serial_number))
if None in row:
raise ValueError("Extra fields in row %d: %s" %
(row_number, ','.join(row[None])))
for field in row:
if not check_field_name(field):
raise ValueError("Invalid field: %s" % field)
entry = {'hwid': hwid,
'vpd': build_vpd(row),
'registration_code_map': build_registration_code_map(row)}
data[serial_number] = entry
return data