13108: Add test for taskqueue
[arvados.git] / sdk / cwl / arvados_cwl / arvjob.py
index 737c9580fb8ceef04576168a15a53d991f9cf496..e222152a168abc248568e84d3a5d80761b37ad99 100644 (file)
@@ -10,7 +10,7 @@ import time
 
 from cwltool.process import get_feature, shortname, UnsupportedRequirement
 from cwltool.errors import WorkflowException
-from cwltool.draft2tool import revmap_file, CommandLineTool
+from cwltool.command_line_tool import revmap_file, CommandLineTool
 from cwltool.load_tool import fetch_document
 from cwltool.builder import Builder
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
@@ -134,6 +134,8 @@ class ArvadosJob(object):
             if reuse_req:
                 enable_reuse = reuse_req["enableReuse"]
 
+        self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
+
         try:
             with Perf(metrics, "create %s" % self.name):
                 response = self.arvrunner.api.jobs().create(
@@ -150,7 +152,8 @@ class ArvadosJob(object):
                     find_or_create=enable_reuse
                 ).execute(num_retries=self.arvrunner.num_retries)
 
-            self.arvrunner.processes[response["uuid"]] = self
+            self.uuid = response["uuid"]
+            self.arvrunner.process_submitted(self)
 
             self.update_pipeline_component(response)
 
@@ -263,8 +266,8 @@ class ArvadosJob(object):
                 processStatus = "permanentFail"
         finally:
             self.output_callback(outputs, processStatus)
-            if record["uuid"] in self.arvrunner.processes:
-                del self.arvrunner.processes[record["uuid"]]
+            self.arvrunner.process_done(record["uuid"])
+
 
 class RunnerJob(Runner):
     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
@@ -280,7 +283,7 @@ class RunnerJob(Runner):
         if self.tool.tool["id"].startswith("keep:"):
             self.job_order["cwl:tool"] = self.tool.tool["id"][5:]
         else:
-            packed = packed_workflow(self.arvrunner, self.tool)
+            packed = packed_workflow(self.arvrunner, self.tool, self.merged_map)
             wf_pdh = upload_workflow_collection(self.arvrunner, self.name, packed)
             self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
 
@@ -299,6 +302,9 @@ class RunnerJob(Runner):
         if self.on_error:
             self.job_order["arv:on_error"] = self.on_error
 
+        if kwargs.get("debug"):
+            self.job_order["arv:debug"] = True
+
         return {
             "script": "cwl-runner",
             "script_version": "master",
@@ -311,8 +317,8 @@ class RunnerJob(Runner):
             }
         }
 
-    def run(self, *args, **kwargs):
-        job_spec = self.arvados_job_spec(*args, **kwargs)
+    def run(self, **kwargs):
+        job_spec = self.arvados_job_spec(**kwargs)
 
         job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
 
@@ -348,7 +354,7 @@ class RunnerJob(Runner):
             return
 
         self.uuid = job["uuid"]
-        self.arvrunner.processes[self.uuid] = self
+        self.arvrunner.process_submitted(self)
 
         if job["state"] in ("Complete", "Failed", "Cancelled"):
             self.done(job)
@@ -367,7 +373,7 @@ class RunnerTemplate(object):
     }
 
     def __init__(self, runner, tool, job_order, enable_reuse, uuid,
-                 submit_runner_ram=0, name=None):
+                 submit_runner_ram=0, name=None, merged_map=None):
         self.runner = runner
         self.tool = tool
         self.job = RunnerJob(
@@ -378,7 +384,8 @@ class RunnerTemplate(object):
             output_name=None,
             output_tags=None,
             submit_runner_ram=submit_runner_ram,
-            name=name)
+            name=name,
+            merged_map=merged_map)
         self.uuid = uuid
 
     def pipeline_component_spec(self):