Merge branch 'master' into 7492-keepproxy-upstream-errors
[arvados.git] / crunch_scripts / arvados_samtools.py
1 import arvados
2 import re
3 import os
4 import sys
5 import fcntl
6 import subprocess
7
8 samtools_path = None
9
10 def samtools_install_path():
11     """
12     Extract the samtools source tree, build the samtools binary, and
13     return the path to the source tree.
14     """
15     global samtools_path
16     if samtools_path:
17         return samtools_path
18     samtools_path = arvados.util.tarball_extract(
19         tarball = arvados.current_job()['script_parameters']['samtools_tgz'],
20         path = 'samtools')
21
22     # build "samtools" binary
23     lockfile = open(os.path.split(samtools_path)[0] + '.samtools-make.lock',
24                     'w')
25     fcntl.flock(lockfile, fcntl.LOCK_EX)
26     arvados.util.run_command(['make', '-j16'], cwd=samtools_path)
27     lockfile.close()
28
29     return samtools_path
30
31 def samtools_binary():
32     """
33     Return the path to the samtools executable.
34     """
35     return os.path.join(samtools_install_path(), 'samtools')
36
37 def run(command, command_args, **kwargs):
38     """
39     Build and run the samtools binary.
40
41     command is the samtools subcommand, e.g., "view" or "sort".
42
43     command_args is a list of additional command line arguments, e.g.,
44     ['-bt', 'ref_list.txt', '-o', 'aln.bam', 'aln.sam.gz']
45
46     It is assumed that we are running in a Crunch job environment, and
47     the job's "samtools_tgz" parameter is a collection containing the
48     samtools source tree in a .tgz file.
49     """
50     execargs = [samtools_binary(),
51                 command]
52     execargs += command_args
53     sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs)))
54     arvados.util.run_command(
55         execargs,
56         cwd=arvados.current_task().tmpdir,
57         stdin=kwargs.get('stdin', subprocess.PIPE),
58         stderr=kwargs.get('stderr', sys.stderr),
59         stdout=kwargs.get('stdout', sys.stderr))
60
61 def one_task_per_bam_file(if_sequence=0, and_end_task=True):
62     """
63     Queue one task for each bam file in this job's input collection.
64
65     Each new task will have an "input" parameter: a manifest
66     containing one .bam file and (if available) the corresponding .bai
67     index file.
68
69     Files in the input collection that are not named *.bam or *.bai
70     (as well as *.bai files that do not match any .bam file present)
71     are silently ignored.
72
73     if_sequence and and_end_task arguments have the same significance
74     as in arvados.job_setup.one_task_per_input_file().
75     """
76     if if_sequence != arvados.current_task()['sequence']:
77         return
78     job_input = arvados.current_job()['script_parameters']['input']
79     cr = arvados.CollectionReader(job_input)
80     bam = {}
81     bai = {}
82     for s in cr.all_streams():
83         for f in s.all_files():
84             if re.search(r'\.bam$', f.name()):
85                 bam[s.name(), f.name()] = f
86             elif re.search(r'\.bai$', f.name()):
87                 bai[s.name(), f.name()] = f
88     for ((s_name, f_name), bam_f) in bam.items():
89         bai_f = bai.get((s_name, re.sub(r'bam$', 'bai', f_name)), None)
90         task_input = bam_f.as_manifest()
91         if bai_f:
92             task_input += bai_f.as_manifest()
93         new_task_attrs = {
94             'job_uuid': arvados.current_job()['uuid'],
95             'created_by_job_task_uuid': arvados.current_task()['uuid'],
96             'sequence': if_sequence + 1,
97             'parameters': {
98                 'input': task_input
99                 }
100             }
101         arvados.api().job_tasks().create(body=new_task_attrs).execute()
102     if and_end_task:
103         arvados.api().job_tasks().update(uuid=arvados.current_task()['uuid'],
104                                          body={'success':True}
105                                          ).execute()
106         exit(0)