"""
Helper functions for running tests
"""
import sys
import os
import subprocess as sp
import csv
import json
try:
from .settings import CWL_DIR, CWL_ARGS, TOIL_ARGS, DATA_SETS, KNOWN_FUSIONS_FILE, IMPACT_FILE
except ImportError:
from settings import CWL_DIR, CWL_ARGS, TOIL_ARGS, DATA_SETS, KNOWN_FUSIONS_FILE, IMPACT_FILE
from collections import OrderedDict
import unittest
from tempfile import mkdtemp
import shutil
from pathlib import Path
import getpass
import hashlib
username = getpass.getuser()
[docs]class CWLRunner(object):
"""
class for running a CWL File
"""
[docs] def __init__(self,
cwl_file, # str or CWLFile
input, # pipeline input dict to be converted to JSON
CWL_ARGS = CWL_ARGS,
print_stdout = False,
dir = None, # directory to run the CWL in and write output to
output_dir = None, # directory to save output files to
input_json_file = None, # path to write input JSON to if you already have one chosen
input_is_file = False, # if the `input` arg should be treated as a pre-made JSON file and not a Python dict
verbose = True,
testcase = None,
engine = "cwltool", # default engine is cwl-runner cwltool
print_command = False,
restart = False,
jobStore = None,
debug = False,
leave_tmpdir = False,
leave_outputs = False,
parallel = False
):
"""
Parameters
----------
cwl_file: str | CWLFile
CWL file path or CWLFile object to run
input: dict
data dict to be used as input to the CWL
CWL_ARGS: list
list of extra command line arguments to include when running the CWL workflow
print_stdout: bool
whether stdout from the CWL workflow should be printed to console
dir: str
directory to run the CWL workflow in; if `None`, a temp dir will be created automatically
output_dir: str
output directory for the CWL workflow; if `None`, defaults to 'output' inside the `dir` directory
input_json_file: str
path to the input.json file to write CWL input data to
input_is_file: bool
if the `input` arg should be treated as a pre-made JSON file path and not a Python dict object
verbose: bool
include descriptive messages in console stdout when running CWL
testcase: unittest.TestCase
`unittest.TestCase` object to use when making assertions when running as part of a unit test
engine: str
run the CWL with cwl-runner (`"cwltool"`) or Toil (`"toil"`)
print_command: bool
print the fully evaluated command to console before running
restart: bool
enable "restart" or "resume" functionality when running the CWL workflow
jobStore: str
Toil job store to use when restarting a pipeline
debug: bool
whether to include the `--debug` arg with the CWL runner command for extra console output in case of error and debugging
leave_tmpdir: bool
do not delete the tmp dir after the CWL workflow finishes
leave_outputs: bool
do not delete the output dir after completion
parallel: bool
run workflow jobs in parallel; NOTE: make sure all Singularity containers are cached first or it will break
Examples
--------
Example usage::
runner = CWLRunner(
cwl_file = "/path/to/tool.cwl",
input = cwl_input,
dir = "/run/workflow/here",
verbose = True)
output_json, output_dir, output_json_file = runner.run()
"""
self.cwl_file = cwl_file
self.input = input
self.CWL_ARGS = CWL_ARGS
self.print_stdout = print_stdout
self.verbose = verbose
self.input_json_file = input_json_file
self.testcase = testcase
self.engine = engine
self.print_command = print_command
self.restart = restart
self.jobStore = jobStore
self.debug = debug
self.leave_tmpdir = leave_tmpdir
self.leave_outputs = leave_outputs
self.parallel = parallel
self.output_dir = output_dir
self.input_is_file = input_is_file
if dir is None:
if engine == 'cwltool':
dir = "cwltool_output"
elif engine == 'toil':
dir = "toil_output"
else:
dir = "pipeline_output"
Path(os.path.abspath(dir)).mkdir(parents=True, exist_ok=True)
self.dir = os.path.abspath(dir)
def run(self):
"""
Run the CWL workflow object
"""
if self.verbose:
message = ">>> Running {cwl_file} in {dir}".format(cwl_file = self.cwl_file, dir = self.dir)
print(message)
if self.engine == 'cwltool':
output_json, output_dir = run_cwl(
testcase = self.testcase,
tmpdir = self.dir,
input_json = self.input,
cwl_file = self.cwl_file,
CWL_ARGS = self.CWL_ARGS,
print_stdout = self.print_stdout,
print_command = self.print_command,
check_returncode = False,
input_json_file = self.input_json_file,
debug = self.debug,
leave_tmpdir = self.leave_tmpdir,
leave_outputs = self.leave_outputs,
parallel = self.parallel,
output_dir = self.output_dir,
input_is_file = self.input_is_file
)
elif self.engine == 'toil':
output_json, output_dir = run_cwl_toil(
input_data = self.input,
cwl_file = self.cwl_file,
run_dir = self.dir,
print_command = self.print_command,
input_json_file = self.input_json_file,
restart = self.restart,
jobStore = self.jobStore,
input_is_file = self.input_is_file
)
else:
return()
output_json_file = os.path.join(self.dir, "output.json")
with open(output_json_file, "w") as fout:
json.dump(output_json, fout, indent = 4)
return(output_json, output_dir, output_json_file)
[docs]class CWLFile(os.PathLike):
"""
Wrapper class to locate the full path to a cwl file more conveniently
"""
[docs] def __init__(self, path, CWL_DIR = CWL_DIR):
"""
Parameters
----------
path: str
name of a CWL file relative to `CWL_DIR`
CWL_DIR: str
full path to the directory containing CWL files
Examples
--------
Example usage::
cwl_file = CWLFile("foo.cwl")
"""
self.path = os.path.join(CWL_DIR, path)
def __str__(self):
return(self.path)
def __repr__(self):
return(self.path)
def __fspath__(self):
return(self.path)
[docs]def run_command(args, testcase = None, validate = False, print_stdout = False):
"""
Helper function to run a shell command easier
Parameters
----------
args: list
a list of shell args to execute
validate: bool
whether to check that the exit code was 0; requires `testcase`
testcase: unittest.TestCase
a test case instance for making assertions
Returns
-------
int
the command return code
str
the command stdout
str
the command stderr
Examples
-------
Example usage::
command = [ "foo.py", "arg1", "arg2" ]
returncode, proc_stdout, proc_stderr = run_command(command, testcase = self, validate = True)
"""
process = sp.Popen(args, stdout = sp.PIPE, stderr = sp.PIPE, universal_newlines = True)
proc_stdout, proc_stderr = process.communicate()
returncode = process.returncode
proc_stdout = proc_stdout.strip()
proc_stderr = proc_stderr.strip()
if print_stdout:
print(proc_stdout)
# check that it ran successfully; requires testcase to be passed !
if validate:
if returncode != 0:
print(proc_stderr)
testcase.assertEqual(returncode, 0)
return(returncode, proc_stdout, proc_stderr)
[docs]def run_cwl(
testcase, # 'self' in the unittest.TestCase instance
tmpdir, # dir where execution is taking place and files are staged & written
input_json, # CWL input data
cwl_file, # CWL file to run
CWL_ARGS = CWL_ARGS, # default cwltool args to use
print_stdout = False,
print_command = False,
check_returncode = True,
input_json_file = None,
debug = False,
leave_tmpdir = False,
leave_outputs = False,
parallel = False,
output_dir = None,
input_is_file = False # if the `input_json` is actually a path to a pre-existing JSON file
):
"""
Run the CWL with cwltool / cwl-runner
Returns
-------
dict:
output data dictionary loaded from CWL stdout JSON
str:
path to the `output` directory from the CWL workflow
"""
if not input_is_file:
# the input_json is a Python dict that needs to be dumped to file
if not input_json_file:
input_json_file = os.path.join(tmpdir, "input.json")
with open(input_json_file, "w") as json_out:
json.dump(input_json, json_out)
else:
# input_json is a pre-existing JSON file
input_json_file = input_json
if output_dir is None:
output_dir = os.path.join(tmpdir, "output")
cache_dir = os.path.join(tmpdir, 'tmp', "cache")
tmp_dir = os.path.join(tmpdir, 'tmp', "tmp")
if leave_outputs:
CWL_ARGS = [ *CWL_ARGS, '--leave-outputs' ]
if leave_tmpdir:
CWL_ARGS = [ *CWL_ARGS, '--leave-tmpdir' ]
if debug:
CWL_ARGS = [ *CWL_ARGS, '--debug' ]
if parallel:
print(">>> Running cwl-runner with 'parallel'; make sure all Singularity containers are pre-cached!")
# if the containers are not already all pre-pulled then it can cause issues with parallel jobs all trying to pull the same container to the same filepath
CWL_ARGS = [ *CWL_ARGS, '--parallel' ]
command = [
"cwl-runner",
*CWL_ARGS,
"--outdir", output_dir,
"--tmpdir-prefix", tmp_dir,
"--cachedir", cache_dir,
cwl_file, input_json_file
]
if print_command:
print(">>> cwl-runner command:")
print(' '.join([ str(c) for c in command ]) )
returncode, proc_stdout, proc_stderr = run_command(command)
if print_stdout:
print(proc_stdout)
if returncode != 0:
print(proc_stderr)
if check_returncode:
testcase.assertEqual(returncode, 0)
output_json = json.loads(proc_stdout)
return(output_json, output_dir)
[docs]def run_cwl_toil(
input_data,
cwl_file,
run_dir,
output_dir = None,
workDir = None,
jobStore = None,
tmpDir = None,
logFile = None,
input_json_file = None,
print_command = False,
restart = False,
TOIL_ARGS = TOIL_ARGS,
input_is_file = False # if the `input_json` is actually a path to a pre-existing JSON file
):
"""
Run a CWL using Toil
"""
run_dir = os.path.abspath(run_dir)
# if we are not restarting, jobStore should not already exist
if not restart:
if not jobStore:
jobStore = os.path.join(run_dir, "jobstore")
if os.path.exists(jobStore):
print(">>> ERROR: Job store already exists; ", jobStore)
sys.exit(1)
TOIL_ARGS = [ *TOIL_ARGS, '--jobStore', jobStore ]
# if we are restarting, jobStore needs to exist
else:
if not jobStore:
print(">>> ERROR: jobStore must be provided")
sys.exit(1)
else:
jobStore = os.path.abspath(jobStore)
if not os.path.exists(jobStore):
print(">>> ERROR: jobStore does not exist; ", jobStore)
sys.exit(1)
# need to add extra restart args
TOIL_ARGS = [ *TOIL_ARGS, '--restart', '--jobStore', jobStore ]
if not input_is_file:
# the input_data is a Python dict to be dumped to JSON file
# if there is already a desired path to dump input data to
if input_json_file is None:
input_json_file = os.path.join(run_dir, "input.json")
# dump input data to JSON file
with open(input_json_file, "w") as json_out:
json.dump(input_data, json_out)
else:
# input_json is a pre-existing JSON file
input_json_file = input_data
if output_dir is None:
output_dir = os.path.join(run_dir, "output")
if workDir is None:
workDir = os.path.join(run_dir, "work")
if logFile is None:
logFile = os.path.join(run_dir, "toil.log")
if tmpDir is None:
# tmpDir = os.path.join(run_dir, "tmp")
tmpDir = os.path.join('/scratch', username)
tmpDirPrefix = os.path.join(tmpDir, "tmp")
Path(workDir).mkdir(parents=True, exist_ok=True)
Path(tmpDir).mkdir(parents=True, exist_ok=True)
command = [
"toil-cwl-runner",
*TOIL_ARGS,
"--logFile", logFile,
"--outdir", output_dir,
'--workDir', workDir,
'--tmpdir-prefix', tmpDirPrefix,
cwl_file, input_json_file
]
if print_command:
print(">>> toil-cwl-runner command:")
print(' '.join([ str(c) for c in command ]) )
returncode, proc_stdout, proc_stderr = run_command(command)
try:
output_data = json.loads(proc_stdout)
return(output_data, output_dir)
# if you cant decode the JSON stdout then it did not finish correctly
except json.decoder.JSONDecodeError:
print(proc_stdout)
print(proc_stderr)
raise
[docs]def load_mutations(filename):
"""
Load the mutations from a tabular .maf file
Parameters
----------
filename: str
path to input file
Returns
-------
list
a list of comment lines from the file
list
a list of dicts containing the records from the file
Examples
--------
Example usage::
>>> row1 = {'Hugo_Symbol': 'SOX9', 'Chromosome': '1'}
>>> row2 = {'Hugo_Symbol': 'BRCA', 'Chromosome': '7'}
>>> comments = [ ['# version 2.4'] ]
>>> lines = dicts2lines([row1, row2], comments)
>>> output_path = write_table('.', 'output.maf', lines)
>>> comments, mutations = load_mutations(output_path)
>>> comments
['# version 2.4']
>>> mutations
[OrderedDict([('Hugo_Symbol', 'SOX9'), ('Chromosome', '1')]), OrderedDict([('Hugo_Symbol', 'BRCA'), ('Chromosome', '7')])]
"""
comments, start_line = parse_header_comments(filename)
with open(filename) as fin:
while start_line > 0:
next(fin)
start_line -= 1
reader = csv.DictReader(fin, delimiter = '\t')
mutations = [ row for row in reader ]
return(comments, mutations)
[docs]def write_table(tmpdir, filename, lines, delimiter = '\t', filepath = None):
"""
Write a table to a temp location
Parameters
----------
tmpdir: str
path to parent directory to save the file to
filename: str
basename for the file to write to
filepath: str
full path to write the output file to; overrides tmpdir and filename
lines: list
a list of lists, containing each field to write
delimiter: str
character to join the line elements on
Returns
-------
str
path to output file
"""
if not filepath:
filepath = os.path.join(tmpdir, filename)
with open(filepath, "w") as f:
for line in lines:
line_str = delimiter.join(line) + '\n'
f.write(line_str)
return(filepath)
[docs]def dicts2lines(dict_list, comment_list = None):
"""
Helper function to convert a list of dicts into a list of lines to use with write_table
create a list of line parts to pass for write_table
Parameters
----------
dict_list: list
a list of dictionaries with data to be written
comment_list: list
a list of comment lines to prepend to the file data
Note
-----
Returns list of lines in the format::
[ ['# comment1'], ['col1', 'col2'], ['val1', 'val2'], ... ]
Note
----
Dict values must be type `str`
Examples
--------
Example usage::
>>> row1 = { 'a':'1', 'b':'2' }
>>> row2 = { 'a':'6', 'b':7 }
>>> row2 = { 'a':'6', 'b':'7' }
>>> lines = dicts2lines(dict_list = [row1, row2], comment_list = comments)
>>> lines
['# foo', ['a', 'b'], ['1', '2'], ['6', '7']]
>>> output_path = write_table(tmpdir = '.', filename = 'output.txt', lines = lines)
"""
fieldnames = OrderedDict() # use as an ordered set
# get the ordered fieldnames
for row in dict_list:
for key in row.keys():
fieldnames[key] = ''
# list to hold the lines to be written out
demo_maf_lines = []
if comment_list:
for line in comment_list:
demo_maf_lines.append(line)
fieldnames = [ f for f in fieldnames.keys() ]
demo_maf_lines.append(fieldnames)
for row in dict_list:
demo_maf_lines.append([ v for v in row.values() ])
return(demo_maf_lines)
[docs]def md5_file(filename):
"""
Get md5sum of a file by reading it in small chunks. This avoids issues with Python memory usage when hashing large files.
Parameters
----------
filename: str
path to input file
Returns
-------
str
hash for the file
"""
with open(filename, "rb") as f:
file_hash = hashlib.md5()
chunk = f.read(8192)
while chunk:
file_hash.update(chunk)
chunk = f.read(8192)
hash = file_hash.hexdigest()
return(hash)
[docs]class TableReader(object):
"""
Handler for reading a table with comments
Allows for parsing file attributes and rows without loading the whole file into memory
Note
----
Input file must have column headers!
"""
[docs] def __init__(self, filename, comment_char = '#', delimiter = '\t'):
"""
Parameters
----------
filename: str
path to input file
comment_char: str
comment character for the file
delimiter: str
file field delimiter
Examples
--------
Example usage::
table_reader = TableReader(input_maf_file)
comment_lines = table_reader.comment_lines
fieldnames = table_reader.get_fieldnames()
records = [ rec for rec in table_reader.read() ]
"""
self.filename = filename
self.comment_char = comment_char
self.delimiter = delimiter
# get the comments from the file and find the beginning of the table header
self.comments, self.start_line = parse_header_comments(filename, comment_char = self.comment_char)
self.comment_lines = [ c + '\n' for c in self.comments ]
def get_reader(self, fin):
"""
returns the csv.DictReader for the table rows, skipping the comments
"""
start_line = self.start_line
# skip comment lines
while start_line > 0:
next(fin)
start_line -= 1
reader = csv.DictReader(fin, delimiter = self.delimiter)
return(reader)
def get_fieldnames(self):
"""
returns the list of fieldnames for the table
"""
with open(self.filename,'r') as fin:
reader = self.get_reader(fin)
return(reader.fieldnames)
def read(self):
"""
iterable to get the record rows from the table, skipping the comments
"""
with open(self.filename,'r') as fin:
reader = self.get_reader(fin)
for row in reader:
yield(row)
def count(self):
"""
Return the total number of records in the table
"""
num_records = 0
for _ in self.read():
num_records += 1
return(num_records)
[docs]class PlutoTestCase(unittest.TestCase):
"""
An all-in-one `unittest.TestCase` wrapper class that includes a tmpdir, CWLRunner, and other helper functions, to make it easier to create and run unit tests and integration tests for CWL workflows
Note
----
For usage examples, see https://github.com/mskcc/pluto-cwl/tree/master/tests
Examples
--------
Example usage::
class TestAddImpactCWL(PlutoTestCase):
cwl_file = CWLFile('add_af.cwl')
def test_add_af(self):
'''Test that the af col gets calculated and added to the maf file correctly'''
maf_lines = [
['# comment 1'],
['# comment 2'],
['Hugo_Symbol', 't_depth', 't_alt_count'],
['SUFU', '100', '75'],
['GOT1', '100', '1'],
['SOX9', '100', '0'],
]
input_maf = self.write_table(tmpdir = self.tmpdir, filename = 'input.maf', lines = maf_lines)
self.input = {
"input_file": {
"class": "File",
"path": input_maf
},
"output_filename": 'output.maf',
}
output_json, output_dir = self.run_cwl()
expected_output = {
'output_file': {
'location': 'file://' + os.path.join(output_dir, 'output.maf'),
'basename': 'output.maf',
'class': 'File',
'checksum': 'sha1$39de59ad5d736db692504012ce86d3395685112e',
'size': 109,
'path': os.path.join(output_dir, 'output.maf')
}
}
self.assertDictEqual(output_json, expected_output)
comments, mutations = self.load_mutations(output_json['output_file']['path'])
expected_comments = ['# comment 1', '# comment 2']
self.assertEqual(comments, expected_comments)
expected_mutations = [
{'Hugo_Symbol': 'SUFU', 't_depth': '100', 't_alt_count':'75', 't_af': '0.75'},
{'Hugo_Symbol': 'GOT1', 't_depth': '100', 't_alt_count':'1', 't_af': '0.01'},
{'Hugo_Symbol': 'SOX9', 't_depth': '100', 't_alt_count':'0', 't_af': '0.0'}
]
self.assertEqual(mutations, expected_mutations)
"""
# global settings for all test cases in the instance
cwl_file = None # make sure to override this in subclasses before using the runner
DATA_SETS = DATA_SETS
KNOWN_FUSIONS_FILE = KNOWN_FUSIONS_FILE
IMPACT_FILE = IMPACT_FILE
runner_args = dict(
leave_outputs = False,
leave_tmpdir = False,
debug = False,
parallel = False
)
def setUp(self):
"""
This gets automatically run before each test case
Note
----
This method will set up `self.preserve`, `self.tmpdir`, and `self.input`
"""
self.preserve = False # save the tmpdir
self.tmpdir = mkdtemp() # dir = THIS_DIR
self.input = {} # put the CWL input data here
def tearDown(self):
"""
This gets automatically run after each test case
Note
----
This method will delete `self.tmpdir` unless `self.preserve` is `True`
"""
if not self.preserve:
# remove the tmpdir upon test completion
shutil.rmtree(self.tmpdir)
def run_cwl(self, input = None, cwl_file = None):
"""
Run the CWL specified for the test case
Parameters
----------
input: dict
data dict to be used as input to the CWL
cwl_file: str | CWLFile
the CWLFile object or path to CWL file to run
"""
if input is None:
input = self.input
if cwl_file is None:
cwl_file = CWLFile(self.cwl_file)
runner = CWLRunner(
cwl_file = cwl_file,
input = input,
verbose = False,
dir = self.tmpdir,
testcase = self,
**self.runner_args)
# debug = self.debug,
# leave_tmpdir = self.leave_tmpdir,
# leave_outputs = self.leave_outputs
output_json, output_dir, output_json_file = runner.run()
return(output_json, output_dir)
def run_command(self, *args, **kwargs):
"""
Run a shell command. Wrapper around :func:`~pluto.run_command`
"""
returncode, proc_stdout, proc_stderr = run_command(*args, **kwargs)
return(returncode, proc_stdout, proc_stderr)
# wrappers around other functions in this module to reduce imports needed
def write_table(self, *args, **kwargs):
"""
Wrapper around :func:`~pluto.write_table`
"""
filepath = write_table(*args, **kwargs)
return(filepath)
def read_table(self, input_file):
"""
Simple loading of tabular lines in a file
Parameters
----------
input_file: str
path to file
Returns
-------
list
a list of file lines split on whitespace
"""
with open(input_file) as fin:
lines = [ l.strip().split() for l in fin ]
return(lines)
def load_mutations(self, *args, **kwargs):
"""
Wrapper around :func:`~pluto.load_mutations`
"""
comments, mutations = load_mutations(*args, **kwargs)
return(comments, mutations)
def dicts2lines(self, *args, **kwargs):
"""
Wrapper around :func:`~pluto.dicts2lines`
"""
lines = dicts2lines(*args, **kwargs)
return(lines)