8697: Relax version constraints so gem can be used in ruby187/ree projects.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 74f36d9ab6b53e0a7522d318c95486caf1d82fea..8f2102c6c32aa6d21f4532f246b1524946878372 100644 (file)
@@ -1,5 +1,7 @@
 #!/usr/bin/env python
 
+# Implement cwl-runner interface for submitting and running jobs on Arvados.
+
 import argparse
 import arvados
 import arvados.events
@@ -35,6 +37,8 @@ keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.ke
 
 
 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
+    """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
+
     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
 
@@ -52,12 +56,14 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
         if image_tag:
             args.append(image_tag)
         logger.info("Uploading Docker image %s", ":".join(args[1:]))
-        arvados.commands.keepdocker.main(args)
+        arvados.commands.keepdocker.main(args, stdout=sys.stderr)
 
     return dockerRequirement["dockerImageId"]
 
 
 class CollectionFsAccess(cwltool.process.StdFsAccess):
+    """Implement the cwltool FsAccess interface for Arvados Collections."""
+
     def __init__(self, basedir):
         self.collections = {}
         self.basedir = basedir
@@ -114,6 +120,8 @@ class CollectionFsAccess(cwltool.process.StdFsAccess):
             return os.path.exists(self._abs(fn))
 
 class ArvadosJob(object):
+    """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
+
     def __init__(self, runner):
         self.arvrunner = runner
         self.running = False
@@ -160,16 +168,26 @@ class ArvadosJob(object):
             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
 
+        filters = [["repository", "=", "arvados"],
+                   ["script", "=", "crunchrunner"],
+                   ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
+        if not self.arvrunner.ignore_docker_for_reuse:
+            filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
+
         try:
-            response = self.arvrunner.api.jobs().create(body={
-                "owner_uuid": self.arvrunner.project_uuid,
-                "script": "crunchrunner",
-                "repository": "arvados",
-                "script_version": "master",
-                "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
-                "script_parameters": {"tasks": [script_parameters]},
-                "runtime_constraints": runtime_constraints
-            }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
+            response = self.arvrunner.api.jobs().create(
+                body={
+                    "owner_uuid": self.arvrunner.project_uuid,
+                    "script": "crunchrunner",
+                    "repository": "arvados",
+                    "script_version": "master",
+                    "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
+                    "script_parameters": {"tasks": [script_parameters]},
+                    "runtime_constraints": runtime_constraints
+                },
+                filters=filters,
+                find_or_create=kwargs.get("enable_reuse", True)
+            ).execute(num_retries=self.arvrunner.num_retries)
 
             self.arvrunner.jobs[response["uuid"]] = self
 
@@ -283,6 +301,8 @@ class ArvadosJob(object):
 
 
 class RunnerJob(object):
+    """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
+
     def __init__(self, runner, tool, job_order, enable_reuse):
         self.arvrunner = runner
         self.tool = tool
@@ -346,6 +366,7 @@ class RunnerJob(object):
         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
 
         response = self.arvrunner.api.jobs().create(body={
+            "owner_uuid": self.arvrunner.project_uuid,
             "script": "cwl-runner",
             "script_version": "master",
             "repository": "arvados",
@@ -381,6 +402,8 @@ class RunnerJob(object):
             del self.arvrunner.jobs[record["uuid"]]
 
 class ArvPathMapper(cwltool.pathmapper.PathMapper):
+    """Convert container-local paths to and from Keep collection ids."""
+
     def __init__(self, arvrunner, referenced_files, basedir,
                  collection_pattern, file_pattern, name=None, **kwargs):
         self._pathmap = arvrunner.get_uploaded()
@@ -430,6 +453,8 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
 
 
 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
+    """Wrap cwltool CommandLineTool to override selected methods."""
+
     def __init__(self, arvrunner, toolpath_object, **kwargs):
         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
         self.arvrunner = arvrunner
@@ -445,6 +470,9 @@ class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
 
 
 class ArvCwlRunner(object):
+    """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
+    complete, and report output."""
+
     def __init__(self, api_client):
         self.api = api_client
         self.jobs = {}
@@ -521,6 +549,8 @@ class ArvCwlRunner(object):
 
         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
 
+        self.debug = args.debug
+        self.ignore_docker_for_reuse = args.ignore_docker_for_reuse
         self.fs_access = CollectionFsAccess(input_basedir)
 
         kwargs["fs_access"] = self.fs_access
@@ -593,14 +623,15 @@ class ArvCwlRunner(object):
             return self.final_output
 
 def versionstring():
-    cwlpkg = pkg_resources.require("cwltool")
-    arvpkg = pkg_resources.require("arvados-python-client")
+    """Print version string of key packages for provenance and debugging."""
+
     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
+    arvpkg = pkg_resources.require("arvados-python-client")
+    cwlpkg = pkg_resources.require("cwltool")
 
-    return "%s %s, %s %s, %s %s" % (sys.argv[0],
-                                    arvcwlpkg[0].version,
-                                    "arvados-python-client", cwlpkg[0].version,
-                                    "cwltool", arvpkg[0].version)
+    return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
+                                    "arvados-python-client", arvpkg[0].version,
+                                    "cwltool", cwlpkg[0].version)
 
 def main(args, stdout, stderr, api_client=None):
     args.insert(0, "--leave-outputs")
@@ -615,17 +646,20 @@ def main(args, stdout, stderr, api_client=None):
                         help="")
 
     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
+    parser.add_argument("--ignore-docker-for-reuse", action="store_true",
+                        help="Ignore Docker image version when deciding whether to reuse past jobs.",
+                        default=False)
 
     exgroup = parser.add_mutually_exclusive_group()
-    exgroup.add_argument("--submit", action="store_true", help="Submit runner job so workflow can run unattended.",
+    exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
                         default=True, dest="submit")
-    exgroup.add_argument("--local", action="store_false", help="Workflow runner runs on local host and submits jobs.",
+    exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
                         default=True, dest="submit")
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
                         default=True, dest="wait")
-    exgroup.add_argument("--no-wait", action="store_false", help="Exit after submitting workflow runner job.",
+    exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
                         default=True, dest="wait")
 
     try: