Source code for pbcommand.services.models

"""Services Specific Data Models"""
from collections import namedtuple
import json
import uuid

import iso8601

from requests.exceptions import RequestException

from pbcommand.utils import to_ascii

__all__ = ['ServiceJob', 'ServiceEntryPoint', 'JobEntryPoint', 'JobStates', 'JobTypes']


# This are mirrored from the BaseSMRTServer
class LogLevels(object):
    TRACE = "TRACE"
    DEBUG = "DEBUG"
    INFO = "INFO"
    NOTICE = "NOTICE"
    WARN = "WARN"
    ERROR = "ERROR"
    CRITICAL = "CRITICAL"
    FATAL = "FATAL"

    ALL = (TRACE, DEBUG, INFO, NOTICE, WARN, ERROR, CRITICAL, FATAL)

    @classmethod
    def is_valid(cls, level):
        return level in cls.ALL


SERVICE_LOGGER_RESOURCE_ID = "pbsmrtpipe"

LogResource = namedtuple("LogResource", "id name description")
LogMessage = namedtuple("LogMessage", "sourceId level message")

PbsmrtpipeLogResource = LogResource(SERVICE_LOGGER_RESOURCE_ID, "Pbsmrtpipe",
                                    "Secondary Analysis Pbsmrtpipe Job logger")


