4042: expand list fix
[arvados.git] / crunch_scripts / run-command
index c5fbcdfc024f5fd3c6dac20e2a8dc89ec5168a66..40fb7128b2558bc0e5ebe2d6063135f5513d1597 100755 (executable)
@@ -91,6 +91,12 @@ def expand_item(p, c):
                 params[var] = i
                 r.extend(expand_list(params, c["command"]))
             return r
+        if "list" in c and "index" in c and "command" in c:
+            var = c["list"]
+            items = get_items(p, p[var])
+            params = copy.copy(p)
+            params[var] = items[int(c["index"])]
+            return expand_list(params, c["command"])
     elif isinstance(c, list):
         return expand_list(p, c)
     elif isinstance(c, str) or isinstance(c, unicode):
@@ -99,9 +105,41 @@ def expand_item(p, c):
     return []
 
 def expand_list(p, l):
-    return [exp for arg in l for exp in expand_item(p, arg)]
+    if isinstance(l, basestring):
+        return [expand_item(p, l)]
+    else:
+        return [exp for arg in l for exp in expand_item(p, arg)]
+
+def add_to_group(gr, match):
+    m = ('^_^').join(match.groups())
+    if m not in gr:
+        gr[m] = []
+    gr[m].append(match.group(0))
 
 def get_items(p, value):
+    if isinstance(value, dict):
+        if "regex" in value:
+            pattern = re.compile(value["regex"])
+            if "filter" in value:
+                items = get_items(p, value["filter"])
+                return [i for i in items if pattern.match(i)]
+            elif "group" in value:
+                items = get_items(p, value["group"])
+                groups = {}
+                for i in items:
+                    p = pattern.match(i)
+                    if p:
+                        add_to_group(groups, p)
+                return [groups[k] for k in groups]
+            elif "extract" in value:
+                items = get_items(p, value["extract"])
+                r = []
+                for i in items:
+                    p = pattern.match(i)
+                    if p:
+                        r.append(p.groups())
+                return r
+
     if isinstance(value, list):
         return expand_list(p, value)
 
@@ -113,7 +151,7 @@ def get_items(p, value):
             items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
         elif stat.S_ISREG(mode):
             with open(fn) as f:
-                items = [line for line in f]
+                items = [line.rstrip("\r\n") for line in f]
         return items
     else:
         return None
@@ -124,33 +162,50 @@ stdinname = None
 stdinfile = None
 rcode = 1
 
+def recursive_foreach(params, fvars):
+    var = fvars[0]
+    fvars = fvars[1:]
+    items = get_items(params, params[var])
+    logger.info("parallelizing on %s with items %s" % (var, items))
+    if items is not None:
+        for i in items:
+            params = copy.copy(params)
+            params[var] = i
+            if len(fvars) > 0:
+                recursive_foreach(params, fvars)
+            else:
+                arvados.api().job_tasks().create(body={
+                    'job_uuid': arvados.current_job()['uuid'],
+                    'created_by_job_task_uuid': arvados.current_task()['uuid'],
+                    'sequence': 1,
+                    'parameters': params
+                    }
+                ).execute()
+    else:
+        logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
+        sys.exit(1)
+
 try:
     if "task.foreach" in jobp:
         if arvados.current_task()['sequence'] == 0:
-            var = jobp["task.foreach"]
-            items = get_items(jobp, jobp[var])
-            logger.info("parallelizing on %s with items %s" % (var, items))
-            if items is not None:
-                for i in items:
-                    params = copy.copy(jobp)
-                    params[var] = i
-                    arvados.api().job_tasks().create(body={
-                        'job_uuid': arvados.current_job()['uuid'],
-                        'created_by_job_task_uuid': arvados.current_task()['uuid'],
-                        'sequence': 1,
-                        'parameters': params
-                        }
-                    ).execute()
-                if "task.vwd" in jobp:
-                    # Base vwd collection will be merged with output fragments from
-                    # the other tasks by crunch.
-                    arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
-                else:
-                    arvados.current_task().set_output(None)
-                sys.exit(0)
-            else:
+            # This is the first task to start the other tasks and exit
+            fvars = jobp["task.foreach"]
+            if isinstance(fvars, basestring):
+                fvars = [fvars]
+            if not isinstance(fvars, list) or len(fvars) == 0:
+                logger.error("value of task.foreach must be a string or non-empty list")
                 sys.exit(1)
+            recursive_foreach(jobp, jobp["task.foreach"])
+            if "task.vwd" in jobp:
+                # Set output of the first task to the base vwd collection so it
+                # will be merged with output fragments from the other tasks by
+                # crunch.
+                arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
+            else:
+                arvados.current_task().set_output(None)
+            sys.exit(0)
     else:
+        # This is the only task so taskp/jobp are the same
         taskp = jobp
 
     if "task.vwd" in taskp: