3609: Added documentation page. Added to "arv" frontend command. Bug fix to
[arvados.git] / sdk / python / arvados / commands / run.py
index 2d966b64b929e95fa3c06cc6f2fcd0729cea687f..a15a457c5e272cf85d2919d1f08955f16f7320ef 100644 (file)
@@ -10,79 +10,128 @@ import put
 import time
 #import arvados.command.ws as ws
 import subprocess
+import logging
+
+logger = logging.getLogger('arvados.arv-run')
 
 arvrun_parser = argparse.ArgumentParser()
-arvrun_parser.add_argument('--dry-run', action="store_true")
-arvrun_parser.add_argument('--local', action="store_true")
-arvrun_parser.add_argument('--docker-image', type=str, default="arvados/jobs")
-arvrun_parser.add_argument('--git-dir', type=str, default="")
-arvrun_parser.add_argument('command')
+arvrun_parser.add_argument('--dry-run', action="store_true", help="Print out the pipeline that would be submitted and exit")
+arvrun_parser.add_argument('--local', action="store_true", help="Run locally using arv-crunch-job")
+arvrun_parser.add_argument('--docker-image', type=str, default="arvados/jobs", help="Docker image to use, default arvados/jobs")
+arvrun_parser.add_argument('--git-dir', type=str, default="", help="Git repository passed to arv-crunch-job when using --local")
+arvrun_parser.add_argument('--repository', type=str, default="arvados", help="repository field of component, default 'arvados'")
+arvrun_parser.add_argument('--script-version', type=str, default="master", help="script_version field of component, default 'master'")
 arvrun_parser.add_argument('args', nargs=argparse.REMAINDER)
 
-needupload_files = []
-
 class ArvFile(object):
     def __init__(self, prefix, fn):
         self.prefix = prefix
         self.fn = fn
 
-def statfile(prefix, fn, pattern):
+class UploadFile(ArvFile):
+    pass
+
+def is_in_collection(root, branch):
+    if root == "/":
+        return (None, None)
+    fn = os.path.join(root, ".arvados#collection")
+    if os.path.exists(fn):
+        with file(fn, 'r') as f:
+            c = json.load(f)
+        return (c["portable_data_hash"], branch)
+    else:
+        sp = os.path.split(root)
+        return is_in_collection(sp[0], os.path.join(sp[1], branch))
+
+def statfile(prefix, fn):
     absfn = os.path.abspath(fn)
     if os.path.exists(absfn):
-        fn = os.path.abspath(fn)
-        st = os.stat(fn)
+        st = os.stat(absfn)
         if stat.S_ISREG(st.st_mode):
-            mount = os.path.dirname(fn)+"/.arvados#collection"
-            if os.path.exists(mount):
-                with file(mount, 'r') as f:
-                    c = json.load(f)
-                return prefix+"$(file "+c["portable_data_hash"]+"/" + os.path.basename(fn) + ")"
+            sp = os.path.split(absfn)
+            (pdh, branch) = is_in_collection(sp[0], sp[1])
+            if pdh:
+                return ArvFile(prefix, "$(file %s/%s)" % (pdh, branch))
             else:
-                needupload_files.append(fn)
-            return ArvFile(prefix, fn[1:])
+                # trim leading '/' for path prefix test later
+                return UploadFile(prefix, absfn[1:])
     return prefix+fn
 
 def main(arguments=None):
     args = arvrun_parser.parse_args(arguments)
 
+    if len(args.args) == 0:
+        arvrun_parser.print_help()
+        return
+
+    reading_into = 2
+
+    slots = [[], [], []]
+    for c in args.args:
+        if c.startswith('>'):
+            reading_into = 0
+            if len(c) > 1:
+                slots[reading_into].append(c[1:])
+        elif c.startswith('<'):
+            reading_into = 1
+            if len(c) > 1:
+                slots[reading_into].append(c[1:])
+        elif c == '|':
+            reading_into = len(slots)
+            slots.append([])
+        else:
+            slots[reading_into].append(c)
+
+    if slots[0] and len(slots[0]) > 1:
+        logger.error("Can only specify a single stdout file (run-command substitutions are permitted)")
+        return
+
     patterns = [re.compile("(--[^=]+=)(.*)"),
                 re.compile("(-[^=]+=)(.*)"),
                 re.compile("(-.)(.+)")]
 
-    commandargs = []
-
-    for a in args.args:
-        if a[0] == '-':
-            matched = False
-            for p in patterns:
-                m = p.match(a)
-                if m:
-                    commandargs.append(statfile(m.group(1), m.group(2), p))
-                    matched = True
-                    break
-            if not matched:
-                commandargs.append(a)
-        else:
-            commandargs.append(statfile('', a, None))
+    for command in slots[1:]:
+        for i in xrange(0, len(command)):
+            a = command[i]
+            if a[0] == '-':
+                # parameter starts with '-' so it might be a command line
+                # parameter with a file name, do some pattern matching
+                matched = False
+                for p in patterns:
+                    m = p.match(a)
+                    if m:
+                        command[i] = statfile(m.group(1), m.group(2))
+                        break
+            else:
+                # parameter might be a file, so test it
+                command[i] = statfile('', a)
 
     n = True
     pathprefix = "/"