[docs]class ServiceJob(object): def __init__(self, ix, job_uuid, name, state, path, job_type, created_at, settings, is_active=True, smrtlink_version=None, created_by=None, updated_at=None, error_message=None, imported_at=None, job_updated_at=None, created_by_email=None, is_multi_job=False, tags="", parent_multi_job_id=None, workflow=None, project_id=1, job_started_at=None, job_completed_at=None, sub_job_type_id=None): """ :param ix: Job Integer Id :param job_uuid: Globally unique Job UUID :param name: Display name of job :param state: Job State :param path: Absolute Path to the job directory :param job_type: Job Type :param created_at: when the job was created at :param settings: dict of job specific settings :param is_active: If the Job is active (only active jobs are displayed in the SL UI) :param smrtlink_version: SMRT Link Version (if known) :param created_by: User that created the job :param updated_at: when the last update of the job occurred :param error_message: Error message if the job has failed :param job_started_at: Job start time (if the job has started running) :param job_completed_at: Job completed time (if the job has completed) :type ix: int :type job_uuid: str :type name: str :type state: str :type job_type: str :type created_at: DateTime :type updated_at: DateTime | None :type job_updated_at: DateTime | None :type settings: dict :type is_active: bool :type smrtlink_version: str | None :type created_by: str | None :type created_by_email: str | None :type error_message: str | None :type is_multi_job: bool :type tags: str :type workflow: dict | None :type project_id: int """ self.id = int(ix) # validation _ = uuid.UUID(job_uuid) self.uuid = job_uuid self.name = name self.state = state self.path = path self.job_type = job_type self.created_at = created_at self.settings = settings self.is_active = is_active self.smrtlink_version = smrtlink_version self.created_by = created_by self.created_by_email = created_by_email # Is this Option[T] or T? self.updated_at = updated_at self.error_message = error_message self.imported_at = imported_at # Job was imported from another system self.job_updated_at = updated_at if job_updated_at is None else job_updated_at self.is_multi_job = is_multi_job self.tags = tags self.parent_multi_job_id = parent_multi_job_id # for MultiJob state self.workflow = {} if workflow is None else workflow self.project_id = project_id self.sub_job_type_id = sub_job_type_id # Prior to SL 6.0.X, there was a lack of clear mechanism of communication of the # job start and completed at time stamps, the job created at was used. # The created_at refers to the data model entity, not when the job is run. # Note this is only defined when the job has been completed self.job_started_at = job_started_at self.job_completed_at = job_completed_at def __repr__(self): # truncate the name to avoid having a useless repr max_length = 15 if len(self.name) >= max_length: name = self.name[:max_length] + "..." else: name = self.name created_by = "Unknown" if self.created_by is None else self.created_by ix = str(self.id).rjust(5) state = self.state.rjust(11) # simpler format created_at = self.created_at.strftime("%m-%d-%Y %I:%M.%S") # this should really use humanize. But this would take forever # to get into the nightly build def _format_dt(n_seconds): if n_seconds >= 60: # for most cases, you don't really don't # care about the seconds return "{m} min ".format(m=int(n_seconds / 60)) else: return "{s:.2f} sec".format(s=n_seconds) run_time = "NA" if self.run_time_sec is None else _format_dt(self.run_time_sec) _d = dict(k=self.__class__.__name__, i=ix, n=name, c=created_at, u=self.uuid, s=state, b=created_by, r=run_time) return "<{k} i:{i} state:{s} created:{c} by:{b} name:{n} runtime: {r} >".format(**_d) @property def execution_time_sec(self): """ Return the Job Execution time (in sec) for completed jobs or return None for non-completed jobs Note, for Jobs from SL > 6.0.0, this was not defined and will always return None :rtype: None | int """ if self.job_started_at is not None: if self.job_completed_at is not None: return (self.job_completed_at - self.job_started_at).total_seconds() return None @property def run_time_sec(self): """ Note, prior to SL 6.0.X, jobs did not have a well defined job start/complete mechanism and the Job entity timestamps were used. This has assumptions that the Job is started when the job is created. This is often not true. For completed jobs from SL version >= 6.0.x, use execution_time_sec. :rtype: None | int """ if self.job_updated_at is not None: return (self.job_updated_at - self.created_at).total_seconds() return None
[docs] @staticmethod def from_d(d): """Convert from Service JSON response to `ServiceJob` instance""" def sx(x): return d[x] def s_or(x, default=None): return d.get(x, default) # Convert to string key value to ascii def se(x): return to_ascii(sx(x)) def se_or(x, default=None): v = s_or(x, default=default) return v if v is None else to_ascii(v) def to_t(x): return iso8601.parse_date(se(x)) def to_d(x): # the "jsonSettings" are a string for some stupid reason return json.loads(sx(x)) def to_opt_datetime(k): x = s_or(k) return iso8601.parse_date(x) if x is not None else None ix = int(sx('id')) job_uuid = sx('uuid') name = se('name') state = se('state') path = se('path') job_type = se('jobTypeId') created_at = to_t('createdAt') updated_at = to_opt_datetime('updatedAt') job_started_at = to_opt_datetime("jobStartedAt") job_completed_at = to_opt_datetime("jobCompletedAt") job_updated_at = to_opt_datetime('jobUpdatedAt') imported_at = to_opt_datetime('importedAt') project_id = s_or("projectId", 1) smrtlink_version = se_or("smrtlinkVersion") error_message = se_or("errorMessage") created_by = se_or("createdBy") created_by_email = se_or('createdByEmail') is_active = d.get('isActive', True) settings = to_d('jsonSettings') sub_job_type_id = se_or("subJobTypeId") is_multi_job = d.get("isMultiJob", False) parent_multi_job_id = s_or("parentMultiJobId") workflow = json.loads(se_or("workflow", "{}")) tags = se_or("tags", "") return ServiceJob(ix, job_uuid, name, state, path, job_type, created_at, settings, is_active=is_active, smrtlink_version=smrtlink_version, created_by=created_by, created_by_email=created_by_email, updated_at=updated_at, error_message=error_message, imported_at=imported_at, job_updated_at=job_updated_at, project_id=project_id, tags=tags, is_multi_job=is_multi_job, parent_multi_job_id=parent_multi_job_id, workflow=workflow, job_started_at=job_started_at, job_completed_at=job_completed_at, sub_job_type_id=sub_job_type_id)
[docs] def was_successful(self): """ :rtype: bool """ return self.state == JobStates.SUCCESSFUL
class JobExeError(ValueError): """Service Job failed to complete successfully""" pass class SmrtServerConnectionError(RequestException): """This is blunt to catch all status related errors""" pass class SMRTServiceBaseError(Exception): """Fundamental Error datastructure in SMRT Server""" def __init__(self, http_code, error_type, message, **kwargs): self.http_code = http_code self.error_type = error_type self.msg = message message = "Http code={h} msg={m} type={t}".format(h=http_code, m=message, t=error_type) super(SMRTServiceBaseError, self).__init__(message) @staticmethod def from_d(d): """Convert from SMRT Link Service Error JSON response to `SMRTServiceBaseError` instance""" return SMRTServiceBaseError(d['httpCode'], d['errorType'], d['message']) # "Job" is the raw output from the jobs/1234 JobResult = namedtuple("JobResult", "job run_time errors") class JobTask(namedtuple("JobTask", "task_uuid job_id task_id task_type name state created_at updated_at error_message")): @staticmethod def from_d(d): return JobTask(d['uuid'], d['jobId'], d['taskId'], d['taskTypeId'], d['name'], d['state'], d['createdAt'], d['updatedAt'], d.get('errorMessage')) def _to_resource_id(x): if isinstance(x, int): return x try: _ = uuid.UUID(x) return x except ValueError as e: raise ValueError("Resource id '{x}' must be given as int or uuid".format(x=x))
[docs]class ServiceEntryPoint(object): """Entry Points to initialize Pipelines""" def __init__(self, entry_id, dataset_type, path_or_uri): self.entry_id = entry_id self.dataset_type = dataset_type # int (only supported), UUID or path to XML dataset will be added self._resource = path_or_uri @property def resource(self): """Backwards compatible with path_or_uri""" return self._resource def __repr__(self): _d = dict(k=self.__class__.__name__, e=self.entry_id, r=self._resource, d=self.dataset_type) return "<{k} {e} {d} {r} >".format(**_d)
[docs] @staticmethod def from_d(d): """Convert from Service JSON response to `ServiceEntryPoint` instance""" i = _to_resource_id(d['datasetId']) return ServiceEntryPoint(to_ascii(d['entryId']), to_ascii(d['fileTypeId']), i)
[docs] def to_d(self): return dict(entryId=self.entry_id, fileTypeId=self.dataset_type, datasetId=self.resource)
class JobEntryPoint(namedtuple("JobEntryPoint", "job_id dataset_uuid dataset_metatype")): """ Returned from the Services /job/1234/entry-points """ @staticmethod def from_d(d): """Convert from Service JSON response to `JobEntryPoint` instance""" return JobEntryPoint(d['jobId'], d['datasetUUID'], d['datasetType'])
[docs]class JobStates(object): """Allowed SMRT Link Service Job states""" CREATED = "CREATED" SUBMITTED = "SUBMITTED" RUNNING = "RUNNING" FAILED = "FAILED" SUCCESSFUL = "SUCCESSFUL" ALL = (RUNNING, CREATED, FAILED, SUCCESSFUL, SUBMITTED) # End points ALL_COMPLETED = (FAILED, SUCCESSFUL)
[docs]class JobTypes(object): """SMRT Link Analysis JOb Types""" IMPORT_DS = "import-dataset" IMPORT_DSTORE = "import-datastore" MERGE_DS = "merge-datasets" PB_PIPE = "pbsmrtpipe" MOCK_PB_PIPE = "mock-pbsmrtpipe" CONVERT_FASTA = 'convert-fasta-reference'
[docs] @classmethod def ALL(cls): """ALL allowed SL Analysis Job Types""" return (cls.IMPORT_DS, cls.IMPORT_DSTORE, cls.MERGE_DS, cls.PB_PIPE, cls.MOCK_PB_PIPE, cls.CONVERT_FASTA)
class ServiceResourceTypes(object): REPORTS = "reports" DATASTORE = "datastore" ENTRY_POINTS = "entry-points"