Merge branch '15028-cwl-v1.1' refs #15028
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.utils import aslist
26 from cwltool.job import JobBase
27
28 import arvados.collection
29
30 from .arvdocker import arv_docker_get_image
31 from . import done
32 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
33 from .fsaccess import CollectionFetcher
34 from .pathmapper import NoFollowPathMapper, trim_listing
35 from .perf import Perf
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 class ArvadosContainer(JobBase):
41     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
42
43     def __init__(self, runner, job_runtime,
44                  builder,   # type: Builder
45                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
46                  make_path_mapper,  # type: Callable[..., PathMapper]
47                  requirements,      # type: List[Dict[Text, Text]]
48                  hints,     # type: List[Dict[Text, Text]]
49                  name       # type: Text
50     ):
51         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
52         self.arvrunner = runner
53         self.job_runtime = job_runtime
54         self.running = False
55         self.uuid = None
56
57     def update_pipeline_component(self, r):
58         pass
59
60     def run(self, runtimeContext):
61         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
62         # which calls makeJobRunner() to get a new ArvadosContainer
63         # object.  The fields that define execution such as
64         # command_line, environment, etc are set on the
65         # ArvadosContainer object by CommandLineTool.job() before
66         # run() is called.
67
68         runtimeContext = self.job_runtime
69
70         container_request = {
71             "command": self.command_line,
72             "name": self.name,
73             "output_path": self.outdir,
74             "cwd": self.outdir,
75             "priority": runtimeContext.priority,
76             "state": "Committed",
77             "properties": {},
78         }
79         runtime_constraints = {}
80
81         if runtimeContext.project_uuid:
82             container_request["owner_uuid"] = runtimeContext.project_uuid
83
84         if self.arvrunner.secret_store.has_secret(self.command_line):
85             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
86
87         if self.arvrunner.secret_store.has_secret(self.environment):
88             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
89
90         resources = self.builder.resources
91         if resources is not None:
92             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
93             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
94
95         mounts = {
96             self.outdir: {
97                 "kind": "tmp",
98                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
99             },
100             self.tmpdir: {
101                 "kind": "tmp",
102                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
103             }
104         }
105         secret_mounts = {}
106         scheduling_parameters = {}
107
108         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
109         rf.sort(key=lambda k: k.resolved)
110         prevdir = None
111         for resolved, target, tp, stg in rf:
112             if not stg:
113                 continue
114             if prevdir and target.startswith(prevdir):
115                 continue
116             if tp == "Directory":
117                 targetdir = target
118             else:
119                 targetdir = os.path.dirname(target)
120             sp = resolved.split("/", 1)
121             pdh = sp[0][5:]   # remove "keep:"
122             mounts[targetdir] = {
123                 "kind": "collection",
124                 "portable_data_hash": pdh
125             }
126             if pdh in self.pathmapper.pdh_to_uuid:
127                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
128             if len(sp) == 2:
129                 if tp == "Directory":
130                     path = sp[1]
131                 else:
132                     path = os.path.dirname(sp[1])
133                 if path and path != "/":
134                     mounts[targetdir]["path"] = path
135             prevdir = targetdir + "/"
136
137         with Perf(metrics, "generatefiles %s" % self.name):
138             if self.generatefiles["listing"]:
139                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
140                                                     keep_client=self.arvrunner.keep_client,
141                                                     num_retries=self.arvrunner.num_retries)
142                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
143                                                     separateDirs=False)
144
145                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
146
147                 logger.debug("generatemapper is %s", sorteditems)
148
149                 with Perf(metrics, "createfiles %s" % self.name):
150                     for f, p in sorteditems:
151                         if not p.target:
152                             pass
153                         elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
154                             if p.resolved.startswith("_:"):
155                                 vwd.mkdirs(p.target)
156                             else:
157                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
158                                 vwd.copy(path or ".", p.target, source_collection=source)
159                         elif p.type == "CreateFile":
160                             if self.arvrunner.secret_store.has_secret(p.resolved):
161                                 secret_mounts["%s/%s" % (self.outdir, p.target)] = {
162                                     "kind": "text",
163                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
164                                 }
165                             else:
166                                 with vwd.open(p.target, "w") as n:
167                                     n.write(p.resolved)
168
169                 def keepemptydirs(p):
170                     if isinstance(p, arvados.collection.RichCollectionBase):
171                         if len(p) == 0:
172                             p.open(".keep", "w").close()
173                         else:
174                             for c in p:
175                                 keepemptydirs(p[c])
176
177                 keepemptydirs(vwd)
178
179                 if not runtimeContext.current_container:
180                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
181                 info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
182                 vwd.save_new(name=info["name"],
183                              owner_uuid=runtimeContext.project_uuid,
184                              ensure_unique_name=True,
185                              trash_at=info["trash_at"],
186                              properties=info["properties"])
187
188                 prev = None
189                 for f, p in sorteditems:
190                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
191                         (prev is not None and p.target.startswith(prev))):
192                         continue
193                     mountpoint = "%s/%s" % (self.outdir, p.target)
194                     mounts[mountpoint] = {"kind": "collection",
195                                           "portable_data_hash": vwd.portable_data_hash(),
196                                           "path": p.target}
197                     if p.type.startswith("Writable"):
198                         mounts[mountpoint]["writable"] = True
199                     prev = p.target + "/"
200
201         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
202         if self.environment:
203             container_request["environment"].update(self.environment)
204
205         if self.stdin:
206             sp = self.stdin[6:].split("/", 1)
207             mounts["stdin"] = {"kind": "collection",
208                                 "portable_data_hash": sp[0],
209                                 "path": sp[1]}
210
211         if self.stderr:
212             mounts["stderr"] = {"kind": "file",
213                                 "path": "%s/%s" % (self.outdir, self.stderr)}
214
215         if self.stdout:
216             mounts["stdout"] = {"kind": "file",
217                                 "path": "%s/%s" % (self.outdir, self.stdout)}
218
219         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
220         if not docker_req:
221             docker_req = {"dockerImageId": "arvados/jobs"}
222
223         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
224                                                                     docker_req,
225                                                                     runtimeContext.pull_image,
226                                                                     runtimeContext.project_uuid)
227
228         network_req, _ = self.get_requirement("NetworkAccess")
229         if network_req:
230             runtime_constraints["API"] = network_req["networkAccess"]
231
232         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
233         if api_req:
234             runtime_constraints["API"] = True
235
236         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
237         if runtime_req:
238             if "keep_cache" in runtime_req:
239                 runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
240             if "outputDirType" in runtime_req:
241                 if runtime_req["outputDirType"] == "local_output_dir":
242                     # Currently the default behavior.
243                     pass
244                 elif runtime_req["outputDirType"] == "keep_output_dir":
245                     mounts[self.outdir]= {
246                         "kind": "collection",
247                         "writable": True
248                     }
249
250         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
251         if partition_req:
252             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
253
254         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
255         if intermediate_output_req:
256             self.output_ttl = intermediate_output_req["outputTTL"]
257         else:
258             self.output_ttl = self.arvrunner.intermediate_output_ttl
259
260         if self.output_ttl < 0:
261             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
262
263         if self.timelimit is not None and self.timelimit > 0:
264             scheduling_parameters["max_run_time"] = self.timelimit
265
266         extra_submit_params = {}
267         if runtimeContext.submit_runner_cluster:
268             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
269
270         container_request["output_name"] = "Output for step %s" % (self.name)
271         container_request["output_ttl"] = self.output_ttl
272         container_request["mounts"] = mounts
273         container_request["secret_mounts"] = secret_mounts
274         container_request["runtime_constraints"] = runtime_constraints
275         container_request["scheduling_parameters"] = scheduling_parameters
276
277         enable_reuse = runtimeContext.enable_reuse
278         if enable_reuse:
279             reuse_req, _ = self.get_requirement("WorkReuse")
280             if reuse_req:
281                 enable_reuse = reuse_req["enableReuse"]
282             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
283             if reuse_req:
284                 enable_reuse = reuse_req["enableReuse"]
285         container_request["use_existing"] = enable_reuse
286
287         if runtimeContext.runnerjob.startswith("arvwf:"):
288             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
289             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
290             if container_request["name"] == "main":
291                 container_request["name"] = wfrecord["name"]
292             container_request["properties"]["template_uuid"] = wfuuid
293
294         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
295
296         try:
297             if runtimeContext.submit_request_uuid:
298                 response = self.arvrunner.api.container_requests().update(
299                     uuid=runtimeContext.submit_request_uuid,
300                     body=container_request,
301                     **extra_submit_params
302                 ).execute(num_retries=self.arvrunner.num_retries)
303             else:
304                 response = self.arvrunner.api.container_requests().create(
305                     body=container_request,
306                     **extra_submit_params
307                 ).execute(num_retries=self.arvrunner.num_retries)
308
309             self.uuid = response["uuid"]
310             self.arvrunner.process_submitted(self)
311
312             if response["state"] == "Final":
313                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
314             else:
315                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
316         except Exception:
317             logger.exception("%s got an error", self.arvrunner.label(self))
318             self.output_callback({}, "permanentFail")
319
320     def done(self, record):
321         outputs = {}
322         try:
323             container = self.arvrunner.api.containers().get(
324                 uuid=record["container_uuid"]
325             ).execute(num_retries=self.arvrunner.num_retries)
326             if container["state"] == "Complete":
327                 rcode = container["exit_code"]
328                 if self.successCodes and rcode in self.successCodes:
329                     processStatus = "success"
330                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
331                     processStatus = "temporaryFail"
332                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
333                     processStatus = "permanentFail"
334                 elif rcode == 0:
335                     processStatus = "success"
336                 else:
337                     processStatus = "permanentFail"
338             else:
339                 processStatus = "permanentFail"
340
341             if processStatus == "permanentFail" and record["log_uuid"]:
342                 logc = arvados.collection.CollectionReader(record["log_uuid"],
343                                                            api_client=self.arvrunner.api,
344                                                            keep_client=self.arvrunner.keep_client,
345                                                            num_retries=self.arvrunner.num_retries)
346                 label = self.arvrunner.label(self)
347                 done.logtail(
348                     logc, logger.error,
349                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
350
351             if record["output_uuid"]:
352                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
353                     # Compute the trash time to avoid requesting the collection record.
354                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
355                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
356                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
357                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
358                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
359                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
360                 self.arvrunner.add_intermediate_output(record["output_uuid"])
361
362             if container["output"]:
363                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
364         except WorkflowException as e:
365             # Only include a stack trace if in debug mode.
366             # A stack trace may obfuscate more useful output about the workflow.
367             logger.error("%s unable to collect output from %s:\n%s",
368                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
369             processStatus = "permanentFail"
370         except Exception:
371             logger.exception("%s while getting output object:", self.arvrunner.label(self))
372             processStatus = "permanentFail"
373         finally:
374             self.output_callback(outputs, processStatus)
375
376
377 class RunnerContainer(Runner):
378     """Submit and manage a container that runs arvados-cwl-runner."""
379
380     def arvados_job_spec(self, runtimeContext):
381         """Create an Arvados container request for this workflow.
382
383         The returned dict can be used to create a container passed as
384         the +body+ argument to container_requests().create().
385         """
386
387         adjustDirObjs(self.job_order, trim_listing)
388         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
389         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
390
391         secret_mounts = {}
392         for param in sorted(self.job_order.keys()):
393             if self.secret_store.has_secret(self.job_order[param]):
394                 mnt = "/secrets/s%d" % len(secret_mounts)
395                 secret_mounts[mnt] = {
396                     "kind": "text",
397                     "content": self.secret_store.retrieve(self.job_order[param])
398                 }
399                 self.job_order[param] = {"$include": mnt}
400
401         container_req = {
402             "name": self.name,
403             "output_path": "/var/spool/cwl",
404             "cwd": "/var/spool/cwl",
405             "priority": self.priority,
406             "state": "Committed",
407             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
408             "mounts": {
409                 "/var/lib/cwl/cwl.input.json": {
410                     "kind": "json",
411                     "content": self.job_order
412                 },
413                 "stdout": {
414                     "kind": "file",
415                     "path": "/var/spool/cwl/cwl.output.json"
416                 },
417                 "/var/spool/cwl": {
418                     "kind": "collection",
419                     "writable": True
420                 }
421             },
422             "secret_mounts": secret_mounts,
423             "runtime_constraints": {
424                 "vcpus": math.ceil(self.submit_runner_cores),
425                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
426                 "API": True
427             },
428             "use_existing": self.enable_reuse,
429             "properties": {}
430         }
431
432         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
433             sp = self.embedded_tool.tool["id"].split('/')
434             workflowcollection = sp[0][5:]
435             workflowname = "/".join(sp[1:])
436             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
437             container_req["mounts"]["/var/lib/cwl/workflow"] = {
438                 "kind": "collection",
439                 "portable_data_hash": "%s" % workflowcollection
440             }
441         else:
442             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
443             workflowpath = "/var/lib/cwl/workflow.json#main"
444             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
445                 "kind": "json",
446                 "content": packed
447             }
448             if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
449                 container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
450
451
452         # --local means execute the workflow instead of submitting a container request
453         # --api=containers means use the containers API
454         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
455         # --disable-validate because we already validated so don't need to do it again
456         # --eval-timeout is the timeout for javascript invocation
457         # --parallel-task-count is the number of threads to use for job submission
458         # --enable/disable-reuse sets desired job reuse
459         # --collection-cache-size sets aside memory to store collections
460         command = ["arvados-cwl-runner",
461                    "--local",
462                    "--api=containers",
463                    "--no-log-timestamps",
464                    "--disable-validate",
465                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
466                    "--thread-count=%s" % self.arvrunner.thread_count,
467                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
468                    "--collection-cache-size=%s" % self.collection_cache_size]
469
470         if self.output_name:
471             command.append("--output-name=" + self.output_name)
472             container_req["output_name"] = self.output_name
473
474         if self.output_tags:
475             command.append("--output-tags=" + self.output_tags)
476
477         if runtimeContext.debug:
478             command.append("--debug")
479
480         if runtimeContext.storage_classes != "default":
481             command.append("--storage-classes=" + runtimeContext.storage_classes)
482
483         if self.on_error:
484             command.append("--on-error=" + self.on_error)
485
486         if self.intermediate_output_ttl:
487             command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
488
489         if self.arvrunner.trash_intermediate:
490             command.append("--trash-intermediate")
491
492         if self.arvrunner.project_uuid:
493             command.append("--project-uuid="+self.arvrunner.project_uuid)
494
495         if self.enable_dev:
496             command.append("--enable-dev")
497
498         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
499
500         container_req["command"] = command
501
502         return container_req
503
504
505     def run(self, runtimeContext):
506         runtimeContext.keepprefix = "keep:"
507         job_spec = self.arvados_job_spec(runtimeContext)
508         if self.arvrunner.project_uuid:
509             job_spec["owner_uuid"] = self.arvrunner.project_uuid
510
511         extra_submit_params = {}
512         if runtimeContext.submit_runner_cluster:
513             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
514
515         if runtimeContext.submit_request_uuid:
516             if "cluster_id" in extra_submit_params:
517                 # Doesn't make sense for "update" and actually fails
518                 del extra_submit_params["cluster_id"]
519             response = self.arvrunner.api.container_requests().update(
520                 uuid=runtimeContext.submit_request_uuid,
521                 body=job_spec,
522                 **extra_submit_params
523             ).execute(num_retries=self.arvrunner.num_retries)
524         else:
525             response = self.arvrunner.api.container_requests().create(
526                 body=job_spec,
527                 **extra_submit_params
528             ).execute(num_retries=self.arvrunner.num_retries)
529
530         self.uuid = response["uuid"]
531         self.arvrunner.process_submitted(self)
532
533         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
534
535     def done(self, record):
536         try:
537             container = self.arvrunner.api.containers().get(
538                 uuid=record["container_uuid"]
539             ).execute(num_retries=self.arvrunner.num_retries)
540             container["log"] = record["log_uuid"]
541         except Exception:
542             logger.exception("%s while getting runner container", self.arvrunner.label(self))
543             self.arvrunner.output_callback({}, "permanentFail")
544         else:
545             super(RunnerContainer, self).done(container)