Merge pull request #1 from arvados/master
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
index 47fbfa5a7910c56e1d427f150d8410a08b23dddd..2b55ce9df5afa6b4a5e7d98ded954be50ae40aa0 100644 (file)
@@ -33,6 +33,7 @@ from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_
 from .fsaccess import CollectionFetcher
 from .pathmapper import NoFollowPathMapper, trim_listing
 from .perf import Perf
+from ._version import __version__
 
 logger = logging.getLogger('arvados.cwl-runner')
 metrics = logging.getLogger('arvados.cwl-runner.metrics')
@@ -123,6 +124,8 @@ class ArvadosContainer(JobBase):
                 "kind": "collection",
                 "portable_data_hash": pdh
             }
+            if pdh in self.pathmapper.pdh_to_uuid:
+                mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
             if len(sp) == 2:
                 if tp == "Directory":
                     path = sp[1]
@@ -140,7 +143,7 @@ class ArvadosContainer(JobBase):
                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
                                                     separateDirs=False)
 
-                sorteditems = sorted(list(generatemapper.items()), key=lambda n: n[1].target)
+                sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
 
                 logger.debug("generatemapper is %s", sorteditems)
 
@@ -153,7 +156,7 @@ class ArvadosContainer(JobBase):
                                 vwd.mkdirs(p.target)
                             else:
                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
-                                vwd.copy(path, p.target, source_collection=source)
+                                vwd.copy(path or ".", p.target, source_collection=source)
                         elif p.type == "CreateFile":
                             if self.arvrunner.secret_store.has_secret(p.resolved):
                                 secret_mounts["%s/%s" % (self.outdir, p.target)] = {
@@ -162,7 +165,7 @@ class ArvadosContainer(JobBase):
                                 }
                             else:
                                 with vwd.open(p.target, "w") as n:
-                                    n.write(p.resolved.encode("utf-8"))
+                                    n.write(p.resolved)
 
                 def keepemptydirs(p):
                     if isinstance(p, arvados.collection.RichCollectionBase):
@@ -216,13 +219,17 @@ class ArvadosContainer(JobBase):
 
         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
         if not docker_req:
-            docker_req = {"dockerImageId": "arvados/jobs"}
+            docker_req = {"dockerImageId": "arvados/jobs:"+__version__}
 
         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
                                                                     docker_req,
                                                                     runtimeContext.pull_image,
                                                                     runtimeContext.project_uuid)
 
+        network_req, _ = self.get_requirement("NetworkAccess")
+        if network_req:
+            runtime_constraints["API"] = network_req["networkAccess"]
+
         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
         if api_req:
             runtime_constraints["API"] = True
@@ -254,7 +261,7 @@ class ArvadosContainer(JobBase):
         if self.output_ttl < 0:
             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
 
-        if self.timelimit is not None:
+        if self.timelimit is not None and self.timelimit > 0:
             scheduling_parameters["max_run_time"] = self.timelimit
 
         extra_submit_params = {}
@@ -270,6 +277,9 @@ class ArvadosContainer(JobBase):
 
         enable_reuse = runtimeContext.enable_reuse
         if enable_reuse:
+            reuse_req, _ = self.get_requirement("WorkReuse")
+            if reuse_req:
+                enable_reuse = reuse_req["enableReuse"]
             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
             if reuse_req:
                 enable_reuse = reuse_req["enableReuse"]
@@ -304,8 +314,8 @@ class ArvadosContainer(JobBase):
                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
             else:
                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
-        except Exception as e:
-            logger.error("%s got error %s" % (self.arvrunner.label(self), str(e)))
+        except Exception:
+            logger.exception("%s got an error", self.arvrunner.label(self))
             self.output_callback({}, "permanentFail")
 
     def done(self, record):
@@ -329,8 +339,8 @@ class ArvadosContainer(JobBase):
             else:
                 processStatus = "permanentFail"
 
-            if processStatus == "permanentFail":
-                logc = arvados.collection.CollectionReader(container["log"],
+            if processStatus == "permanentFail" and record["log_uuid"]:
+                logc = arvados.collection.CollectionReader(record["log_uuid"],
                                                            api_client=self.arvrunner.api,
                                                            keep_client=self.arvrunner.keep_client,
                                                            num_retries=self.arvrunner.num_retries)
@@ -342,7 +352,7 @@ class ArvadosContainer(JobBase):
             if record["output_uuid"]:
                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
                     # Compute the trash time to avoid requesting the collection record.
-                    trash_at = ciso8601.parse_datetime_unaware(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
+                    trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
@@ -353,11 +363,13 @@ class ArvadosContainer(JobBase):
             if container["output"]:
                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
         except WorkflowException as e:
+            # Only include a stack trace if in debug mode.
+            # A stack trace may obfuscate more useful output about the workflow.
             logger.error("%s unable to collect output from %s:\n%s",
                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
             processStatus = "permanentFail"
-        except Exception as e:
-            logger.exception("%s while getting output object: %s", self.arvrunner.label(self), e)
+        except Exception:
+            logger.exception("%s while getting output object:", self.arvrunner.label(self))
             processStatus = "permanentFail"
         finally:
             self.output_callback(outputs, processStatus)
@@ -414,7 +426,7 @@ class RunnerContainer(Runner):
                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
                 "API": True
             },
-            "use_existing": self.enable_reuse,
+            "use_existing": False, # Never reuse the runner container - see #15497.
             "properties": {}
         }
 
@@ -481,6 +493,9 @@ class RunnerContainer(Runner):
         if self.arvrunner.project_uuid:
             command.append("--project-uuid="+self.arvrunner.project_uuid)
 
+        if self.enable_dev:
+            command.append("--enable-dev")
+
         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
 
         container_req["command"] = command
@@ -499,6 +514,9 @@ class RunnerContainer(Runner):
             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
 
         if runtimeContext.submit_request_uuid:
+            if "cluster_id" in extra_submit_params:
+                # Doesn't make sense for "update" and actually fails
+                del extra_submit_params["cluster_id"]
             response = self.arvrunner.api.container_requests().update(
                 uuid=runtimeContext.submit_request_uuid,
                 body=job_spec,
@@ -520,8 +538,9 @@ class RunnerContainer(Runner):
             container = self.arvrunner.api.containers().get(
                 uuid=record["container_uuid"]
             ).execute(num_retries=self.arvrunner.num_retries)
-        except Exception as e:
-            logger.exception("%s while getting runner container: %s", self.arvrunner.label(self), e)
+            container["log"] = record["log_uuid"]
+        except Exception:
+            logger.exception("%s while getting runner container", self.arvrunner.label(self))
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             super(RunnerContainer, self).done(container)