13108: Add test for taskqueue
[arvados.git] / sdk / cwl / arvados_cwl / arvjob.py
index 2731b2694422fcf8a986057266efe23354830c46..e222152a168abc248568e84d3a5d80761b37ad99 100644 (file)
@@ -10,7 +10,7 @@ import time
 
 from cwltool.process import get_feature, shortname, UnsupportedRequirement
 from cwltool.errors import WorkflowException
-from cwltool.draft2tool import revmap_file, CommandLineTool
+from cwltool.command_line_tool import revmap_file, CommandLineTool
 from cwltool.load_tool import fetch_document
 from cwltool.builder import Builder
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
@@ -134,6 +134,8 @@ class ArvadosJob(object):
             if reuse_req:
                 enable_reuse = reuse_req["enableReuse"]
 
+        self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
+
         try:
             with Perf(metrics, "create %s" % self.name):
                 response = self.arvrunner.api.jobs().create(
@@ -150,7 +152,8 @@ class ArvadosJob(object):
                     find_or_create=enable_reuse
                 ).execute(num_retries=self.arvrunner.num_retries)
 
-            self.arvrunner.processes[response["uuid"]] = self
+            self.uuid = response["uuid"]
+            self.arvrunner.process_submitted(self)
 
             self.update_pipeline_component(response)
 
@@ -263,8 +266,8 @@ class ArvadosJob(object):
                 processStatus = "permanentFail"
         finally:
             self.output_callback(outputs, processStatus)
-            if record["uuid"] in self.arvrunner.processes:
-                del self.arvrunner.processes[record["uuid"]]
+            self.arvrunner.process_done(record["uuid"])
+
 
 class RunnerJob(Runner):
     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
@@ -314,8 +317,8 @@ class RunnerJob(Runner):
             }
         }
 
-    def run(self, *args, **kwargs):
-        job_spec = self.arvados_job_spec(*args, **kwargs)
+    def run(self, **kwargs):
+        job_spec = self.arvados_job_spec(**kwargs)
 
         job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
 
@@ -351,7 +354,7 @@ class RunnerJob(Runner):
             return
 
         self.uuid = job["uuid"]
-        self.arvrunner.processes[self.uuid] = self
+        self.arvrunner.process_submitted(self)
 
         if job["state"] in ("Complete", "Failed", "Cancelled"):
             self.done(job)