Advanced Task/ToolContract Types

To enable pipeline scaling, “Chunking” of files two new Tool Contract types extend the base Tool Contract data model.

Scattering/Chunking Tool Contract

Tasks/ToolContract that take a any file type(s) and emit a single scatter.chunk.json file.

At a high level, the Scatter Tool Contract data model extends the core Tool Contract model and adds two fields, chunk_keys and nchunks.

  • chunk_keys is the expected key(s) that will be written to the PipelineChunk data model (defined below)
  • nchunks mirrors the nproc model of using a symbol $max_nchunks or an int to define the absolute upper bound on the number of chunks that should be created. If this value is exceeded, the pipeline engine will immediately fail the execution.

Example Tool Contract

{
    "version": "0.1.0",
    "driver": {
        "serialization": "json",
        "exe": "python -m pbcommand.cli.examples.dev_scatter_fasta_app --resolved-tool-contract ",
        "env": {}
    },
    "schema_version": "2.0.0",
    "tool_contract": {
        "task_type": "pbsmrtpipe.task_types.scattered",
        "resource_types": [],
        "description": "Scatter a single fasta file to create chunk.json file",
        "schema_options": [
            {
                "optionTypeId": "integer",
                "default": 10,
                "id": "pbcommand.task_options.dev_scatter_fa_nchunks",
                "name": "Number of chunks",
                "description": "Suggested number of chunks. May be overridden by $max_nchunks"
            }
        ],
        "output_types": [
            {
                "title": "Chunk JSON",
                "description": "Scattered/Chunked Fasta Chunk.json",
                "default_name": "fasta.chunks",
                "id": "cjson",
                "file_type_id": "PacBio.FileTypes.CHUNK"
            }
        ],
        "_comment": "Created by pbcommand 0.5.2",
        "nchunks": "$max_nchunks",
        "name": "Fasta Scatter",
        "input_types": [
            {
                "description": "Fasta file to scatter",
                "title": "Fasta In",
                "id": "fasta_in",
                "file_type_id": "PacBio.FileTypes.Fasta"
            }
        ],
        "chunk_keys": [
            "$chunk.fasta_id"
        ],
        "nproc": 1,
        "is_distributed": false,
        "tool_contract_id": "pbcommand.tasks.dev_scatter_fasta"
    },
    "tool_contract_id": "pbcommand.tasks.dev_scatter_fasta"
}

PipelineChunk Data Model

