inp = arvados.CollectionReader(arvados.getjobparam('reads'))
-prog = re.compile(r'(.*?)_1.fastq(.gz)?$')
-
manifest_list = []
-chunking = arvados.getjobparam('chunking')
+chunking = False #arvados.getjobparam('chunking')
def nextline(reader, start):
n = -1
start += 128
return n
+# Chunk a fastq into approximately 64 MiB chunks. Requires that the input data
+# be decompressed ahead of time, such as using decompress-all.py. Generates a
+# new manifest, but doesn't actually move any data around. Handles paired
+# reads by ensuring that each chunk of a pair gets the same number of records.
+#
+# This works, but in practice is so slow that potential gains in alignment
+# performance are lost in the prep time, which is why it is currently disabled.
+#
+# A better algorithm would seek to a file position a bit less than the desired
+# chunk size and then scan ahead for the next record, making sure that record
+# was matched by the read pair.
def splitfastq(p):
for i in xrange(0, len(p)):
p[i]["start"] = 0
if splitnow:
for i in xrange(0, len(p)):
global manifest_list
- print "Finish piece ./_%s/%s (%s %s)" % (piece, p[i]["reader"].name(), p[i]["start"], p[i]["end"])
+ print >>sys.stderr, "Finish piece ./_%s/%s (%s %s)" % (piece, p[i]["reader"].name(), p[i]["start"], p[i]["end"])
manifest = []
manifest.extend(["./_" + str(piece)])
manifest.extend([d[arvados.LOCATOR] for d in p[i]["reader"]._stream._data_locators])
-
- print p[i]
- print arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])
-
manifest.extend(["{}:{}:{}".format(seg[arvados.LOCATOR]+seg[arvados.OFFSET], seg[arvados.SEGMENTSIZE], p[i]["reader"].name().replace(' ', '\\040')) for seg in arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])])
manifest_list.append(manifest)
- print "Finish piece %s" % (" ".join(manifest))
p[i]["start"] = p[i]["end"]
piece += 1
else:
p[i]["end"] += recordsize[i]
count += 1
if count % 10000 == 0:
- print "Record %s at %s" % (count, p[i]["end"])
+ print >>sys.stderr, "Record %s at %s" % (count, p[i]["end"])
+prog = re.compile(r'(.*?)(_[12])?\.fastq(\.gz)?$')
+
+# Look for fastq files
for s in inp.all_streams():
- if s.name() == ".":
- for f in s.all_files():
- result = prog.match(f.name())
- if result != None:
- p = [{}, {}]
- p[0]["reader"] = s.files()[result.group(0)]
- if result.group(2) != None:
- p[1]["reader"] = s.files()[result.group(1) + "_2.fastq" + result.group(2)]
- else:
- p[1]["reader"] = s.files()[result.group(1) + "_2.fastq"]
+ for f in s.all_files():
+ name_pieces = prog.match(f.name())
+ if name_pieces != None:
+ if s.name() != ".":
+ # The downstream tool (run-command) only iterates over the top
+ # level of directories so if there are fastq files in
+ # directories in the input, the choice is either to forget
+ # there are directories (which might lead to name conflicts) or
+ # just fail.
+ print >>sys.stderr, "fastq must be at the root of the collection"
+ sys.exit(1)
+
+ p = None
+ if name_pieces.group(2) != None:
+ if name_pieces.group(2) == "_1":
+ p = [{}, {}]
+ p[0]["reader"] = s.files()[name_pieces.group(0)]
+ p[1]["reader"] = s.files()[name_pieces.group(1) + "_2.fastq" + (name_pieces.group(3) if name_pieces.group(3) else '')]
+ else:
+ p = [{}]
+ p[0]["reader"] = s.files()[name_pieces.group(0)]
+
+ if p != None:
if chunking:
splitfastq(p)
else:
- m0 = p[0]["reader"].as_manifest()[1:]
- m1 = p[1]["reader"].as_manifest()[1:]
- manifest_list.append(["./_" + str(piece), m0[:-1]])
- manifest_list.append(["./_" + str(piece), m1[:-1]])
+ for i in xrange(0, len(p)):
+ m = p[i]["reader"].as_manifest()[1:]
+ manifest_list.append(["./_" + str(piece), m[:-1]])
piece += 1
-# No pairs found so just put each fastq file into a separate directory
-if len(manifest_list) == 0:
- for s in inp.all_streams():
- prog = re.compile("(.*?).fastq(.gz)?$")
- if s.name() == ".":
- for f in s.all_files():
- result = prog.match(f.name())
- if result != None:
- p = [{}]
- p[0]["reader"] = s.files()[result.group(0)]
- if chunking:
- splitfastq(p)
- else:
- m0 = p[0]["reader"].as_manifest()[1:]
- manifest_list.append(["./_" + str(piece), m0])
- piece += 1
-
manifest_text = "\n".join(" ".join(m) for m in manifest_list)
arvados.current_task().set_output(manifest_text)