10812: Use packed workflows for all run modes.
[arvados.git] / sdk / cwl / arvados_cwl / arvjob.py
index 4bbbda26fb692f40551cce7f88a4396d7a5871ae..a4cd5cd3f8d79491e66f7e6a4aecfe9b852268b9 100644 (file)
@@ -9,11 +9,14 @@ from cwltool.errors import WorkflowException
 from cwltool.draft2tool import revmap_file, CommandLineTool
 from cwltool.load_tool import fetch_document
 from cwltool.builder import Builder
+from cwltool.pathmapper import adjustDirObjs
+
+import ruamel.yaml as yaml
 
 import arvados.collection
 
 from .arvdocker import arv_docker_get_image
-from .runner import Runner, arvados_jobs_image
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
 from .pathmapper import InitialWorkDirPathMapper
 from .perf import Perf
 from . import done
@@ -231,6 +234,28 @@ class ArvadosJob(object):
 class RunnerJob(Runner):
     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
 
+    def upload_workflow_collection(self, packed):
+        collection = arvados.collection.Collection(api_client=self.arvrunner.api,
+                                                   keep_client=self.arvrunner.keep_client,
+                                                   num_retries=self.arvrunner.num_retries)
+        with collection.open("workflow.cwl", "w") as f:
+            f.write(yaml.round_trip_dump(packed))
+
+        exists = self.arvrunner.api.collections().list(filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+                                                 ["portable_data_hash", "=", collection.portable_data_hash()],
+                                                 ["name", "like", self.name+"%"]]).execute(num_retries=self.arvrunner.num_retries)
+
+        if exists["items"]:
+            logger.info("Using collection %s", exists["items"][0]["uuid"])
+        else:
+            collection.save_new(name=self.name,
+                                owner_uuid=self.arvrunner.project_uuid,
+                                ensure_unique_name=True,
+                                num_retries=self.arvrunner.num_retries)
+            logger.info("Uploaded to %s", collection.manifest_locator())
+
+        return collection.portable_data_hash()
+
     def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
         """Create an Arvados job specification for this workflow.
 
@@ -239,9 +264,14 @@ class RunnerJob(Runner):
         a pipeline template or pipeline instance.
         """
 
-        workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+        if self.tool.tool["id"].startswith("keep:"):
+            pass
+        else:
+            packed = packed_workflow(self.arvrunner, self.tool)
+            wf_pdh = self.upload_workflow_collection(packed)
+            self.job_order["cwl:tool"] = "%s/workflow.cwl" % wf_pdh
 
-        self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"]).target[5:]
+        adjustDirObjs(self.job_order, trim_listing)
 
         if self.output_name:
             self.job_order["arv:output_name"] = self.output_name