fc61142335c3ff1fe7424f413ee25736a5133fe7
[arvados.git] / crunch_scripts / split-fastq.py
1 #!/usr/bin/python
2
3 import arvados
4 import re
5 import hashlib
6
7 api = arvados.api('v1')
8
9 piece = 0
10 manifest_text = ""
11
12 # Look for paired reads
13
14 inp = arvados.CollectionReader(arvados.getjobparam('reads'))
15
16 prog = re.compile("(.*?)_1.fastq(.gz)?$")
17
18 manifest_text = ""
19
20 def readline(reader, start):
21     line = ""
22     n = -1
23     while n == -1:
24         r = reader.readfrom(start, 1024)
25         if r == '':
26             break
27         n = string.find(r, "\n")
28         line += r[0:n]
29         start += len(r)
30     return line
31
32 def splitfastq(p):
33     for i in xrange(0, len(p)):
34         p[i]["start"] = 0
35         p[i]["end"] = 0
36
37     while True:
38         recordsize = [0, 0]
39
40         # read 4 lines starting at "start"
41         for ln in xrange(0, 4):
42             for i in xrange(0, len(p)):
43                 r = readline(p[i]["reader"], p[i]["start"])
44                 if r == '':
45                     return
46                 recordsize[i] += len(r)
47
48         splitnow = False
49         for i in xrange(0, len(p)):
50             if ((p[i]["end"] - p[i]["start"]) + recordsize[i]) >= arvados.BLOCKSIZE:
51                 splitnow = True
52
53         if splitnow:
54             for i in xrange(0, len(p)):
55                 global piece
56                 global manifest_text
57                 manifest = []
58                 manifest.extend("./_" + str(piece))
59                 manifest.extend([d[LOCATOR] for d in p["reader"]._stream._data_locators])
60                 manifest.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])])
61                 manifest_text += manifest.join(" ") + "\n"
62                 p[i]["start"] = p[i]["end"]
63         else:
64             for i in xrange(0, len(p)):
65                 p[i]["end"] += recordsize[i]
66
67
68 for s in inp.all_streams():
69     if s.name() == ".":
70         for f in s.all_files():
71             result = prog.match(f.name())
72             if result != None:
73                 p = [{}, {}]
74                 p[0]["reader"] = s.files()[result.group(0)]
75                 p[1]["reader"] = s.files()[result.group(1) + "_2.fastq" + result.group(2)]
76                 splitfastq(p)
77                 #m0 = p[0]["reader"].as_manifest()[1:]
78                 #m1 = p[1]["reader"].as_manifest()[1:]
79                 #manifest_text += "./_" + str(piece) + m0
80                 #manifest_text += "./_" + str(piece) + m1
81                 piece += 1
82
83 # No pairs found so just put each fastq file into a separate directory
84 if manifest_text == "":
85     for s in inp.all_streams():
86         prog = re.compile("(.*?).fastq(.gz)?$")
87         if s.name() == ".":
88             for f in s.all_files():
89                 result = prog.match(f.name())
90                 if result != None:
91                     p = [{}]
92                     p[0]["reader"] = s.files()[result.group(0)]
93                     splitfastq(p)
94                     #m0 = p[0]["reader"].as_manifest()[1:]
95                     #manifest_text += "./_" + str(piece) + m0
96                     piece += 1
97
98 arvados.current_task().set_output(manifest_text)