#!/usr/bin/python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
import arvados
import re
manifest_list = []
-chunking = False #arvados.getjobparam('chunking')
-
def nextline(reader, start):
n = -1
while True:
start += 128
return n
-# Chunk a fastq into approximately 64 MiB chunks. Requires that the input data
-# be decompressed ahead of time, such as using decompress-all.py. Generates a
-# new manifest, but doesn't actually move any data around. Handles paired
-# reads by ensuring that each chunk of a pair gets the same number of records.
-#
-# This works, but in practice is so slow that potential gains in alignment
-# performance are lost in the prep time, which is why it is currently disabled.
-#
-# A better algorithm would seek to a file position a bit less than the desired
-# chunk size and then scan ahead for the next record, making sure that record
-# was matched by the read pair.
-def splitfastq(p):
- for i in xrange(0, len(p)):
- p[i]["start"] = 0
- p[i]["end"] = 0
-
- count = 0
- recordsize = [0, 0]
-
- global piece
- finish = False
- while not finish:
- for i in xrange(0, len(p)):
- recordsize[i] = 0
-
- # read next 4 lines
- for i in xrange(0, len(p)):
- for ln in xrange(0, 4):
- r = nextline(p[i]["reader"], p[i]["end"]+recordsize[i])
- if r == -1:
- finish = True
- break
- recordsize[i] += (r+1)
-
- splitnow = finish
- for i in xrange(0, len(p)):
- if ((p[i]["end"] - p[i]["start"]) + recordsize[i]) >= (64*1024*1024):
- splitnow = True
-
- if splitnow:
- for i in xrange(0, len(p)):
- global manifest_list
- print >>sys.stderr, "Finish piece ./_%s/%s (%s %s)" % (piece, p[i]["reader"].name(), p[i]["start"], p[i]["end"])
- manifest = []
- manifest.extend(["./_" + str(piece)])
- manifest.extend([d[arvados.LOCATOR] for d in p[i]["reader"]._stream._data_locators])
- manifest.extend(["{}:{}:{}".format(seg[arvados.LOCATOR]+seg[arvados.OFFSET], seg[arvados.SEGMENTSIZE], p[i]["reader"].name().replace(' ', '\\040')) for seg in arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])])
- manifest_list.append(manifest)
- p[i]["start"] = p[i]["end"]
- piece += 1
- else:
- for i in xrange(0, len(p)):
- p[i]["end"] += recordsize[i]
- count += 1
- if count % 10000 == 0:
- print >>sys.stderr, "Record %s at %s" % (count, p[i]["end"])
-
-prog = re.compile(r'(.*?)(_12)?\.fastq(\.gz)?$')
+prog = re.compile(r'(.*?)(_[12])?\.fastq(\.gz)?$')
# Look for fastq files
for s in inp.all_streams():
for f in s.all_files():
name_pieces = prog.match(f.name())
- if name_pieces != None:
+ if name_pieces is not None:
if s.name() != ".":
# The downstream tool (run-command) only iterates over the top
# level of directories so if there are fastq files in
print >>sys.stderr, "fastq must be at the root of the collection"
sys.exit(1)
- if name_pieces.group(2) != None:
+ p = None
+ if name_pieces.group(2) is not None:
if name_pieces.group(2) == "_1":
p = [{}, {}]
p[0]["reader"] = s.files()[name_pieces.group(0)]
- if name_pieces.group(2) != None:
- p[1]["reader"] = s.files()[name_pieces.group(1) + "_2.fastq" + name_pieces.group(2)]
- else:
- p[1]["reader"] = s.files()[name_pieces.group(1) + "_2.fastq"]
+ p[1]["reader"] = s.files()[name_pieces.group(1) + "_2.fastq" + (name_pieces.group(3) if name_pieces.group(3) else '')]
else:
p = [{}]
p[0]["reader"] = s.files()[name_pieces.group(0)]
- if chunking:
- splitfastq(p)
- else:
+ if p is not None:
for i in xrange(0, len(p)):
- m = p[i]["reader"].as_manifest()[1:]
- manifest_list.append(["./_" + str(piece), m[:-1]])
+ m = p[i]["reader"].as_manifest().split()
+ m[0] = "./_" + str(piece)
+ manifest_list.append(m)
piece += 1
-manifest_text = "\n".join(" ".join(m) for m in manifest_list)
+manifest_text = "\n".join(" ".join(m) for m in manifest_list) + "\n"
arvados.current_task().set_output(manifest_text)