machine learning / bioelectricity / longevity

[Extension: redun] Bioinformatics pipelines from the bottom up

This is the first part of a series of extensions that I will add to my previous post on Bioinformatics pipelines from the bottom up. Learn about the core features of redun by using it to reimplement a toy bioinformatics workflow.

[Extension: redun] Bioinformatics pipelines from the bottom up

This is the first part of a series of extensions that I will add to my previous post on Bioinformatics pipelines from the bottom up. In order for you to be able to follow along, I'd recommend skimming over the other tutorial at least up until the part where we start with Makefiles. Here's what we'll cover in this tutorial:

  • Learn about the core features of redun by using it to reimplement a toy bioinformatics workflow
  • Run redun workflows on AWS Batch
  • Import submodules via pip
  • Emulate Makefile behavior in redun with a custom DSL

Motivation

Simple workflows are a great way to quickly get an in-depth look into the core features and advantages of new tools. The toy workflow we implemented in the first post (and reimplement in this one) consists of the following steps:

  1. Take a set of .fasta protein files
  2. Split each into peptides using a variable number of missed cleavages
  3. Count the number of cysteines in total as well as the number of peptides that contain a cysteine
  4. Generate an output report containing this information in a .tsv file
  5. Create an archive to share with colleagues

In the last post, I covered vanilla bash, Makefiles, and Nextflow as three modes of execution for bioinformatics workflows. Given the size and scale of modern workflows, the two former are rarely a valid option for anyone anymore and Nextflow is just one example of a toolchain that enables developers to run their pipelines at scale in the cloud. There are a lot of others.

For my own work, Python is the main workhorse for all of my data processing and analysis code so naturally, I'm drawn towards something that can natively integrate with it. The most natural integration happens when the toolchain itself is written in Python and I can just annotate my functions with something like a @task operator to be able to chain them into a workflow. A couple of frameworks come to mind here:

In this post, we'll cover redun, a tool written by the data engineering team at Insitro which was open-sourced in November 2021. The Github repo contains the following description:

redun aims to be a more expressive and efficient workflow framework, built on top of the popular Python programming language. It takes the somewhat contrarian view that writing dataflows directly is unnecessarily restrictive, and by doing so we lose abstractions we have come to rely on in most modern high-level languages (control flow, composability, recursion, high order functions, etc). redun's key insight is that workflows can be expressed as lazy expressions, which are then evaluated by a scheduler that performs automatic parallelization, caching, and data provenance logging.

Redun introduces a bunch of interesting features and, in my opinion, it is one of the first workflow tools out there that really nailed it. I highly recommend checking out its very well-written design document as well as reading through the first 4 tutorials.

Setup

We start by cloning the existing GitHub repository and use part_00 as our starting point:

# Fork and clone repository and switch to branch part_00
git clone https://github.com/<your_git_username>/bioinformatics-pipeline-tutorial.git
cd bioinformatics-pipeline-tutorial/
git checkout part_00

The structure of the repository will look like this:

$ tree
.
├── README.md
├── bin
│   ├── 01_digest_protein.py
│   ├── 02_count_amino_acids.py
│   ├── 03a_plot_count.py
│   ├── 03b_get_report.py
│   └── __init__.py
└── fasta
    ├── KLF4.fasta
    ├── MYC.fasta
    ├── PO5F1.fasta
    └── SOX2.fasta

2 directories, 10 files

In order to set up our environment, let's add a requirements.txt file with the following content and run pip install -r requirements.txt (feel free to use virtualenv, conda, or poetry to set up a virtual environment).

redun
plotly
kaleido

First, we create a data/ directory for our workflow output data and touch a workflow.py file which we'll use to write our redun workflow using the existing code from the first tutorial in the bin/ folder as a reference.

mkdir -p data
touch workflow.py

Porting the workflow

A lot of bioinformatics workflows rely on programs that are shipped as binaries and are executed through the command line. Redun supports Script tasks as first-class citizens and we'll initially explore how to use these to write our workflow. The first step in our pipeline is the script in bin/01_digest_protein.py. In order to call it, let's start by adding some code to workflow.py:

"""workflow.py"""
import os

from redun import File, script, task

redun_namespace = "bioinformatics_pipeline_tutorial.script_workflow"


@task()
def digest_protein_task(
    input_fasta: File,
    enzyme_regex: str = "[KR]",
    missed_cleavages: int = 0,
    min_length: int = 4,
    max_length: int = 75,
) -> File:
    protein = input_fasta.basename().split(".")[0]
    output_path = os.path.join(
        os.path.split(input_fasta.dirname())[0], "data", f"{protein}.peptides.txt"
    )
    return script(
        f"""
        bin/01_digest_protein.py \
            {input_fasta.path} \
            {output_path} \
            --enzyme_regex {enzyme_regex} \
            --missed_cleavages {missed_cleavages} \
            --min_length {min_length} \
            --max_length {max_length}
        """,
        outputs=File(output_path),
    )

We can then execute the task like this:

redun run workflow.py digest_protein_task --input-fasta fasta/KLF4.fasta

If successful, you should see the file KLF4.peptides.txt in the data/ directory.

