blob: d0a679763f1a62739582b87c02a23d99a7eaff06 [file] [log] [blame]
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from gzip import GzipFile
from mtlib.feedback import FeedbackDownloader
from io import BytesIO
from .cros_remote import CrOSRemote
import base64
import bz2
import os
import os.path
import re
import sys
import tarfile
import zipfile
# path for temporary screenshot
screenshot_filepath = 'extension/screenshot.jpg'
class Options:
def __init__(self, download=False, new=False, screenshot=False,
evdev=None):
self.download = download
self.new = new
self.screenshot = screenshot
self.evdev = evdev
def Log(source=None, options=None, activity=None, evdev=None):
""" Load log from feedback url, device ip or local file path.
Returns a log object containg activity, evdev and system logs.
source can be either an ip address to a device from which to
download, a url pointing to a feedback report to pull or a filename.
"""
if not options:
options = Options()
if source:
if os.path.exists(source):
if source.endswith('.zip') or source.endswith('.bz2'):
return FeedbackLog(source)
return FileLog(source, options.evdev)
else:
match = re.search('Report/([0-9]+)', source)
if match:
return FeedbackLog(match.group(1), options.screenshot,
options.download)
else:
# todo add regex and error message
return DeviceLog(source, options.new)
if activity or evdev:
log = AbstractLog()
if activity:
if os.path.exists(activity):
log.activity = open(activity).read()
else:
log.activity = activity
if evdev:
if os.path.exists(evdev):
log.evdev = open(evdev).read()
else:
log.evdev = evdev
return log
class AbstractLog(object):
""" Common functions for log files
A log file consists of the activity log, the evdev log, a system log, and
possibly a screenshot, which can be saved to disk all together.
"""
def __init__(self):
self.activity = ''
self.evdev = ''
self.system = ''
self.image = None
def SaveAs(self, filename):
open(filename, 'w').write(self.activity)
if self.evdev:
open(filename + '.evdev', 'w').write(self.evdev)
if self.system:
open(filename + '.system', 'w').write(self.system)
if self.image:
open(filename + '.jpg', 'w').write(self.image)
def CleanUp(self):
if os.path.exists(screenshot_filepath):
os.remove(screenshot_filepath)
class FileLog(AbstractLog):
""" Loads log from file.
evdev or system logs are optional.
"""
def __init__(self, filename, evdev=None):
AbstractLog.__init__(self)
self.activity = open(filename).read()
if evdev:
self.evdev = open(evdev).read()
elif os.path.exists(filename + '.evdev'):
self.evdev = open(filename + '.evdev').read()
if os.path.exists(filename + '.system'):
self.system = open(filename + '.system').read()
if os.path.exists(filename + '.jpg'):
self.image = open(filename + '.jpg').read()
file(screenshot_filepath, 'w').write(self.image)
class FeedbackLog(AbstractLog):
""" Download log from feedback url.
Downloads logs and (possibly) screenshot from a feedback id or file name
"""
def __init__(self, id_or_filename, screenshot=False, download=False,
force_latest=None, downloader=None):
AbstractLog.__init__(self)
self.force_latest = force_latest
self.try_screenshot = screenshot
self.report = None
self.downloader = downloader
if id_or_filename.endswith('.zip') or id_or_filename.endswith('.bz2'):
self.report = open(id_or_filename, 'rb').read()
screenshot_filename = id_or_filename[:-4] + '.jpg'
try:
self.image = open(screenshot_filename).read()
except IOError:
# No corresponding screenshot available.
pass
else:
if not self.downloader:
self.downloader = FeedbackDownloader()
self.report = self.downloader.DownloadSystemLog(id_or_filename)
if self.try_screenshot:
self.image = self.downloader.DownloadScreenshot(id_or_filename)
if self.report:
self._ExtractSystemLog()
if self.system:
self._ExtractLogFiles()
# Only write to screenshot.jpg if we will be viewing the screenshot
if not download and self.image:
file(screenshot_filepath, 'w').write(self.image)
def _GetLatestFile(self, match, tar):
# find file name containing match with latest timestamp
names = filter(lambda n: n.find(match) >= 0, tar.getnames())
names.sort()
if not names:
print('Cannot find files named %s in tar file' % match)
print('Tar file contents:', tar.getnames())
sys.exit(-1)
return tar.extractfile(names[-1])
def _ExtractSystemLog(self):
if self.report[0:2] == b'BZ':
system_log = bz2.decompress(self.report)
elif self.report[0:2] == b'PK':
bytes_io = BytesIO(self.report)
zip = zipfile.ZipFile(bytes_io, 'r')
system_log = zip.read(zip.namelist()[0])
zip.extractall('./')
else:
print('Cannot download logs file')
sys.exit(-1)
self.system = system_log.decode('utf-8', 'strict')
def _ExtractLogFiles(self):
# Find embedded and uuencoded activity.tar in system log
def ExtractByInterface(interface):
assert interface == 'pad' or interface == 'screen'
log_index = [
(('hack-33025-touch{0}_activity=\"\"\"\n' +
'begin-base64 644 touch{0}_activity_log.tar\n').format(interface),
'"""'),
(('touch{0}_activity=\"\"\"\n' +
'begin-base64 644 touch{0}_activity_log.tar\n').format(interface),
'"""'),
(('hack-33025-touch{0}_activity=<multiline>\n' +
'---------- START ----------\n' +
'begin-base64 644 touch{0}_activity_log.tar\n').format(interface),
'---------- END ----------'),
]
start_index = end_index = None
for start, end in log_index:
if start in self.system:
start_index = self.system.index(start) + len(start)
end_index = self.system.index(end, start_index)
break
if start_index is None:
return []
activity_tar_enc = self.system[start_index:end_index]
# base64 decode
activity_tar_data = base64.b64decode(activity_tar_enc)
if activity_tar_data == '':
print('touch{0}_activity_log.tar found, but empty'.format(interface))
return []
# untar
activity_tar_file = tarfile.open(fileobj=BytesIO(activity_tar_data))
def ExtractPadFiles(name):
# find matching evdev file
evdev_name = name.replace('touchpad_activity', 'cmt_input_events')
activity_gz = activity_tar_file.extractfile(name)
evdev_gz = activity_tar_file.extractfile(evdev_name)
# decompress log files
return (GzipFile(fileobj=activity_gz).read(),
GzipFile(fileobj=evdev_gz).read())
def ExtractScreenFiles(name):
# Always try for a screenshot with touchscreen view
self.try_screenshot = True
evdev = activity_tar_file.extractfile(name)
return (evdev.read(), None)
extract_func = {
'pad': ExtractPadFiles,
'screen': ExtractScreenFiles,
}
# return latest log files, we don't include cmt files
return [(filename, extract_func[interface])
for filename in activity_tar_file.getnames()
if filename.find('cmt') == -1]
if self.force_latest == 'pad':
logs = ExtractByInterface('pad')
idx = 0
elif self.force_latest == 'screen':
logs = ExtractByInterface('screen')
idx = 0
else:
logs = ExtractByInterface('pad') + ExtractByInterface('screen')
if not logs:
print('Cannot find touchpad_activity_log.tar or '
'touchscreen_activity_log.tar in systems log file.')
sys.exit(-1)
if len(logs) == 1:
idx = 0
else:
while True:
print('Which log file would you like to use?')
for i, (name, extract_func) in enumerate(logs):
if name.startswith('evdev'):
name = 'touchscreen log - ' + name
print(i, name)
print('>')
selection = sys.stdin.readline()
try:
idx = int(selection)
if idx < 0 or idx >= len(logs):
print('Number out of range')
else:
break
except:
print('Not a number')
name, extract_func = logs[idx]
self.activity, self.evdev = extract_func(name)
identity_message = """\
In order to access devices in TPTool, you need to have password-less
auth for chromebooks set up.
Would you like tptool to run the following command for you?
$ %s
Yes/No? (Default: No)"""
class DeviceLog(AbstractLog):
""" Downloads logs from a running chromebook via scp. """
def __init__(self, ip, new=False):
AbstractLog.__init__(self)
self.remote = CrOSRemote(ip)
if new:
self.remote.SafeExecute('/opt/google/input/inputcontrol -t touchpad' +
' --log')
self.activity = self.remote.Read('/var/log/xorg/touchpad_activity_log.txt')
self.evdev = self.remote.Read('/var/log/xorg/cmt_input_events.dat')