DEV Community

W. Lee Pang, PhD
W. Lee Pang, PhD

Posted on

Profiling workflows with the Amazon Genomics CLI

Profiling is an essential part of developing genomics workflows. By identifying and eliminating expensive bottlenecks, users can make workflows performant and cost efficient. A common way to profile a workflow is to generate a timing chart of all tasks executed. The Amazon Genomics CLI (AGC) provides a unified experience for running workflows across multiple workflow engines. In doing so, it also enables a common way to generate timing charts and profile workflows. In this blog I’ll show you how this can be done with AGC and a little Python coding.

As a prerequisite, you’ll need to install AGC and activate your AWS account region for running workflows. This is covered in AGC’s documentation.

To run a workflow with AGC you use commands like the following - in this case I’m running the GATK data processing workflow that is included as an example when you install AGC:

cd ~/amazon-genomics-cli/examples/gatk-best-practices-project

agc context deploy onDemandCtx
agc workflow run -c onDemandCtx gatk4-data-processing

# returns a workflow run-id like:
# 20ceffe1-9664-4f41-a5e9-66ab5eaf57ac
Enter fullscreen mode Exit fullscreen mode

Once the workflow completes, you can get task execution information from the workflow logs using:

agc logs workflow gatk4-data-processing -r 20ceffe1-9664-4f41-a5e9-66ab5eaf57ac
Enter fullscreen mode Exit fullscreen mode

which looks like:

RunId: 20ceffe1-9664-4f41-a5e9-66ab5eaf57ac
State: COMPLETE
Tasks: 
    Name                        JobId                   StartTime               StopTime                ExitCode
    HaplotypeCallerGvcf_GATK4.MergeGVCFs        12d27ad2-df4e-44f7-90ba-6140bde14cf0    2022-03-24 17:50:06.575 +0000 UTC   2022-03-24 18:00:34.439 +0000 UTC   0
    HaplotypeCallerGvcf_GATK4.HaplotypeCaller   39e61999-079a-4671-9625-9a1fada62c6b    2022-03-24 17:29:12.955 +0000 UTC   2022-03-24 17:48:22.475 +0000 UTC   0
    HaplotypeCallerGvcf_GATK4.HaplotypeCaller   ce9a6f75-9c9d-4e55-b4f9-1f46c516a3b5    2022-03-24 17:29:12.961 +0000 UTC   2022-03-24 17:47:25.444 +0000 UTC   0
...
Enter fullscreen mode Exit fullscreen mode

The output from agc logs workflow provides start and stop times for each task run by the workflow. We can use this to generate a simple Gantt chart of the workflow tasks. After dropping the above output into a file, you can parse out the tasks with the following Python code:

#!/bin/env python3

from datetime import datetime
import re

DT_FMTS = [
    '%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%dT%H:%M:%S.%fZ', 
    '%Y-%m-%dT%H:%M:%S+00:00', '%Y-%m-%dT%H:%M:%S.%f+00:00',
    '%Y-%m-%d %H:%M:%S.%f %z %Z', '%Y-%m-%d %H:%M:%S %z %Z',
]

def parse_datetime(s, fmts = DT_FMTS):
    for fmt in fmts:
        try:
            dt = datetime.strptime(s, fmt)
            return dt
        except ValueError as e:
            pass
    else:
        raise ValueError(f"unable to parse datetime from '{s}'")


def parse_agc_log(log_file):
    with open(log_file, 'r') as f:
        lines = f.readlines()
        lines = list(filter(lambda x: x.strip(), lines))

    data = {
        k: lines[i].split(":")[1].strip()
        for i, k in enumerate(['runid', 'state'])
    }

    data['tasks'] = [
        dict(zip(
            ['name', 'job_id', 'start_time', 'end_time', 'exit_code'], 
            re.split('\t+', line.strip())
        )) 
        for line in lines[4:]
    ]

    for i, task in enumerate(data['tasks']):
        for k in ('start_time', 'end_time'):
            task[k] = parse_datetime(task[k])
        data['tasks'][i] = task

    data['tasks'] = sorted(data['tasks'], key=lambda x: (x['start_time'], x['name']))

    return data
