X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/57e0202fcbec42bf9f9aabade183c66551be0a88..6f2855585712ef7222f739a5afcef0aa736e01ac:/crunch_scripts/split-fastq.py diff --git a/crunch_scripts/split-fastq.py b/crunch_scripts/split-fastq.py index ece593d29e..17aabf2930 100755 --- a/crunch_scripts/split-fastq.py +++ b/crunch_scripts/split-fastq.py @@ -14,8 +14,6 @@ manifest_text = "" inp = arvados.CollectionReader(arvados.getjobparam('reads')) -prog = re.compile(r'(.*?)_1.fastq(.gz)?$') - manifest_list = [] chunking = False #arvados.getjobparam('chunking') @@ -33,6 +31,17 @@ def nextline(reader, start): start += 128 return n +# Chunk a fastq into approximately 64 MiB chunks. Requires that the input data +# be decompressed ahead of time, such as using decompress-all.py. Generates a +# new manifest, but doesn't actually move any data around. Handles paired +# reads by ensuring that each chunk of a pair gets the same number of records. +# +# This works, but in practice is so slow that potential gains in alignment +# performance are lost in the prep time, which is why it is currently disabled. +# +# A better algorithm would seek to a file position a bit less than the desired +# chunk size and then scan ahead for the next record, making sure that record +# was matched by the read pair. def splitfastq(p): for i in xrange(0, len(p)): p[i]["start"] = 0 @@ -64,17 +73,12 @@ def splitfastq(p): if splitnow: for i in xrange(0, len(p)): global manifest_list - print "Finish piece ./_%s/%s (%s %s)" % (piece, p[i]["reader"].name(), p[i]["start"], p[i]["end"]) + print >>sys.stderr, "Finish piece ./_%s/%s (%s %s)" % (piece, p[i]["reader"].name(), p[i]["start"], p[i]["end"]) manifest = [] manifest.extend(["./_" + str(piece)]) manifest.extend([d[arvados.LOCATOR] for d in p[i]["reader"]._stream._data_locators]) - - print p[i] - print arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"]) - manifest.extend(["{}:{}:{}".format(seg[arvados.LOCATOR]+seg[arvados.OFFSET], seg[arvados.SEGMENTSIZE], p[i]["reader"].name().replace(' ', '\\040')) for seg in arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])]) manifest_list.append(manifest) - print "Finish piece %s" % (" ".join(manifest)) p[i]["start"] = p[i]["end"] piece += 1 else: @@ -82,45 +86,44 @@ def splitfastq(p): p[i]["end"] += recordsize[i] count += 1 if count % 10000 == 0: - print "Record %s at %s" % (count, p[i]["end"]) + print >>sys.stderr, "Record %s at %s" % (count, p[i]["end"]) +prog = re.compile(r'(.*?)(_[12])?\.fastq(\.gz)?$') + +# Look for fastq files for s in inp.all_streams(): - if s.name() == ".": - for f in s.all_files(): - result = prog.match(f.name()) - if result != None: - p = [{}, {}] - p[0]["reader"] = s.files()[result.group(0)] - if result.group(2) != None: - p[1]["reader"] = s.files()[result.group(1) + "_2.fastq" + result.group(2)] - else: - p[1]["reader"] = s.files()[result.group(1) + "_2.fastq"] + for f in s.all_files(): + name_pieces = prog.match(f.name()) + if name_pieces is not None: + if s.name() != ".": + # The downstream tool (run-command) only iterates over the top + # level of directories so if there are fastq files in + # directories in the input, the choice is either to forget + # there are directories (which might lead to name conflicts) or + # just fail. + print >>sys.stderr, "fastq must be at the root of the collection" + sys.exit(1) + + p = None + if name_pieces.group(2) is not None: + if name_pieces.group(2) == "_1": + p = [{}, {}] + p[0]["reader"] = s.files()[name_pieces.group(0)] + p[1]["reader"] = s.files()[name_pieces.group(1) + "_2.fastq" + (name_pieces.group(3) if name_pieces.group(3) else '')] + else: + p = [{}] + p[0]["reader"] = s.files()[name_pieces.group(0)] + + if p is not None: if chunking: splitfastq(p) else: - m0 = p[0]["reader"].as_manifest()[1:] - m1 = p[1]["reader"].as_manifest()[1:] - manifest_list.append(["./_" + str(piece), m0[:-1]]) - manifest_list.append(["./_" + str(piece), m1[:-1]]) + for i in xrange(0, len(p)): + m = p[i]["reader"].as_manifest().split() + m[0] = "./_" + str(piece) + manifest_list.append(m) piece += 1 -# No pairs found so just put each fastq file into a separate directory -if len(manifest_list) == 0: - for s in inp.all_streams(): - prog = re.compile("(.*?).fastq(.gz)?$") - if s.name() == ".": - for f in s.all_files(): - result = prog.match(f.name()) - if result != None: - p = [{}] - p[0]["reader"] = s.files()[result.group(0)] - if chunking: - splitfastq(p) - else: - m0 = p[0]["reader"].as_manifest()[1:] - manifest_list.append(["./_" + str(piece), m0]) - piece += 1 - -manifest_text = "\n".join(" ".join(m) for m in manifest_list) +manifest_text = "\n".join(" ".join(m) for m in manifest_list) + "\n" arvados.current_task().set_output(manifest_text)