5476: Better implementation of connection timeout scaling. Updated docstring
[arvados.git] / sdk / python / arvados / commands / run.py
index 962d6a806c63d59d811c457680b68a739e5ac3ed..f2bf0f353bbd146ece4775da7b6094d654ed0d04 100644 (file)
@@ -1,6 +1,7 @@
 #!/usr/bin/env python
 
 import arvados
+import arvados.commands.ws as ws
 import argparse
 import json
 import re
@@ -8,13 +9,13 @@ import os
 import stat
 import put
 import time
-import arvados.commands.ws as ws
 import subprocess
 import logging
+import arvados.commands._util as arv_cmd
 
 logger = logging.getLogger('arvados.arv-run')
 
-arvrun_parser = argparse.ArgumentParser()
+arvrun_parser = argparse.ArgumentParser(parents=[arv_cmd.retry_opt])
 arvrun_parser.add_argument('--dry-run', action="store_true", help="Print out the pipeline that would be submitted and exit")
 arvrun_parser.add_argument('--local', action="store_true", help="Run locally using arv-run-pipeline-instance")
 arvrun_parser.add_argument('--docker-image', type=str, default="arvados/jobs", help="Docker image to use, default arvados/jobs")
@@ -35,6 +36,9 @@ class ArvFile(object):
 class UploadFile(ArvFile):
     pass
 
+# Determine if a file is in a collection, and return a tuple consisting of the
+# portable data hash and the path relative to the root of the collection.
+# Return None if the path isn't with an arv-mount collection or there was is error.
 def is_in_collection(root, branch):
     try:
         if root == "/":
@@ -47,9 +51,12 @@ def is_in_collection(root, branch):
         else:
             sp = os.path.split(root)
             return is_in_collection(sp[0], os.path.join(sp[1], branch))
-    except:
+    except IOError, OSError:
         return (None, None)
 
+# Determine the project to place the output of this command by searching upward
+# for arv-mount psuedofile indicating the project.  If the cwd isn't within
+# an arv-mount project or there is an error, return current_user.
 def determine_project(root, current_user):
     try:
         if root == "/":
@@ -65,9 +72,14 @@ def determine_project(root, current_user):
         else:
             sp = os.path.split(root)
             return determine_project(sp[0], current_user)
-    except:
+    except IOError, OSError:
         return current_user
 
+# Determine if string corresponds to a file, and if that file is part of a
+# arv-mounted collection or only local to the machine.  Returns one of
+# ArvFile() (file already exists in a collection), UploadFile() (file needs to
+# be uploaded to a collection), or simply returns prefix+fn (which yields the
+# original parameter string).
 def statfile(prefix, fn):
     absfn = os.path.abspath(fn)
     if os.path.exists(absfn):
@@ -80,6 +92,12 @@ def statfile(prefix, fn):
             else:
                 # trim leading '/' for path prefix test later
                 return UploadFile(prefix, absfn[1:])
+        if stat.S_ISDIR(st.st_mode):
+            sp = os.path.split(absfn)
+            (pdh, branch) = is_in_collection(sp[0], sp[1])
+            if pdh:
+                return ArvFile(prefix, "$(dir %s/%s/)" % (pdh, branch))
+
     return prefix+fn
 
 def main(arguments=None):
@@ -93,6 +111,17 @@ def main(arguments=None):
 
     reading_into = 2
 
+    # Parse the command arguments into 'slots'.
+    # All words following '>' are output arguments and are collected into slots[0].
+    # All words following '<' are input arguments and are collected into slots[1].
+    # slots[2..] store the parameters of each command in the pipeline.
+    #
+    # e.g. arv-run foo arg1 arg2 '|' bar arg3 arg4 '<' input1 input2 input3 '>' output.txt
+    # will be parsed into:
+    #   [['output.txt'],
+    #    ['input1', 'input2', 'input3'],
+    #    ['foo', 'arg1', 'arg2'],
+    #    ['bar', 'arg3', 'arg4']]
     slots = [[], [], []]
     for c in args.args:
         if c.startswith('>'):
@@ -120,9 +149,11 @@ def main(arguments=None):
         else:
             project = determine_project(os.getcwd(), api.users().current().execute()["uuid"])
 
+    # Identify input files.  Look at each parameter and test to see if there is
+    # a file by that name.  This uses 'patterns' to look for within
+    # command line arguments, such as --foo=file.txt or -lfile.txt
     patterns = [re.compile("([^=]+=)(.*)"),
                 re.compile("(-[A-Za-z])(.+)")]
-
     for j, command in enumerate(slots[1:]):
         for i, a in enumerate(command):
             if j > 0 and i == 0:
@@ -133,17 +164,19 @@ def main(arguments=None):
                 # if it starts with a \ then don't do any interpretation
                 command[i] = a[1:]
             else:
-                # Do some pattern matching
-                matched = False
-                for p in patterns:
-                    m = p.match(a)
-                    if m:
-                        command[i] = statfile(m.group(1), m.group(2))
-                        matched = True
-                        break
-                if not matched:
-                    # parameter might be a file, so test it
-                    command[i] = statfile('', a)
+                # See if it looks like a file
+                command[i] = statfile('', a)
+
+                # If a file named command[i] was found, it would now be an
+                # ArvFile or UploadFile.  If command[i] is a basestring, that
+                # means it doesn't correspond exactly to a file, so do some
+                # pattern matching.
+                if isinstance(command[i], basestring):
+                    for p in patterns:
+                        m = p.match(a)
+                        if m:
+                            command[i] = statfile(m.group(1), m.group(2))
+                            break
 
     n = True
     pathprefix = "/"
@@ -181,11 +214,11 @@ def main(arguments=None):
         print("Upload local files: \"%s\"" % '" "'.join([c.fn for c in files]))
 
         if args.dry_run:
-            print("cd %s" % pathprefix)
+            print("$(input) is %s" % pathprefix.rstrip('/'))
             pdh = "$(input)"
         else:
             files = sorted(files, key=lambda x: x.fn)
-            collection = arvados.CollectionWriter(api, num_retries=3)
+            collection = arvados.CollectionWriter(api, num_retries=args.retries)
             stream = None
             for f in files:
                 sp = os.path.split(f.fn)
@@ -242,7 +275,7 @@ def main(arguments=None):
     if slots[1]:
         task_foreach.append("stdin")
         component["script_parameters"]["stdin"] = slots[1]
-        component["script_parameters"]["task.stdin"] = "$(stdin)"\
+        component["script_parameters"]["task.stdin"] = "$(stdin)"
 
     if task_foreach:
         component["script_parameters"]["task.foreach"] = task_foreach
@@ -264,7 +297,7 @@ def main(arguments=None):
         print(json.dumps(pipeline, indent=4))
     else:
         pipeline["owner_uuid"] = project
-        pi = api.pipeline_instances().create(body=pipeline).execute()
+        pi = api.pipeline_instances().create(body=pipeline, ensure_unique_name=True).execute()
         print "Running pipeline %s" % pi["uuid"]
 
         if args.local: