| #! /usr/bin/env python |
| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| from mtlib.log import FeedbackDownloader, FeedbackLog, Log |
| from mtreplay import MTReplay |
| import fnmatch |
| import json |
| import multiprocessing |
| import os |
| import random |
| import re |
| import traceback |
| import urllib |
| import datetime |
| |
| |
| # prepare folder for log files |
| script_dir = os.path.dirname(os.path.realpath(__file__)) |
| log_dir = os.path.join(script_dir, '../cache/logs/') |
| invalid_filename = os.path.join(log_dir, 'invalid') |
| if not os.path.exists(log_dir): |
| os.mkdir(log_dir) |
| |
| |
| class Query(object): |
| """ Abstract class for queries. |
| |
| These objects are applied to files by the QueryEngine, which |
| calls FindMatches to execute the search on the replay results. |
| |
| capture_logs can be set to true to direct the QueryEngine to return |
| the Log object in the QueryResults. |
| """ |
| def __init__(self): |
| self.capture_logs = False |
| |
| def FindMatches(self, replay_results, filename): |
| """ Returns a list of QueryMatch objects """ |
| return [] |
| |
| |
| class QueryMatch(object): |
| """ Describes a match and contains information on how to locate it """ |
| def __init__(self, filename, timestamp, line): |
| self.filename = filename |
| self.timestamp = timestamp |
| self.line = line |
| |
| def __str__(self): |
| if self.timestamp: |
| return str(self.timestamp) + ": " + self.line |
| else: |
| return self.line |
| |
| def __repr__(self): |
| return str(self) |
| |
| |
| class QueryResult(object): |
| """ Describes the results of a query on a file. |
| |
| This includes all matches found in this file, the number of |
| SYN reports processed and optionally the activity Log object, |
| if requested by the Query.""" |
| def __init__(self, filename): |
| self.filename = filename |
| self.syn_count = 0 |
| self.matches = [] |
| self.log = None |
| |
| |
| class QueryEngine(object): |
| """ This class allows queries to be executed on a large number of log files. |
| |
| It managed a pool of log files, allows more log files to be downloaded and |
| can execute queries in parallel on this pool of log files. |
| """ |
| |
| def ExecuteSingle(self, filename, query): |
| """ Executes a query on a single log file """ |
| log = Log(filename) |
| replay = MTReplay() |
| result = QueryResult(filename) |
| |
| # find platform for file |
| platform = replay.PlatformOf(log) |
| if not platform: |
| print 'No platform for %s' % os.path.basename(filename) |
| return result |
| |
| # count the number of syn reports in log file |
| result.syn_count = len(tuple(re.finditer("0000 0000 0", log.evdev))) |
| |
| # run replay |
| try: |
| replay_result = replay.Replay(log) |
| except: |
| return result |
| |
| result.matches = query.FindMatches(replay_result, filename) |
| if result.matches: |
| result.log = replay_result.log |
| |
| return result |
| |
| def Execute(self, filenames, queries, parallel=True): |
| """ Executes a query on a list of log files. |
| |
| filenames: list of filenames to execute |
| queries: either a single query object for all files, |
| or a dictionary mapping filenames to query objects. |
| parallel: set to False to execute sequentially. |
| """ |
| |
| print "Processing %d log files" % len(filenames) |
| |
| if hasattr(queries, 'FindMatches'): |
| queries = dict([(filename, queries) for filename in filenames]) |
| |
| # arguments for QuerySubprocess |
| parameters = [(name, queries[name]) |
| for name in filenames if name in queries] |
| |
| # process all files either in parallel or sequential |
| if parallel: |
| pool = multiprocessing.Pool() |
| results = pool.map(ExecuteSingleSubprocess, parameters) |
| pool.terminate() |
| else: |
| results = map(ExecuteSingleSubprocess, parameters) |
| |
| # count syn reports |
| syn_count = sum([result.syn_count for result in results]) |
| |
| # create dict of results by filename |
| result_dict = dict([(result.filename, result) |
| for result in results |
| if result.matches]) |
| |
| # syn reports are coming at approx 60 Hz on most platforms |
| syn_per_second = 60.0 |
| hours = syn_count / syn_per_second / 60.0 / 60.0 |
| print "Processed ~%.2f hours of interaction" % hours |
| |
| return result_dict |
| |
| def SelectFiles(self, number, platform=None): |
| """ Returns a random selection of files from the pool """ |
| # list all log files |
| files = [os.path.abspath(os.path.join(log_dir, f)) |
| for f in os.listdir(log_dir) |
| if f.isdigit()] |
| |
| if platform: |
| print "Filtering files by platform. This may take a while." |
| replay = MTReplay() |
| pool = multiprocessing.Pool() |
| platforms = pool.map(GetPlatformSubprocess, files) |
| pool.terminate() |
| |
| filtered = filter(lambda (f, p): p and fnmatch.fnmatch(p, platform), |
| platforms) |
| files = map(lambda (f, p): f, filtered) |
| print "found", len(files), "log files matching", platform |
| |
| # randomly select subset of files |
| if number is not None: |
| files = random.sample(files, number) |
| return files |
| |
| def GetInvalidIDs(self): |
| """Look for list of feedback IDs with invalid logs""" |
| if not os.path.exists(invalid_filename): |
| return [] |
| return [x.strip() for x in open(invalid_filename).readlines()] |
| |
| def DownloadFile(self, id, downloader, invalid_ids): |
| """Download one feedback log into the pool. |
| |
| Return 1 if successful, 0 if not. |
| """ |
| filename = os.path.join(log_dir, id) |
| if os.path.exists(filename): |
| print 'Skipping existing report', id |
| return 0 |
| if id in invalid_ids: |
| print 'Skipping invalid report', id |
| return 0 |
| |
| print 'Downloading new report', id |
| try: |
| # might throw IO/Tar/Zip/etc exceptions |
| report = FeedbackLog(id, force_latest='pad', downloader=downloader) |
| # Test parse. Will throw exception on malformed log |
| json.loads(report.activity) |
| except: |
| print 'Invalid report', id |
| with open(invalid_filename, 'a') as f: |
| f.write(id + '\n') |
| return 0 |
| |
| # check if report contains logs and actual events |
| if report.activity and report.evdev and 'E:' in report.evdev: |
| report.SaveAs(filename) |
| return 1 |
| else: |
| print 'Invalid report %s' % id |
| with open(invalid_filename, 'a') as f: |
| f.write(id + '\n') |
| return 0 |
| |
| def Download(self, num_to_download, parallel=True): |
| """Download 'num' new feedback logs into the pool.""" |
| downloader = FeedbackDownloader() |
| |
| dt = datetime.datetime.utcnow() - datetime.datetime(1970, 1, 1) |
| end_time = (((dt.days * 24 * 60 * 60 + dt.seconds) * 1000) + |
| (dt.microseconds / 10)) |
| num_to_download = int(num_to_download) |
| num_downloaded = 0 |
| invalid_ids = self.GetInvalidIDs() |
| page_token = '' |
| |
| while num_to_download > num_downloaded: |
| # Download list of feedback report id's |
| num_this_iteration = min((num_to_download - num_downloaded) * 5, 500) |
| page_token, report_ids = downloader.DownloadIDs( |
| num_this_iteration, end_time, page_token) |
| |
| # Download and check each report |
| parameters = [(r_id, downloader, invalid_ids) for r_id in report_ids] |
| if parallel: |
| pool = multiprocessing.Pool() |
| results = sum(pool.map(DownloadFileSubprocess, parameters)) |
| pool.terminate() |
| else: |
| results = sum(map(DownloadFileSubprocess, parameters)) |
| num_downloaded += results |
| print "--------------------" |
| print "%d/%d reports found" % (num_downloaded, num_to_download) |
| print "--------------------" |
| |
| |
| def GetPlatformSubprocess(filename): |
| replay = MTReplay() |
| log = Log(filename) |
| detected_platform = replay.PlatformOf(log) |
| if detected_platform: |
| print filename + ": " + detected_platform.name |
| return filename, detected_platform.name |
| else: |
| return filename, None |
| |
| def ExecuteSingleSubprocess(args): |
| """ Wrapper for subprocesses to run ExecuteSingle """ |
| try: |
| return QueryEngine().ExecuteSingle(args[0], args[1]) |
| except Exception, e: |
| traceback.print_exc() |
| raise e |
| |
| def DownloadFileSubprocess(args): |
| """ Wrapper for subprocesses to run DownloadFile """ |
| try: |
| return QueryEngine().DownloadFile(args[0], args[1], args[2]) |
| except Exception, e: |
| traceback.print_exc() |
| raise e |