11167: Refactored tests to use new helper function.
[arvados.git] / crunch_scripts / arvados_bwa.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import arvados
6 import re
7 import os
8 import sys
9 import fcntl
10 import subprocess
11
12 bwa_install_path = None
13
14 def install_path():
15     """
16     Extract the bwa source tree, build the bwa binary, and return the
17     path to the source tree.
18     """
19     global bwa_install_path
20     if bwa_install_path:
21         return bwa_install_path
22
23     bwa_install_path = arvados.util.tarball_extract(
24         tarball = arvados.current_job()['script_parameters']['bwa_tbz'],
25         path = 'bwa')
26
27     # build "bwa" binary
28     lockfile = open(os.path.split(bwa_install_path)[0] + '.bwa-make.lock',
29                     'w')
30     fcntl.flock(lockfile, fcntl.LOCK_EX)
31     arvados.util.run_command(['make', '-j16'], cwd=bwa_install_path)
32     lockfile.close()
33
34     return bwa_install_path
35
36 def bwa_binary():
37     """
38     Return the path to the bwa executable.
39     """
40     return os.path.join(install_path(), 'bwa')
41
42 def run(command, command_args, **kwargs):
43     """
44     Build and run the bwa binary.
45
46     command is the bwa module, e.g., "index" or "aln".
47
48     command_args is a list of additional command line arguments, e.g.,
49     ['-a', 'bwtsw', 'ref.fasta']
50
51     It is assumed that we are running in a Crunch job environment, and
52     the job's "bwa_tbz" parameter is a collection containing the bwa
53     source tree in a .tbz file.
54     """
55     execargs = [bwa_binary(),
56                 command]
57     execargs += command_args
58     sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs)))
59     arvados.util.run_command(
60         execargs,
61         cwd=arvados.current_task().tmpdir,
62         stderr=sys.stderr,
63         stdin=kwargs.get('stdin', subprocess.PIPE),
64         stdout=kwargs.get('stdout', sys.stderr))
65
66 def one_task_per_pair_input_file(if_sequence=0, and_end_task=True):
67     """
68     Queue one task for each pair of fastq files in this job's input
69     collection.
70
71     Each new task will have two parameters, named "input_1" and
72     "input_2", each being a manifest containing a single fastq file.
73
74     A matching pair of files in the input collection is assumed to
75     have names "x_1.y" and "x_2.y".
76
77     Files in the input collection that are not part of a matched pair
78     are silently ignored.
79
80     if_sequence and and_end_task arguments have the same significance
81     as in arvados.job_setup.one_task_per_input_file().
82     """
83     if if_sequence != arvados.current_task()['sequence']:
84         return
85     job_input = arvados.current_job()['script_parameters']['input']
86     cr = arvados.CollectionReader(job_input)
87     all_files = []
88     for s in cr.all_streams():
89         all_files += list(s.all_files())
90     for s in cr.all_streams():
91         for left_file in s.all_files():
92             left_name = left_file.name()
93             right_file = None
94             right_name = re.sub(r'(.*_)1\.', '\g<1>2.', left_name)
95             if right_name == left_name:
96                 continue
97             for f2 in s.all_files():
98                 if right_name == f2.name():
99                     right_file = f2
100             if right_file != None:
101                 new_task_attrs = {
102                     'job_uuid': arvados.current_job()['uuid'],
103                     'created_by_job_task_uuid': arvados.current_task()['uuid'],
104                     'sequence': if_sequence + 1,
105                     'parameters': {
106                         'input_1':left_file.as_manifest(),
107                         'input_2':right_file.as_manifest()
108                         }
109                     }
110                 arvados.api().job_tasks().create(body=new_task_attrs).execute()
111     if and_end_task:
112         arvados.api().job_tasks().update(uuid=arvados.current_task()['uuid'],
113                                    body={'success':True}
114                                    ).execute()
115         exit(0)