Merge branch '8784-dir-listings'
[arvados.git] / crunch_scripts / arvados_samtools.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import arvados
6 import re
7 import os
8 import sys
9 import fcntl
10 import subprocess
11
12 samtools_path = None
13
14 def samtools_install_path():
15     """
16     Extract the samtools source tree, build the samtools binary, and
17     return the path to the source tree.
18     """
19     global samtools_path
20     if samtools_path:
21         return samtools_path
22     samtools_path = arvados.util.tarball_extract(
23         tarball = arvados.current_job()['script_parameters']['samtools_tgz'],
24         path = 'samtools')
25
26     # build "samtools" binary
27     lockfile = open(os.path.split(samtools_path)[0] + '.samtools-make.lock',
28                     'w')
29     fcntl.flock(lockfile, fcntl.LOCK_EX)
30     arvados.util.run_command(['make', '-j16'], cwd=samtools_path)
31     lockfile.close()
32
33     return samtools_path
34
35 def samtools_binary():
36     """
37     Return the path to the samtools executable.
38     """
39     return os.path.join(samtools_install_path(), 'samtools')
40
41 def run(command, command_args, **kwargs):
42     """
43     Build and run the samtools binary.
44
45     command is the samtools subcommand, e.g., "view" or "sort".
46
47     command_args is a list of additional command line arguments, e.g.,
48     ['-bt', 'ref_list.txt', '-o', 'aln.bam', 'aln.sam.gz']
49
50     It is assumed that we are running in a Crunch job environment, and
51     the job's "samtools_tgz" parameter is a collection containing the
52     samtools source tree in a .tgz file.
53     """
54     execargs = [samtools_binary(),
55                 command]
56     execargs += command_args
57     sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs)))
58     arvados.util.run_command(
59         execargs,
60         cwd=arvados.current_task().tmpdir,
61         stdin=kwargs.get('stdin', subprocess.PIPE),
62         stderr=kwargs.get('stderr', sys.stderr),
63         stdout=kwargs.get('stdout', sys.stderr))
64
65 def one_task_per_bam_file(if_sequence=0, and_end_task=True):
66     """
67     Queue one task for each bam file in this job's input collection.
68
69     Each new task will have an "input" parameter: a manifest
70     containing one .bam file and (if available) the corresponding .bai
71     index file.
72
73     Files in the input collection that are not named *.bam or *.bai
74     (as well as *.bai files that do not match any .bam file present)
75     are silently ignored.
76
77     if_sequence and and_end_task arguments have the same significance
78     as in arvados.job_setup.one_task_per_input_file().
79     """
80     if if_sequence != arvados.current_task()['sequence']:
81         return
82     job_input = arvados.current_job()['script_parameters']['input']
83     cr = arvados.CollectionReader(job_input)
84     bam = {}
85     bai = {}
86     for s in cr.all_streams():
87         for f in s.all_files():
88             if re.search(r'\.bam$', f.name()):
89                 bam[s.name(), f.name()] = f
90             elif re.search(r'\.bai$', f.name()):
91                 bai[s.name(), f.name()] = f
92     for ((s_name, f_name), bam_f) in bam.items():
93         bai_f = bai.get((s_name, re.sub(r'bam$', 'bai', f_name)), None)
94         task_input = bam_f.as_manifest()
95         if bai_f:
96             task_input += bai_f.as_manifest()
97         new_task_attrs = {
98             'job_uuid': arvados.current_job()['uuid'],
99             'created_by_job_task_uuid': arvados.current_task()['uuid'],
100             'sequence': if_sequence + 1,
101             'parameters': {
102                 'input': task_input
103                 }
104             }
105         arvados.api().job_tasks().create(body=new_task_attrs).execute()
106     if and_end_task:
107         arvados.api().job_tasks().update(uuid=arvados.current_task()['uuid'],
108                                          body={'success':True}
109                                          ).execute()
110         exit(0)