Merge branch '10211-double-close-crash' closes #10211
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index a4132ca3f762675ff9dccc0129f3f8139b313f1b..054d3530cfe174b9a5da3025d248097ca1062d7a 100644 (file)
@@ -112,45 +112,48 @@ def upload_docker(arvrunner, tool):
         for s in tool.steps:
             upload_docker(arvrunner, s.embedded_tool)
 
+def upload_instance(arvrunner, name, tool, job_order):
+        upload_docker(arvrunner, tool)
+
+        workflowmapper = upload_dependencies(arvrunner,
+                                             name,
+                                             tool.doc_loader,
+                                             tool.tool,
+                                             tool.tool["id"],
+                                             True)
+
+        jobmapper = upload_dependencies(arvrunner,
+                                        os.path.basename(job_order.get("id", "#")),
+                                        tool.doc_loader,
+                                        job_order,
+                                        job_order.get("id", "#"),
+                                        False)
+
+        adjustDirObjs(job_order, trim_listing)
+
+        if "id" in job_order:
+            del job_order["id"]
+
+        return workflowmapper
+
 
 class Runner(object):
-    def __init__(self, runner, tool, job_order, enable_reuse):
+    def __init__(self, runner, tool, job_order, enable_reuse, output_name):
         self.arvrunner = runner
         self.tool = tool
         self.job_order = job_order
         self.running = False
         self.enable_reuse = enable_reuse
         self.uuid = None
+        self.final_output = None
+        self.output_name = output_name
 
     def update_pipeline_component(self, record):
         pass
 
     def arvados_job_spec(self, *args, **kwargs):
-        upload_docker(self.arvrunner, self.tool)
-
         self.name = os.path.basename(self.tool.tool["id"])
-
-        workflowmapper = upload_dependencies(self.arvrunner,
-                                             self.name,
-                                             self.tool.doc_loader,
-                                             self.tool.tool,
-                                             self.tool.tool["id"],
-                                             True)
-
-        jobmapper = upload_dependencies(self.arvrunner,
-                                        os.path.basename(self.job_order.get("id", "#")),
-                                        self.tool.doc_loader,
-                                        self.job_order,
-                                        self.job_order.get("id", "#"),
-                                        False)
-
-        adjustDirObjs(self.job_order, trim_listing)
-
-        if "id" in self.job_order:
-            del self.job_order["id"]
-
-        return workflowmapper
-
+        return upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
 
     def done(self, record):
         if record["state"] == "Complete":
@@ -169,7 +172,11 @@ class Runner(object):
         outputs = None
         try:
             try:
-                outc = arvados.collection.Collection(record["output"])
+                self.final_output = record["output"]
+                outc = arvados.collection.CollectionReader(self.final_output,
+                                                           api_client=self.arvrunner.api,
+                                                           keep_client=self.arvrunner.keep_client,
+                                                           num_retries=self.arvrunner.num_retries)
                 with outc.open("cwl.output.json") as f:
                     outputs = json.load(f)
                 def keepify(fileobj):