The PipelineChunk data model is defined in pbcommand.models and the companion IO layers (load_pipeline_chunks_from_json and write_pipeline_chunks are in pbcommand.pb_io.

Each input file must be mapped to a chunk_key that can then be mapped to the input of the original unchunked task.

For example, if there’s a single input file (e.g., FileTypes.FASTA), then the Scatter ToolContract should define a chunk_key of “fasta_id”. chunk_key`(s) that do NOT start with `$chunk. will considered to be extra metadata that will be passed through. This is useful for adding chunk specific metadata, such as the number of contigs or average contig length.

Minimal example of reading and writing PipelineChunk(s) data model.

In [1]: from pbcommand.models import PipelineChunk

In [2]: c0 = PipelineChunk("scattered-fasta_0", **{"$chunk.fasta_id":"/path/to/chunk-0.fasta"})

In [3]: c1 = PipelineChunk("scattered-fasta_1", **{"$chunk.fasta_id":"/path/to/chunk-1.fasta"})

In [4]: chunks = [c0, c1]

In [5]: from pbcommand.pb_io import write_pipeline_chunks

In [6]: write_pipeline_chunks(chunks, "test-scatter.chunk.json", "Test comment")

In [7]: from pbcommand.pb_io import load_pipeline_chunks_from_json

In [8]: load_pipeline_chunks_from_json("test-scatter.chunk.json")
Out[8]: 
[<PipelineChunk id='scattered-fasta_0' chunk keys=$chunk.fasta_id >,
 <PipelineChunk id='scattered-fasta_1' chunk keys=$chunk.fasta_id >]

Defining a Scatter Tool Contract

Currently, python is the only language that is supported for writing CHUNK JSON files.

The python Scatter tool contract API follows similar to base Tool Contract API,

Simple example of Scattering/Chunking a single Fasta file. The notable points are adding the required chunk_keys and nchunks to the scattering specific pbparser.

"""Example of Generating a Chunk.json file that 'scatters' a pair of fasta files"""
import os
import logging
import sys
import warnings
import math
import datetime

from pbcommand.cli import pbparser_runner
from pbcommand.models import get_scatter_pbparser, FileTypes, PipelineChunk
from pbcommand.pb_io import write_pipeline_chunks
from pbcommand.utils import setup_log

log = logging.getLogger(__name__)

TOOL_ID = "pbcommand.tasks.dev_scatter_fasta"
__version__ = '0.1.0'


try:
    from pbcore.io import FastaWriter, FastaReader
except ImportError:
    warnings.warn("Example apps require pbcore. Install from https://github.com/PacificBiosciences/pbcore")


class Constants(object):
    NCHUNKS_OPT = "pbcommand.task_options.dev_scatter_fa_nchunks"
    FA_CHUNK_KEY = "$chunk.fasta_id"


def __get_nrecords_from_reader(reader):
    n = 0
    for _ in reader:
        n += 1
    return n


def write_fasta_records(fastax_writer_klass, records, file_name):

    n = 0
    with fastax_writer_klass(file_name) as w:
        for record in records:
            w.writeRecord(record)
            n += 1

    log.debug("Completed writing {n} fasta records".format(n=n))


def __to_chunked_fastx_files(fastx_reader_klass, fastax_writer_klass, chunk_key, fastx_path, max_total_nchunks, dir_name, base_name, ext):
    """Convert a Fasta/Fasta file to a chunked list of files"""

    # grab the number of records so we can chunk it
    with fastx_reader_klass(fastx_path) as f:
        nrecords = __get_nrecords_from_reader(f)

    max_total_nchunks = min(nrecords, max_total_nchunks)

    n = int(math.ceil(float(nrecords)) / max_total_nchunks)

    nchunks = 0
    with fastx_reader_klass(fastx_path) as r:
        it = iter(r)
        for i in xrange(max_total_nchunks):
            records = []

            chunk_id = "_".join([base_name, str(nchunks)])
            chunk_name = ".".join([chunk_id, ext])
            nchunks += 1
            fasta_chunk_path = os.path.join(dir_name, chunk_name)

            if i != max_total_nchunks:
                for _ in xrange(n):
                    records.append(next(it))
            else:
                for x in it:
                    records.append(x)

            write_fasta_records(fastax_writer_klass, records, fasta_chunk_path)
            total_bases = sum(len(r.sequence) for r in records)
            d = dict(total_bases=total_bases, nrecords=len(records))
            d[chunk_key] = os.path.abspath(fasta_chunk_path)
            c = PipelineChunk(chunk_id, **d)
            yield c


def to_chunked_fasta_files(fasta_path, max_total_nchunks, dir_name, chunk_key, base_name, ext):
    return __to_chunked_fastx_files(FastaReader, FastaWriter, chunk_key, fasta_path, max_total_nchunks, dir_name, base_name, ext)


def write_chunks_to_json(chunks, chunk_file):
    log.debug("Wrote {n} chunks to {f}.".format(n=len(chunks), f=chunk_file))
    write_pipeline_chunks(chunks, chunk_file, "Chunks written at {d}".format(d=datetime.datetime.now()))
    return 0


def _write_fasta_chunks_to_file(to_chunk_fastx_file_func, chunk_file, fastx_path, max_total_chunks, dir_name, chunk_key, chunk_base_name, chunk_ext):
    chunks = list(to_chunk_fastx_file_func(fastx_path, max_total_chunks, dir_name, chunk_key, chunk_base_name, chunk_ext))
    write_chunks_to_json(chunks, chunk_file)
    return 0


def write_fasta_chunks_to_file(chunk_file, fasta_path, max_total_chunks, dir_name, chunk_key, chunk_base_name, chunk_ext):
    return _write_fasta_chunks_to_file(to_chunked_fasta_files, chunk_file, fasta_path, max_total_chunks, dir_name, chunk_key, chunk_base_name, chunk_ext)


def run_main(fasta_file, chunk_output_json, chunk_key, max_nchunks, nchunks=None, chunk_base_name="fasta"):
    """Create a Chunk.json file with nchunks <= max_nchunks

    Not clear on the nchunks vs max_nchunks.
    """
    output_dir = os.path.dirname(chunk_output_json)
    return write_fasta_chunks_to_file(chunk_output_json, fasta_file, max_nchunks, output_dir, chunk_key, chunk_base_name, "fasta")


def get_parser():

    driver = "python -m pbcommand.cli.examples.dev_scatter_fasta_app --resolved-tool-contract "
    desc = "Scatter a single fasta file to create chunk.json file"
    # chunk keys that **will** be written to the file
    chunk_keys = (Constants.FA_CHUNK_KEY, )
    p = get_scatter_pbparser(TOOL_ID, __version__, "Fasta Scatter",
                             desc, driver, chunk_keys, is_distributed=False)
    p.add_input_file_type(FileTypes.FASTA, "fasta_in", "Fasta In", "Fasta file to scatter")
    p.add_output_file_type(FileTypes.CHUNK, "cjson", "Chunk JSON", "Scattered/Chunked Fasta Chunk.json", "fasta.chunks")
    p.add_int("pbcommand.task_options.dev_scatter_fa_nchunks", "nchunks", 10, "Number of chunks",
              "Suggested number of chunks. May be overridden by $max_nchunks")
    return p


def args_runner(args):
    return run_main(args.fasta_in, args.cjson, Constants.FA_CHUNK_KEY, args.nchunks)


def rtc_runner(rtc):
    return run_main(rtc.task.input_files[0],
                    rtc.task.output_files[0],
                    Constants.FA_CHUNK_KEY,
                    rtc.task.options[Constants.NCHUNKS_OPT])


def main(argv=sys.argv):
    return pbparser_runner(argv[1:],
                           get_parser(),
                           args_runner,
                           rtc_runner,
                           log,
                           setup_log)


if __name__ == '__main__':
    sys.exit(main())

Advanced Scattering/Chunking Patterns

For more advanced scattering/chunks usecases, such as chunking multiple input files (e.g., task input signature SubreadSet and ReferenceSet XML), this will require writing a chunk key for each input. Specifically, $chunk.subreadset_id and $chunk.referenceset_id to the PipelineChunk.

This enables the chunking pattern of a specific task to be completely decoupled from the workflow level. The chunking pattern is communicated in the chunk(s) in PipelineChunk defined by the chunking task. In this specific chunking pattern, the SubreadSet is chunked into N files, while the ReferenceSet is passed unchunked.

These chunk keys combined with the chunk operator (defined in pbsmrtpipe) communicates to the workflow engine how to pass $chunk.subreadset_id to the first input of N-chunked instances of unchunked task. Similarly, the $chunk.referenceset_id to the second input of the N-chunked task instance.

See the pbsmrtpipe docs and the testkit-data jobs in pbsmrtpipe for more details.

Gather ToolContract

A Gather Tool Contract takes a single CHUNK Json file type as input and emits a single output file of any type.

Example:

{
    "version": "0.1.0",
    "driver": {
        "serialization": "json",
        "exe": "python -m pbcommand.cli.examples.dev_gather_fasta_app --resolved-tool-contract ",
        "env": {}
    },
    "schema_version": "2.0.0",
    "tool_contract": {
        "task_type": "pbsmrtpipe.task_types.gathered",
        "resource_types": [],
        "description": "Gather a fasta resources in a Chunk.json file",
        "schema_options": [],
        "output_types": [
            {
                "title": "Chunk JSON",
                "description": "Output Fasta",
                "default_name": "gathered",
                "id": "output",
                "file_type_id": "PacBio.FileTypes.Fasta"
            }
        ],
        "_comment": "Created by pbcommand 0.5.2",
        "name": "Fasta Chunk Gather",
        "input_types": [
            {
                "description": "Chunked Fasta JSON Out",
                "title": "Chunk JSON",
                "id": "chunk_json",
                "file_type_id": "PacBio.FileTypes.CHUNK"
            }
        ],
        "nproc": 1,
        "is_distributed": false,
        "tool_contract_id": "pbcommand.tasks.dev_gather_fasta"
    },
    "tool_contract_id": "pbcommand.tasks.dev_gather_fasta"
}

The Gather task doesn’t extend the base ToolContract and add new properties. However, it will restrict the the input type to FileTypes.CHUNK and the output type signature must only be one file type.

Example Gather Tool:

"""Example of Gather TC to gather several $chunk.fasta_id in chunk.json file.

"""
import logging
import sys
import warnings

import functools

from pbcommand.cli import pbparser_runner
from pbcommand.models import get_gather_pbparser, FileTypes
from pbcommand.pb_io import load_pipeline_chunks_from_json
from pbcommand.utils import setup_log

from .dev_scatter_fasta_app import Constants

log = logging.getLogger(__name__)

TOOL_ID = "pbcommand.tasks.dev_gather_fasta"
__version__ = '0.1.0'


try:
    from pbcore.io import FastaWriter, FastaReader
except ImportError:
    warnings.warn("Example apps require pbcore. Install from https://github.com/PacificBiosciences/pbcore")


def __gather_fastx(fastx_reader, fastx_writer, fastx_files, output_file):
    # this will work for any Pbcore Reader, Writer classes
    n = 0
    with fastx_writer(output_file) as writer:
        for fastx_file in fastx_files:
            with fastx_reader(fastx_file) as reader:
                for record in reader:
                    n += 1
                    writer.writeRecord(record)

    log.info("Completed gathering {n} files (with {x} records) to {f}".format(n=len(fastx_files), f=output_file, x=n))
    return 0


gather_fasta = functools.partial(__gather_fastx, FastaReader, FastaWriter)


def _get_datum_from_chunks_by_chunk_key(chunks, chunk_key):
    datum = []
    for chunk in chunks:
        if chunk_key in chunk.chunk_keys:
            value = chunk.chunk_d[chunk_key]
            datum.append(value)
        else:
            raise KeyError("Unable to find chunk key '{i}' in {p}".format(i=chunk_key, p=chunk))

    return datum


def __args_gather_runner(func, chunk_json, output_file, chunk_key):
    chunks = load_pipeline_chunks_from_json(chunk_json)

    # Allow looseness
    if not chunk_key.startswith('$chunk.'):
        chunk_key = '$chunk.' + chunk_key
        log.warn("Prepending chunk key with '$chunk.' to '{c}'".format(c=chunk_key))

    fastx_files = _get_datum_from_chunks_by_chunk_key(chunks, chunk_key)
    _ = func(fastx_files, output_file)
    return 0


def run_main(chunked_json, output_fasta, chunk_key):
    """Create a Chunk.json file with nchunks <= max_nchunks

    Not clear on the nchunks vs max_nchunks.
    """
    return __args_gather_runner(gather_fasta, chunked_json, output_fasta, chunk_key)


def get_parser():

    driver = "python -m pbcommand.cli.examples.dev_gather_fasta_app --resolved-tool-contract "
    desc = "Gather a fasta resources in a Chunk.json file"
    p = get_gather_pbparser(TOOL_ID, __version__, "Fasta Chunk Gather",
                            desc, driver, is_distributed=False)
    p.add_input_file_type(FileTypes.CHUNK, "chunk_json", "Chunk JSON", "Chunked Fasta JSON Out")
    p.add_output_file_type(FileTypes.FASTA, "output", "Chunk JSON", "Output Fasta", "gathered")
    return p


def args_runner(args):
    return run_main(args.chunk_json, args.output, Constants.FA_CHUNK_KEY)


def rtc_runner(rtc):
    return run_main(rtc.task.input_files[0],
                    rtc.task.output_files[0],
                    Constants.FA_CHUNK_KEY)


def main(argv=sys.argv):
    return pbparser_runner(argv[1:],
                           get_parser(),
                           args_runner,
                           rtc_runner,
                           log,
                           setup_log)


if __name__ == '__main__':
    sys.exit(main())

For Gather’ing a task that has a multiple N outputs, N gather tasks must be defined.

See the pbsmrtpipe docs for details of constructing a chunked pipeline.

More examples of scatter/chunking and gather tasks are in pbcoretools.