16169: Monkey patch load_tool.resolve_and_validate_document to fix bug
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.utils import aslist
26 from cwltool.job import JobBase
27
28 import arvados.collection
29
30 from .arvdocker import arv_docker_get_image
31 from . import done
32 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
33 from .fsaccess import CollectionFetcher
34 from .pathmapper import NoFollowPathMapper, trim_listing
35 from .perf import Perf
36 from ._version import __version__
37
38 logger = logging.getLogger('arvados.cwl-runner')
39 metrics = logging.getLogger('arvados.cwl-runner.metrics')
40
41 class ArvadosContainer(JobBase):
42     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
43
44     def __init__(self, runner, job_runtime,
45                  builder,   # type: Builder
46                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
47                  make_path_mapper,  # type: Callable[..., PathMapper]
48                  requirements,      # type: List[Dict[Text, Text]]
49                  hints,     # type: List[Dict[Text, Text]]
50                  name       # type: Text
51     ):
52         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
53         self.arvrunner = runner
54         self.job_runtime = job_runtime
55         self.running = False
56         self.uuid = None
57
58     def update_pipeline_component(self, r):
59         pass
60
61     def run(self, runtimeContext):
62         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
63         # which calls makeJobRunner() to get a new ArvadosContainer
64         # object.  The fields that define execution such as
65         # command_line, environment, etc are set on the
66         # ArvadosContainer object by CommandLineTool.job() before
67         # run() is called.
68
69         runtimeContext = self.job_runtime
70
71         container_request = {
72             "command": self.command_line,
73             "name": self.name,
74             "output_path": self.outdir,
75             "cwd": self.outdir,
76             "priority": runtimeContext.priority,
77             "state": "Committed",
78             "properties": {},
79         }
80         runtime_constraints = {}
81
82         if runtimeContext.project_uuid:
83             container_request["owner_uuid"] = runtimeContext.project_uuid
84
85         if self.arvrunner.secret_store.has_secret(self.command_line):
86             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
87
88         if self.arvrunner.secret_store.has_secret(self.environment):
89             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
90
91         resources = self.builder.resources
92         if resources is not None:
93             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
94             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
95
96         mounts = {
97             self.outdir: {
98                 "kind": "tmp",
99                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
100             },
101             self.tmpdir: {
102                 "kind": "tmp",
103                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
104             }
105         }
106         secret_mounts = {}
107         scheduling_parameters = {}
108
109         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
110         rf.sort(key=lambda k: k.resolved)
111         prevdir = None
112         for resolved, target, tp, stg in rf:
113             if not stg:
114                 continue
115             if prevdir and target.startswith(prevdir):
116                 continue
117             if tp == "Directory":
118                 targetdir = target
119             else:
120                 targetdir = os.path.dirname(target)
121             sp = resolved.split("/", 1)
122             pdh = sp[0][5:]   # remove "keep:"
123             mounts[targetdir] = {
124                 "kind": "collection",
125                 "portable_data_hash": pdh
126             }
127             if pdh in self.pathmapper.pdh_to_uuid:
128                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
129             if len(sp) == 2:
130                 if tp == "Directory":
131                     path = sp[1]
132                 else:
133                     path = os.path.dirname(sp[1])
134                 if path and path != "/":
135                     mounts[targetdir]["path"] = path
136             prevdir = targetdir + "/"
137
138         with Perf(metrics, "generatefiles %s" % self.name):
139             if self.generatefiles["listing"]:
140                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
141                                                     keep_client=self.arvrunner.keep_client,
142                                                     num_retries=self.arvrunner.num_retries)
143                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
144                                                     separateDirs=False)
145
146                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
147
148                 logger.debug("generatemapper is %s", sorteditems)
149
150                 with Perf(metrics, "createfiles %s" % self.name):
151                     for f, p in sorteditems:
152                         if not p.target:
153                             pass
154                         elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
155                             if p.resolved.startswith("_:"):
156                                 vwd.mkdirs(p.target)
157                             else:
158                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
159                                 vwd.copy(path or ".", p.target, source_collection=source)
160                         elif p.type == "CreateFile":
161                             if self.arvrunner.secret_store.has_secret(p.resolved):
162                                 secret_mounts["%s/%s" % (self.outdir, p.target)] = {
163                                     "kind": "text",
164                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
165                                 }
166                             else:
167                                 with vwd.open(p.target, "w") as n:
168                                     n.write(p.resolved)
169
170                 def keepemptydirs(p):
171                     if isinstance(p, arvados.collection.RichCollectionBase):
172                         if len(p) == 0:
173                             p.open(".keep", "w").close()
174                         else:
175                             for c in p:
176                                 keepemptydirs(p[c])
177
178                 keepemptydirs(vwd)
179
180                 if not runtimeContext.current_container:
181                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
182                 info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
183                 vwd.save_new(name=info["name"],
184                              owner_uuid=runtimeContext.project_uuid,
185                              ensure_unique_name=True,
186                              trash_at=info["trash_at"],
187                              properties=info["properties"])
188
189                 prev = None
190                 for f, p in sorteditems:
191                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
192                         (prev is not None and p.target.startswith(prev))):
193                         continue
194                     mountpoint = "%s/%s" % (self.outdir, p.target)
195                     mounts[mountpoint] = {"kind": "collection",
196                                           "portable_data_hash": vwd.portable_data_hash(),
197                                           "path": p.target}
198                     if p.type.startswith("Writable"):
199                         mounts[mountpoint]["writable"] = True
200                     prev = p.target + "/"
201
202         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
203         if self.environment:
204             container_request["environment"].update(self.environment)
205
206         if self.stdin:
207             sp = self.stdin[6:].split("/", 1)
208             mounts["stdin"] = {"kind": "collection",
209                                 "portable_data_hash": sp[0],
210                                 "path": sp[1]}
211
212         if self.stderr:
213             mounts["stderr"] = {"kind": "file",
214                                 "path": "%s/%s" % (self.outdir, self.stderr)}
215
216         if self.stdout:
217             mounts["stdout"] = {"kind": "file",
218                                 "path": "%s/%s" % (self.outdir, self.stdout)}
219
220         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
221         if not docker_req:
222             docker_req = {"dockerImageId": "arvados/jobs:"+__version__}
223
224         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
225                                                                     docker_req,
226                                                                     runtimeContext.pull_image,
227                                                                     runtimeContext.project_uuid)
228
229         network_req, _ = self.get_requirement("NetworkAccess")
230         if network_req:
231             runtime_constraints["API"] = network_req["networkAccess"]
232
233         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
234         if api_req:
235             runtime_constraints["API"] = True
236
237         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
238         if runtime_req:
239             if "keep_cache" in runtime_req:
240                 runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
241             if "outputDirType" in runtime_req:
242                 if runtime_req["outputDirType"] == "local_output_dir":
243                     # Currently the default behavior.
244                     pass
245                 elif runtime_req["outputDirType"] == "keep_output_dir":
246                     mounts[self.outdir]= {
247                         "kind": "collection",
248                         "writable": True
249                     }
250
251         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
252         if partition_req:
253             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
254
255         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
256         if intermediate_output_req:
257             self.output_ttl = intermediate_output_req["outputTTL"]
258         else:
259             self.output_ttl = self.arvrunner.intermediate_output_ttl
260
261         if self.output_ttl < 0:
262             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
263
264         if self.timelimit is not None and self.timelimit > 0:
265             scheduling_parameters["max_run_time"] = self.timelimit
266
267         extra_submit_params = {}
268         if runtimeContext.submit_runner_cluster:
269             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
270
271         container_request["output_name"] = "Output for step %s" % (self.name)
272         container_request["output_ttl"] = self.output_ttl
273         container_request["mounts"] = mounts
274         container_request["secret_mounts"] = secret_mounts
275         container_request["runtime_constraints"] = runtime_constraints
276         container_request["scheduling_parameters"] = scheduling_parameters
277
278         enable_reuse = runtimeContext.enable_reuse
279         if enable_reuse:
280             reuse_req, _ = self.get_requirement("WorkReuse")
281             if reuse_req:
282                 enable_reuse = reuse_req["enableReuse"]
283             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
284             if reuse_req:
285                 enable_reuse = reuse_req["enableReuse"]
286         container_request["use_existing"] = enable_reuse
287
288         if runtimeContext.runnerjob.startswith("arvwf:"):
289             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
290             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
291             if container_request["name"] == "main":
292                 container_request["name"] = wfrecord["name"]
293             container_request["properties"]["template_uuid"] = wfuuid
294
295         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
296
297         try:
298             if runtimeContext.submit_request_uuid:
299                 response = self.arvrunner.api.container_requests().update(
300                     uuid=runtimeContext.submit_request_uuid,
301                     body=container_request,
302                     **extra_submit_params
303                 ).execute(num_retries=self.arvrunner.num_retries)
304             else:
305                 response = self.arvrunner.api.container_requests().create(
306                     body=container_request,
307                     **extra_submit_params
308                 ).execute(num_retries=self.arvrunner.num_retries)
309
310             self.uuid = response["uuid"]
311             self.arvrunner.process_submitted(self)
312
313             if response["state"] == "Final":
314                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
315             else:
316                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
317         except Exception:
318             logger.exception("%s got an error", self.arvrunner.label(self))
319             self.output_callback({}, "permanentFail")
320
321     def done(self, record):
322         outputs = {}
323         try:
324             container = self.arvrunner.api.containers().get(
325                 uuid=record["container_uuid"]
326             ).execute(num_retries=self.arvrunner.num_retries)
327             if container["state"] == "Complete":
328                 rcode = container["exit_code"]
329                 if self.successCodes and rcode in self.successCodes:
330                     processStatus = "success"
331                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
332                     processStatus = "temporaryFail"
333                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
334                     processStatus = "permanentFail"
335                 elif rcode == 0:
336                     processStatus = "success"
337                 else:
338                     processStatus = "permanentFail"
339             else:
340                 processStatus = "permanentFail"
341
342             if processStatus == "permanentFail" and record["log_uuid"]:
343                 logc = arvados.collection.CollectionReader(record["log_uuid"],
344                                                            api_client=self.arvrunner.api,
345                                                            keep_client=self.arvrunner.keep_client,
346                                                            num_retries=self.arvrunner.num_retries)
347                 label = self.arvrunner.label(self)
348                 done.logtail(
349                     logc, logger.error,
350                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
351
352             if record["output_uuid"]:
353                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
354                     # Compute the trash time to avoid requesting the collection record.
355                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
356                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
357                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
358                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
359                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
360                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
361                 self.arvrunner.add_intermediate_output(record["output_uuid"])
362
363             if container["output"]:
364                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
365         except WorkflowException as e:
366             # Only include a stack trace if in debug mode.
367             # A stack trace may obfuscate more useful output about the workflow.
368             logger.error("%s unable to collect output from %s:\n%s",
369                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
370             processStatus = "permanentFail"
371         except Exception:
372             logger.exception("%s while getting output object:", self.arvrunner.label(self))
373             processStatus = "permanentFail"
374         finally:
375             self.output_callback(outputs, processStatus)
376
377
378 class RunnerContainer(Runner):
379     """Submit and manage a container that runs arvados-cwl-runner."""
380
381     def arvados_job_spec(self, runtimeContext):
382         """Create an Arvados container request for this workflow.
383
384         The returned dict can be used to create a container passed as
385         the +body+ argument to container_requests().create().
386         """
387
388         adjustDirObjs(self.job_order, trim_listing)
389         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
390         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
391
392         secret_mounts = {}
393         for param in sorted(self.job_order.keys()):
394             if self.secret_store.has_secret(self.job_order[param]):
395                 mnt = "/secrets/s%d" % len(secret_mounts)
396                 secret_mounts[mnt] = {
397                     "kind": "text",
398                     "content": self.secret_store.retrieve(self.job_order[param])
399                 }
400                 self.job_order[param] = {"$include": mnt}
401
402         container_req = {
403             "name": self.name,
404             "output_path": "/var/spool/cwl",
405             "cwd": "/var/spool/cwl",
406             "priority": self.priority,
407             "state": "Committed",
408             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
409             "mounts": {
410                 "/var/lib/cwl/cwl.input.json": {
411                     "kind": "json",
412                     "content": self.job_order
413                 },
414                 "stdout": {
415                     "kind": "file",
416                     "path": "/var/spool/cwl/cwl.output.json"
417                 },
418                 "/var/spool/cwl": {
419                     "kind": "collection",
420                     "writable": True
421                 }
422             },
423             "secret_mounts": secret_mounts,
424             "runtime_constraints": {
425                 "vcpus": math.ceil(self.submit_runner_cores),
426                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
427                 "API": True
428             },
429             "use_existing": False, # Never reuse the runner container - see #15497.
430             "properties": {}
431         }
432
433         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
434             sp = self.embedded_tool.tool["id"].split('/')
435             workflowcollection = sp[0][5:]
436             workflowname = "/".join(sp[1:])
437             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
438             container_req["mounts"]["/var/lib/cwl/workflow"] = {
439                 "kind": "collection",
440                 "portable_data_hash": "%s" % workflowcollection
441             }
442         else:
443             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
444             workflowpath = "/var/lib/cwl/workflow.json#main"
445             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
446                 "kind": "json",
447                 "content": packed
448             }
449             if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
450                 container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
451
452
453         # --local means execute the workflow instead of submitting a container request
454         # --api=containers means use the containers API
455         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
456         # --disable-validate because we already validated so don't need to do it again
457         # --eval-timeout is the timeout for javascript invocation
458         # --parallel-task-count is the number of threads to use for job submission
459         # --enable/disable-reuse sets desired job reuse
460         # --collection-cache-size sets aside memory to store collections
461         command = ["arvados-cwl-runner",
462                    "--local",
463                    "--api=containers",
464                    "--no-log-timestamps",
465                    "--disable-validate",
466                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
467                    "--thread-count=%s" % self.arvrunner.thread_count,
468                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
469                    "--collection-cache-size=%s" % self.collection_cache_size]
470
471         if self.output_name:
472             command.append("--output-name=" + self.output_name)
473             container_req["output_name"] = self.output_name
474
475         if self.output_tags:
476             command.append("--output-tags=" + self.output_tags)
477
478         if runtimeContext.debug:
479             command.append("--debug")
480
481         if runtimeContext.storage_classes != "default":
482             command.append("--storage-classes=" + runtimeContext.storage_classes)
483
484         if self.on_error:
485             command.append("--on-error=" + self.on_error)
486
487         if self.intermediate_output_ttl:
488             command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
489
490         if self.arvrunner.trash_intermediate:
491             command.append("--trash-intermediate")
492
493         if self.arvrunner.project_uuid:
494             command.append("--project-uuid="+self.arvrunner.project_uuid)
495
496         if self.enable_dev:
497             command.append("--enable-dev")
498
499         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
500
501         container_req["command"] = command
502
503         return container_req
504
505
506     def run(self, runtimeContext):
507         runtimeContext.keepprefix = "keep:"
508         job_spec = self.arvados_job_spec(runtimeContext)
509         if self.arvrunner.project_uuid:
510             job_spec["owner_uuid"] = self.arvrunner.project_uuid
511
512         extra_submit_params = {}
513         if runtimeContext.submit_runner_cluster:
514             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
515
516         if runtimeContext.submit_request_uuid:
517             if "cluster_id" in extra_submit_params:
518                 # Doesn't make sense for "update" and actually fails
519                 del extra_submit_params["cluster_id"]
520             response = self.arvrunner.api.container_requests().update(
521                 uuid=runtimeContext.submit_request_uuid,
522                 body=job_spec,
523                 **extra_submit_params
524             ).execute(num_retries=self.arvrunner.num_retries)
525         else:
526             response = self.arvrunner.api.container_requests().create(
527                 body=job_spec,
528                 **extra_submit_params
529             ).execute(num_retries=self.arvrunner.num_retries)
530
531         self.uuid = response["uuid"]
532         self.arvrunner.process_submitted(self)
533
534         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
535
536     def done(self, record):
537         try:
538             container = self.arvrunner.api.containers().get(
539                 uuid=record["container_uuid"]
540             ).execute(num_retries=self.arvrunner.num_retries)
541             container["log"] = record["log_uuid"]
542         except Exception:
543             logger.exception("%s while getting runner container", self.arvrunner.label(self))
544             self.arvrunner.output_callback({}, "permanentFail")
545         else:
546             super(RunnerContainer, self).done(container)