Now, a major reason (at least for me) to use redun is that we can natively define workflows in Python. If you're  interested to see a working example of the complete workflow using script tasks, check out the scripts_workflow.py file in the final redun branch. In the following part, we will follow a different, Python-native approach of defining tasks. Execute these commands to rename workflow.py and touch a new file.

mv workflow.py scripts_workflow.py
touch workflow.py

The first step is to copy over the three functions needed for the digest_protein task: load_fasta(), save_peptides() and digest_protein() (from bin/01_digest_protein.py. Only three small changes were made: we added type annotations as a good practice, added a redun_namespace variable at the top to define the namespace in which we run our workflow, and lastly, we adapted the save_peptides() function to return a redun File object after saving the results to it. For reference, the updated workflow.py file:

"""workflow.py"""
import os
import re
from typing import List, Tuple

from redun import File, task

redun_namespace = "bioinformatics_pipeline_tutorial.workflow"


def load_fasta(input_file: File) -> Tuple[str, str]:
    """
    Load a protein with its metadata from a given .fasta file.
    """
    with input_file.open("r") as fasta_file:
        lines = fasta_file.read().splitlines()
    metadata = lines[0]
    sequence = "".join(lines[1:])
    return metadata, sequence


def save_peptides(filename: str, peptides: List[str]) -> File:
    """
    Write out the list of given peptides to a .txt file. Each line is a different peptide.
    """
    output_file = File(filename)
    with output_file.open("w") as out:
        for peptide in peptides:
            out.write("{}\n".format(peptide))
    return output_file


def digest_protein(
    protein_sequence: str,
    enzyme_regex: str = "[KR]",
    missed_cleavages: int = 0,
    min_length: int = 4,
    max_length: int = 75,
) -> List[str]:
    """
    Digest a protein into peptides using a given enzyme. Defaults to trypsin.
    """
    # Find the cleavage sites
    enzyme_regex = re.compile(enzyme_regex)
    sites = (
        [0]
        + [m.end() for m in enzyme_regex.finditer(protein_sequence)]
        + [len(protein_sequence)]
    )

    peptides = set()

    # Do the digest
    for start_idx, start_site in enumerate(sites):
        for diff_idx in range(1, missed_cleavages + 2):
            end_idx = start_idx + diff_idx
            if end_idx >= len(sites):
                continue
            end_site = sites[end_idx]
            peptide = protein_sequence[start_site:end_site]
            if len(peptide) < min_length or len(peptide) > max_length:
                continue
            peptides.add(peptide)
    return peptides

As you can see we now have what is basically the equivalent to our previous bin/01_digest_protein.py file without the main() function and parameter options (we'll add these later). We can now basically take the main() function from bin/01_digest_protein.py and add the @task operator to it:

"""workflow.py"""

[...]

@task()
def digest_protein_task(input_fasta: File, output_file: File) -> File:
    _, protein_sequence = load_fasta(input_fasta)
    peptides = digest_protein(protein_sequence)
    peptides_file = save_peptides(output_file.path, peptides)
    return peptides_file

In redun, there is no extra @workflow decorator. A workflow gets assembled when a task is called and that task has other tasks that it is dependent on. We will later see what that looks like. The cool thing about this is that it enables us to execute any task by itself (which is not possible with a lot of tools because they only let you execute workflows as a whole). To run the digest_protein_task(), we call:

redun run workflow.py digest_protein_task --input-fasta fasta/KLF4.fasta --output-file data/KLF4.peptides.txt

In order to not have to pass the --output-file path as an extra argument, we make it dependent on the input file by changing the code to the following:

"""workflow.py"""

[...]

@task()
def digest_protein_task(input_fasta: File) -> File:
    _, protein_sequence = load_fasta(input_fasta)
    peptides = digest_protein(protein_sequence)
    protein = input_fasta.basename().split(".")[0]
    output_path = os.path.join(
        os.path.split(input_fasta.dirname())[0], "data", f"{protein}.peptides.txt"
    )
    peptides_file = save_peptides(output_path, peptides)
    return peptides_file

Try rerunning it without the `--output-file` flag:

redun run workflow.py digest_protein_task --input-fasta fasta/KLF4.fasta 

Now, as you might remember, the input we're dealing with is a list of input files, not just a single file. To make our workflow compatible with that, we add a main() task that takes as input a list of files.

"""workflow.py"""
[...]
from redun import File, task
from redun.file import glob_file

[...]

@task()
def main(input_dir: str) -> List[File]:
    input_fastas = [File(f) for f in glob_file(f"{input_dir}/*.fasta")]
    peptide_files = [digest_protein_task(fasta) for fasta in input_fastas]
    return peptide_files

Try running:

redun run workflow.py main --input-dir fasta/

Now, the digest_protein_task() should have been executed for all files in the fasta/ folder. We can ensure this by using redun's logging functionality. The command redun log - shows the execution of the most recent run. Alternatively one can note down the execution ID (see below) when launching a new run and use either the full string or everything up to the first -: redun log 1091e19e-b5b7-412b-bdf0-b703a9f79cd5 or redun log 1091e19e.

$ redun run workflow.py main --input-dir fasta/                                                                
[redun] redun :: version 0.8.7
[redun] config dir: /Users/ricomeinl/Downloads/bioinformatics-pipeline-tutorial/.redun
[redun] Start Execution 1091e19e-b5b7-412b-bdf0-b703a9f79cd5:  redun run workflow.py main --input-dir fasta/
[...]

By running either of the above, we can observe that redun indeed ran five tasks: the main tasks and the digest_protein task for all four files.

$ redun log -
Exec 1091e19e-b5b7-412b-bdf0-b703a9f79cd5 [ DONE ] 2022-05-11 18:17:15:  run workflow.py main --input-dir fasta/ (git_commit=785ccac738c29bb27efa5fe8e950c23018961621, git_origin_url=https://github.com/ricomnl/bioinformatics-pipel..., project=bioinformatics_pipeline_tutorial.workflow, redun.version=0.8.7, user=ricomeinl)
Duration: 0:00:00.15

Jobs: 5 (DONE: 5, CACHED: 0, FAILED: 0)
--------------------------------------------------------------------------------
Job acaf05c6 [ DONE ] 2022-05-11 18:17:15:  bioinformatics_pipeline_tutorial.workflow.main(input_dir='fasta/') 
  Job 9620645d [ DONE ] 2022-05-11 18:17:15:  bioinformatics_pipeline_tutorial.workflow.digest_protein_task(File(path=fasta/SOX2.fasta, hash=621d4a48)) 
  Job efb908dc [ DONE ] 2022-05-11 18:17:15:  bioinformatics_pipeline_tutorial.workflow.digest_protein_task(File(path=fasta/KLF4.fasta, hash=10761e8a)) 
  Job fdb4f1fc [ DONE ] 2022-05-11 18:17:15:  bioinformatics_pipeline_tutorial.workflow.digest_protein_task(File(path=fasta/PO5F1.fasta, hash=341326f2)) 
  Job f2bc3668 [ DONE ] 2022-05-11 18:17:15:  bioinformatics_pipeline_tutorial.workflow.digest_protein_task(File(path=fasta/MYC.fasta, hash=daeb9045)) 

Great stuff! Let's add the next task count_amino_acids().

We start by adding the helper functions from bin/02_count_amino_acids.py to our workflow.py file (with some small changes akin to the ones mentioned above).

"""workflow.py"""

[...]

def load_peptides(input_file: File) -> List[str]:
    """
    Load peptides from a .txt file as a list.
    """
    with input_file.open("r") as peptide_file:
        lines = peptide_file.read().splitlines()
    return lines


def save_counts(filename: str, peptide_counts: List[int]) -> File:
    """
    Write out the peptide counts to a .tsv file using tabs as a separator.
    """
    output_file = File(filename)
    with output_file.open("w") as out:
        out.write("{}\n".format("\t".join([str(c) for c in peptide_counts])))
    return output_file


def num_peptides(peptides: List[str]) -> int:
    """
    Retrieve the number of peptides in a given list.
    """
    return len(peptides)


def num_peptides_with_aa(peptides: List[str], amino_acid: str = "C") -> int:
    """
    Count the number of peptides in a given list that contain a given amino acid. 
    Defaults to cysteine.
    """
    return sum([1 if amino_acid in peptide else 0 for peptide in peptides])


def total_num_aa_in_protein(protein: str) -> int:
    """
    Count the total number of amino acids in a given protein string.
    """
    return len(protein)


def num_aa_in_protein(protein: str, amino_acid: str = "C") -> int:
    """
    Count the number of times a given amino acid occurs in a given protein.
    Defaults to cysteine.
    """
    return protein.count(amino_acid)
    

@task()
def digest_protein_task(input_fasta: File) -> File: ...

[...]

After that, we again port the main() function, this time from bin/02_count_amino_acids.py, to a new function decorated with @task():

"""workflow.py"""

[...]

@task()
def count_amino_acids_task(
    input_fasta: File, input_peptides: File, amino_acid: str = "C"
) -> File:
    """
    Count the number of times a given amino acid appears in a protein as well
    as its peptides after digestion.
    """
    _, protein_sequence = load_fasta(input_fasta)
    peptides = load_peptides(input_peptides)
    n_peptides = num_peptides(peptides)
    n_peptides_with_aa = num_peptides_with_aa(peptides, amino_acid=amino_acid)
    total_aa_in_protein = total_num_aa_in_protein(protein_sequence)
    aa_in_protein = num_aa_in_protein(protein_sequence, amino_acid=amino_acid)
    protein = input_fasta.basename().split(".")[0]
    output_path = os.path.join(
        os.path.split(input_fasta.dirname())[0], "data", f"{protein}.count.tsv"
    )
    aa_count_file = save_counts(
        output_path,
        [
            amino_acid,
            n_peptides,
            n_peptides_with_aa,
            total_aa_in_protein,
            aa_in_protein,
        ],
    )
    return aa_count_file


@task()
def main(input_dir: str) -> List[File]: ...

It's easy to test our task by itself:

redun run workflow.py count_amino_acids_task --input-fasta fasta/KLF4.fasta --input-peptides data/KLF4.peptides.txt

If successful, the task should have created a file called KLF4.count.tsv in the data/ folder. We can now combine the two tasks in our main() function and execute it with:

"""workflow.py"""

[...]

@task()
def main(input_dir: str) -> List[File]:
	input_fastas = [File(f) for f in glob_file(f"{input_dir}/*.fasta")]
    peptide_files = [digest_protein_task(fasta) for fasta in input_fastas]
    aa_count_files = [count_amino_acids_task(fasta, peptides) for (fasta, peptides) in zip(input_fastas, peptide_files)]
    return aa_count_files
redun run workflow.py main --input-dir fasta/

Running redun log - again will show that this time the first four tasks were cached because neither the code of the tasks nor the generated files were changed.

$ redun log -
Exec 8c438a1b-c24d-49c0-9c4c-cdf71e2504a8 [ DONE ] 2022-05-11 20:52:22:  run workflow.py main --input-dir fasta/ (git_commit=785ccac738c29bb27efa5fe8e950c23018961621, git_origin_url=https://github.com/ricomnl/bioinformatics-pipel..., project=bioinformatics_pipeline_tutorial.workflow, redun.version=0.8.7, user=ricomeinl)
Duration: 0:00:00.18

Jobs: 9 (DONE: 5, CACHED: 4, FAILED: 0)
--------------------------------------------------------------------------------
Job 72959d5d [ DONE ] 2022-05-11 20:52:22:  bioinformatics_pipeline_tutorial.workflow.main(input_dir='fasta/') 
  Job 344ede72 [CACHED] 2022-05-11 20:52:22:  bioinformatics_pipeline_tutorial.workflow.digest_protein_task(File(path=fasta/SOX2.fasta, hash=621d4a48)) 
  Job 0e853ce6 [CACHED] 2022-05-11 20:52:22:  bioinformatics_pipeline_tutorial.workflow.digest_protein_task(File(path=fasta/KLF4.fasta, hash=10761e8a)) 
  Job d8a5ea59 [CACHED] 2022-05-11 20:52:22:  bioinformatics_pipeline_tutorial.workflow.digest_protein_task(File(path=fasta/PO5F1.fasta, hash=341326f2)) 
  Job 80151743 [CACHED] 2022-05-11 20:52:22:  bioinformatics_pipeline_tutorial.workflow.digest_protein_task(File(path=fasta/MYC.fasta, hash=daeb9045)) 
  Job 60d89b74 [ DONE ] 2022-05-11 20:52:22:  bioinformatics_pipeline_tutorial.workflow.count_amino_acids_task(File(path=fasta/SOX2.fasta, hash=621d4a48), File(path=data/SOX2.peptides.txt, hash=de981d55), amino_acid='C') 
  Job 1271c054 [ DONE ] 2022-05-11 20:52:22:  bioinformatics_pipeline_tutorial.workflow.count_amino_acids_task(File(path=fasta/KLF4.fasta, hash=10761e8a), File(path=data/KLF4.peptides.txt, hash=365eea97), amino_acid='C') 
  Job 92e3bbe5 [ DONE ] 2022-05-11 20:52:22:  bioinformatics_pipeline_tutorial.workflow.count_amino_acids_task(File(path=fasta/PO5F1.fasta, hash=341326f2), File(path=data/PO5F1.peptides.txt, hash=cf7b5a5e), amino_acid='C') 
  Job ad2298f6 [ DONE ] 2022-05-11 20:52:22:  bioinformatics_pipeline_tutorial.workflow.count_amino_acids_task(File(path=fasta/MYC.fasta, hash=daeb9045), File(path=data/MYC.peptides.txt, hash=06a265e1), amino_acid='C')

I now encourage you to add the two final tasks yourself. Remember from the last post, we want to create plots for the generated counts of each protein .fasta file (bin/03a_plot_count.py ) and finally, generate an output report for the results (bin/03b_get_report.py). Below you can find the final code for the main() and the archive_results_task(). Try to fill in the code for plot_count_task() and get_report_task().

"""workflow.py"""


[...]


@task()
def plot_count_task(input_count: File) -> File:
    """
    Load the calculated counts and create a plot.
    """
    # TODO
    pass


@task()
def get_report_task(input_counts: List[File]) -> File:
    """
    Get a list of input files from a given folder and create a report.
    """
    # TODO
    pass


@task()
def archive_results_task(inputs_plots: List[File], input_report: File) -> File:
    output_path = os.path.join(
        os.path.split(input_report.dirname())[0], "data", f"results.tgz"
    )
    tar_file = File(output_path)
    with tar_file.open("wb") as out:
        with tarfile.open(fileobj=out, mode="w|gz") as tar:
            for file_path in inputs_plots + [input_report]:
                if get_filesystem_class(url=file_path.path).name == "s3":
                    tmp_file = File(os.path.basename(file_path.path))
                else:
                    tmp_file = file_path
                output_file = file_path.copy_to(tmp_file, skip_if_exists=True)
                tar.add(output_file.path)
    return tar_file


@task()
def main(
    input_dir: str,
    amino_acid: str = "C",
    enzyme_regex: str = "[KR]",
    missed_cleavages: int = 0,
    min_length: int = 4,
    max_length: int = 75,
) -> List[File]:
    input_fastas = [File(f) for f in glob_file(f"{input_dir}/*.fasta")]
    peptide_files = [
        digest_protein_task(
            fasta,
            enzyme_regex=enzyme_regex,
            missed_cleavages=missed_cleavages,
            min_length=min_length,
            max_length=max_length,
        )
        for fasta in input_fastas
    ]
    aa_count_files = [
        count_amino_acids_task(
            fasta, peptides, amino_acid=amino_acid
        )
        for (fasta, peptides) in zip(input_fastas, peptide_files)
    ]
    count_plots = [
        plot_count_task(aa_count)
        for aa_count in aa_count_files
    ]
    report_file = get_report_task(aa_count_files)
    results_archive = archive_results_task(
        count_plots, report_file
    )
    return results_archive

Hint: In order to port over bin/03a_plot_count.py the plot_counts() function needs to be adjusted to use plotly instead of matplotlib because redun parallelizes tasks across multiple threads and matplotlib will throw an error when it's run outside the main thread.

💡
Update: The code using matplotlib should still work when using the process-based instead of the thread-based executor.

Hence, here is the updated function using plotly:

"""workflow.py"""
import os
import re
from typing import List, Tuple

from plotly.subplots import make_subplots
import plotly.graph_objects as go
from redun import task, File
from redun.file import glob_file, get_filesystem_class

[...]

def plot_counts(filename: str, counts: List[str]) -> File:
    """
    Plot the calculated counts.
    """
    (
        amino_acid,
        n_peptides,
        n_peptides_with_aa,
        total_aa_in_peptides,
        aa_in_peptides,
    ) = counts
    labels_n_peptides = ["No. of Peptides", "No. of Peptides w/ {}".format(amino_acid)]
    labels_n_aa = ["Total No. of Amino Acids", "No. of {}'s".format(amino_acid)]
    colors = ["#001425", "#308AAD"]
    fig = make_subplots(rows=1, cols=2)
    fig.add_trace(
        go.Bar(
            x=labels_n_peptides,
            y=[int(n_peptides_with_aa), int(n_peptides)],
            marker_color=colors[0],
        ),
        row=1,
        col=1,
    )
    fig.add_trace(
        go.Bar(
            x=labels_n_aa,
            y=[int(aa_in_peptides), int(total_aa_in_peptides)],
            marker_color=colors[1],
        ),
        row=1,
        col=2,
    )
    fig.update_layout(
        height=600,
        width=800,
        title_text="{}'s in Peptides and Amino Acids".format(amino_acid),
        showlegend=False,
    )
    if get_filesystem_class(url=filename).name == "s3":
        tmp_file = File(os.path.basename(filename))
    else:
        tmp_file = File(filename)
    fig.write_image(tmp_file.path)
    output_file = tmp_file.copy_to(File(filename), skip_if_exists=True)
    return output_file

[...]

If you've made it all there way up until here, you can check your solution against the working version on the redun branch of the Github repository.

If you run redun run workflow.py main --input-dir fasta/, your local data/ repository should be populated with these files:

$ tree data 
data
├── KLF4.count.plot.png
├── KLF4.count.tsv
├── KLF4.peptides.txt
├── MYC.count.plot.png
├── MYC.count.tsv
├── MYC.peptides.txt
├── PO5F1.count.plot.png
├── PO5F1.count.tsv
├── PO5F1.peptides.txt
├── SOX2.count.plot.png
├── SOX2.count.tsv
├── SOX2.peptides.txt
├── protein_report.tsv
└── results.tgz

0 directories, 14 files

Taking it to the cloud

To define where a @task will run, we can specify a task executor like this:

@task(executor="my_executor")
def digest_protein_task():
    # ...

The executor my_executor then has to be defined in the redun configuration .redun/redun.ini. If you go and open it up you can see the default executor defined already:

# redun configuration.

[backend]
db_uri = sqlite:///redun.db

[executors.default]
type = local
max_workers = 20

As of now, redun supports AWS Batch and Glue executors that will run tasks in the cloud. A Kubernetes executor is currently in the making. We'll walk through how to create one for AWS Batch below.

I'm not going to go through all the steps on how to set up AWS Batch as there are a lot of great tutorials online. If you want to follow along make sure you have Docker installed and an existing AWS CLI setup.

You'll need the following AWS resources:

  • S3 Bucket
  • Push access to Elastic Container Registry (ECR)
  • An AWS Batch queue that we can publish jobs to

To get started, we need to create a Dockerfile like this:

FROM ubuntu:20.04

# Install OS-level libraries.
RUN apt-get update -y && DEBIAN_FRONTEND="noninteractive" apt-get install -y \
    python3 \
    python3-pip && \
    apt-get clean

WORKDIR /code

# Install our python code dependencies.
COPY requirements.txt .
RUN pip3 install --upgrade pip
RUN pip3 install -r requirements.txt

We'll also create a Makefile to simplify the process of building and pushing our Docker image:

IMAGE=bioinformatics_pipeline_tutorial
ACCOUNT=$(shell aws ecr describe-registry --query registryId --output text)
REGION=$(shell aws configure get region)
REGISTRY=$(ACCOUNT).dkr.ecr.$(REGION).amazonaws.com

login:
	aws ecr get-login-password --region $(REGION) | docker login --username AWS --password-stdin $(REGISTRY)

build:
	docker build -t $(REGISTRY)/$(IMAGE) --build-arg REGISTRY=$(REGISTRY) .

build-local:
	docker build -t $(IMAGE) --build-arg REGISTRY=$(REGISTRY) .

create-repo:
	aws ecr create-repository --repository=$(IMAGE)

push:
	docker push $(REGISTRY)/$(IMAGE)

bash:
	docker run --rm -it $(REGISTRY)/$(IMAGE) bash

bash-local:
	docker run --rm -it $(IMAGE) bash

To build and test the Docker image locally, run:

make build-local
docker run --rm -it bioinformatics_pipeline_tutorial pip list | grep "redun"

If the output is redun                    0.8.7, the Dockerfile has built and installed the dependencies correctly.  

Now, we can use the following make command to build our Docker image:

make login
make build

After the image builds, we need to publish it to ECR so that it is accessible by AWS Batch. There are several steps for doing that, which are covered in these make commands:

# If the docker repo does not exist yet.
make create-repo

# Push the locally built image to ECR.
make push

You might be wondering: how will our Python code get into the container? We didn't add our workflow.py file to the Docker image. The answer lies in redun's code packaging feature, which essentially packages all the Python code in the current directory into a tar file and copies it to our S3 scratch directory. From here, it will be downloaded into the running AWS Batch job. This makes it a lot faster to iterate, without having to rebuild the Docker image for every code change.

Let's add our custom AWS Batch executor to our .redun/redun.ini config in the current working directory:

[...]

[executors.batch]
type = aws_batch

# Required:
image = YOUR_ACCOUNT_ID.dkr.ecr.us-west-2.amazonaws.com/bioinformatics_pipeline_tutorial
queue = YOUR_QUEUE_NAME
s3_scratch = s3://YOUR_BUCKET/redun/

# Optional:
role = arn:aws:iam::YOUR_ACCOUNT_ID:role/YOUR_ROLE
job_name_prefix = redun-example
debug = False

To get a working example, you'll need to replace all caps variables YOUR_ACCOUNT_ID, YOUR_QUEUE_NAME, and YOUR_BUCKET, with your own AWS Account ID, AWS Batch queue name, and S3 bucket, respectively.

Next up, make sure that all the tasks are equipped with our shiny new AWS Batch executor.

"""workflow.py"""

[...]

@task(executor="batch")
def main(...)

Now we can execute our pipeline as usual, with the difference that redun will now run each task as a separate AWS Batch job and use the input data stored in the given S3 bucket (you'll need to upload the fasta/ folder to the S3 bucket you want to use).

redun run workflow.py main --input-dir s3://YOUR_BUCKET/fasta/

Note: I ran into the following error and used the fix detailed in this post by AWS to set up my role correctly.

ECS was unable to assume the role 'arn:aws:iam::***:role/role-name' that was provided for this task. Please verify that the role being passed has the proper trust relationship and permissions and that your IAM user has permissions to pass this role.

Redun provides a nice debug functionality through which the tasks run locally in Docker containers and the data is still pulled from S3. To enable it change the debug field in the .redun/redun.ini config:

[...]

[executors.batch]

[...]

debug = True

Then, in order to jump into a running task, you can add the familiar import pdb; pdb.set_trace() statement to debug.

Importing submodules via pip

Ok, so we've written a workflow that consists of five tasks that are connected together through our main() task. The workflow itself might be quite specific but it's easy to imagine that many individual tasks could be reused by other workflows. Redun solves this very elegantly and it's something that's hard to get right (e.g. Nextflow only very recently added this feature with the release of their DSL2 and it's still bumpy IMO).

We're going to create a src/ folder and put all of our reusable tasks in there so that someone who wants to use them can just pip install our Github repository (or released Python package).

mkdir -p bioinformatics_pipeline_tutorial/
touch bioinformatics_pipeline_tutorial/__init__.py
touch bioinformatics_pipeline_tutorial/lib.py

Now copy everything except the main() function into bioinformatics_pipeline_tutorial/lib.py. In the workflow.py file, import the task functions.

"""workflow.py"""
[...]

from bioinformatics_pipeline_tutorial.lib import (
    digest_protein_task,
    count_amino_acids_task,
    plot_count_task,
    get_report_task,
    archive_results_task,
)

[...]

Finally, let's add a setup.py file to make the Github repository installable. Feel free to try and publish your package on pip and install it for another project.

"""setup.py"""
from setuptools import setup


setup(
    name="bioinformatics_pipeline_tutorial",
    version="0.0.1",
    packages=["bioinformatics_pipeline_tutorial"],
    install_requires=["redun", "plotly", "kaleido"],
)

You can try it out by installing the finished module via:

pip install git+https://github.com/<your_git_username>/bioinformatics-pipeline-tutorial@redun

Then open up a Python console by calling python and try to import the digest_protein_task().

$ python
Python 3.8.5 (default, Sep 27 2020, 11:35:15) 
[Clang 12.0.0 (clang-1200.0.32.2)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from bioinformatics_pipeline_tutorial.lib import digest_protein_task
>>>

Check out the final state of this part of the tutorial by pulling the redun branch.

Redun Makefiles

The bonus round section of the second tutorial in the redun Github repository shows how redun can emulate Makefile behavior. Those of you who recall the original blog post will remember that in part_02 we created a Makefile to execute our pipeline. Let's have some fun and try to rewrite the Makefile in redun. This part will also showcase redun's ability to do recursions.

First, check out part_02 and run make all to make sure it's still working:

git checkout part_02

As you will recall from the last post, a Makefile is made up of recipes that specify how to build target files. The structure of each recipe is this:

targets: prerequisites
	command

This is how we would specify a recipe for how to create the file KLF4.peptides.txt in the data/ folder (the target) using the bin/01_digest_protein.py script (the command) with fasta/KLF4.fasta being the only prerequisite.

data/KLF4.peptides.txt: fasta/KLF4.fasta
	bin/01_digest_protein.py fasta/KLF4.fasta data/KLF4.peptides.txt

We can then call make data/KLF4.peptides.txt to generate the target file.

To emulate this behavior in redun, we start by creating a file make_workflow.py.

touch make_workflow.py

We start by defining the first rule with a custom DSL:

"""make_workflow.py"""

redun_namespace = "bioinformatics_pipeline_tutorial.make_workflow"


# Custom DSL for describing targets, dependencies (deps), and commands.
rules = {
    "data/KLF4.peptides.txt": {
        "deps": ["fasta/KLF4.fasta"],
        "command": "bin/01_digest_protein.py fasta/KLF4.fasta data/KLF4.peptides.txt"
    },
}

Next, we copy the two functions run_command() and make() from the redun tutorial. The run_command() function takes as input a shell command specified as a string which it runs to generate the target file. The make() function generates the target by recursively creating all its dependencies (if needed).

"""make_workflow.py"""
import os
from typing import List, Optional

from redun import task, File
from redun.functools import const


redun_namespace = "bioinformatics_pipeline_tutorial.make_workflow"


# Custom DSL for describing targets, dependencies (deps), and commands.
rules = {
    "data/KLF4.peptides.txt": {
        "deps": ["fasta/KLF4.fasta"],
        "command": "bin/01_digest_protein.py fasta/KLF4.fasta data/KLF4.peptides.txt"
    },
}


@task()
def run_command(command: str, inputs: List[File], output_path: str) -> File:
    """
    Run a shell command to produce a target.
    """
    # Ignore inputs. We pass it as an argument to simply force a dependency.
    assert os.system(command) == 0
    return File(output_path)


@task()
def make(target: str = "all", rules: dict = rules) -> Optional[File]:
    """
    Make a target (file) using a series of rules.
    """
    rule = rules.get(target)
    if not rule:
        # No rule. See if target already exists.
        file = File(target)
        if not file.exists():
            raise ValueError(f"No rule for target: {target}")
        return file

    # Recursively make dependencies.
    inputs = [
        make(dep, rules=rules)
        for dep in rule.get("deps", [])
    ]

    # Run command, if needed.
    if "command" in rule:
        return run_command(rule["command"], inputs, target)
    else:
        return const(None, inputs)

We can generate a target by calling:

redun run make_workflow.py make --target data/KLF4.peptides.txt

Let's add some more rules:

"""make_workflow.py"""

[...]

# Custom DSL for describing targets, dependencies (deps), and commands.
rules = {
    "data/KLF4.peptides.txt": {
        "deps": ["fasta/KLF4.fasta"],
        "command": "bin/01_digest_protein.py fasta/KLF4.fasta data/KLF4.peptides.txt"
    },
    "data/KLF4.count.tsv": {
        "deps": ["fasta/KLF4.fasta", "data/KLF4.peptides.txt"],
        "command": "bin/02_count_amino_acids.py fasta/KLF4.fasta data/KLF4.peptides.txt data/KLF4.count.tsv"
    },
    "data/KLF4.plot.png": {
        "deps": ["data/KLF4.count.tsv"],
        "command": "bin/03a_plot_count.py data/KLF4.count.tsv data/KLF4.plot.png"
    },
    "data/protein_report.tsv": {
        "deps": ["data/KLF4.count.tsv"],
        "command": "bin/03b_get_report.py data/KLF4.count.tsv --output_file=data/protein_report.tsv"
    },
    "data/results.tgz": {
        "deps": ["data/KLF4.plot.png", "data/protein_report.tsv"],
        "command": """rm -rf results
                      mkdir results
                      cp data/KLF4.plot.png data/protein_report.tsv results/
                      tar -czf data/results.tgz results
                      rm -r results"""
    },
}

[...]

Now we can run the command below and it will generate the files data/KLF4.plot.png, data/KLF4.count.tsv, and data/KLF4.peptides.txt.

redun run make_workflow.py make --target data/KLF4.plot.png

But what if we wanted to add rules for new proteins? If you read the last post, you already know the answer and it's not: add separate rules for each file. We'll use pattern matching. Now, the initial implementation from the redun examples doesn't support pattern matching. Therefore, let's add a more advanced match_target() function and use it within the make() function. We'll also need to adjust our rules accordingly and use % as a wild card.

"""make_workflow.py"""
import os
from typing import Dict, List, Optional

[...]

rules = {
    "data/%.peptides.txt": {
        "deps": ["fasta/%.fasta"],
        "command": "bin/01_digest_protein.py fasta/%.fasta data/%.peptides.txt"
    },
    "data/%.count.tsv": {
        "deps": ["fasta/%.fasta", "data/%.peptides.txt"],
        "command": "bin/02_count_amino_acids.py fasta/%.fasta data/%.peptides.txt data/%.count.tsv"
    },
    "data/%.plot.png": {
        "deps": ["data/%.count.tsv"],
        "command": "bin/03a_plot_count.py data/%.count.tsv data/%.plot.png"
    },
    "data/protein_report.tsv": {
        "deps": ["data/%.count.tsv"],
        "command": "bin/03b_get_report.py data/%.count.tsv --output_file=data/protein_report.tsv"
    },
    "data/results.tgz": {
        "deps": ["data/%.plot.png", "data/protein_report.tsv"],
        "command": """rm -rf results
                      mkdir results
                      cp data/%.plot.png data/protein_report.tsv results/
                      tar -czf data/results.tgz results
                      rm -r results"""
    },
}

def match_target(target: str = "all", rules: dict = rules) -> Optional[Dict[str, Dict]]:
    """
    Emulate GNU make pattern matching described here: 
    https://www.gnu.org/software/make/manual/html_node/Pattern-Match.html#Pattern-Match
    """
    rule = rules.get(target)
    if not rule:
        _, tbase = os.path.split(target)
        for rkey, rval in rules.items():
            _, rbase = os.path.split(rkey)
            if not "%" in rbase: continue
            pre, post = rbase.split("%")
            if tbase.startswith(pre) and tbase.endswith(post):
                stem = tbase[len(pre):-len(post)]
                rule = {
                    "deps": [dep.replace("%", stem) for dep in rval.get("deps", [])],
                    "command": rval.get("command", "").replace("%", stem),
                }
                break
    return rule
    
[...]

@task()
def make(target: str = "all", rules: dict = rules) -> Optional[File]:
    """
    Make a target (file) using a series of rules.
    """
    rule = match_target(target, rules) if not "%" in target else None
    [...]

We can now generate the target for any protein and our workflow will substitute the % with a matched stem if it finds one. Try running:

redun run make_workflow.py make --target data/MYC.plot.png

However, if you try to generate either one of the last two targets data/protein_report.tsv and data/results.tgz, you'll run into the following issue:

$ redun run make_workflow.py make --target data/protein_report.tsv
[...]
ValueError: No rule for target: data/%.count.tsv

This is the same behavior we'd get if we were to run the same command with make as seen in the last post:

make: *** No rule to make target `data/%.count.tsv', needed by `data/%.plot.png'. Stop.

This occurs because when trying to generate the target data/protein_report.tsv, one of its dependencies is data/%.count.tsv and there is no way for redun (or make) to know which stem to replace the wildcard with. Hence, at some point in our program, we need to define a list of target files that we want to generate. We insert two variables at the top and use them for the last two rules. We also add recipes for all and clean.

"""make_workflow.py"""
[...]

COUNT = ["data/KLF4.count.tsv", "data/MYC.count.tsv", "data/PO5F1.count.tsv", "data/SOX2.count.tsv"]
PLOT = ["data/KLF4.plot.png", "data/MYC.plot.png", "data/PO5F1.plot.png", "data/SOX2.plot.png"]


# Custom DSL for describing targets, dependencies (deps), and commands.
rules = {
    "all": {
        "deps": ["data/results.tgz"],
    },
    "clean": {
        "command": "rm -rf data/*",
    },
    "data/%.peptides.txt": {
        "deps": ["fasta/%.fasta"],
        "command": "bin/01_digest_protein.py fasta/%.fasta data/%.peptides.txt"
    },
    "data/%.count.tsv": {
        "deps": ["fasta/%.fasta", "data/%.peptides.txt"],
        "command": "bin/02_count_amino_acids.py fasta/%.fasta data/%.peptides.txt data/%.count.tsv"
    },
    "data/%.plot.png": {
        "deps": ["data/%.count.tsv"],
        "command": "bin/03a_plot_count.py data/%.count.tsv data/%.plot.png"
    },
    "data/protein_report.tsv": {
        "deps": COUNT,
        "command": "bin/03b_get_report.py {COUNT} --output_file=data/protein_report.tsv".format(COUNT=" ".join(COUNT))
    },
    "data/results.tgz": {
        "deps": PLOT + ["data/protein_report.tsv"],
        "command": """rm -rf results
                      mkdir results
                      cp {PLOT} data/protein_report.tsv results/
                      tar -czf data/results.tgz results
                      rm -r results""".format(PLOT=" ".join(PLOT))
    },
}

[...]

You should now be able to clean up all past files that were generated with:

redun run make_workflow.py make --target clean

Finally, run the following and check whether it actually generates all of our target files in the data/ folder:

redun run make_workflow.py make --target all
# Or:
redun run make_workflow.py make

You can browse the final state of this part of the tutorial in the redun branch.

Conclusion

That's it! Thanks for sticking with me all the way until the end, I hope it was fun and you got to explore some of redun's functionality. I linked some further resources below. Just to recap, here's what we covered:

  • Core features of redun
  • Run redun workflows on AWS Batch
  • Import submodules via pip
  • Emulate Makefile behavior in redun

For follow-up questions or feedback on this article, you can submit an issue through the accompanying GitHub repository or reach me on Twitter.

Huge thanks to Alex Trapp and Matt Rasmussen for their thoughts and feedback on the draft.

Resources