Start working on adding local unix pipe support to run-command.
[arvados.git] / crunch_scripts / split-fastq.py
index ece593d29e5014e5cc225bc30160bf6e6398d1bd..17aabf2930393a48d3539483d5198cab5af35631 100755 (executable)
@@ -14,8 +14,6 @@ manifest_text = ""
 
 inp = arvados.CollectionReader(arvados.getjobparam('reads'))
 
-prog = re.compile(r'(.*?)_1.fastq(.gz)?$')
-
 manifest_list = []
 
 chunking = False #arvados.getjobparam('chunking')
@@ -33,6 +31,17 @@ def nextline(reader, start):
             start += 128
     return n
 
+# Chunk a fastq into approximately 64 MiB chunks.  Requires that the input data
+# be decompressed ahead of time, such as using decompress-all.py.  Generates a
+# new manifest, but doesn't actually move any data around.  Handles paired
+# reads by ensuring that each chunk of a pair gets the same number of records.
+#
+# This works, but in practice is so slow that potential gains in alignment
+# performance are lost in the prep time, which is why it is currently disabled.
+#
+# A better algorithm would seek to a file position a bit less than the desired
+# chunk size and then scan ahead for the next record, making sure that record
+# was matched by the read pair.
 def splitfastq(p):
     for i in xrange(0, len(p)):
         p[i]["start"] = 0
@@ -64,17 +73,12 @@ def splitfastq(p):
         if splitnow:
             for i in xrange(0, len(p)):
                 global manifest_list
-                print "Finish piece ./_%s/%s (%s %s)" % (piece, p[i]["reader"].name(), p[i]["start"], p[i]["end"])
+                print >>sys.stderr, "Finish piece ./_%s/%s (%s %s)" % (piece, p[i]["reader"].name(), p[i]["start"], p[i]["end"])
                 manifest = []
                 manifest.extend(["./_" + str(piece)])
                 manifest.extend([d[arvados.LOCATOR] for d in p[i]["reader"]._stream._data_locators])
-
-                print p[i]
-                print arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])
-
                 manifest.extend(["{}:{}:{}".format(seg[arvados.LOCATOR]+seg[arvados.OFFSET], seg[arvados.SEGMENTSIZE], p[i]["reader"].name().replace(' ', '\\040')) for seg in arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])])
                 manifest_list.append(manifest)
-                print "Finish piece %s" % (" ".join(manifest))
                 p[i]["start"] = p[i]["end"]
             piece += 1
         else:
@@ -82,45 +86,44 @@ def splitfastq(p):
                 p[i]["end"] += recordsize[i]
             count += 1
             if count % 10000 == 0:
-                print "Record %s at %s" % (count, p[i]["end"])
+                print >>sys.stderr, "Record %s at %s" % (count, p[i]["end"])
 
+prog = re.compile(r'(.*?)(_[12])?\.fastq(\.gz)?$')
+
+# Look for fastq files
 for s in inp.all_streams():
-    if s.name() == ".":
-        for f in s.all_files():
-            result = prog.match(f.name())
-            if result != None:
-                p = [{}, {}]
-                p[0]["reader"] = s.files()[result.group(0)]
-                if result.group(2) != None:
-                    p[1]["reader"] = s.files()[result.group(1) + "_2.fastq" + result.group(2)]
-                else:
-                    p[1]["reader"] = s.files()[result.group(1) + "_2.fastq"]
+    for f in s.all_files():
+        name_pieces = prog.match(f.name())
+        if name_pieces is not None:
+            if s.name() != ".":
+                # The downstream tool (run-command) only iterates over the top
+                # level of directories so if there are fastq files in
+                # directories in the input, the choice is either to forget
+                # there are directories (which might lead to name conflicts) or
+                # just fail.
+                print >>sys.stderr, "fastq must be at the root of the collection"
+                sys.exit(1)
+
+            p = None
+            if name_pieces.group(2) is not None:
+                if name_pieces.group(2) == "_1":
+                    p = [{}, {}]
+                    p[0]["reader"] = s.files()[name_pieces.group(0)]
+                    p[1]["reader"] = s.files()[name_pieces.group(1) + "_2.fastq" + (name_pieces.group(3) if name_pieces.group(3) else '')]
+            else:
+                p = [{}]
+                p[0]["reader"] = s.files()[name_pieces.group(0)]
+
+            if p is not None:
                 if chunking:
                     splitfastq(p)
                 else:
-                    m0 = p[0]["reader"].as_manifest()[1:]
-                    m1 = p[1]["reader"].as_manifest()[1:]
-                    manifest_list.append(["./_" + str(piece), m0[:-1]])
-                    manifest_list.append(["./_" + str(piece), m1[:-1]])
+                    for i in xrange(0, len(p)):
+                        m = p[i]["reader"].as_manifest().split()
+                        m[0] = "./_" + str(piece)
+                        manifest_list.append(m)
                     piece += 1
 
-# No pairs found so just put each fastq file into a separate directory
-if len(manifest_list) == 0:
-    for s in inp.all_streams():
-        prog = re.compile("(.*?).fastq(.gz)?$")
-        if s.name() == ".":
-            for f in s.all_files():
-                result = prog.match(f.name())
-                if result != None:
-                    p = [{}]
-                    p[0]["reader"] = s.files()[result.group(0)]
-                    if chunking:
-                        splitfastq(p)
-                    else:
-                        m0 = p[0]["reader"].as_manifest()[1:]
-                        manifest_list.append(["./_" + str(piece), m0])
-                        piece += 1
-
-manifest_text = "\n".join(" ".join(m) for m in manifest_list)
+manifest_text = "\n".join(" ".join(m) for m in manifest_list) + "\n"
 
 arvados.current_task().set_output(manifest_text)