Merge branch '16669-oidc-access-token'
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.job import JobBase
26
27 import arvados.collection
28
29 from .arvdocker import arv_docker_get_image
30 from . import done
31 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
32 from .fsaccess import CollectionFetcher
33 from .pathmapper import NoFollowPathMapper, trim_listing
34 from .perf import Perf
35 from ._version import __version__
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 class ArvadosContainer(JobBase):
41     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
42
43     def __init__(self, runner, job_runtime,
44                  builder,   # type: Builder
45                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
46                  make_path_mapper,  # type: Callable[..., PathMapper]
47                  requirements,      # type: List[Dict[Text, Text]]
48                  hints,     # type: List[Dict[Text, Text]]
49                  name       # type: Text
50     ):
51         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
52         self.arvrunner = runner
53         self.job_runtime = job_runtime
54         self.running = False
55         self.uuid = None
56
57     def update_pipeline_component(self, r):
58         pass
59
60     def run(self, runtimeContext):
61         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
62         # which calls makeJobRunner() to get a new ArvadosContainer
63         # object.  The fields that define execution such as
64         # command_line, environment, etc are set on the
65         # ArvadosContainer object by CommandLineTool.job() before
66         # run() is called.
67
68         runtimeContext = self.job_runtime
69
70         container_request = {
71             "command": self.command_line,
72             "name": self.name,
73             "output_path": self.outdir,
74             "cwd": self.outdir,
75             "priority": runtimeContext.priority,
76             "state": "Committed",
77             "properties": {},
78         }
79         runtime_constraints = {}
80
81         if runtimeContext.project_uuid:
82             container_request["owner_uuid"] = runtimeContext.project_uuid
83
84         if self.arvrunner.secret_store.has_secret(self.command_line):
85             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
86
87         if self.arvrunner.secret_store.has_secret(self.environment):
88             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
89
90         resources = self.builder.resources
91         if resources is not None:
92             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
93             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
94
95         mounts = {
96             self.outdir: {
97                 "kind": "tmp",
98                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
99             },
100             self.tmpdir: {
101                 "kind": "tmp",
102                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
103             }
104         }
105         secret_mounts = {}
106         scheduling_parameters = {}
107
108         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
109         rf.sort(key=lambda k: k.resolved)
110         prevdir = None
111         for resolved, target, tp, stg in rf:
112             if not stg:
113                 continue
114             if prevdir and target.startswith(prevdir):
115                 continue
116             if tp == "Directory":
117                 targetdir = target
118             else:
119                 targetdir = os.path.dirname(target)
120             sp = resolved.split("/", 1)
121             pdh = sp[0][5:]   # remove "keep:"
122             mounts[targetdir] = {
123                 "kind": "collection",
124                 "portable_data_hash": pdh
125             }
126             if pdh in self.pathmapper.pdh_to_uuid:
127                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
128             if len(sp) == 2:
129                 if tp == "Directory":
130                     path = sp[1]
131                 else:
132                     path = os.path.dirname(sp[1])
133                 if path and path != "/":
134                     mounts[targetdir]["path"] = path
135             prevdir = targetdir + "/"
136
137         with Perf(metrics, "generatefiles %s" % self.name):
138             if self.generatefiles["listing"]:
139                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
140                                                     keep_client=self.arvrunner.keep_client,
141                                                     num_retries=self.arvrunner.num_retries)
142                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
143                                                     separateDirs=False)
144
145                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
146
147                 logger.debug("generatemapper is %s", sorteditems)
148
149                 with Perf(metrics, "createfiles %s" % self.name):
150                     for f, p in sorteditems:
151                         if not p.target:
152                             continue
153
154                         if p.target.startswith("/"):
155                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
156                         else:
157                             dst = p.target
158
159                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
160                             if p.resolved.startswith("_:"):
161                                 vwd.mkdirs(dst)
162                             else:
163                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
164                                 vwd.copy(path or ".", dst, source_collection=source)
165                         elif p.type == "CreateFile":
166                             if self.arvrunner.secret_store.has_secret(p.resolved):
167                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
168                                 secret_mounts[mountpoint] = {
169                                     "kind": "text",
170                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
171                                 }
172                             else:
173                                 with vwd.open(dst, "w") as n:
174                                     n.write(p.resolved)
175
176                 def keepemptydirs(p):
177                     if isinstance(p, arvados.collection.RichCollectionBase):
178                         if len(p) == 0:
179                             p.open(".keep", "w").close()
180                         else:
181                             for c in p:
182                                 keepemptydirs(p[c])
183
184                 keepemptydirs(vwd)
185
186                 if not runtimeContext.current_container:
187                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
188                 info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
189                 vwd.save_new(name=info["name"],
190                              owner_uuid=runtimeContext.project_uuid,
191                              ensure_unique_name=True,
192                              trash_at=info["trash_at"],
193                              properties=info["properties"])
194
195                 prev = None
196                 for f, p in sorteditems:
197                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
198                         (prev is not None and p.target.startswith(prev))):
199                         continue
200                     if p.target.startswith("/"):
201                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
202                     else:
203                         dst = p.target
204                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
205                     mounts[mountpoint] = {"kind": "collection",
206                                           "portable_data_hash": vwd.portable_data_hash(),
207                                           "path": dst}
208                     if p.type.startswith("Writable"):
209                         mounts[mountpoint]["writable"] = True
210                     prev = p.target + "/"
211
212         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
213         if self.environment:
214             container_request["environment"].update(self.environment)
215
216         if self.stdin:
217             sp = self.stdin[6:].split("/", 1)
218             mounts["stdin"] = {"kind": "collection",
219                                 "portable_data_hash": sp[0],
220                                 "path": sp[1]}
221
222         if self.stderr:
223             mounts["stderr"] = {"kind": "file",
224                                 "path": "%s/%s" % (self.outdir, self.stderr)}
225
226         if self.stdout:
227             mounts["stdout"] = {"kind": "file",
228                                 "path": "%s/%s" % (self.outdir, self.stdout)}
229
230         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
231         if not docker_req:
232             docker_req = {"dockerImageId": "arvados/jobs:"+__version__}
233
234         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
235                                                                     docker_req,
236                                                                     runtimeContext.pull_image,
237                                                                     runtimeContext.project_uuid,
238                                                                     runtimeContext.force_docker_pull,
239                                                                     runtimeContext.tmp_outdir_prefix)
240
241         network_req, _ = self.get_requirement("NetworkAccess")
242         if network_req:
243             runtime_constraints["API"] = network_req["networkAccess"]
244
245         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
246         if api_req:
247             runtime_constraints["API"] = True
248
249         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
250         if runtime_req:
251             if "keep_cache" in runtime_req:
252                 runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
253             if "outputDirType" in runtime_req:
254                 if runtime_req["outputDirType"] == "local_output_dir":
255                     # Currently the default behavior.
256                     pass
257                 elif runtime_req["outputDirType"] == "keep_output_dir":
258                     mounts[self.outdir]= {
259                         "kind": "collection",
260                         "writable": True
261                     }
262
263         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
264         if partition_req:
265             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
266
267         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
268         if intermediate_output_req:
269             self.output_ttl = intermediate_output_req["outputTTL"]
270         else:
271             self.output_ttl = self.arvrunner.intermediate_output_ttl
272
273         if self.output_ttl < 0:
274             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
275
276         if self.timelimit is not None and self.timelimit > 0:
277             scheduling_parameters["max_run_time"] = self.timelimit
278
279         extra_submit_params = {}
280         if runtimeContext.submit_runner_cluster:
281             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
282
283         container_request["output_name"] = "Output for step %s" % (self.name)
284         container_request["output_ttl"] = self.output_ttl
285         container_request["mounts"] = mounts
286         container_request["secret_mounts"] = secret_mounts
287         container_request["runtime_constraints"] = runtime_constraints
288         container_request["scheduling_parameters"] = scheduling_parameters
289
290         enable_reuse = runtimeContext.enable_reuse
291         if enable_reuse:
292             reuse_req, _ = self.get_requirement("WorkReuse")
293             if reuse_req:
294                 enable_reuse = reuse_req["enableReuse"]
295             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
296             if reuse_req:
297                 enable_reuse = reuse_req["enableReuse"]
298         container_request["use_existing"] = enable_reuse
299
300         if runtimeContext.runnerjob.startswith("arvwf:"):
301             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
302             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
303             if container_request["name"] == "main":
304                 container_request["name"] = wfrecord["name"]
305             container_request["properties"]["template_uuid"] = wfuuid
306
307         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
308
309         try:
310             if runtimeContext.submit_request_uuid:
311                 response = self.arvrunner.api.container_requests().update(
312                     uuid=runtimeContext.submit_request_uuid,
313                     body=container_request,
314                     **extra_submit_params
315                 ).execute(num_retries=self.arvrunner.num_retries)
316             else:
317                 response = self.arvrunner.api.container_requests().create(
318                     body=container_request,
319                     **extra_submit_params
320                 ).execute(num_retries=self.arvrunner.num_retries)
321
322             self.uuid = response["uuid"]
323             self.arvrunner.process_submitted(self)
324
325             if response["state"] == "Final":
326                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
327             else:
328                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
329         except Exception as e:
330             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
331             logger.debug("Container request was %s", container_request)
332             self.output_callback({}, "permanentFail")
333
334     def done(self, record):
335         outputs = {}
336         try:
337             container = self.arvrunner.api.containers().get(
338                 uuid=record["container_uuid"]
339             ).execute(num_retries=self.arvrunner.num_retries)
340             if container["state"] == "Complete":
341                 rcode = container["exit_code"]
342                 if self.successCodes and rcode in self.successCodes:
343                     processStatus = "success"
344                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
345                     processStatus = "temporaryFail"
346                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
347                     processStatus = "permanentFail"
348                 elif rcode == 0:
349                     processStatus = "success"
350                 else:
351                     processStatus = "permanentFail"
352             else:
353                 processStatus = "permanentFail"
354
355             if processStatus == "permanentFail" and record["log_uuid"]:
356                 logc = arvados.collection.CollectionReader(record["log_uuid"],
357                                                            api_client=self.arvrunner.api,
358                                                            keep_client=self.arvrunner.keep_client,
359                                                            num_retries=self.arvrunner.num_retries)
360                 label = self.arvrunner.label(self)
361                 done.logtail(
362                     logc, logger.error,
363                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
364
365             if record["output_uuid"]:
366                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
367                     # Compute the trash time to avoid requesting the collection record.
368                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
369                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
370                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
371                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
372                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
373                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
374                 self.arvrunner.add_intermediate_output(record["output_uuid"])
375
376             if container["output"]:
377                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
378         except WorkflowException as e:
379             # Only include a stack trace if in debug mode.
380             # A stack trace may obfuscate more useful output about the workflow.
381             logger.error("%s unable to collect output from %s:\n%s",
382                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
383             processStatus = "permanentFail"
384         except Exception:
385             logger.exception("%s while getting output object:", self.arvrunner.label(self))
386             processStatus = "permanentFail"
387         finally:
388             self.output_callback(outputs, processStatus)
389
390
391 class RunnerContainer(Runner):
392     """Submit and manage a container that runs arvados-cwl-runner."""
393
394     def arvados_job_spec(self, runtimeContext):
395         """Create an Arvados container request for this workflow.
396
397         The returned dict can be used to create a container passed as
398         the +body+ argument to container_requests().create().
399         """
400
401         adjustDirObjs(self.job_order, trim_listing)
402         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
403         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
404
405         secret_mounts = {}
406         for param in sorted(self.job_order.keys()):
407             if self.secret_store.has_secret(self.job_order[param]):
408                 mnt = "/secrets/s%d" % len(secret_mounts)
409                 secret_mounts[mnt] = {
410                     "kind": "text",
411                     "content": self.secret_store.retrieve(self.job_order[param])
412                 }
413                 self.job_order[param] = {"$include": mnt}
414
415         container_req = {
416             "name": self.name,
417             "output_path": "/var/spool/cwl",
418             "cwd": "/var/spool/cwl",
419             "priority": self.priority,
420             "state": "Committed",
421             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
422             "mounts": {
423                 "/var/lib/cwl/cwl.input.json": {
424                     "kind": "json",
425                     "content": self.job_order
426                 },
427                 "stdout": {
428                     "kind": "file",
429                     "path": "/var/spool/cwl/cwl.output.json"
430                 },
431                 "/var/spool/cwl": {
432                     "kind": "collection",
433                     "writable": True
434                 }
435             },
436             "secret_mounts": secret_mounts,
437             "runtime_constraints": {
438                 "vcpus": math.ceil(self.submit_runner_cores),
439                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
440                 "API": True
441             },
442             "use_existing": False, # Never reuse the runner container - see #15497.
443             "properties": {}
444         }
445
446         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
447             sp = self.embedded_tool.tool["id"].split('/')
448             workflowcollection = sp[0][5:]
449             workflowname = "/".join(sp[1:])
450             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
451             container_req["mounts"]["/var/lib/cwl/workflow"] = {
452                 "kind": "collection",
453                 "portable_data_hash": "%s" % workflowcollection
454             }
455         else:
456             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
457             workflowpath = "/var/lib/cwl/workflow.json#main"
458             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
459                 "kind": "json",
460                 "content": packed
461             }
462             if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
463                 container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
464
465
466         # --local means execute the workflow instead of submitting a container request
467         # --api=containers means use the containers API
468         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
469         # --disable-validate because we already validated so don't need to do it again
470         # --eval-timeout is the timeout for javascript invocation
471         # --parallel-task-count is the number of threads to use for job submission
472         # --enable/disable-reuse sets desired job reuse
473         # --collection-cache-size sets aside memory to store collections
474         command = ["arvados-cwl-runner",
475                    "--local",
476                    "--api=containers",
477                    "--no-log-timestamps",
478                    "--disable-validate",
479                    "--disable-color",
480                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
481                    "--thread-count=%s" % self.arvrunner.thread_count,
482                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
483                    "--collection-cache-size=%s" % self.collection_cache_size]
484
485         if self.output_name:
486             command.append("--output-name=" + self.output_name)
487             container_req["output_name"] = self.output_name
488
489         if self.output_tags:
490             command.append("--output-tags=" + self.output_tags)
491
492         if runtimeContext.debug:
493             command.append("--debug")
494
495         if runtimeContext.storage_classes != "default":
496             command.append("--storage-classes=" + runtimeContext.storage_classes)
497
498         if self.on_error:
499             command.append("--on-error=" + self.on_error)
500
501         if self.intermediate_output_ttl:
502             command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
503
504         if self.arvrunner.trash_intermediate:
505             command.append("--trash-intermediate")
506
507         if self.arvrunner.project_uuid:
508             command.append("--project-uuid="+self.arvrunner.project_uuid)
509
510         if self.enable_dev:
511             command.append("--enable-dev")
512
513         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
514
515         container_req["command"] = command
516
517         return container_req
518
519
520     def run(self, runtimeContext):
521         runtimeContext.keepprefix = "keep:"
522         job_spec = self.arvados_job_spec(runtimeContext)
523         if self.arvrunner.project_uuid:
524             job_spec["owner_uuid"] = self.arvrunner.project_uuid
525
526         extra_submit_params = {}
527         if runtimeContext.submit_runner_cluster:
528             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
529
530         if runtimeContext.submit_request_uuid:
531             if "cluster_id" in extra_submit_params:
532                 # Doesn't make sense for "update" and actually fails
533                 del extra_submit_params["cluster_id"]
534             response = self.arvrunner.api.container_requests().update(
535                 uuid=runtimeContext.submit_request_uuid,
536                 body=job_spec,
537                 **extra_submit_params
538             ).execute(num_retries=self.arvrunner.num_retries)
539         else:
540             response = self.arvrunner.api.container_requests().create(
541                 body=job_spec,
542                 **extra_submit_params
543             ).execute(num_retries=self.arvrunner.num_retries)
544
545         self.uuid = response["uuid"]
546         self.arvrunner.process_submitted(self)
547
548         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
549
550         workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
551         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
552         url = ""
553         if workbench2:
554             url = "{}processes/{}".format(workbench2, response["uuid"])
555         elif workbench1:
556             url = "{}container_requests/{}".format(workbench1, response["uuid"])
557         if url:
558             logger.info("Monitor workflow progress at %s", url)
559
560
561     def done(self, record):
562         try:
563             container = self.arvrunner.api.containers().get(
564                 uuid=record["container_uuid"]
565             ).execute(num_retries=self.arvrunner.num_retries)
566             container["log"] = record["log_uuid"]
567         except Exception:
568             logger.exception("%s while getting runner container", self.arvrunner.label(self))
569             self.arvrunner.output_callback({}, "permanentFail")
570         else:
571             super(RunnerContainer, self).done(container)