Merge branch '9570-cwl-runner-fixes' closes #9570 (again)
[arvados.git] / crunch_scripts / arvados_bwa.py
1 import arvados
2 import re
3 import os
4 import sys
5 import fcntl
6 import subprocess
7
8 bwa_install_path = None
9
10 def install_path():
11     """
12     Extract the bwa source tree, build the bwa binary, and return the
13     path to the source tree.
14     """
15     global bwa_install_path
16     if bwa_install_path:
17         return bwa_install_path
18
19     bwa_install_path = arvados.util.tarball_extract(
20         tarball = arvados.current_job()['script_parameters']['bwa_tbz'],
21         path = 'bwa')
22
23     # build "bwa" binary
24     lockfile = open(os.path.split(bwa_install_path)[0] + '.bwa-make.lock',
25                     'w')
26     fcntl.flock(lockfile, fcntl.LOCK_EX)
27     arvados.util.run_command(['make', '-j16'], cwd=bwa_install_path)
28     lockfile.close()
29
30     return bwa_install_path
31
32 def bwa_binary():
33     """
34     Return the path to the bwa executable.
35     """
36     return os.path.join(install_path(), 'bwa')
37
38 def run(command, command_args, **kwargs):
39     """
40     Build and run the bwa binary.
41
42     command is the bwa module, e.g., "index" or "aln".
43
44     command_args is a list of additional command line arguments, e.g.,
45     ['-a', 'bwtsw', 'ref.fasta']
46
47     It is assumed that we are running in a Crunch job environment, and
48     the job's "bwa_tbz" parameter is a collection containing the bwa
49     source tree in a .tbz file.
50     """
51     execargs = [bwa_binary(),
52                 command]
53     execargs += command_args
54     sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs)))
55     arvados.util.run_command(
56         execargs,
57         cwd=arvados.current_task().tmpdir,
58         stderr=sys.stderr,
59         stdin=kwargs.get('stdin', subprocess.PIPE),
60         stdout=kwargs.get('stdout', sys.stderr))
61
62 def one_task_per_pair_input_file(if_sequence=0, and_end_task=True):
63     """
64     Queue one task for each pair of fastq files in this job's input
65     collection.
66
67     Each new task will have two parameters, named "input_1" and
68     "input_2", each being a manifest containing a single fastq file.
69
70     A matching pair of files in the input collection is assumed to
71     have names "x_1.y" and "x_2.y".
72
73     Files in the input collection that are not part of a matched pair
74     are silently ignored.
75
76     if_sequence and and_end_task arguments have the same significance
77     as in arvados.job_setup.one_task_per_input_file().
78     """
79     if if_sequence != arvados.current_task()['sequence']:
80         return
81     job_input = arvados.current_job()['script_parameters']['input']
82     cr = arvados.CollectionReader(job_input)
83     all_files = []
84     for s in cr.all_streams():
85         all_files += list(s.all_files())
86     for s in cr.all_streams():
87         for left_file in s.all_files():
88             left_name = left_file.name()
89             right_file = None
90             right_name = re.sub(r'(.*_)1\.', '\g<1>2.', left_name)
91             if right_name == left_name:
92                 continue
93             for f2 in s.all_files():
94                 if right_name == f2.name():
95                     right_file = f2
96             if right_file != None:
97                 new_task_attrs = {
98                     'job_uuid': arvados.current_job()['uuid'],
99                     'created_by_job_task_uuid': arvados.current_task()['uuid'],
100                     'sequence': if_sequence + 1,
101                     'parameters': {
102                         'input_1':left_file.as_manifest(),
103                         'input_2':right_file.as_manifest()
104                         }
105                     }
106                 arvados.api().job_tasks().create(body=new_task_attrs).execute()
107     if and_end_task:
108         arvados.api().job_tasks().update(uuid=arvados.current_task()['uuid'],
109                                    body={'success':True}
110                                    ).execute()
111         exit(0)