Enter fullscreen mode Exit fullscreen mode

Using the parse_agc_log() function above looks like:

parse_agc_log('logs/gatk4-data-processing__20ceffe1-9664-4f41-a5e9-66ab5eaf57ac.agc.log')
Enter fullscreen mode Exit fullscreen mode

which will return something like:

{'runid': '20ceffe1-9664-4f41-a5e9-66ab5eaf57ac',
 'state': 'COMPLETE',
 'tasks': [{'name': 'HaplotypeCallerGvcf_GATK4.MergeGVCFs',
   'job_id': '26fcfea4-cfbc-4f04-95ef-115eb309f3df',
   'start_time': datetime.datetime(2022, 4, 11, 1, 55, 53, 734000, tzinfo=datetime.timezone(datetime.timedelta(0), 'UTC')),
   'end_time': datetime.datetime(2022, 4, 11, 2, 6, 40, 418000, tzinfo=datetime.timezone(datetime.timedelta(0), 'UTC')),
   'exit_code': '0'},
  {'name': 'HaplotypeCallerGvcf_GATK4.HaplotypeCaller',
   'job_id': 'a9b3e5f1-cbd8-482b-87ba-df34e0252571',
   'start_time': datetime.datetime(2022, 4, 11, 1, 36, 5, 426000, tzinfo=datetime.timezone(datetime.timedelta(0), 'UTC')),
   'end_time': datetime.datetime(2022, 4, 11, 1, 52, 42, 457000, tzinfo=datetime.timezone(datetime.timedelta(0), 'UTC')),
   'exit_code': '0'},
   ...
Enter fullscreen mode Exit fullscreen mode

This works to parse workflow tasks for most cases. However, when the task name is long it can chomp the separator between the task name and the task job-id. For example:

# ok line
    PreProcessingForVariantDiscovery_GATK4.MergeBamAlignment    7e244b7b-e02d-4bb9-bee7-9005204382f3    2022-03-24 10:17:20.673 +0000 UTC   2022-03-24 10:23:00.67 +0000 UTC    0

# bad line
    PreProcessingForVariantDiscovery_GATK4.CreateSequenceGroupingTSV85c5a61b-867e-49ed-a11d-951910444c03    2022-03-24 10:05:53.583 +0000 UTC   2022-03-24 10:11:55.023 +0000 UTC   0
Enter fullscreen mode Exit fullscreen mode

This makes it difficult for steps we want to do later that involve the job-id. The JSON formatter for agc is not currently used by the agc logs workflow command. I created an issue to address this to help the team prioritize it. I could probably solve this in the short term with some regular expressions, but I’d rather not. There is another way to consistently get machine parseable output.

Under the hood, agc uses the GA4GH Workflow Execution Schema (WES) to communicate with AWS resources to run workflows. Each AGC context deploys a GA4GH WES endpoint which you can send requests to directly. The endpoint requires AWS SigV4 signed requests, so the easiest way to talk to it is with a tool like awscurl, which does the request signing for you.

To get the WES endpoint for a specific context you can use the agc context describe command:

agc context describe onDemandCtx --format json
Enter fullscreen mode Exit fullscreen mode

which returns something like:

{
    "Name": "onDemandCtx",
    "Status": "STARTED",
    "StatusReason": "",
    "MaxVCpus": 256,
    "RequestSpotInstances": false,
    "InstanceTypes": null,
    "Output": {
        "Url": "s3://agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtx"
    },
    "WesEndpoint": {
        "Url": "https://yrtxeqnhkb.execute-api.us-east-1.amazonaws.com/prod/"
    }
}
Enter fullscreen mode Exit fullscreen mode

Note the WES endpoint Url needs to have “ga4gh/wes/v1” appended to it. We can extract this value using jq.

wes=$(agc context describe onDemandCtx --format json | jq -r '.WesEndpoint.Url + "ga4gh/wes/v1"')
Enter fullscreen mode Exit fullscreen mode

We can now use the WesEndpoint to send a GA4GH WES GetRunLog request for the workflow:

awscurl ${wes}/runs/20ceffe1-9664-4f41-a5e9-66ab5eaf57ac
Enter fullscreen mode Exit fullscreen mode

which will return something like the following:

{
  "outputs": {
    "id": "20ceffe1-9664-4f41-a5e9-66ab5eaf57ac",
    "outputs": {
      "HaplotypeCallerGvcf_GATK4.output_vcf": "s3://agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-MergeGVCFs/NA12878.g.vcf.gz",
      "HaplotypeCallerGvcf_GATK4.output_vcf_index": "s3://agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-MergeGVCFs/NA12878.g.vcf.gz.tbi"
    }
  },
  "request": {
    "workflow_params": {},
    "workflow_type": "WDL",
    "workflow_type_version": "1.0"
  },
  "run_id": "0612738f-06cd-4d96-9438-a69c432dc59e",
  "state": "COMPLETE",
  "task_logs": [
    {
      "cmd": [
        "set -e\n\n/gatk/gatk --java-options \"-Xmx2G\"  \\\nMergeVcfs \\\n--INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-0/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-1/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-2/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-3/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-4/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-5/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-6/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-7/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-733263974272-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-8/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-9/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-10/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-11/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-12/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-13/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-14/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-15/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-16/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-17/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-18/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-19/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-20/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-21/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-22/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-23/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-733263974272-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-24/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-25/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-26/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-27/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-28/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-29/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-30/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-31/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-32/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-33/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-34/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-35/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-36/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-37/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-38/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-39/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-40/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-41/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-42/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-43/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-44/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-45/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-46/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-47/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-48/NA12878.g.vcf.gz --INPUT /cromwell_root/agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-49/NA12878.g.vcf.gz \\\n--OUTPUT NA12878.g.vcf.gz"
      ],
      "end_time": "2022-04-11T02:06:40.418Z",
      "exit_code": "0",
      "name": "HaplotypeCallerGvcf_GATK4.MergeGVCFs|26fcfea4-cfbc-4f04-95ef-115eb309f3df",
      "start_time": "2022-04-11T01:55:53.734Z",
      "stderr": "s3://agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-MergeGVCFs/MergeGVCFs-stderr.log",
      "stdout": "s3://agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-MergeGVCFs/MergeGVCFs-stdout.log"
    },
    {
      "cmd": [
        "set -e\n\n/gatk/gatk --java-options \"-Xmx6G -XX:GCTimeLimit=50 -XX:GCHeapFreeLimit=10\" \\\nHaplotypeCaller \\\n-R /cromwell_root/broad-references/hg38/v0/Homo_sapiens_assembly38.fasta \\\n-I s3://agc-733263974272-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-CramToBamTask/NA12878.bam \\\n-L /cromwell_root/gatk-test-data/intervals/hg38_wgs_scattered_calling_intervals/temp_0001_of_50/scattered.interval_list \\\n-O NA12878.g.vcf.gz \\\n-contamination 0 \\\n-G StandardAnnotation -G StandardHCAnnotation -G AS_StandardAnnotation \\\n-GQB 10 -GQB 20 -GQB 30 -GQB 40 -GQB 50 -GQB 60 -GQB 70 -GQB 80 -GQB 90 \\\n-ERC GVCF \\\n \\\n\n\n# Cromwell doesn't like optional task outputs, so we have to touch this file.\ntouch NA12878.g.vcf.gz.bamout.bam"
      ],
      "end_time": "2022-04-11T01:52:42.457Z",
      "exit_code": "0",
      "name": "HaplotypeCallerGvcf_GATK4.HaplotypeCaller|a9b3e5f1-cbd8-482b-87ba-df34e0252571",
      "start_time": "2022-04-11T01:36:05.426Z",
      "stderr": "s3://agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-0/HaplotypeCaller-0-stderr.log",
      "stdout": "s3://agc-111122223333-us-east-1/project/GATKwdl/userid/user33IfmN/context/onDemandCtxCromwell1/cromwell-execution/HaplotypeCallerGvcf_GATK4/0612738f-06cd-4d96-9438-a69c432dc59e/call-HaplotypeCaller/shard-0/HaplotypeCaller-0-stdout.log"
    },
    ...
  ],
  ...
}
Enter fullscreen mode Exit fullscreen mode

If we capture this output to a file, we can process it with the following Python code:

#!/bin/env python3

import json

def parse_wes_runlog(log_file):
    with open(log_file, 'r') as f:
        runlog = json.load(f)
        tasks = runlog['task_logs']

    if runlog.get('run_log'):
        for prop in ('start_time', 'end_time'):
            runlog["run_log"][prop] = parse_datetime(runlog["run_log"][prop])

    for i, task in enumerate(tasks):
        for k in ('start_time', 'end_time'):
            tasks[i][k] = parse_datetime(task[k])

        name, job_id = task['name'].split("|")
        tasks[i]['job_name'] = name
        tasks[i]['job_id'] = job_id

    runlog['task_logs'] = sorted(tasks, key=lambda x: (x['start_time'], x['name']))

    return runlog
Enter fullscreen mode Exit fullscreen mode

Now that we can get task timing information in a consistent manner, let’s do some plotting. For this, I’m going to use Bokeh which generates nice interactive plots.

#!/bin/env python3

from bokeh.models import ColumnDataSource
from bokeh.plotting import figure, output_notebook, show

TIME_SCALE_FACTORS = {
    "sec": 1, "min": 1/60, "hr": 1/3600
}

def plot_tasks_bokeh(tasks, time_units='min'):
    """plots task timing using engine reported timing (wes runlog)"""
    time_scale_factor = TIME_SCALE_FACTORS[time_units]

    tare = min([task['start_time'] for task in tasks])
    wall_time = (max([task['end_time'] for task in tasks]) - tare).total_seconds() * time_scale_factor

    stats = {
        'num_tasks': len(tasks),
        'timing': {
            'wallclock': wall_time,
            'units': time_units
        }
    }

    stats_text = (
        f"tasks: {stats['num_tasks']}, "
        f"wall_time: {stats['timing']['wallclock']:.2f} {stats['timing']['units']}"
    )

    source = ColumnDataSource(data=dict(
        y=range(len(tasks)),
        names=[task['job_name'] for task in tasks],
        running_left=[(task['start_time'] - tare).total_seconds() * time_scale_factor for task in tasks],
        running_right=[(task['end_time'] - tare).total_seconds() * time_scale_factor for task in tasks],
        text_x=[((task['end_time'] - tare).total_seconds() + 30) * time_scale_factor for task in tasks],
    ))

    output_notebook()

    p = figure(width=1280, height=800)
    p.hbar(y='y', left='running_left', right='running_right', height=0.8, source=source)
    p.text(x='text_x', y='y', text='names', alpha=0.4, text_baseline='middle', text_font_size='1.5ex', source=source)

    p.y_range.flipped = True
    p.xaxis.axis_label = f"task execution time ({time_units})"
    p.title.text = stats_text

    show(p)

    return p
Enter fullscreen mode Exit fullscreen mode

Using this plotting function:

#!/bin/env python3
runlog = parse_wes_runlog()
p = plot_tasks_bokeh(runlog['task_logs'])
Enter fullscreen mode Exit fullscreen mode

we get a plot that looks like this:

Simple task timing plot

This gives us a great view of what order tasks are run in, how long each task takes, how many shards a parallel task creates, and how each task contributes to the overall workflow run time. This is a good start, but we can go deeper.

AGC schedules workflow tasks to AWS Batch. Looking at each task name, we see that AWS Batch job-ids are encoded, which we’ve already split out in parse_wes_runlog() as job_name and job_id. Using these job-ids, we can query AWS Batch for additional job details. To do that we’ll add the following Python functions to our script:

#!/bin/env python3

import boto3
from concurrent.futures import ThreadPoolExecutor, as_completed

def get_job_descriptions(batch, job_ids):
    jobs = []
    job_ids_sets = chunks(job_ids, 100)

    with ThreadPoolExecutor(max_workers=10) as executor:
        future_jobs = {
            executor.submit(batch.describe_jobs, jobs=job_ids_set): job_ids_set
            for job_ids_set in job_ids_sets
        }

        for future in as_completed(future_jobs):
            try:
                response = future.result()
                jobs += response["jobs"]
            except Exception as e:
                print(f"error retrieving job descriptions: {e}")

    descriptions = {job['jobId']:job for job in jobs}
    return descriptions


def chunks(l, n):
    """split list l into chunks of size n"""
    for i in range(0, len(l), n):
        yield l[i : i + n]
Enter fullscreen mode Exit fullscreen mode

These functions help to gather AWS Batch job descriptions concurrently in groups of 100 to accommodate the limits of the AWS Batch GetJobDescriptions API call.

We can now update our parse_wes_runlog() function to include retrieving the corresponding AWS Batch job description for each task by adding:

def parse_wes_runlog(log_file, batch=boto3.client('batch')):
    # ...
    descriptions = get_job_descriptions(batch, [task['job_id'] for task in tasks])
    for i, task in enumerate(tasks):
        tasks[i]['job_description'] = descriptions.get(task['job_id'])
    # ...
Enter fullscreen mode Exit fullscreen mode

This gives us the following additional metrics for each task:

  • The time interval between when the job was created in AWS Batch to when the job actually started running. This is the amount of time the job spent waiting in a job queue.
  • The completion status for each task - i.e. “SUCCEEDED” or “FAILED”.
  • The type and amount of compute resources the job requested, specifically number of vCPUs and GBs of memory

Note: AWS Batch only stores job information for 48hrs after it has reached a “completed” state (e.g. SUCCEEDED or FAILED). It is a good idea to get this information as soon as you can and cache them somewhere.

Combining task start and stop times with vCPU and memory details, you can get a rough estimate of how much each task costs. If you go to the AWS EC2 console, you can export a table that contains instance types and their Linux On-demand per hour cost. Doing this for non-(a, d, n) C, M, and R instances types with x86_64 architecture, and fitting a multilinear model to the data, I get the following formula which I’ve encapsulated in a Python function:

#!/bin/env python3

def approx_instance_cost_per_hr(vcpu, mem_gib):
    # rough OLS fit model based on c, m, r instance pricing filtered for x86_64 and non-(a,d,n)
    # returns linux on-demand per hour cost in USD
    usd = 0.03550716*vcpu + 0.003633091*mem_gib + 0.00718333

    return usd
Enter fullscreen mode Exit fullscreen mode

Note: AWS prices vary by region and are updated regularly, so your model constants may differ.

We can now update our plotting function in the following ways. First we’ll add more data to our ColumnDataSource:

    millis_to_sec = 1/1000
    mebi_to_gebi = 1/1024

    jobs = [task['job_description'] for task in tasks]
    tare = min([job['createdAt'] for job in jobs])

    colors={'SUCCEEDED': 'cornflowerblue', 'FAILED': 'orangered'}

    source = ColumnDataSource(data=dict(
        y = range(len(jobs)),
        names=[job['jobName'] for job in jobs],
        color=[colors[job['status']] for job in jobs],
        cost=[
            approx_instance_cost_per_hr(
                get_job_resources(job)['vcpus'], 
                get_job_resources(job)['memory'] * mebi_to_gebi
            ) * ((job['stoppedAt'] - job['startedAt']) * millis_to_sec / 3600)
            for job in jobs],
        cpus=[get_job_resources(job)['vcpus'] for job in jobs],
        memory=[get_job_resources(job)['memory'] * mebi_to_gebi for job in jobs],
        queued_left=[(job['createdAt'] - tare) * millis_to_sec * time_scale_factor for job in jobs],
        queued_right=[(job['startedAt'] - tare) * millis_to_sec * time_scale_factor for job in jobs],
        running_left=[(job['startedAt'] - tare) * millis_to_sec * time_scale_factor for job in jobs],
        running_right=[(job['stoppedAt'] - tare) * millis_to_sec * time_scale_factor for job in jobs],
        text_x=[((job['stoppedAt'] - tare) * millis_to_sec + 30) * time_scale_factor for job in jobs],
    ))
Enter fullscreen mode Exit fullscreen mode

A few things to note. AWS Batch reports times in millisecond timestamps and memory in Mebibytes, hence the need for the millis_to_sec and mebi_to_gebi conversion factors. I’ve added data columns to associate with task completion state (color) as well as data to show task time spent in queue (queued_left and queued_right).

Our main task timing plot is the same as before with the addition of an extra set of bars to show how long tasks were in queue:

p.hbar(y='y', left='queued_left', right='queued_right', height=0.8, color='lightgrey', source=source, legend_label="queued")
Enter fullscreen mode Exit fullscreen mode

We can also add linked plots for per task cpu requested:

p_cpu = figure(width=160, y_range=p.y_range, sizing_mode="stretch_height")
p_cpu.hbar(y='y', right='cpus', height=0.8, color="darkgrey", source=source)
p_cpu.x_range = Range1d(-1, 32)
p_cpu.xaxis.axis_label = "vcpus"
p_cpu.title.text = f"max cpus: {max(source.data['cpus'])}"
p_cpu.yaxis.visible = False
Enter fullscreen mode Exit fullscreen mode

per task memory requested:

p_mem = figure(width=160, y_range=p.y_range, sizing_mode="stretch_height")
p_mem.hbar(y='y', right='memory', height=0.8, color="darkgrey", source=source)
p_mem.x_range = Range1d(-1, 32)
p_mem.xaxis.axis_label = "memory (GiB)"
p_mem.title.text = f"max mem: {max(source.data['memory']):.2f} GiB"
p_mem.yaxis.visible = False
Enter fullscreen mode Exit fullscreen mode

and per task cost estimates:

p_usd = figure(width=160, y_range=p.y_range, sizing_mode="stretch_height")
p_usd.hbar(y='y', right='cost', height=0.8, color='limegreen', source=source)
x_max = 0.1
x_min = -(x_max * 0.05)
p_usd.x_range = Range1d(x_min, x_max*1.05)
p_usd.xaxis.axis_label = "task cost (USD)"
p_usd.title.text = f"total: {total_cost:.2f} USD"
Enter fullscreen mode Exit fullscreen mode

We can also compute some overall workflow summary statistics like total_cpu_time and importantly total_cost:

cpus = [get_job_resources(job)['vcpus'] for job in jobs]
total_cpu_time = sum([(job['stoppedAt'] - job['startedAt']) * cpu for job, cpu in zip(jobs, cpus)]) * millis_to_sec * time_scale_factor
total_cost = sum(source.data['cost'])
Enter fullscreen mode Exit fullscreen mode

Pulling the above together using a Bokeh gridplot layout, we get a plot that looks like the following:

Detailed workflow profile plot with per task timing, compute resource utilization, and cost

Note: The input for the above workflow is a small dataset used for testing and demo purposes. The overall cost shown is not representative of a workflow processing a full 30x human whole genome sample.

With the plot above, you can see at a glance which task(s) impact the workflow the most in terms of execution time and cost. You can also see where tasks might be spending too much time in queue waiting for available resources. In both cases, you now have some insight into how the workflow can be optimized - e.g by tuning individual task resource requirements or adjusting overall compute availability via the Max vCPUs allowed in AWS Batch Compute Environments via AGC’s context configuration.

Importantly, the method above can be used for any workflow that AGC can run so you don’t have to remember engine specific details of how to get such profiling information. This opens up the possibility of comparing the performance a workflow across workflow engines, which is something I’ll explore further in a future blog post. In the meantime, I recommend generating some plots of your own! All the Python code I used above is available in Github.

Top comments (0)