inp = arvados.CollectionReader(arvados.getjobparam('reads'))
-prog = re.compile(r'(.*?)_1.fastq(.gz)?$')
+manifest_list = []
-manifest_text = ""
+chunking = False #arvados.getjobparam('chunking')
-def readline(reader, start):
- line = ""
+def nextline(reader, start):
n = -1
- while n == -1:
- r = reader.readfrom(start, 1024)
+ while True:
+ r = reader.readfrom(start, 128)
if r == '':
break
n = string.find(r, "\n")
- line += r[0:n]
- start += len(r)
- return line
-
+ if n > -1:
+ break
+ else:
+ start += 128
+ return n
+
+# Chunk a fastq into approximately 64 MiB chunks. Requires that the input data
+# be decompressed ahead of time, such as using decompress-all.py. Generates a
+# new manifest, but doesn't actually move any data around. Handles paired
+# reads by ensuring that each chunk of a pair gets the same number of records.
+#
+# This works, but in practice is so slow that potential gains in alignment
+# performance are lost in the prep time, which is why it is currently disabled.
+#
+# A better algorithm would seek to a file position a bit less than the desired
+# chunk size and then scan ahead for the next record, making sure that record
+# was matched by the read pair.
def splitfastq(p):
for i in xrange(0, len(p)):
p[i]["start"] = 0
p[i]["end"] = 0
- while True:
- recordsize = [0, 0]
+ count = 0
+ recordsize = [0, 0]
- # read 4 lines starting at "start"
- for ln in xrange(0, 4):
- for i in xrange(0, len(p)):
- r = readline(p[i]["reader"], p[i]["start"])
- if r == '':
- return
- recordsize[i] += len(r)
+ global piece
+ finish = False
+ while not finish:
+ for i in xrange(0, len(p)):
+ recordsize[i] = 0
- splitnow = False
+ # read next 4 lines
+ for i in xrange(0, len(p)):
+ for ln in xrange(0, 4):
+ r = nextline(p[i]["reader"], p[i]["end"]+recordsize[i])
+ if r == -1:
+ finish = True
+ break
+ recordsize[i] += (r+1)
+
+ splitnow = finish
for i in xrange(0, len(p)):
- if ((p[i]["end"] - p[i]["start"]) + recordsize[i]) >= arvados.BLOCKSIZE:
+ if ((p[i]["end"] - p[i]["start"]) + recordsize[i]) >= (64*1024*1024):
splitnow = True
if splitnow:
for i in xrange(0, len(p)):
- global piece
- global manifest_text
+ global manifest_list
+ print >>sys.stderr, "Finish piece ./_%s/%s (%s %s)" % (piece, p[i]["reader"].name(), p[i]["start"], p[i]["end"])
manifest = []
- manifest.extend("./_" + str(piece))
- manifest.extend([d[arvados.LOCATOR] for d in p["reader"]._stream._data_locators])
- manifest.extend(["{}:{}:{}".format(seg[arvados.LOCATOR], seg[arvados.BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])])
- manifest_text += manifest.join(" ") + "\n"
+ manifest.extend(["./_" + str(piece)])
+ manifest.extend([d[arvados.LOCATOR] for d in p[i]["reader"]._stream._data_locators])
+ manifest.extend(["{}:{}:{}".format(seg[arvados.LOCATOR]+seg[arvados.OFFSET], seg[arvados.SEGMENTSIZE], p[i]["reader"].name().replace(' ', '\\040')) for seg in arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])])
+ manifest_list.append(manifest)
p[i]["start"] = p[i]["end"]
+ piece += 1
else:
for i in xrange(0, len(p)):
p[i]["end"] += recordsize[i]
+ count += 1
+ if count % 10000 == 0:
+ print >>sys.stderr, "Record %s at %s" % (count, p[i]["end"])
+prog = re.compile(r'(.*?)(_[12])?\.fastq(\.gz)?$')
+# Look for fastq files
for s in inp.all_streams():
- if s.name() == ".":
- for f in s.all_files():
- result = prog.match(f.name())
- if result != None:
- p = [{}, {}]
- p[0]["reader"] = s.files()[result.group(0)]
- if result.group(2) != None:
- p[1]["reader"] = s.files()[result.group(1) + "_2.fastq" + result.group(2)]
- else:
- p[1]["reader"] = s.files()[result.group(1) + "_2.fastq"]
- splitfastq(p)
- #m0 = p[0]["reader"].as_manifest()[1:]
- #m1 = p[1]["reader"].as_manifest()[1:]
- #manifest_text += "./_" + str(piece) + m0
- #manifest_text += "./_" + str(piece) + m1
- piece += 1
-
-# No pairs found so just put each fastq file into a separate directory
-if manifest_text == "":
- for s in inp.all_streams():
- prog = re.compile("(.*?).fastq(.gz)?$")
- if s.name() == ".":
- for f in s.all_files():
- result = prog.match(f.name())
- if result != None:
- p = [{}]
- p[0]["reader"] = s.files()[result.group(0)]
+ for f in s.all_files():
+ name_pieces = prog.match(f.name())
+ if name_pieces is not None:
+ if s.name() != ".":
+ # The downstream tool (run-command) only iterates over the top
+ # level of directories so if there are fastq files in
+ # directories in the input, the choice is either to forget
+ # there are directories (which might lead to name conflicts) or
+ # just fail.
+ print >>sys.stderr, "fastq must be at the root of the collection"
+ sys.exit(1)
+
+ p = None
+ if name_pieces.group(2) is not None:
+ if name_pieces.group(2) == "_1":
+ p = [{}, {}]
+ p[0]["reader"] = s.files()[name_pieces.group(0)]
+ p[1]["reader"] = s.files()[name_pieces.group(1) + "_2.fastq" + (name_pieces.group(3) if name_pieces.group(3) else '')]
+ else:
+ p = [{}]
+ p[0]["reader"] = s.files()[name_pieces.group(0)]
+
+ if p is not None:
+ if chunking:
splitfastq(p)
- #m0 = p[0]["reader"].as_manifest()[1:]
- #manifest_text += "./_" + str(piece) + m0
+ else:
+ for i in xrange(0, len(p)):
+ m = p[i]["reader"].as_manifest().split()
+ m[0] = "./_" + str(piece)
+ manifest_list.append(m)
piece += 1
+manifest_text = "\n".join(" ".join(m) for m in manifest_list) + "\n"
+
arvados.current_task().set_output(manifest_text)