10895: Improve reuse for RunInSingleContainer jobs
[arvados.git] / sdk / cwl / arvados_cwl / arvjob.py
index 7b318026d57a0da8ef3caa5a4b91f768e824431a..87bacd02d3096bf7d6e8500630181d0d1d706472 100644 (file)
@@ -18,7 +18,7 @@ import ruamel.yaml as yaml
 import arvados.collection
 
 from .arvdocker import arv_docker_get_image
-from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing, upload_workflow_collection
 from .pathmapper import InitialWorkDirPathMapper
 from .perf import Perf
 from . import done
@@ -141,11 +141,12 @@ class ArvadosJob(object):
 
             self.update_pipeline_component(response)
 
-            logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
-
             if response["state"] in ("Complete", "Failed", "Cancelled"):
+                logger.info("%s reuse job %s", self.arvrunner.label(self), response["uuid"])
                 with Perf(metrics, "done %s" % self.name):
                     self.done(response)
+            else:
+                logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
         except Exception as e:
             logger.exception("%s error" % (self.arvrunner.label(self)))
             self.output_callback({}, "permanentFail")
@@ -236,30 +237,6 @@ class ArvadosJob(object):
 class RunnerJob(Runner):
     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
 
-    def upload_workflow_collection(self, packed):
-        collection = arvados.collection.Collection(api_client=self.arvrunner.api,
-                                                   keep_client=self.arvrunner.keep_client,
-                                                   num_retries=self.arvrunner.num_retries)
-        with collection.open("workflow.cwl", "w") as f:
-            f.write(yaml.round_trip_dump(packed))
-
-        filters = [["portable_data_hash", "=", collection.portable_data_hash()],
-                   ["name", "like", self.name+"%"]]
-        if self.arvrunner.project_uuid:
-            filters.append(["owner_uuid", "=", self.arvrunner.project_uuid])
-        exists = self.arvrunner.api.collections().list(filters=filters).execute(num_retries=self.arvrunner.num_retries)
-
-        if exists["items"]:
-            logger.info("Using collection %s", exists["items"][0]["uuid"])
-        else:
-            collection.save_new(name=self.name,
-                                owner_uuid=self.arvrunner.project_uuid,
-                                ensure_unique_name=True,
-                                num_retries=self.arvrunner.num_retries)
-            logger.info("Uploaded to %s", collection.manifest_locator())
-
-        return collection.portable_data_hash()
-
     def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
         """Create an Arvados job specification for this workflow.
 
@@ -272,7 +249,7 @@ class RunnerJob(Runner):
             self.job_order["cwl:tool"] = self.tool.tool["id"][5:]
         else:
             packed = packed_workflow(self.arvrunner, self.tool)
-            wf_pdh = self.upload_workflow_collection(packed)
+            wf_pdh = upload_workflow_collection(self.arvrunner, self.name, packed)
             self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
 
         adjustDirObjs(self.job_order, trim_listing)