blob: 7d96862b4b3c10ac8e99f822065bde074f0d23e4 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2021 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import enum
from datetime import datetime
from datetime import timezone
from moblab_common.database_models.job_uploading_status import (
JobUploadingStatus,
)
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
class Status(enum.Enum):
NOT_READY = 1
QUEUED = 2
UPLOADING = 3
UPLOADED = 4
UPLOAD_FAILED = 5
DELETED = 6
# ToDo(ivanbrovkovich): decide on DB access location
class MoblabDb:
# ToDo(ivanbrovkovich): push configuration to env vars
_engine = create_engine(
"mysql+pymysql://root:moblab_db_passwd@db:3306/moblab",
pool_pre_ping=True,
)
_Session = sessionmaker(bind=_engine)
def get_session(self):
return MoblabDb._Session()
class UploadResultState:
"""Represents the known info about job upload status"""
def __init__(
self,
status=Status.NOT_READY.name,
attempt_number=0,
last_falure_reason="",
):
self.status = status
self.attempt_number = attempt_number
self.last_falure_reason = last_falure_reason
class ResultUploadStatusConnector:
"""Abstraction around persistence layer for uploading status by result"""
def __init__(self):
self.db = MoblabDb()
def get_jobs_upload_status(self, job_id_list):
"""Returns the list of upload statuses for requested jobs"""
upload_status = {}
if job_id_list:
session = self.db.get_session()
results = (
session.query(JobUploadingStatus)
.filter(JobUploadingStatus.job_id.in_(job_id_list))
.all()
)
session.close()
for r in results:
state = UploadResultState()
state.status = Status[r.status].name
state.attempt_number = r.upload_attempt_number
state.last_falure_reason = r.last_attempt_failure_reason
upload_status[r.job_id] = state
return upload_status
def set_job_upload_status(self, job_id, status, failure_reason=""):
"""Updates the upload status for a given job
Args:
job_id: (int) job id value
status: (Status) enum value
failure_reason: (string) error message in case of failure attempt
"""
session = self.db.get_session()
try:
upload_status = (
session.query(JobUploadingStatus)
.filter_by(job_id=job_id)
.first()
)
if not upload_status:
logging.info(
'Insert job "%s" upload status "%s"',
job_id,
status,
)
session.add(
JobUploadingStatus(
job_id=job_id,
status=status.name,
upload_attempt_number=(
1 if status == Status.UPLOAD_FAILED else 0
),
updated_on=datetime.now(timezone.utc),
)
)
else:
logging.info(
'Updating job "%s" upload status "%s" -> "%s"',
job_id,
upload_status.status,
status,
)
upload_status.status = status.name
upload_status.upload_attempt_number = (
upload_status.upload_attempt_number
+ (1 if status == Status.UPLOAD_FAILED else 0)
)
if status == Status.UPLOAD_FAILED:
upload_status.last_attempt_failure_reason = failure_reason
upload_status.updated_on = datetime.now(timezone.utc)
session.commit()
finally:
session.close()