Merge branch 'master' into 3373-improve-gatk3-snv-pipeline
[arvados.git] / crunch_scripts / split-fastq.py
1 #!/usr/bin/python
2
3 import arvados
4 import re
5 import hashlib
6 import string
7
8 api = arvados.api('v1')
9
10 piece = 0
11 manifest_text = ""
12
13 # Look for paired reads
14
15 inp = arvados.CollectionReader(arvados.getjobparam('reads'))
16
17 prog = re.compile(r'(.*?)_1.fastq(.gz)?$')
18
19 manifest_list = []
20
21 chunking = False #arvados.getjobparam('chunking')
22
23 def nextline(reader, start):
24     n = -1
25     while True:
26         r = reader.readfrom(start, 128)
27         if r == '':
28             break
29         n = string.find(r, "\n")
30         if n > -1:
31             break
32         else:
33             start += 128
34     return n
35
36 def splitfastq(p):
37     for i in xrange(0, len(p)):
38         p[i]["start"] = 0
39         p[i]["end"] = 0
40
41     count = 0
42     recordsize = [0, 0]
43
44     global piece
45     finish = False
46     while not finish:
47         for i in xrange(0, len(p)):
48             recordsize[i] = 0
49
50         # read next 4 lines
51         for i in xrange(0, len(p)):
52             for ln in xrange(0, 4):
53                 r = nextline(p[i]["reader"], p[i]["end"]+recordsize[i])
54                 if r == -1:
55                     finish = True
56                     break
57                 recordsize[i] += (r+1)
58
59         splitnow = finish
60         for i in xrange(0, len(p)):
61             if ((p[i]["end"] - p[i]["start"]) + recordsize[i]) >= (64*1024*1024):
62                 splitnow = True
63
64         if splitnow:
65             for i in xrange(0, len(p)):
66                 global manifest_list
67                 print "Finish piece ./_%s/%s (%s %s)" % (piece, p[i]["reader"].name(), p[i]["start"], p[i]["end"])
68                 manifest = []
69                 manifest.extend(["./_" + str(piece)])
70                 manifest.extend([d[arvados.LOCATOR] for d in p[i]["reader"]._stream._data_locators])
71
72                 print p[i]
73                 print arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])
74
75                 manifest.extend(["{}:{}:{}".format(seg[arvados.LOCATOR]+seg[arvados.OFFSET], seg[arvados.SEGMENTSIZE], p[i]["reader"].name().replace(' ', '\\040')) for seg in arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])])
76                 manifest_list.append(manifest)
77                 print "Finish piece %s" % (" ".join(manifest))
78                 p[i]["start"] = p[i]["end"]
79             piece += 1
80         else:
81             for i in xrange(0, len(p)):
82                 p[i]["end"] += recordsize[i]
83             count += 1
84             if count % 10000 == 0:
85                 print "Record %s at %s" % (count, p[i]["end"])
86
87 for s in inp.all_streams():
88     if s.name() == ".":
89         for f in s.all_files():
90             result = prog.match(f.name())
91             if result != None:
92                 p = [{}, {}]
93                 p[0]["reader"] = s.files()[result.group(0)]
94                 if result.group(2) != None:
95                     p[1]["reader"] = s.files()[result.group(1) + "_2.fastq" + result.group(2)]
96                 else:
97                     p[1]["reader"] = s.files()[result.group(1) + "_2.fastq"]
98                 if chunking:
99                     splitfastq(p)
100                 else:
101                     m0 = p[0]["reader"].as_manifest()[1:]
102                     m1 = p[1]["reader"].as_manifest()[1:]
103                     manifest_list.append(["./_" + str(piece), m0[:-1]])
104                     manifest_list.append(["./_" + str(piece), m1[:-1]])
105                     piece += 1
106
107 # No pairs found so just put each fastq file into a separate directory
108 if len(manifest_list) == 0:
109     for s in inp.all_streams():
110         prog = re.compile("(.*?).fastq(.gz)?$")
111         if s.name() == ".":
112             for f in s.all_files():
113                 result = prog.match(f.name())
114                 if result != None:
115                     p = [{}]
116                     p[0]["reader"] = s.files()[result.group(0)]
117                     if chunking:
118                         splitfastq(p)
119                     else:
120                         m0 = p[0]["reader"].as_manifest()[1:]
121                         manifest_list.append(["./_" + str(piece), m0])
122                         piece += 1
123
124 manifest_text = "\n".join(" ".join(m) for m in manifest_list)
125
126 arvados.current_task().set_output(manifest_text)