17698: Merge branch 'main' into 17698-keepstore-concurrent-writes
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.job import JobBase
26
27 import arvados.collection
28
29 from .arvdocker import arv_docker_get_image
30 from . import done
31 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
32 from .fsaccess import CollectionFetcher
33 from .pathmapper import NoFollowPathMapper, trim_listing
34 from .perf import Perf
35 from ._version import __version__
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 class ArvadosContainer(JobBase):
41     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
42
43     def __init__(self, runner, job_runtime,
44                  builder,   # type: Builder
45                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
46                  make_path_mapper,  # type: Callable[..., PathMapper]
47                  requirements,      # type: List[Dict[Text, Text]]
48                  hints,     # type: List[Dict[Text, Text]]
49                  name       # type: Text
50     ):
51         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
52         self.arvrunner = runner
53         self.job_runtime = job_runtime
54         self.running = False
55         self.uuid = None
56
57     def update_pipeline_component(self, r):
58         pass
59
60     def run(self, runtimeContext):
61         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
62         # which calls makeJobRunner() to get a new ArvadosContainer
63         # object.  The fields that define execution such as
64         # command_line, environment, etc are set on the
65         # ArvadosContainer object by CommandLineTool.job() before
66         # run() is called.
67
68         runtimeContext = self.job_runtime
69
70         container_request = {
71             "command": self.command_line,
72             "name": self.name,
73             "output_path": self.outdir,
74             "cwd": self.outdir,
75             "priority": runtimeContext.priority,
76             "state": "Committed",
77             "properties": {},
78         }
79         runtime_constraints = {}
80
81         if runtimeContext.project_uuid:
82             container_request["owner_uuid"] = runtimeContext.project_uuid
83
84         if self.arvrunner.secret_store.has_secret(self.command_line):
85             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
86
87         if self.arvrunner.secret_store.has_secret(self.environment):
88             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
89
90         resources = self.builder.resources
91         if resources is not None:
92             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
93             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
94
95         mounts = {
96             self.outdir: {
97                 "kind": "tmp",
98                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
99             },
100             self.tmpdir: {
101                 "kind": "tmp",
102                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
103             }
104         }
105         secret_mounts = {}
106         scheduling_parameters = {}
107
108         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
109         rf.sort(key=lambda k: k.resolved)
110         prevdir = None
111         for resolved, target, tp, stg in rf:
112             if not stg:
113                 continue
114             if prevdir and target.startswith(prevdir):
115                 continue
116             if tp == "Directory":
117                 targetdir = target
118             else:
119                 targetdir = os.path.dirname(target)
120             sp = resolved.split("/", 1)
121             pdh = sp[0][5:]   # remove "keep:"
122             mounts[targetdir] = {
123                 "kind": "collection",
124                 "portable_data_hash": pdh
125             }
126             if pdh in self.pathmapper.pdh_to_uuid:
127                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
128             if len(sp) == 2:
129                 if tp == "Directory":
130                     path = sp[1]
131                 else:
132                     path = os.path.dirname(sp[1])
133                 if path and path != "/":
134                     mounts[targetdir]["path"] = path
135             prevdir = targetdir + "/"
136
137         with Perf(metrics, "generatefiles %s" % self.name):
138             if self.generatefiles["listing"]:
139                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
140                                                     keep_client=self.arvrunner.keep_client,
141                                                     num_retries=self.arvrunner.num_retries)
142                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
143                                                     separateDirs=False)
144
145                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
146
147                 logger.debug("generatemapper is %s", sorteditems)
148
149                 with Perf(metrics, "createfiles %s" % self.name):
150                     for f, p in sorteditems:
151                         if not p.target:
152                             continue
153
154                         if p.target.startswith("/"):
155                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
156                         else:
157                             dst = p.target
158
159                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
160                             if p.resolved.startswith("_:"):
161                                 vwd.mkdirs(dst)
162                             else:
163                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
164                                 vwd.copy(path or ".", dst, source_collection=source)
165                         elif p.type == "CreateFile":
166                             if self.arvrunner.secret_store.has_secret(p.resolved):
167                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
168                                 secret_mounts[mountpoint] = {
169                                     "kind": "text",
170                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
171                                 }
172                             else:
173                                 with vwd.open(dst, "w") as n:
174                                     n.write(p.resolved)
175
176                 def keepemptydirs(p):
177                     if isinstance(p, arvados.collection.RichCollectionBase):
178                         if len(p) == 0:
179                             p.open(".keep", "w").close()
180                         else:
181                             for c in p:
182                                 keepemptydirs(p[c])
183
184                 keepemptydirs(vwd)
185
186                 if not runtimeContext.current_container:
187                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
188                 info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
189                 vwd.save_new(name=info["name"],
190                              owner_uuid=runtimeContext.project_uuid,
191                              ensure_unique_name=True,
192                              trash_at=info["trash_at"],
193                              properties=info["properties"])
194
195                 prev = None
196                 for f, p in sorteditems:
197                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
198                         (prev is not None and p.target.startswith(prev))):
199                         continue
200                     if p.target.startswith("/"):
201                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
202                     else:
203                         dst = p.target
204                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
205                     mounts[mountpoint] = {"kind": "collection",
206                                           "portable_data_hash": vwd.portable_data_hash(),
207                                           "path": dst}
208                     if p.type.startswith("Writable"):
209                         mounts[mountpoint]["writable"] = True
210                     prev = p.target + "/"
211
212         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
213         if self.environment:
214             container_request["environment"].update(self.environment)
215
216         if self.stdin:
217             sp = self.stdin[6:].split("/", 1)
218             mounts["stdin"] = {"kind": "collection",
219                                 "portable_data_hash": sp[0],
220                                 "path": sp[1]}
221
222         if self.stderr:
223             mounts["stderr"] = {"kind": "file",
224                                 "path": "%s/%s" % (self.outdir, self.stderr)}
225
226         if self.stdout:
227             mounts["stdout"] = {"kind": "file",
228                                 "path": "%s/%s" % (self.outdir, self.stdout)}
229
230         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
231         if not docker_req:
232             docker_req = {"dockerImageId": "arvados/jobs:"+__version__}
233
234         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
235                                                                     docker_req,
236                                                                     runtimeContext.pull_image,
237                                                                     runtimeContext.project_uuid,
238                                                                     runtimeContext.force_docker_pull,
239                                                                     runtimeContext.tmp_outdir_prefix)
240
241         network_req, _ = self.get_requirement("NetworkAccess")
242         if network_req:
243             runtime_constraints["API"] = network_req["networkAccess"]
244
245         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
246         if api_req:
247             runtime_constraints["API"] = True
248
249         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
250         if runtime_req:
251             if "keep_cache" in runtime_req:
252                 runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
253             if "outputDirType" in runtime_req:
254                 if runtime_req["outputDirType"] == "local_output_dir":
255                     # Currently the default behavior.
256                     pass
257                 elif runtime_req["outputDirType"] == "keep_output_dir":
258                     mounts[self.outdir]= {
259                         "kind": "collection",
260                         "writable": True
261                     }
262
263         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
264         if partition_req:
265             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
266
267         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
268         if intermediate_output_req:
269             self.output_ttl = intermediate_output_req["outputTTL"]
270         else:
271             self.output_ttl = self.arvrunner.intermediate_output_ttl
272
273         if self.output_ttl < 0:
274             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
275
276         storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
277         if storage_class_req and storage_class_req.get("intermediateStorageClass"):
278             container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
279         else:
280             container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
281
282         if self.timelimit is not None and self.timelimit > 0:
283             scheduling_parameters["max_run_time"] = self.timelimit
284
285         extra_submit_params = {}
286         if runtimeContext.submit_runner_cluster:
287             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
288
289         container_request["output_name"] = "Output for step %s" % (self.name)
290         container_request["output_ttl"] = self.output_ttl
291         container_request["mounts"] = mounts
292         container_request["secret_mounts"] = secret_mounts
293         container_request["runtime_constraints"] = runtime_constraints
294         container_request["scheduling_parameters"] = scheduling_parameters
295
296         enable_reuse = runtimeContext.enable_reuse
297         if enable_reuse:
298             reuse_req, _ = self.get_requirement("WorkReuse")
299             if reuse_req:
300                 enable_reuse = reuse_req["enableReuse"]
301             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
302             if reuse_req:
303                 enable_reuse = reuse_req["enableReuse"]
304         container_request["use_existing"] = enable_reuse
305
306         if runtimeContext.runnerjob.startswith("arvwf:"):
307             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
308             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
309             if container_request["name"] == "main":
310                 container_request["name"] = wfrecord["name"]
311             container_request["properties"]["template_uuid"] = wfuuid
312
313         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
314
315         try:
316             if runtimeContext.submit_request_uuid:
317                 response = self.arvrunner.api.container_requests().update(
318                     uuid=runtimeContext.submit_request_uuid,
319                     body=container_request,
320                     **extra_submit_params
321                 ).execute(num_retries=self.arvrunner.num_retries)
322             else:
323                 response = self.arvrunner.api.container_requests().create(
324                     body=container_request,
325                     **extra_submit_params
326                 ).execute(num_retries=self.arvrunner.num_retries)
327
328             self.uuid = response["uuid"]
329             self.arvrunner.process_submitted(self)
330
331             if response["state"] == "Final":
332                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
333             else:
334                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
335         except Exception as e:
336             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
337             logger.debug("Container request was %s", container_request)
338             self.output_callback({}, "permanentFail")
339
340     def done(self, record):
341         outputs = {}
342         try:
343             container = self.arvrunner.api.containers().get(
344                 uuid=record["container_uuid"]
345             ).execute(num_retries=self.arvrunner.num_retries)
346             if container["state"] == "Complete":
347                 rcode = container["exit_code"]
348                 if self.successCodes and rcode in self.successCodes:
349                     processStatus = "success"
350                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
351                     processStatus = "temporaryFail"
352                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
353                     processStatus = "permanentFail"
354                 elif rcode == 0:
355                     processStatus = "success"
356                 else:
357                     processStatus = "permanentFail"
358             else:
359                 processStatus = "permanentFail"
360
361             if processStatus == "permanentFail" and record["log_uuid"]:
362                 logc = arvados.collection.CollectionReader(record["log_uuid"],
363                                                            api_client=self.arvrunner.api,
364                                                            keep_client=self.arvrunner.keep_client,
365                                                            num_retries=self.arvrunner.num_retries)
366                 label = self.arvrunner.label(self)
367                 done.logtail(
368                     logc, logger.error,
369                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
370
371             if record["output_uuid"]:
372                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
373                     # Compute the trash time to avoid requesting the collection record.
374                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
375                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
376                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
377                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
378                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
379                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
380                 self.arvrunner.add_intermediate_output(record["output_uuid"])
381
382             if container["output"]:
383                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
384         except WorkflowException as e:
385             # Only include a stack trace if in debug mode.
386             # A stack trace may obfuscate more useful output about the workflow.
387             logger.error("%s unable to collect output from %s:\n%s",
388                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
389             processStatus = "permanentFail"
390         except Exception:
391             logger.exception("%s while getting output object:", self.arvrunner.label(self))
392             processStatus = "permanentFail"
393         finally:
394             self.output_callback(outputs, processStatus)
395
396
397 class RunnerContainer(Runner):
398     """Submit and manage a container that runs arvados-cwl-runner."""
399
400     def arvados_job_spec(self, runtimeContext):
401         """Create an Arvados container request for this workflow.
402
403         The returned dict can be used to create a container passed as
404         the +body+ argument to container_requests().create().
405         """
406
407         adjustDirObjs(self.job_order, trim_listing)
408         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
409         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
410
411         secret_mounts = {}
412         for param in sorted(self.job_order.keys()):
413             if self.secret_store.has_secret(self.job_order[param]):
414                 mnt = "/secrets/s%d" % len(secret_mounts)
415                 secret_mounts[mnt] = {
416                     "kind": "text",
417                     "content": self.secret_store.retrieve(self.job_order[param])
418                 }
419                 self.job_order[param] = {"$include": mnt}
420
421         container_req = {
422             "name": self.name,
423             "output_path": "/var/spool/cwl",
424             "cwd": "/var/spool/cwl",
425             "priority": self.priority,
426             "state": "Committed",
427             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
428             "mounts": {
429                 "/var/lib/cwl/cwl.input.json": {
430                     "kind": "json",
431                     "content": self.job_order
432                 },
433                 "stdout": {
434                     "kind": "file",
435                     "path": "/var/spool/cwl/cwl.output.json"
436                 },
437                 "/var/spool/cwl": {
438                     "kind": "collection",
439                     "writable": True
440                 }
441             },
442             "secret_mounts": secret_mounts,
443             "runtime_constraints": {
444                 "vcpus": math.ceil(self.submit_runner_cores),
445                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
446                 "API": True
447             },
448             "use_existing": False, # Never reuse the runner container - see #15497.
449             "properties": {}
450         }
451
452         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
453             sp = self.embedded_tool.tool["id"].split('/')
454             workflowcollection = sp[0][5:]
455             workflowname = "/".join(sp[1:])
456             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
457             container_req["mounts"]["/var/lib/cwl/workflow"] = {
458                 "kind": "collection",
459                 "portable_data_hash": "%s" % workflowcollection
460             }
461         else:
462             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
463             workflowpath = "/var/lib/cwl/workflow.json#main"
464             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
465                 "kind": "json",
466                 "content": packed
467             }
468             if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
469                 container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
470
471
472         # --local means execute the workflow instead of submitting a container request
473         # --api=containers means use the containers API
474         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
475         # --disable-validate because we already validated so don't need to do it again
476         # --eval-timeout is the timeout for javascript invocation
477         # --parallel-task-count is the number of threads to use for job submission
478         # --enable/disable-reuse sets desired job reuse
479         # --collection-cache-size sets aside memory to store collections
480         command = ["arvados-cwl-runner",
481                    "--local",
482                    "--api=containers",
483                    "--no-log-timestamps",
484                    "--disable-validate",
485                    "--disable-color",
486                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
487                    "--thread-count=%s" % self.arvrunner.thread_count,
488                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
489                    "--collection-cache-size=%s" % self.collection_cache_size]
490
491         if self.output_name:
492             command.append("--output-name=" + self.output_name)
493             container_req["output_name"] = self.output_name
494
495         if self.output_tags:
496             command.append("--output-tags=" + self.output_tags)
497
498         if runtimeContext.debug:
499             command.append("--debug")
500
501         if runtimeContext.storage_classes != "default":
502             command.append("--storage-classes=" + runtimeContext.storage_classes)
503
504         if runtimeContext.intermediate_storage_classes != "default":
505             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
506
507         if self.on_error:
508             command.append("--on-error=" + self.on_error)
509
510         if self.intermediate_output_ttl:
511             command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
512
513         if self.arvrunner.trash_intermediate:
514             command.append("--trash-intermediate")
515
516         if self.arvrunner.project_uuid:
517             command.append("--project-uuid="+self.arvrunner.project_uuid)
518
519         if self.enable_dev:
520             command.append("--enable-dev")
521
522         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
523
524         container_req["command"] = command
525
526         return container_req
527
528
529     def run(self, runtimeContext):
530         runtimeContext.keepprefix = "keep:"
531         job_spec = self.arvados_job_spec(runtimeContext)
532         if self.arvrunner.project_uuid:
533             job_spec["owner_uuid"] = self.arvrunner.project_uuid
534
535         extra_submit_params = {}
536         if runtimeContext.submit_runner_cluster:
537             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
538
539         if runtimeContext.submit_request_uuid:
540             if "cluster_id" in extra_submit_params:
541                 # Doesn't make sense for "update" and actually fails
542                 del extra_submit_params["cluster_id"]
543             response = self.arvrunner.api.container_requests().update(
544                 uuid=runtimeContext.submit_request_uuid,
545                 body=job_spec,
546                 **extra_submit_params
547             ).execute(num_retries=self.arvrunner.num_retries)
548         else:
549             response = self.arvrunner.api.container_requests().create(
550                 body=job_spec,
551                 **extra_submit_params
552             ).execute(num_retries=self.arvrunner.num_retries)
553
554         self.uuid = response["uuid"]
555         self.arvrunner.process_submitted(self)
556
557         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
558
559         workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
560         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
561         url = ""
562         if workbench2:
563             url = "{}processes/{}".format(workbench2, response["uuid"])
564         elif workbench1:
565             url = "{}container_requests/{}".format(workbench1, response["uuid"])
566         if url:
567             logger.info("Monitor workflow progress at %s", url)
568
569
570     def done(self, record):
571         try:
572             container = self.arvrunner.api.containers().get(
573                 uuid=record["container_uuid"]
574             ).execute(num_retries=self.arvrunner.num_retries)
575             container["log"] = record["log_uuid"]
576         except Exception:
577             logger.exception("%s while getting runner container", self.arvrunner.label(self))
578             self.arvrunner.output_callback({}, "permanentFail")
579         else:
580             super(RunnerContainer, self).done(container)