16353: Support CWL 1.2 behavior for mounting at arbitrary container path
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.utils import aslist
26 from cwltool.job import JobBase
27
28 import arvados.collection
29
30 from .arvdocker import arv_docker_get_image
31 from . import done
32 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
33 from .fsaccess import CollectionFetcher
34 from .pathmapper import NoFollowPathMapper, trim_listing
35 from .perf import Perf
36 from ._version import __version__
37
38 logger = logging.getLogger('arvados.cwl-runner')
39 metrics = logging.getLogger('arvados.cwl-runner.metrics')
40
41 class ArvadosContainer(JobBase):
42     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
43
44     def __init__(self, runner, job_runtime,
45                  builder,   # type: Builder
46                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
47                  make_path_mapper,  # type: Callable[..., PathMapper]
48                  requirements,      # type: List[Dict[Text, Text]]
49                  hints,     # type: List[Dict[Text, Text]]
50                  name       # type: Text
51     ):
52         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
53         self.arvrunner = runner
54         self.job_runtime = job_runtime
55         self.running = False
56         self.uuid = None
57
58     def update_pipeline_component(self, r):
59         pass
60
61     def run(self, runtimeContext):
62         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
63         # which calls makeJobRunner() to get a new ArvadosContainer
64         # object.  The fields that define execution such as
65         # command_line, environment, etc are set on the
66         # ArvadosContainer object by CommandLineTool.job() before
67         # run() is called.
68
69         runtimeContext = self.job_runtime
70
71         container_request = {
72             "command": self.command_line,
73             "name": self.name,
74             "output_path": self.outdir,
75             "cwd": self.outdir,
76             "priority": runtimeContext.priority,
77             "state": "Committed",
78             "properties": {},
79         }
80         runtime_constraints = {}
81
82         if runtimeContext.project_uuid:
83             container_request["owner_uuid"] = runtimeContext.project_uuid
84
85         if self.arvrunner.secret_store.has_secret(self.command_line):
86             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
87
88         if self.arvrunner.secret_store.has_secret(self.environment):
89             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
90
91         resources = self.builder.resources
92         if resources is not None:
93             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
94             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
95
96         mounts = {
97             self.outdir: {
98                 "kind": "tmp",
99                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
100             },
101             self.tmpdir: {
102                 "kind": "tmp",
103                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
104             }
105         }
106         secret_mounts = {}
107         scheduling_parameters = {}
108
109         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
110         rf.sort(key=lambda k: k.resolved)
111         prevdir = None
112         for resolved, target, tp, stg in rf:
113             if not stg:
114                 continue
115             if prevdir and target.startswith(prevdir):
116                 continue
117             if tp == "Directory":
118                 targetdir = target
119             else:
120                 targetdir = os.path.dirname(target)
121             sp = resolved.split("/", 1)
122             pdh = sp[0][5:]   # remove "keep:"
123             mounts[targetdir] = {
124                 "kind": "collection",
125                 "portable_data_hash": pdh
126             }
127             if pdh in self.pathmapper.pdh_to_uuid:
128                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
129             if len(sp) == 2:
130                 if tp == "Directory":
131                     path = sp[1]
132                 else:
133                     path = os.path.dirname(sp[1])
134                 if path and path != "/":
135                     mounts[targetdir]["path"] = path
136             prevdir = targetdir + "/"
137
138         with Perf(metrics, "generatefiles %s" % self.name):
139             if self.generatefiles["listing"]:
140                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
141                                                     keep_client=self.arvrunner.keep_client,
142                                                     num_retries=self.arvrunner.num_retries)
143                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
144                                                     separateDirs=False)
145
146                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
147
148                 logger.debug("generatemapper is %s", sorteditems)
149
150                 with Perf(metrics, "createfiles %s" % self.name):
151                     for f, p in sorteditems:
152                         if not p.target:
153                             continue
154
155                         if not p.target.startswith("/"):
156                             raise Exception("Expected mount target to start with '/'")
157
158                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
159
160                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
161                             if p.resolved.startswith("_:"):
162                                 vwd.mkdirs(dst)
163                             else:
164                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
165                                 vwd.copy(path or ".", dst, source_collection=source)
166                         elif p.type == "CreateFile":
167                             if self.arvrunner.secret_store.has_secret(p.resolved):
168                                 secret_mounts[p.target] = {
169                                     "kind": "text",
170                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
171                                 }
172                             else:
173                                 with vwd.open(dst, "w") as n:
174                                     n.write(p.resolved)
175
176                 def keepemptydirs(p):
177                     if isinstance(p, arvados.collection.RichCollectionBase):
178                         if len(p) == 0:
179                             p.open(".keep", "w").close()
180                         else:
181                             for c in p:
182                                 keepemptydirs(p[c])
183
184                 keepemptydirs(vwd)
185
186                 if not runtimeContext.current_container:
187                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
188                 info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
189                 vwd.save_new(name=info["name"],
190                              owner_uuid=runtimeContext.project_uuid,
191                              ensure_unique_name=True,
192                              trash_at=info["trash_at"],
193                              properties=info["properties"])
194
195                 prev = None
196                 for f, p in sorteditems:
197                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
198                         (prev is not None and p.target.startswith(prev))):
199                         continue
200                     dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
201                     mounts[p.target] = {"kind": "collection",
202                                           "portable_data_hash": vwd.portable_data_hash(),
203                                           "path": dst}
204                     if p.type.startswith("Writable"):
205                         mounts[p.target]["writable"] = True
206                     prev = p.target + "/"
207
208         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
209         if self.environment:
210             container_request["environment"].update(self.environment)
211
212         if self.stdin:
213             sp = self.stdin[6:].split("/", 1)
214             mounts["stdin"] = {"kind": "collection",
215                                 "portable_data_hash": sp[0],
216                                 "path": sp[1]}
217
218         if self.stderr:
219             mounts["stderr"] = {"kind": "file",
220                                 "path": "%s/%s" % (self.outdir, self.stderr)}
221
222         if self.stdout:
223             mounts["stdout"] = {"kind": "file",
224                                 "path": "%s/%s" % (self.outdir, self.stdout)}
225
226         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
227         if not docker_req:
228             docker_req = {"dockerImageId": "arvados/jobs:"+__version__}
229
230         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
231                                                                     docker_req,
232                                                                     runtimeContext.pull_image,
233                                                                     runtimeContext.project_uuid)
234
235         network_req, _ = self.get_requirement("NetworkAccess")
236         if network_req:
237             runtime_constraints["API"] = network_req["networkAccess"]
238
239         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
240         if api_req:
241             runtime_constraints["API"] = True
242
243         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
244         if runtime_req:
245             if "keep_cache" in runtime_req:
246                 runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
247             if "outputDirType" in runtime_req:
248                 if runtime_req["outputDirType"] == "local_output_dir":
249                     # Currently the default behavior.
250                     pass
251                 elif runtime_req["outputDirType"] == "keep_output_dir":
252                     mounts[self.outdir]= {
253                         "kind": "collection",
254                         "writable": True
255                     }
256
257         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
258         if partition_req:
259             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
260
261         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
262         if intermediate_output_req:
263             self.output_ttl = intermediate_output_req["outputTTL"]
264         else:
265             self.output_ttl = self.arvrunner.intermediate_output_ttl
266
267         if self.output_ttl < 0:
268             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
269
270         if self.timelimit is not None and self.timelimit > 0:
271             scheduling_parameters["max_run_time"] = self.timelimit
272
273         extra_submit_params = {}
274         if runtimeContext.submit_runner_cluster:
275             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
276
277         container_request["output_name"] = "Output for step %s" % (self.name)
278         container_request["output_ttl"] = self.output_ttl
279         container_request["mounts"] = mounts
280         container_request["secret_mounts"] = secret_mounts
281         container_request["runtime_constraints"] = runtime_constraints
282         container_request["scheduling_parameters"] = scheduling_parameters
283
284         enable_reuse = runtimeContext.enable_reuse
285         if enable_reuse:
286             reuse_req, _ = self.get_requirement("WorkReuse")
287             if reuse_req:
288                 enable_reuse = reuse_req["enableReuse"]
289             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
290             if reuse_req:
291                 enable_reuse = reuse_req["enableReuse"]
292         container_request["use_existing"] = enable_reuse
293
294         if runtimeContext.runnerjob.startswith("arvwf:"):
295             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
296             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
297             if container_request["name"] == "main":
298                 container_request["name"] = wfrecord["name"]
299             container_request["properties"]["template_uuid"] = wfuuid
300
301         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
302
303         try:
304             if runtimeContext.submit_request_uuid:
305                 response = self.arvrunner.api.container_requests().update(
306                     uuid=runtimeContext.submit_request_uuid,
307                     body=container_request,
308                     **extra_submit_params
309                 ).execute(num_retries=self.arvrunner.num_retries)
310             else:
311                 response = self.arvrunner.api.container_requests().create(
312                     body=container_request,
313                     **extra_submit_params
314                 ).execute(num_retries=self.arvrunner.num_retries)
315
316             self.uuid = response["uuid"]
317             self.arvrunner.process_submitted(self)
318
319             if response["state"] == "Final":
320                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
321             else:
322                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
323         except Exception:
324             logger.exception("%s got an error", self.arvrunner.label(self))
325             logger.debug("Container request was %s", container_request)
326             self.output_callback({}, "permanentFail")
327
328     def done(self, record):
329         outputs = {}
330         try:
331             container = self.arvrunner.api.containers().get(
332                 uuid=record["container_uuid"]
333             ).execute(num_retries=self.arvrunner.num_retries)
334             if container["state"] == "Complete":
335                 rcode = container["exit_code"]
336                 if self.successCodes and rcode in self.successCodes:
337                     processStatus = "success"
338                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
339                     processStatus = "temporaryFail"
340                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
341                     processStatus = "permanentFail"
342                 elif rcode == 0:
343                     processStatus = "success"
344                 else:
345                     processStatus = "permanentFail"
346             else:
347                 processStatus = "permanentFail"
348
349             if processStatus == "permanentFail" and record["log_uuid"]:
350                 logc = arvados.collection.CollectionReader(record["log_uuid"],
351                                                            api_client=self.arvrunner.api,
352                                                            keep_client=self.arvrunner.keep_client,
353                                                            num_retries=self.arvrunner.num_retries)
354                 label = self.arvrunner.label(self)
355                 done.logtail(
356                     logc, logger.error,
357                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
358
359             if record["output_uuid"]:
360                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
361                     # Compute the trash time to avoid requesting the collection record.
362                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
363                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
364                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
365                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
366                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
367                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
368                 self.arvrunner.add_intermediate_output(record["output_uuid"])
369
370             if container["output"]:
371                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
372         except WorkflowException as e:
373             # Only include a stack trace if in debug mode.
374             # A stack trace may obfuscate more useful output about the workflow.
375             logger.error("%s unable to collect output from %s:\n%s",
376                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
377             processStatus = "permanentFail"
378         except Exception:
379             logger.exception("%s while getting output object:", self.arvrunner.label(self))
380             processStatus = "permanentFail"
381         finally:
382             self.output_callback(outputs, processStatus)
383
384
385 class RunnerContainer(Runner):
386     """Submit and manage a container that runs arvados-cwl-runner."""
387
388     def arvados_job_spec(self, runtimeContext):
389         """Create an Arvados container request for this workflow.
390
391         The returned dict can be used to create a container passed as
392         the +body+ argument to container_requests().create().
393         """
394
395         adjustDirObjs(self.job_order, trim_listing)
396         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
397         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
398
399         secret_mounts = {}
400         for param in sorted(self.job_order.keys()):
401             if self.secret_store.has_secret(self.job_order[param]):
402                 mnt = "/secrets/s%d" % len(secret_mounts)
403                 secret_mounts[mnt] = {
404                     "kind": "text",
405                     "content": self.secret_store.retrieve(self.job_order[param])
406                 }
407                 self.job_order[param] = {"$include": mnt}
408
409         container_req = {
410             "name": self.name,
411             "output_path": "/var/spool/cwl",
412             "cwd": "/var/spool/cwl",
413             "priority": self.priority,
414             "state": "Committed",
415             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
416             "mounts": {
417                 "/var/lib/cwl/cwl.input.json": {
418                     "kind": "json",
419                     "content": self.job_order
420                 },
421                 "stdout": {
422                     "kind": "file",
423                     "path": "/var/spool/cwl/cwl.output.json"
424                 },
425                 "/var/spool/cwl": {
426                     "kind": "collection",
427                     "writable": True
428                 }
429             },
430             "secret_mounts": secret_mounts,
431             "runtime_constraints": {
432                 "vcpus": math.ceil(self.submit_runner_cores),
433                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
434                 "API": True
435             },
436             "use_existing": False, # Never reuse the runner container - see #15497.
437             "properties": {}
438         }
439
440         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
441             sp = self.embedded_tool.tool["id"].split('/')
442             workflowcollection = sp[0][5:]
443             workflowname = "/".join(sp[1:])
444             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
445             container_req["mounts"]["/var/lib/cwl/workflow"] = {
446                 "kind": "collection",
447                 "portable_data_hash": "%s" % workflowcollection
448             }
449         else:
450             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
451             workflowpath = "/var/lib/cwl/workflow.json#main"
452             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
453                 "kind": "json",
454                 "content": packed
455             }
456             if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
457                 container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
458
459
460         # --local means execute the workflow instead of submitting a container request
461         # --api=containers means use the containers API
462         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
463         # --disable-validate because we already validated so don't need to do it again
464         # --eval-timeout is the timeout for javascript invocation
465         # --parallel-task-count is the number of threads to use for job submission
466         # --enable/disable-reuse sets desired job reuse
467         # --collection-cache-size sets aside memory to store collections
468         command = ["arvados-cwl-runner",
469                    "--local",
470                    "--api=containers",
471                    "--no-log-timestamps",
472                    "--disable-validate",
473                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
474                    "--thread-count=%s" % self.arvrunner.thread_count,
475                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
476                    "--collection-cache-size=%s" % self.collection_cache_size]
477
478         if self.output_name:
479             command.append("--output-name=" + self.output_name)
480             container_req["output_name"] = self.output_name
481
482         if self.output_tags:
483             command.append("--output-tags=" + self.output_tags)
484
485         if runtimeContext.debug:
486             command.append("--debug")
487
488         if runtimeContext.storage_classes != "default":
489             command.append("--storage-classes=" + runtimeContext.storage_classes)
490
491         if self.on_error:
492             command.append("--on-error=" + self.on_error)
493
494         if self.intermediate_output_ttl:
495             command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
496
497         if self.arvrunner.trash_intermediate:
498             command.append("--trash-intermediate")
499
500         if self.arvrunner.project_uuid:
501             command.append("--project-uuid="+self.arvrunner.project_uuid)
502
503         if self.enable_dev:
504             command.append("--enable-dev")
505
506         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
507
508         container_req["command"] = command
509
510         return container_req
511
512
513     def run(self, runtimeContext):
514         runtimeContext.keepprefix = "keep:"
515         job_spec = self.arvados_job_spec(runtimeContext)
516         if self.arvrunner.project_uuid:
517             job_spec["owner_uuid"] = self.arvrunner.project_uuid
518
519         extra_submit_params = {}
520         if runtimeContext.submit_runner_cluster:
521             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
522
523         if runtimeContext.submit_request_uuid:
524             if "cluster_id" in extra_submit_params:
525                 # Doesn't make sense for "update" and actually fails
526                 del extra_submit_params["cluster_id"]
527             response = self.arvrunner.api.container_requests().update(
528                 uuid=runtimeContext.submit_request_uuid,
529                 body=job_spec,
530                 **extra_submit_params
531             ).execute(num_retries=self.arvrunner.num_retries)
532         else:
533             response = self.arvrunner.api.container_requests().create(
534                 body=job_spec,
535                 **extra_submit_params
536             ).execute(num_retries=self.arvrunner.num_retries)
537
538         self.uuid = response["uuid"]
539         self.arvrunner.process_submitted(self)
540
541         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
542
543     def done(self, record):
544         try:
545             container = self.arvrunner.api.containers().get(
546                 uuid=record["container_uuid"]
547             ).execute(num_retries=self.arvrunner.num_retries)
548             container["log"] = record["log_uuid"]
549         except Exception:
550             logger.exception("%s while getting runner container", self.arvrunner.label(self))
551             self.arvrunner.output_callback({}, "permanentFail")
552         else:
553             super(RunnerContainer, self).done(container)