"""Services Specific Data Models"""
from collections import namedtuple
import json
import uuid
import iso8601
from requests.exceptions import RequestException
from pbcommand.utils import to_ascii
__all__ = ['ServiceJob', 'ServiceEntryPoint', 'JobEntryPoint', 'JobStates', 'JobTypes']
# This are mirrored from the BaseSMRTServer
class LogLevels(object):
TRACE = "TRACE"
DEBUG = "DEBUG"
INFO = "INFO"
NOTICE = "NOTICE"
WARN = "WARN"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
FATAL = "FATAL"
ALL = (TRACE, DEBUG, INFO, NOTICE, WARN, ERROR, CRITICAL, FATAL)
@classmethod
def is_valid(cls, level):
return level in cls.ALL
SERVICE_LOGGER_RESOURCE_ID = "pbsmrtpipe"
LogResource = namedtuple("LogResource", "id name description")
LogMessage = namedtuple("LogMessage", "sourceId level message")
PbsmrtpipeLogResource = LogResource(SERVICE_LOGGER_RESOURCE_ID, "Pbsmrtpipe",
"Secondary Analysis Pbsmrtpipe Job logger")
[docs]class ServiceJob(object):
def __init__(self, ix, job_uuid, name, state, path, job_type, created_at,
settings, is_active=True, smrtlink_version=None,
created_by=None, updated_at=None, error_message=None):
"""
:param ix: Job Integer Id
:param job_uuid: Globally unique Job UUID
:param name: Display name of job
:param state: Job State
:param path: Absolute Path to the job directory
:param job_type: Job Type
:param created_at: when the job was created at
:param settings: dict of job specific settings
:param is_active: If the Job is active (only active jobs are displayed in the SL UI)
:param smrtlink_version: SMRT Link Version (if known)
:param created_by: User that created the job
:param updated_at: when the last update of the job occurred
:param error_message: Error message if the job has failed
:type ix: int
:type job_uuid: str
:type name: str
:type state: str
:type job_type: str
:type created_at: DateTime
:type updated_at: DateTime | None
:type settings: dict
:type is_active: bool
:type smrtlink_version: str | None
:type created_by: str | None
:type error_message: str | None
"""
self.id = int(ix)
# validation
_ = uuid.UUID(job_uuid)
self.uuid = job_uuid
self.name = name
self.state = state
self.path = path
self.job_type = job_type
self.created_at = created_at
self.settings = settings
self.is_active = is_active
self.smrtlink_version = smrtlink_version
self.created_by = created_by
# Is this Option[T] or T?
self.updated_at = updated_at
self.error_message = error_message
if self.updated_at is not None:
dt = self.updated_at - self.created_at
self.run_time_sec = dt.total_seconds()
else:
self.run_time_sec = None
def __repr__(self):
# truncate the name to avoid having a useless repr
max_length = 15
if len(self.name) >= max_length:
name = self.name[:max_length] + "..."
else:
name = self.name
created_by = "Unknown" if self.created_by is None else self.created_by
ix = str(self.id).rjust(5)
state = self.state.rjust(11)
# simpler format
created_at = self.created_at.strftime("%m-%d-%Y %I:%M.%S")
# this should really use humanize. But this would take forever
# to get into the nightly build
def _format_dt(n_seconds):
if n_seconds >= 60:
# for most cases, you don't really don't
# care about the seconds
return "{m} min ".format(m=int(n_seconds / 60))
else:
return "{s:.2f} sec".format(s=n_seconds)
run_time = "NA" if self.run_time_sec is None else _format_dt(self.run_time_sec)
_d = dict(k=self.__class__.__name__,
i=ix,
n=name,
c=created_at,
u=self.uuid,
s=state, b=created_by,
r=run_time)
return "<{k} i:{i} state:{s} created:{c} by:{b} name:{n} runtime: {r} >".format(**_d)
@staticmethod
[docs] def from_d(d):
"""Convert from Service JSON response to `ServiceJob` instance"""
def sx(x):
return d[x]
def s_or(x, default=None):
return d.get(x, default)
# Convert to string key value to ascii
def se(x):
return to_ascii(sx(x))
def se_or(x, default=None):
v = s_or(x, default=default)
return v if v is None else to_ascii(v)
def to_t(x):
return iso8601.parse_date(se(x))
def to_d(x):
# the "jsonSettings" are a string for some stupid reason
return json.loads(sx(x))
def to_opt_datetime(k):
x = s_or(k)
return iso8601.parse_date(x) if x is not None else None
ix = int(sx('id'))
job_uuid = sx('uuid')
name = se('name')
state = se('state')
path = se('path')
job_type = se('jobTypeId')
created_at = to_t('createdAt')
updated_at = to_opt_datetime('updatedAt')
smrtlink_version = se_or("smrtlinkVersion")
error_message = se_or("errorMessage")
created_by = se_or("createdBy")
is_active = d.get('isActive', True)
settings = to_d('jsonSettings')
return ServiceJob(ix, job_uuid, name, state, path, job_type,
created_at,
settings, is_active=is_active,
smrtlink_version=smrtlink_version,
created_by=created_by,
updated_at=updated_at,
error_message=error_message)
[docs] def was_successful(self):
""" :rtype: bool """
return self.state == JobStates.SUCCESSFUL
class JobExeError(ValueError):
"""Service Job failed to complete successfully"""
pass
class SmrtServerConnectionError(RequestException):
"""This is blunt to catch all status related errors"""
pass
class SMRTServiceBaseError(Exception):
"""Fundamental Error datastructure in SMRT Server"""
def __init__(self, http_code, error_type, message, **kwargs):
self.http_code = http_code
self.error_type = error_type
self.msg = message
message = "Http code={h} msg={m} type={t}".format(h=http_code, m=message, t=error_type)
super(Exception, self).__init__(message)
@staticmethod
def from_d(d):
"""Convert from SMRT Link Service Error JSON response to `SMRTServiceBaseError` instance"""
return SMRTServiceBaseError(d['httpCode'], d['errorType'], d['message'])
# "Job" is the raw output from the jobs/1234
JobResult = namedtuple("JobResult", "job run_time errors")
class JobTask(namedtuple("JobTask", "task_uuid job_id task_id task_type name state created_at updated_at error_message")):
@staticmethod
def from_d(d):
return JobTask(d['uuid'], d['jobId'], d['taskId'], d['taskTypeId'],
d['name'], d['state'], d['createdAt'],
d['updatedAt'], d.get('errorMessage'))
def _to_resource_id(x):
if isinstance(x, int):
return x
try:
_ = uuid.UUID(x)
return x
except ValueError as e:
raise ValueError("Resource id '{x}' must be given as int or uuid".format(x=x))
[docs]class ServiceEntryPoint(object):
"""Entry Points to initialize Pipelines"""
def __init__(self, entry_id, dataset_type, path_or_uri):
self.entry_id = entry_id
self.dataset_type = dataset_type
# int (only supported), UUID or path to XML dataset will be added
self._resource = path_or_uri
@property
def resource(self):
"""Backwards compatible with path_or_uri"""
return self._resource
def __repr__(self):
_d = dict(k=self.__class__.__name__, e=self.entry_id,
r=self._resource, d=self.dataset_type)
return "<{k} {e} {d} {r} >".format(**_d)
@staticmethod
[docs] def from_d(d):
"""Convert from Service JSON response to `ServiceEntryPoint` instance"""
i = _to_resource_id(d['datasetId'])
return ServiceEntryPoint(to_ascii(d['entryId']), to_ascii(d['fileTypeId']), i)
[docs] def to_d(self):
return dict(entryId=self.entry_id,
fileTypeId=self.dataset_type,
datasetId=self.resource)
class JobEntryPoint(namedtuple("JobEntryPoint", "job_id dataset_uuid dataset_metatype")):
""" Returned from the Services /job/1234/entry-points """
@staticmethod
def from_d(d):
"""Convert from Service JSON response to `JobEntryPoint` instance"""
return JobEntryPoint(d['jobId'], d['datasetUUID'], d['datasetType'])
[docs]class JobStates(object):
"""Allowed SMRT Link Service Job states"""
CREATED = "CREATED"
SUBMITTED = "SUBMITTED"
RUNNING = "RUNNING"
FAILED = "FAILED"
SUCCESSFUL = "SUCCESSFUL"
ALL = (RUNNING, CREATED, FAILED, SUCCESSFUL, SUBMITTED)
# End points
ALL_COMPLETED = (FAILED, SUCCESSFUL)
[docs]class JobTypes(object):
"""SMRT Link Analysis JOb Types"""
IMPORT_DS = "import-dataset"
IMPORT_DSTORE = "import-datastore"
MERGE_DS = "merge-datasets"
PB_PIPE = "pbsmrtpipe"
MOCK_PB_PIPE = "mock-pbsmrtpipe"
CONVERT_FASTA = 'convert-fasta-reference'
@classmethod
[docs] def ALL(cls):
"""ALL allowed SL Analysis Job Types"""
return (cls.IMPORT_DS, cls.IMPORT_DSTORE, cls.MERGE_DS,
cls.PB_PIPE, cls.MOCK_PB_PIPE, cls.CONVERT_FASTA)
class ServiceResourceTypes(object):
REPORTS = "reports"
DATASTORE = "datastore"
ENTRY_POINTS = "entry-points"