Merge branch '8879-cwl-runner-job-owner-wip'
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 511f173eb0008995a60e55652a9c3af6be40bb06..8f2102c6c32aa6d21f4532f246b1524946878372 100644 (file)
@@ -168,16 +168,26 @@ class ArvadosJob(object):
             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
 
+        filters = [["repository", "=", "arvados"],
+                   ["script", "=", "crunchrunner"],
+                   ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
+        if not self.arvrunner.ignore_docker_for_reuse:
+            filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
+
         try:
-            response = self.arvrunner.api.jobs().create(body={
-                "owner_uuid": self.arvrunner.project_uuid,
-                "script": "crunchrunner",
-                "repository": "arvados",
-                "script_version": "master",
-                "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
-                "script_parameters": {"tasks": [script_parameters]},
-                "runtime_constraints": runtime_constraints
-            }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
+            response = self.arvrunner.api.jobs().create(
+                body={
+                    "owner_uuid": self.arvrunner.project_uuid,
+                    "script": "crunchrunner",
+                    "repository": "arvados",
+                    "script_version": "master",
+                    "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
+                    "script_parameters": {"tasks": [script_parameters]},
+                    "runtime_constraints": runtime_constraints
+                },
+                filters=filters,
+                find_or_create=kwargs.get("enable_reuse", True)
+            ).execute(num_retries=self.arvrunner.num_retries)
 
             self.arvrunner.jobs[response["uuid"]] = self
 
@@ -356,6 +366,7 @@ class RunnerJob(object):
         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
 
         response = self.arvrunner.api.jobs().create(body={
+            "owner_uuid": self.arvrunner.project_uuid,
             "script": "cwl-runner",
             "script_version": "master",
             "repository": "arvados",
@@ -538,6 +549,8 @@ class ArvCwlRunner(object):
 
         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
 
+        self.debug = args.debug
+        self.ignore_docker_for_reuse = args.ignore_docker_for_reuse
         self.fs_access = CollectionFsAccess(input_basedir)
 
         kwargs["fs_access"] = self.fs_access
@@ -633,6 +646,9 @@ def main(args, stdout, stderr, api_client=None):
                         help="")
 
     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
+    parser.add_argument("--ignore-docker-for-reuse", action="store_true",
+                        help="Ignore Docker image version when deciding whether to reuse past jobs.",
+                        default=False)
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",