X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/363d128327963a7c1d93992613a741af4d5f55fd..1b8d669706fb952f0e5d1ad5e407073f4815907f:/crunch_scripts/split-fastq.py diff --git a/crunch_scripts/split-fastq.py b/crunch_scripts/split-fastq.py index a279198742..1c7a36871d 100755 --- a/crunch_scripts/split-fastq.py +++ b/crunch_scripts/split-fastq.py @@ -3,6 +3,7 @@ import arvados import re import hashlib +import string api = arvados.api('v1') @@ -13,89 +14,54 @@ manifest_text = "" inp = arvados.CollectionReader(arvados.getjobparam('reads')) -prog = re.compile(r'(.*?)_1.fastq(.gz)?$') +manifest_list = [] -manifest_text = "" - -def readline(reader, start): - line = "" +def nextline(reader, start): n = -1 - while n == -1: - r = reader.readfrom(start, 1024) + while True: + r = reader.readfrom(start, 128) if r == '': break n = string.find(r, "\n") - line += r[0:n] - start += len(r) - return line - -def splitfastq(p): - for i in xrange(0, len(p)): - p[i]["start"] = 0 - p[i]["end"] = 0 - - while True: - recordsize = [0, 0] - - # read 4 lines starting at "start" - for ln in xrange(0, 4): - for i in xrange(0, len(p)): - r = readline(p[i]["reader"], p[i]["start"]) - if r == '': - return - recordsize[i] += len(r) - - splitnow = False - for i in xrange(0, len(p)): - if ((p[i]["end"] - p[i]["start"]) + recordsize[i]) >= arvados.BLOCKSIZE: - splitnow = True - - if splitnow: - for i in xrange(0, len(p)): - global piece - global manifest_text - manifest = [] - manifest.extend("./_" + str(piece)) - manifest.extend([d[LOCATOR] for d in p["reader"]._stream._data_locators]) - manifest.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])]) - manifest_text += manifest.join(" ") + "\n" - p[i]["start"] = p[i]["end"] + if n > -1: + break else: - for i in xrange(0, len(p)): - p[i]["end"] += recordsize[i] + start += 128 + return n +prog = re.compile(r'(.*?)(_[12])?\.fastq(\.gz)?$') +# Look for fastq files for s in inp.all_streams(): - if s.name() == ".": - for f in s.all_files(): - result = prog.match(f.name()) - if result != None: - p = [{}, {}] - p[0]["reader"] = s.files()[result.group(0)] - if result.group(2) != None: - p[1]["reader"] = s.files()[result.group(1) + "_2.fastq" + result.group(2)] - else: - p[1]["reader"] = s.files()[result.group(1) + "_2.fastq" - splitfastq(p) - #m0 = p[0]["reader"].as_manifest()[1:] - #m1 = p[1]["reader"].as_manifest()[1:] - #manifest_text += "./_" + str(piece) + m0 - #manifest_text += "./_" + str(piece) + m1 + for f in s.all_files(): + name_pieces = prog.match(f.name()) + if name_pieces is not None: + if s.name() != ".": + # The downstream tool (run-command) only iterates over the top + # level of directories so if there are fastq files in + # directories in the input, the choice is either to forget + # there are directories (which might lead to name conflicts) or + # just fail. + print >>sys.stderr, "fastq must be at the root of the collection" + sys.exit(1) + + p = None + if name_pieces.group(2) is not None: + if name_pieces.group(2) == "_1": + p = [{}, {}] + p[0]["reader"] = s.files()[name_pieces.group(0)] + p[1]["reader"] = s.files()[name_pieces.group(1) + "_2.fastq" + (name_pieces.group(3) if name_pieces.group(3) else '')] + else: + p = [{}] + p[0]["reader"] = s.files()[name_pieces.group(0)] + + if p is not None: + for i in xrange(0, len(p)): + m = p[i]["reader"].as_manifest().split() + m[0] = "./_" + str(piece) + manifest_list.append(m) piece += 1 -# No pairs found so just put each fastq file into a separate directory -if manifest_text == "": - for s in inp.all_streams(): - prog = re.compile("(.*?).fastq(.gz)?$") - if s.name() == ".": - for f in s.all_files(): - result = prog.match(f.name()) - if result != None: - p = [{}] - p[0]["reader"] = s.files()[result.group(0)] - splitfastq(p) - #m0 = p[0]["reader"].as_manifest()[1:] - #manifest_text += "./_" + str(piece) + m0 - piece += 1 +manifest_text = "\n".join(" ".join(m) for m in manifest_list) + "\n" arvados.current_task().set_output(manifest_text)