3959: crunch-job use job locking API
[arvados.git] / crunch_scripts / split-fastq.py
1 #!/usr/bin/python
2
3 import arvados
4 import re
5 import hashlib
6 import string
7
8 api = arvados.api('v1')
9
10 piece = 0
11 manifest_text = ""
12
13 # Look for paired reads
14
15 inp = arvados.CollectionReader(arvados.getjobparam('reads'))
16
17 manifest_list = []
18
19 chunking = False #arvados.getjobparam('chunking')
20
21 def nextline(reader, start):
22     n = -1
23     while True:
24         r = reader.readfrom(start, 128)
25         if r == '':
26             break
27         n = string.find(r, "\n")
28         if n > -1:
29             break
30         else:
31             start += 128
32     return n
33
34 # Chunk a fastq into approximately 64 MiB chunks.  Requires that the input data
35 # be decompressed ahead of time, such as using decompress-all.py.  Generates a
36 # new manifest, but doesn't actually move any data around.  Handles paired
37 # reads by ensuring that each chunk of a pair gets the same number of records.
38 #
39 # This works, but in practice is so slow that potential gains in alignment
40 # performance are lost in the prep time, which is why it is currently disabled.
41 #
42 # A better algorithm would seek to a file position a bit less than the desired
43 # chunk size and then scan ahead for the next record, making sure that record
44 # was matched by the read pair.
45 def splitfastq(p):
46     for i in xrange(0, len(p)):
47         p[i]["start"] = 0
48         p[i]["end"] = 0
49
50     count = 0
51     recordsize = [0, 0]
52
53     global piece
54     finish = False
55     while not finish:
56         for i in xrange(0, len(p)):
57             recordsize[i] = 0
58
59         # read next 4 lines
60         for i in xrange(0, len(p)):
61             for ln in xrange(0, 4):
62                 r = nextline(p[i]["reader"], p[i]["end"]+recordsize[i])
63                 if r == -1:
64                     finish = True
65                     break
66                 recordsize[i] += (r+1)
67
68         splitnow = finish
69         for i in xrange(0, len(p)):
70             if ((p[i]["end"] - p[i]["start"]) + recordsize[i]) >= (64*1024*1024):
71                 splitnow = True
72
73         if splitnow:
74             for i in xrange(0, len(p)):
75                 global manifest_list
76                 print >>sys.stderr, "Finish piece ./_%s/%s (%s %s)" % (piece, p[i]["reader"].name(), p[i]["start"], p[i]["end"])
77                 manifest = []
78                 manifest.extend(["./_" + str(piece)])
79                 manifest.extend([d[arvados.LOCATOR] for d in p[i]["reader"]._stream._data_locators])
80                 manifest.extend(["{}:{}:{}".format(seg[arvados.LOCATOR]+seg[arvados.OFFSET], seg[arvados.SEGMENTSIZE], p[i]["reader"].name().replace(' ', '\\040')) for seg in arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])])
81                 manifest_list.append(manifest)
82                 p[i]["start"] = p[i]["end"]
83             piece += 1
84         else:
85             for i in xrange(0, len(p)):
86                 p[i]["end"] += recordsize[i]
87             count += 1
88             if count % 10000 == 0:
89                 print >>sys.stderr, "Record %s at %s" % (count, p[i]["end"])
90
91 prog = re.compile(r'(.*?)(_[12])?\.fastq(\.gz)?$')
92
93 # Look for fastq files
94 for s in inp.all_streams():
95     for f in s.all_files():
96         name_pieces = prog.match(f.name())
97         if name_pieces is not None:
98             if s.name() != ".":
99                 # The downstream tool (run-command) only iterates over the top
100                 # level of directories so if there are fastq files in
101                 # directories in the input, the choice is either to forget
102                 # there are directories (which might lead to name conflicts) or
103                 # just fail.
104                 print >>sys.stderr, "fastq must be at the root of the collection"
105                 sys.exit(1)
106
107             p = None
108             if name_pieces.group(2) is not None:
109                 if name_pieces.group(2) == "_1":
110                     p = [{}, {}]
111                     p[0]["reader"] = s.files()[name_pieces.group(0)]
112                     p[1]["reader"] = s.files()[name_pieces.group(1) + "_2.fastq" + (name_pieces.group(3) if name_pieces.group(3) else '')]
113             else:
114                 p = [{}]
115                 p[0]["reader"] = s.files()[name_pieces.group(0)]
116
117             if p is not None:
118                 if chunking:
119                     splitfastq(p)
120                 else:
121                     for i in xrange(0, len(p)):
122                         m = p[i]["reader"].as_manifest().split()
123                         m[0] = "./_" + str(piece)
124                         manifest_list.append(m)
125                     piece += 1
126
127 manifest_text = "\n".join(" ".join(m) for m in manifest_list) + "\n"
128
129 arvados.current_task().set_output(manifest_text)