"""IO Layer for creating models from files"""
import json
import logging
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
import pbcommand
from pbcommand.schemas import RTC_SCHEMA, TC_SCHEMA, validate_presets
from pbcommand.models import (TaskTypes,
GatherToolContractTask,
ScatterToolContractTask,
MalformedToolContractError,
MalformedResolvedToolContractError,
validate_tool_contract)
from pbcommand.pb_io.common import pacbio_option_from_dict
from pbcommand.models.tool_contract import (ToolDriver,
ToolContractTask,
ToolContract,
ResolvedToolContractTask,
ResolvedToolContract,
InputFileType,
OutputFileType,
ResolvedScatteredToolContractTask,
ResolvedGatherToolContractTask,
ToolContractResolvedResource,
PipelinePreset)
log = logging.getLogger(__name__)
__all__ = ['load_resolved_tool_contract_from',
'load_tool_contract_from',
'load_pipeline_presets_from',
'write_tool_contract',
'write_resolved_tool_contract']
class Constants(object):
TOOL_ID = "tool_contract_id"
TOOL = "tool_contract"
TOOL_TYPE = "task_type"
IS_DIST = 'is_distributed'
# Serialization Format
SERIALIZATION = 'serialization'
# Scatter TC, mirrors the nproc key in the JSON
NCHUNKS = "nchunks"
RTOOL = "resolved_tool_contract"
# Used in Scattering/Chunking tasks to
# produce chunks with specific $chunk_keys
CHUNK_KEYS = "chunk_keys"
MAX_NCHUNKS = 'max_nchunks'
# Used in Gather Tasks
GATHER_CHUNK_KEY = 'chunk_key'
def load_or_raise(ex_type):
def loader_wrap(func):
def _wrapper(path):
msg = "Failed to load {p}".format(p=path)
try:
return func(path)
except Exception as e:
msg = msg + " {e} {m}".format(m=e.message, e=e)
log.error(msg, exc_info=True)
raise ex_type(msg)
return _wrapper
return loader_wrap
def __driver_from_d(d):
driver_exe = d['driver']['exe']
driver_env = d['driver'].get('env', {})
serialization = d['driver'].get(Constants.SERIALIZATION, 'json')
return ToolDriver(driver_exe, env=driver_env, serialization=serialization)
def __core_resolved_tool_contract_task_from_d(d):
def _to_a(x):
return x.encode('ascii', 'ignore')
def _get(attr_name):
return d[Constants.RTOOL][attr_name]
def _get_or(attr_name, default_value):
return d[Constants.RTOOL].get(attr_name, default_value)
def _get_ascii(x_):
return _to_a(_get(x_))
tool_contract_id = _get_ascii(Constants.TOOL_ID)
tool_type = _get_ascii(Constants.TOOL_TYPE)
is_distributed = _get(Constants.IS_DIST)
# list of strings
input_files = [_to_a(x) for x in _get("input_files")]
# list of strings
output_files = [_to_a(x) for x in _get("output_files")]
tool_options = _get("options")
# int
nproc = _get("nproc")
# allow for backward compatibility
log_level = _get_or("log_level", "INFO")
resource_types = [ToolContractResolvedResource.from_d(dx) for dx in _get("resources")]
return tool_contract_id, is_distributed, input_files, output_files, tool_options, nproc, resource_types, log_level
def __to_rtc_from_d(d):
def _wrapper(task):
driver = __driver_from_d(d)
rtc = ResolvedToolContract(task, driver)
return rtc
return _wrapper
def _standard_resolved_tool_contract_from_d(d):
"""Load a 'Standard' CLI task type"""
tool_contract_id, is_distributed, input_files, output_files, tool_options, nproc, resource_types, log_level = __core_resolved_tool_contract_task_from_d(d)
task = ResolvedToolContractTask(tool_contract_id, is_distributed,
input_files, output_files,
tool_options, nproc, resource_types,
log_level)
return __to_rtc_from_d(d)(task)
def _scatter_resolved_tool_contract_from_d(d):
"""Load a Gathered Tool Contract """
tool_contract_id, is_distributed, input_files, output_files, tool_options, nproc, resource_types, log_level = __core_resolved_tool_contract_task_from_d(d)
max_nchunks = d[Constants.RTOOL][Constants.MAX_NCHUNKS]
chunk_keys = d[Constants.RTOOL][Constants.CHUNK_KEYS]
task = ResolvedScatteredToolContractTask(tool_contract_id, is_distributed, input_files, output_files, tool_options, nproc, resource_types, max_nchunks, chunk_keys, log_level=log_level)
return __to_rtc_from_d(d)(task)
def _gather_resolved_tool_contract_from_d(d):
tool_contract_id, is_distributed, input_files, output_files, tool_options, nproc, resource_types, log_level = __core_resolved_tool_contract_task_from_d(d)
chunk_key = d[Constants.RTOOL][Constants.GATHER_CHUNK_KEY]
task = ResolvedGatherToolContractTask(tool_contract_id, is_distributed,
input_files, output_files,
tool_options, nproc, resource_types,
chunk_key, log_level=log_level)
return __to_rtc_from_d(d)(task)
def resolved_tool_contract_from_d(d):
"""Convert a dict to Resolved Tool Contract"""
def _to_a(x):
return x.encode('ascii', 'ignore')
def _get(attr_name):
return d[Constants.RTOOL][attr_name]
def _get_ascii(x_):
return _to_a(_get(x_))
tool_type = _get_ascii(Constants.TOOL_TYPE)
dispatch_funcs = {TaskTypes.STANDARD: _standard_resolved_tool_contract_from_d,
TaskTypes.GATHERED: _gather_resolved_tool_contract_from_d,
TaskTypes.SCATTERED: _scatter_resolved_tool_contract_from_d}
if tool_type in dispatch_funcs:
return dispatch_funcs[tool_type](d)
else:
raise ValueError("Unsupported task type '{x}' Supported task types {t}".format(x=tool_type, t=dispatch_funcs.keys()))
def json_path_or_d(value):
if isinstance(value, dict):
return value
elif isinstance(value, basestring):
with open(value, 'r') as f:
d = json.loads(f.read())
return d
else:
raise ValueError("Unsupported value. Expected dict, or string")
def _json_path_or_d(func):
def _wrapper(value):
return func(json_path_or_d(value))
return _wrapper
@load_or_raise(MalformedResolvedToolContractError)
@_json_path_or_d
def load_resolved_tool_contract_from(path_or_d):
return resolved_tool_contract_from_d(path_or_d)
@_json_path_or_d
def __core_tool_contract_task_from(d):
if Constants.TOOL not in d:
raise MalformedResolvedToolContractError("Unable to find root key {k}. Keys {a}".format(k=Constants.TOOL, a=d.keys()))
def _to_a(x_):
return x_.encode('ascii', 'ignore')
def _get(x_):
# Get a Subkey within
if x_ not in d[Constants.TOOL]:
raise MalformedToolContractError("Unable to find subkey '{x}' within key '{i}'".format(x=x_, i=Constants.TOOL))
return d[Constants.TOOL][x_]
def _get_or(x_, default):
return d[Constants.TOOL].get(x_, default)
def _get_ascii(x_):
return _to_a(_get(x_))
def _get_ascii_or(x_, default):
return _to_a(_get_or(x_, default))
def _to_in_ft(fd):
fx = lambda s: _to_a(fd[s])
return InputFileType(fx("file_type_id"), fx("id"), fx("title"), fx("description"))
def _to_out_ft(fd):
fx = lambda s: _to_a(fd[s])
return OutputFileType(fx("file_type_id"), fx("id"), fx("title"), fx("description"), fx("default_name"))
task_id = _to_a(d[Constants.TOOL_ID])
display_name = _get_ascii("name")
version = _to_a(d["version"])
default_desc = "PacBio Tool {n}".format(n=display_name)
description = _get_ascii_or("description", default_desc)
is_distributed = _get(Constants.IS_DIST)
input_types = [_to_in_ft(x) for x in _get("input_types")]
output_types = [_to_out_ft(x) for x in _get("output_types")]
tool_options = [pacbio_option_from_dict(opt_d) for opt_d in _get("schema_options")]
nproc = _get("nproc")
resource_types = _get("resource_types")
return task_id, display_name, description, version, is_distributed, input_types, output_types, tool_options, nproc, resource_types
def __to_tc_from_d(d):
def _wrapper(task):
driver = __driver_from_d(d)
schema_version = d.get("schema_version", "UNKNOWN")
tc = ToolContract(task, driver, schema_version)
return tc
return _wrapper
@_json_path_or_d
def _standard_tool_contract_from(path_or_d):
task_id, display_name, description, version, is_distributed, input_types, output_types, tool_options, nproc, resource_types = __core_tool_contract_task_from(path_or_d)
task = ToolContractTask(task_id, display_name, description, version,
is_distributed,
input_types,
output_types,
tool_options, nproc, resource_types)
return __to_tc_from_d(path_or_d)(task)
@_json_path_or_d
def _scattered_tool_contract_from(path_or_d):
task_id, display_name, description, version, is_distributed, input_types, output_types, tool_options, nproc, resource_types = __core_tool_contract_task_from(path_or_d)
chunk_keys = path_or_d[Constants.TOOL][Constants.CHUNK_KEYS]
# int, or SymbolTypes.MAX_NCHUNKS
nchunks = path_or_d[Constants.TOOL][Constants.NCHUNKS]
task = ScatterToolContractTask(task_id, display_name, description, version,
is_distributed,
input_types,
output_types,
tool_options, nproc, resource_types, chunk_keys, nchunks)
return __to_tc_from_d(path_or_d)(task)
@_json_path_or_d
def _gather_tool_contract_from(path_or_d):
task_id, display_name, description, version, is_distributed, input_types, output_types, tool_options, nproc, resource_types = __core_tool_contract_task_from(path_or_d)
task = GatherToolContractTask(task_id, display_name, description, version,
is_distributed,
input_types,
output_types,
tool_options, nproc, resource_types)
return __to_tc_from_d(path_or_d)(task)
@_json_path_or_d
def tool_contract_from_d(d):
"""Load tool contract from dict"""
if Constants.TOOL not in d:
raise KeyError("Tool Contract must have {k}".format(k=Constants.TOOL))
if Constants.TOOL_ID not in d[Constants.TOOL]:
raise KeyError("Tool Contract must have {k}.{v}".format(k=Constants.TOOL, v=Constants.TOOL_ID))
task_type = d[Constants.TOOL][Constants.TOOL_TYPE]
dispatch_funcs = {TaskTypes.SCATTERED: _scattered_tool_contract_from,
TaskTypes.GATHERED: _gather_tool_contract_from,
TaskTypes.STANDARD: _standard_tool_contract_from}
if task_type in dispatch_funcs:
tc = dispatch_funcs[task_type](d)
return validate_tool_contract(tc)
else:
raise ValueError("Unsupported task type {x}".format(x=task_type))
@load_or_raise(MalformedToolContractError)
@_json_path_or_d
def load_tool_contract_from(path_or_d):
return tool_contract_from_d(path_or_d)
# XXX this could probably be more robust
@_json_path_or_d
def load_pipeline_presets_from(d):
"""
Load pipeline presets from dictionary. This expects a schema where the
options are arrays of type (id,value,optionTypeId), but it will also accept
a shorthand where the options are dictionaries.
"""
validate_presets(d)
options = d['options']
if isinstance(options, list):
options = {o['id']: o['value'] for o in options}
taskOptions = d['taskOptions']
if isinstance(taskOptions, list):
taskOptions = {o['id']: o['value'] for o in taskOptions}
presets = PipelinePreset(
options=options,
task_options=taskOptions,
pipeline_id=d['pipelineId'],
preset_id=d['presetId'],
name=d.get('name', None),
description=d.get('description', None))
return presets
def _write_json(s, output_file):
with open(output_file, 'w') as f:
f.write(json.dumps(s, indent=4, sort_keys=True, separators=(',', ': ')))
return s
def _write_records_to_avro(schema, _d_or_ds, output_file):
# FIXME. There's only one record being written here,
# why does this not support a single item
if isinstance(_d_or_ds, dict):
_d_or_ds = [_d_or_ds]
with open(output_file, 'w') as outs:
with DataFileWriter(outs, DatumWriter(), schema) as writer:
for record in _d_or_ds:
writer.append(record)
log.debug("Write avro file to {p}".format(p=output_file))
return _d_or_ds
def write_tool_contract_avro(tc, avro_output):
return _write_records_to_avro(TC_SCHEMA, tc.to_dict(), avro_output)
def write_resolved_tool_contract_avro(rtc, avro_output):
return _write_records_to_avro(RTC_SCHEMA, rtc.to_dict(), avro_output)