blob: bbb353f2cd40ebb89005c57d0d276fca71d438d2 [file] [log] [blame]
"""
Copyright (c) 2019, OptoFidelity OY
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
3. All advertising materials mentioning features or use of this software must display the following acknowledgement: This product includes software developed by the OptoFidelity OY.
4. Neither the name of the OptoFidelity OY nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
import cherrypy
from genshi.template import MarkupTemplate
from sqlalchemy.orm import joinedload
import TPPTAnalysisSW.measurementdb as measurementdb
from .plotinfo import *
import TPPTAnalysisSW.analyzers as analyzers
from .settings import settings
from datetime import datetime
import math
import json
import time
import traceback
import TPPTAnalysisSW.plotinfo as plotinfo
from .utils import Timer
from .testbase import TestBase
from .base_page import BasePage
from .info.version import Version
import TPPTAnalysisSW.imagefactory as imagefactory
from TPPTAnalysisSW.test import upload_test
import TPPTAnalysisSW.sqluploader as sqluploader
# Controller for testsession
class TestSession(BasePage):
exposed = True
def GET(self, testSessionId=None, command=None):
# Command is here in case we need to create e.g. session-wide settings
if command is None:
if (testSessionId == None):
raise cherrypy.HTTPError("404", "No testsession given")
dbsession = measurementdb.get_database().session()
testsession = dbsession.query(measurementdb.TestSession).filter(
measurementdb.TestSession.id == testSessionId).first()
sessioninfo = plotinfo.TestSessionInfo(testsession=testsession)
tests, tests_in_database, results, curved = self.sessionResultsFromDB(testsession, dbsession)
button_attributes = [{'disabled': 'true'} if test_uploaded else {} for test_uploaded in tests_in_database]
tests_results = zip(tests, results, button_attributes)
# Categorize tests by DUTs
duts = [] # list of tuples: (dut, [(test, result), ...])
dut_indices = {} # Dut id -> duts list index map
for test_result in tests_results:
dut = test_result[0].dut
if dut.id in dut_indices:
# Append to an older dut
dut_index = dut_indices[dut.id]
duts[dut_index][1].append(test_result)
else:
# Add new dut
dut_index = len(duts)
dut_indices[dut.id] = dut_index
duts.append((dut, [test_result]))
# Disable session upload button if all the tests have been uploaded to database
session_upload_button_attributes = {'disabled': 'true'} if tests_in_database.count(True) == len(tests) else {}
with open("templates/testsession.html") as f:
tmpl = MarkupTemplate(f)
stream = tmpl.generate(duts=duts, session=sessioninfo,
session_upload_button_attributes=session_upload_button_attributes,
sql_uploader_initialized=sqluploader.is_sql_uploader_initialized(),
version=Version)
return stream.render('xhtml')
else:
raise cherrypy.HTTPError("404", "Command not found")
def POST(self, testSessionId=None, command=None, **kwargs):
if command == 'upload_test':
test_id = kwargs['test_id']
return upload_test(test_id)
elif command == 'upload_session':
session_id = kwargs['session_id']
dbsession = measurementdb.get_database().session()
# Get IDs of tests that are part of the session.
test_query = dbsession.query(measurementdb.TestItem).filter(
measurementdb.TestItem.testsession_id == session_id)
test_items = test_query.all()
test_classes = [test_class for test_class, cache in map(TestBase.create, [test.id for test in test_items])]
test_items_and_classes = zip(test_items, test_classes)
uploaded_ids = []
# Upload all tests in the session that are not already uploaded
for test_item, test_class in test_items_and_classes:
if not test_class.exists_in_database_with_start_time(test_item.starttime):
if upload_test(test_item.id) is None:
return None
# Keep track of uploaded test items to inform user which tests were actually uploaded
uploaded_ids.append(str(test_item.id))
return json.dumps("Sequence upload successful. Uploaded tests: {}".format(', '.join(uploaded_ids))).encode()
if 'data' not in kwargs:
print("No data in kwargs")
raise cherrypy.HTTPError("400", "Expected data not received")
data = json.loads(kwargs['data'])
if 'command' not in data:
print("No command in data")
raise cherrypy.HTTPError("400", "Expected command not received")
return_data = 'No such command'
if data['command'] == 'set_notes':
dbsession = measurementdb.get_database().session()
testsession = dbsession.query(measurementdb.TestSession).filter(
measurementdb.TestSession.id == testSessionId).first()
testsession.notes = data['value']
dbsession.commit()
return_data = testsession.notes
return return_data
@staticmethod
def session_samples_progs_manus(testsession, dbsession):
sample_names = set()
sample_prog = set()
sample_manu = set()
sample_ver = set()
for test in testsession.test_items:
sample = test.dut.sample_id
manu = test.dut.manufacturer
prog = test.dut.program
ver = test.dut.batch
if sample is None or len(sample) == 0:
sample = "[Empty]"
sample_names.add(sample)
if manu is None or len(manu) == 0:
manu = "[Empty]"
sample_manu.add(manu)
if prog is None or len(prog) == 0:
prog = "[Empty]"
sample_prog.add(prog)
if ver is None or len(ver) == 0:
ver = "[Empty]"
sample_ver.add(ver)
return ", ".join(sorted(sample_names)), \
", ".join(sorted(sample_prog)), \
", ".join(sorted(sample_manu)), \
", ".join(sorted(sample_ver))
@staticmethod
def sessionResultsFromDB(testsession, dbsession):
tests = []
tests_in_database = []
results = []
test_types = []
for test in testsession.test_items:
if len(test.test_results) == 0:
# Run analysis
result = TestBase.evaluateresult(test.id)
else:
result = test.test_results[0].result
test_object, cache = TestBase.create(test.id)
already_in_database = test_object.exists_in_database_with_start_time(test.starttime)
tests.append(test)
tests_in_database.append(already_in_database)
test_types.append(test.testtype_id)
results.append(result)
curved = False
if len(set(test_types)) == 1:
if test_types[0] == 15:
curved = True
return tests, tests_in_database, results, curved
@classmethod
def eval_tests_results(caller, dbsession, testSessionId, recalculate=True):
try:
return TestSession.do_evaluation(dbsession, testSessionId, recalculate=recalculate)
except Exception as e:
print("Error in analysis!", e)
@classmethod
def do_evaluation(caller, dbsession, testSessionId, recalculate=True):
'''
Analyzes all test results in test session.
'''
tests = dbsession.query(measurementdb.TestItem).options(joinedload('type')). \
filter(measurementdb.TestItem.testsession_id == testSessionId).all()
results = []
s = Timer(2)
for idx, t in enumerate(tests):
imagefactory.ImageFactory.delete_images(str(t.id))
result = TestBase.evaluateresult(t.id, recalculate=recalculate)
results.append(result)
s.Time("Analyzed test %d - result %s" % (t.id, result))
return tests, results