Source code for pbcommand.models.tool_contract

"""Common models for Tool Contract and Resolved Tool Contract


Author: Michael Kocher
"""
import abc
from collections import OrderedDict
import types
import datetime

import pbcommand

from .common import TaskTypes, ResourceTypes, REGISTERED_FILE_TYPES

__version__ = pbcommand.get_version()


[docs]class MalformedToolContractError(ValueError): pass
[docs]class MalformedResolvedToolContractError(ValueError): pass
def _validate_type(value, type_or_types): return isinstance(value, type_or_types) def _validate_or_raise(value, type_or_types): if not _validate_type(value, type_or_types): _d = dict(x=value, t=type(value), s=type_or_types) raise TypeError("Unsupported type for {x} {t}. Expected types {s}".format(**_d)) return value def _validate_list_of_or_raise(a_list, t): """Validates a List of items of a specific type""" if not isinstance(a_list, (list, tuple)): raise TypeError("Expected list, got {t}".format(t=type(a_list))) for item in a_list: if not isinstance(item, t): raise TypeError("Expected type {t}, Got {x}".format(t=t, x=type(item))) return a_list def _is_empty_list(alist): return len(alist) == 0 def __validate_ioputs(msg, alist): if _is_empty_list(alist): raise MalformedToolContractError(msg) return True def validate_tool_contract(tc): """:type tc: ToolContract Expand this out. """ __validate_ioputs("Inputs must have at least 1 input.", tc.task.input_file_types) __validate_ioputs("Outputs must have at least 1 output", tc.task.output_file_types) for oft in tc.task.output_file_types: file_type = REGISTERED_FILE_TYPES[oft.file_type_id] if oft.default_name.endswith(file_type.ext): raise ValueError( "File {i} default name already has extension: {n}".format( i=oft.label, n=oft.default_name)) return tc class _IOFileType(object): __metaclass__ = abc.ABCMeta def __init__(self, file_type_id, label, display_name, description): self.file_type_id = file_type_id self.label = label self.display_name = display_name # short description self.description = description def __repr__(self): _d = dict(i=self.label, n=self.display_name, f=self.file_type_id, k=self.__class__.__name__) return "<{k} {f} {i} >".format(**_d) @abc.abstractmethod def to_dict(self): raise NotImplementedError
[docs]class InputFileType(_IOFileType):
[docs] def to_dict(self): return dict(file_type_id=self.file_type_id, id=self.label, title=self.display_name, description=self.description)
[docs]class OutputFileType(_IOFileType): def __init__(self, file_type_id, label, display_name, description, default_name): super(OutputFileType, self).__init__(file_type_id, label, display_name, description) # Default name of the output file. Should be specified as (base, ext) # but "base.ext" is also supported. This should go away self.default_name = default_name
[docs] def to_dict(self): return dict(file_type_id=self.file_type_id, id=self.label, title=self.display_name, description=self.description, default_name=self.default_name)
class ToolContractResolvedResource(object): def __init__(self, resource_type_id, path): assert resource_type_id in ResourceTypes.ALL() self.type_id = resource_type_id self.path = path def __repr__(self): _d = dict(k=self.__class__.__name__, i=self.type_id, p=self.path) return "<{k} {i} path:{p} >".format(**_d) @staticmethod def from_d(d): return ToolContractResolvedResource(d['resource_type'], d['path']) def to_dict(self): return dict(resource_type=self.type_id, path=self.path)
[docs]class ToolDriver(object): def __init__(self, driver_exe, env=None, serialization='json'): """ :param driver_exe: Path to the driver :param env: path to env to be sourced before it's run? :return: """ self.driver_exe = driver_exe self.env = {} if env is None else env # 'avro' or 'json' self.serialization = serialization def __repr__(self): _d = dict(k=self.__class__.__name__, e=self.driver_exe) return "<{k} driver:{e} >".format(**_d)
[docs] def to_dict(self): return dict(exe=self.driver_exe, env=self.env, serialization=self.serialization)
[docs]class ToolContractTask(object): TASK_TYPE_ID = TaskTypes.STANDARD def __init__(self, task_id, name, description, version, is_distributed, input_types, output_types, options, nproc, resources): """ Core metadata for a commandline task :param task_id: Global id to reference your tool in a pipeline :type task_id: str :param name: Display name of your :param description: Short description of your tool :param version: semantic style version string :param is_distributed: If the task will be run locally or not :param is_distributed: bool :param input_types: list[FileType] :param output_types: :param options: list of PacBioOption instances :param nproc: :param resources: :type tool_options: list[PacBioOption] """ self.task_id = task_id self.name = name self.description = description self.version = version self.is_distributed = is_distributed self.input_file_types = input_types self.output_file_types = output_types # This needs to be list self.options = _validate_or_raise(options, list) self.nproc = nproc # List of ResourceTypes self.resources = resources def __repr__(self): _d = dict(k=self.__class__.__name__, i=self.task_id, t=self.is_distributed, n=self.name) return "<{k} id:{i} {n} >".format(**_d)
[docs] def to_dict(self): opts = [x.to_dict() for x in self.options] # for debugging, but creates too much chatter for production # now = " " + str(datetime.datetime.now()) now = "" comment = "Created by pbcommand {v}".format(v=__version__, n=now) + str(now) _t = dict(tool_contract_id=self.task_id, input_types=[i.to_dict() for i in self.input_file_types], output_types=[i.to_dict() for i in self.output_file_types], task_type=self.TASK_TYPE_ID, is_distributed=self.is_distributed, name=self.name, description=self.description, schema_options=opts, nproc=self.nproc, resource_types=self.resources, _comment=comment) return _t
[docs]class ScatterToolContractTask(ToolContractTask): TASK_TYPE_ID = TaskTypes.SCATTERED def __init__(self, task_id, name, description, version, is_distributed, input_types, output_types, tool_options, nproc, resources, chunk_keys, max_nchunks): """Scatter tasks have a special output signature of [FileTypes.CHUNK] The chunk keys are the expected to be written to the chunk.json file """ super(ScatterToolContractTask, self).__init__(task_id, name, description, version, is_distributed, input_types, output_types, tool_options, nproc, resources) self.chunk_keys = chunk_keys # int or $max_chunks symbol self.max_nchunks = max_nchunks
[docs] def to_dict(self): s = super(ScatterToolContractTask, self).to_dict() s['chunk_keys'] = self.chunk_keys s['nchunks'] = self.max_nchunks return s
[docs]class GatherToolContractTask(ToolContractTask): """Gather tasks have special input type [FileTypes.CHUNK]""" TASK_TYPE_ID = TaskTypes.GATHERED
# not completely sure how to handle chunk-keys.
[docs]class ToolContract(object): # Calling to_dict will always generate a compliant version with this # spec WRITER_SCHEMA_VERSION = "2.0.0" def __init__(self, task, driver, schema_version=WRITER_SCHEMA_VERSION): """ :type task: ToolContractTask | ScatterToolContractTask | GatherToolContractTask :type driver: ToolDriver :param task: :param driver: :return: """ self.task = task self.driver = driver self.schema_version = schema_version def __repr__(self): _d = dict(k=self.__class__.__name__, i=self.task.task_id, t=self.task.is_distributed) return "<{k} id:{i} >".format(**_d)
[docs] def to_dict(self): validate_tool_contract(self) _t = self.task.to_dict() _d = dict(version=self.task.version, tool_contract_id=self.task.task_id, driver=self.driver.to_dict(), tool_contract=_t, schema_version=self.WRITER_SCHEMA_VERSION) return _d
def _get_resource_by_type(rt, resources): xs = [] for r in resources: if r.type_id == rt: xs.append(r) return xs
[docs]class ResolvedToolContractTask(object): # The interface is the same, but the types are "resolved" and have a # different # structure TASK_TYPE_ID = TaskTypes.STANDARD def __init__(self, task_id, is_distributed, input_files, output_files, options, nproc, resources, log_level="INFO"): self.task_id = task_id self.is_distributed = is_distributed self.input_files = input_files self.output_files = output_files self.options = options self.nproc = nproc self.resources = resources self.log_level = log_level @property def tmpdir_resources(self): return _get_resource_by_type(ResourceTypes.TMP_DIR, self.resources) @property def tmpfile_resources(self): return _get_resource_by_type(ResourceTypes.TMP_FILE, self.resources) def __repr__(self): _d = dict(k=self.__class__.__name__, i=self.task_id, t=self.is_distributed) return "<{k} id:{i} >".format(**_d)
[docs] def to_dict(self): comment = "Created by pbcommand v{v}".format(v=pbcommand.get_version()) tc = dict(input_files=self.input_files, output_files=self.output_files, task_type=self.TASK_TYPE_ID, is_distributed=self.is_distributed, tool_contract_id=self.task_id, nproc=self.nproc, resources=[r.to_dict() for r in self.resources], options=self.options, _comment=comment, log_level=self.log_level) return tc
[docs]class ResolvedScatteredToolContractTask(ResolvedToolContractTask): TASK_TYPE_ID = TaskTypes.SCATTERED def __init__(self, task_id, is_distributed, input_files, output_files, options, nproc, resources, max_nchunks, chunk_keys, log_level="INFO"): super(ResolvedScatteredToolContractTask, self).__init__(task_id, is_distributed, input_files, output_files, options, nproc, resources, log_level) self.max_nchunks = max_nchunks # these can be used to verified the output chunk.json # after the task has been run self.chunk_keys = chunk_keys
[docs] def to_dict(self): d = super(ResolvedScatteredToolContractTask, self).to_dict() d['max_nchunks'] = self.max_nchunks d['chunk_keys'] = self.chunk_keys return d
[docs]class ResolvedGatherToolContractTask(ResolvedToolContractTask): TASK_TYPE_ID = TaskTypes.GATHERED def __init__(self, task_id, is_distributed, input_files, output_files, options, nproc, resources, chunk_key, log_level="INFO"): """ The chunk key is used in the pluck specific chunk values from PipelineChunks. This makes gather tasks (i.e., GffGather) generalized. """ super(ResolvedGatherToolContractTask, self).__init__(task_id, is_distributed, input_files, output_files, options, nproc, resources, log_level) self.chunk_key = chunk_key
[docs] def to_dict(self): d = super(ResolvedGatherToolContractTask, self).to_dict() d['chunk_key'] = self.chunk_key return d
[docs]class ResolvedToolContract(object): def __init__(self, task, driver): """ :type task: ResolvedToolContractTask | ResolvedScatteredToolContractTask | ResolvedGatherToolContractTask :type driver: ToolDriver :param task: :param driver: :return: """ self.task = task self.driver = driver def __repr__(self): _d = dict(k=self.__class__.__name__, i=self.task.task_id, t=self.task.is_distributed) return "<{k} id:{i} >".format(**_d)
[docs] def to_dict(self): return dict(resolved_tool_contract=self.task.to_dict(), driver=self.driver.to_dict())
class PipelinePreset(object): def __init__(self, options, task_options, pipeline_id, preset_id, name, description): self.options = options self.task_options = task_options self.pipeline_id = pipeline_id self.preset_id = preset_id self.name = name self.description = description def __repr__(self): _d = dict(k=self.__class__.__name__) # self.to_dict() return "<{k} >".format(**_d) def to_dict(self): return OrderedDict([ ("pipelineId", self.pipeline_id), ("presetId", self.preset_id), ("name", self.name), ("description", self.description), ("options", dict(self.options)), ("taskOptions", dict(self.task_options))])