-    files = [c for c in commandargs if isinstance(c, ArvFile)]
+    files = [c for command in slots[1:] for c in command if isinstance(c, UploadFile)]
     if len(files) > 0:
+        # Find the smallest path prefix that includes all the files that need to be uploaded.
+        # This starts at the root and iteratively removes common parent directory prefixes
+        # until all file pathes no longer have a common parent.
         while n:
             pathstep = None
             for c in files:
                 if pathstep is None:
                     sp = c.fn.split('/')
                     if len(sp) < 2:
+                        # no parent directories left
                         n = False
                         break
+                    # path step takes next directory
                     pathstep = sp[0] + "/"
                 else:
+                    # check if pathstep is common prefix for all files
                     if not c.fn.startswith(pathstep):
                         n = False
                         break
             if n:
+                # pathstep is common parent directory for all files, so remove the prefix
+                # from each path
                 pathprefix += pathstep
                 for c in files:
                     c.fn = c.fn[len(pathstep):]
@@ -96,48 +145,60 @@ def main(arguments=None):
         else:
             pdh = put.main(["--portable-data-hash"]+[c.fn for c in files])
 
-    commandargs = [("%s$(file %s/%s)" % (c.prefix, pdh, c.fn)) if isinstance(c, ArvFile) else c for c in commandargs]
-
-    cut = None
-    i = -1
-    stdio = [None, None]
-    for j in xrange(0, len(commandargs)):
-        c = commandargs[j]
-        if c == '<':
-            stdio[0] = []
-            i = 0
-            cut = j if cut is None else cut
-        elif c == '>':
-            stdio[1] = []
-            i = 1
-            cut = j if cut is None else cut
-        elif i > -1:
-            stdio[i].append(c)
-
-    if cut is not None:
-        commandargs = commandargs[:cut]
+        for c in files:
+            c.fn = "$(file %s/%s)" % (pdh, c.fn)
+
+    for i in xrange(1, len(slots)):
+        slots[i] = [("%s%s" % (c.prefix, c.fn)) if isinstance(c, ArvFile) else c for c in slots[i]]
 
     component = {
         "script": "run-command",
-        "script_version": "0fdfa049801418ecd1faf33ec1415f3b689ea761",
-        "repository": "arvados",
+        "script_version": args.script_version,
+        "repository": args.repository,
         "script_parameters": {
-            "command": [args.command]+commandargs
         },
         "runtime_constraints": {
             "docker_image": args.docker_image
         }
     }
 
-    if stdio[0]:
-        component["script_parameters"]["task.stdin"] = stdio[0][0]
-    if stdio[1]:
-        component["script_parameters"]["task.stdout"] = stdio[1][0]
+    task_foreach = []
+    group_parser = argparse.ArgumentParser()
+    group_parser.add_argument('--batch-size', type=int)
+    group_parser.add_argument('args', nargs=argparse.REMAINDER)
+
+    for s in xrange(2, len(slots)):
+        for i in xrange(0, len(slots[s])):
+            if slots[s][i] == '--':
+                inp = "input%i" % (s-2)
+                groupargs = group_parser.parse_args(slots[2][i+1:])
+                if groupargs.batch_size:
+                    component["script_parameters"][inp] = {"batch":groupargs.args, "size":groupargs.batch_size}
+                    slots[s] = slots[s][0:i] + [{"foreach": inp, "command": "$(%s)" % inp}]
+                else:
+                    component["script_parameters"][inp] = groupargs.args
+                    slots[s] = slots[s][0:i] + ["$(%s)" % inp]
+                task_foreach.append(inp)
+                break
+            if slots[s][i] == '\--':
+                slots[s][i] = '--'
+
+    if slots[0]:
+        component["script_parameters"]["task.stdout"] = slots[0][0]
+    if slots[1]:
+        task_foreach.append("stdin")
+        component["script_parameters"]["stdin"] = slots[1]
+        component["script_parameters"]["task.stdin"] = "$(stdin)"\
+
+    if task_foreach:
+        component["script_parameters"]["task.foreach"] = task_foreach
+
+    component["script_parameters"]["command"] = slots[2:]
 
     pipeline = {
         "name": "",
         "components": {
-            args.command: component
+            "command": component
         },
         "state":"RunningOnServer"
     }
@@ -149,6 +210,7 @@ def main(arguments=None):
     else:
         api = arvados.api('v1')
         pi = api.pipeline_instances().create(body=pipeline).execute()
+        print "Running pipeline %s" % pi["uuid"]
         #ws.main(["--pipeline", pi["uuid"]])
 
 if __name__ == '__main__':