15028: More conformance fixes
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.utils import aslist
26 from cwltool.job import JobBase
27
28 import arvados.collection
29
30 from .arvdocker import arv_docker_get_image
31 from . import done
32 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
33 from .fsaccess import CollectionFetcher
34 from .pathmapper import NoFollowPathMapper, trim_listing
35 from .perf import Perf
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 class ArvadosContainer(JobBase):
41     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
42
43     def __init__(self, runner, job_runtime,
44                  builder,   # type: Builder
45                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
46                  make_path_mapper,  # type: Callable[..., PathMapper]
47                  requirements,      # type: List[Dict[Text, Text]]
48                  hints,     # type: List[Dict[Text, Text]]
49                  name       # type: Text
50     ):
51         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
52         self.arvrunner = runner
53         self.job_runtime = job_runtime
54         self.running = False
55         self.uuid = None
56
57     def update_pipeline_component(self, r):
58         pass
59
60     def run(self, runtimeContext):
61         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
62         # which calls makeJobRunner() to get a new ArvadosContainer
63         # object.  The fields that define execution such as
64         # command_line, environment, etc are set on the
65         # ArvadosContainer object by CommandLineTool.job() before
66         # run() is called.
67
68         runtimeContext = self.job_runtime
69
70         container_request = {
71             "command": self.command_line,
72             "name": self.name,
73             "output_path": self.outdir,
74             "cwd": self.outdir,
75             "priority": runtimeContext.priority,
76             "state": "Committed",
77             "properties": {},
78         }
79         runtime_constraints = {}
80
81         if runtimeContext.project_uuid:
82             container_request["owner_uuid"] = runtimeContext.project_uuid
83
84         if self.arvrunner.secret_store.has_secret(self.command_line):
85             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
86
87         if self.arvrunner.secret_store.has_secret(self.environment):
88             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
89
90         resources = self.builder.resources
91         if resources is not None:
92             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
93             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
94
95         mounts = {
96             self.outdir: {
97                 "kind": "tmp",
98                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
99             },
100             self.tmpdir: {
101                 "kind": "tmp",
102                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
103             }
104         }
105         secret_mounts = {}
106         scheduling_parameters = {}
107
108         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
109         rf.sort(key=lambda k: k.resolved)
110         prevdir = None
111         for resolved, target, tp, stg in rf:
112             if not stg:
113                 continue
114             if prevdir and target.startswith(prevdir):
115                 continue
116             if tp == "Directory":
117                 targetdir = target
118             else:
119                 targetdir = os.path.dirname(target)
120             sp = resolved.split("/", 1)
121             pdh = sp[0][5:]   # remove "keep:"
122             mounts[targetdir] = {
123                 "kind": "collection",
124                 "portable_data_hash": pdh
125             }
126             if pdh in self.pathmapper.pdh_to_uuid:
127                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
128             if len(sp) == 2:
129                 if tp == "Directory":
130                     path = sp[1]
131                 else:
132                     path = os.path.dirname(sp[1])
133                 if path and path != "/":
134                     mounts[targetdir]["path"] = path
135             prevdir = targetdir + "/"
136
137         with Perf(metrics, "generatefiles %s" % self.name):
138             if self.generatefiles["listing"]:
139                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
140                                                     keep_client=self.arvrunner.keep_client,
141                                                     num_retries=self.arvrunner.num_retries)
142                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
143                                                     separateDirs=False)
144
145                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
146
147                 logger.debug("generatemapper is %s", sorteditems)
148
149                 with Perf(metrics, "createfiles %s" % self.name):
150                     for f, p in sorteditems:
151                         if not p.target:
152                             pass
153                         elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
154                             if p.resolved.startswith("_:"):
155                                 vwd.mkdirs(p.target)
156                             else:
157                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
158                                 vwd.copy(path, p.target, source_collection=source)
159                         elif p.type == "CreateFile":
160                             if self.arvrunner.secret_store.has_secret(p.resolved):
161                                 secret_mounts["%s/%s" % (self.outdir, p.target)] = {
162                                     "kind": "text",
163                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
164                                 }
165                             else:
166                                 with vwd.open(p.target, "w") as n:
167                                     n.write(p.resolved)
168
169                 def keepemptydirs(p):
170                     if isinstance(p, arvados.collection.RichCollectionBase):
171                         if len(p) == 0:
172                             p.open(".keep", "w").close()
173                         else:
174                             for c in p:
175                                 keepemptydirs(p[c])
176
177                 keepemptydirs(vwd)
178
179                 if not runtimeContext.current_container:
180                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
181                 info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
182                 vwd.save_new(name=info["name"],
183                              owner_uuid=runtimeContext.project_uuid,
184                              ensure_unique_name=True,
185                              trash_at=info["trash_at"],
186                              properties=info["properties"])
187
188                 prev = None
189                 for f, p in sorteditems:
190                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
191                         (prev is not None and p.target.startswith(prev))):
192                         continue
193                     mountpoint = "%s/%s" % (self.outdir, p.target)
194                     mounts[mountpoint] = {"kind": "collection",
195                                           "portable_data_hash": vwd.portable_data_hash(),
196                                           "path": p.target}
197                     if p.type.startswith("Writable"):
198                         mounts[mountpoint]["writable"] = True
199                     prev = p.target + "/"
200
201         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
202         if self.environment:
203             container_request["environment"].update(self.environment)
204
205         if self.stdin:
206             sp = self.stdin[6:].split("/", 1)
207             mounts["stdin"] = {"kind": "collection",
208                                 "portable_data_hash": sp[0],
209                                 "path": sp[1]}
210
211         if self.stderr:
212             mounts["stderr"] = {"kind": "file",
213                                 "path": "%s/%s" % (self.outdir, self.stderr)}
214
215         if self.stdout:
216             mounts["stdout"] = {"kind": "file",
217                                 "path": "%s/%s" % (self.outdir, self.stdout)}
218
219         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
220         if not docker_req:
221             docker_req = {"dockerImageId": "arvados/jobs"}
222
223         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
224                                                                     docker_req,
225                                                                     runtimeContext.pull_image,
226                                                                     runtimeContext.project_uuid)
227
228         network_req, _ = self.get_requirement("NetworkAccess")
229         if network_req:
230             runtime_constraints["API"] = network_req["networkAccess"]
231
232         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
233         if api_req:
234             runtime_constraints["API"] = True
235
236         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
237         if runtime_req:
238             if "keep_cache" in runtime_req:
239                 runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
240             if "outputDirType" in runtime_req:
241                 if runtime_req["outputDirType"] == "local_output_dir":
242                     # Currently the default behavior.
243                     pass
244                 elif runtime_req["outputDirType"] == "keep_output_dir":
245                     mounts[self.outdir]= {
246                         "kind": "collection",
247                         "writable": True
248                     }
249
250         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
251         if partition_req:
252             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
253
254         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
255         if intermediate_output_req:
256             self.output_ttl = intermediate_output_req["outputTTL"]
257         else:
258             self.output_ttl = self.arvrunner.intermediate_output_ttl
259
260         if self.output_ttl < 0:
261             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
262
263         if self.timelimit is not None:
264             scheduling_parameters["max_run_time"] = self.timelimit
265
266         extra_submit_params = {}
267         if runtimeContext.submit_runner_cluster:
268             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
269
270         container_request["output_name"] = "Output for step %s" % (self.name)
271         container_request["output_ttl"] = self.output_ttl
272         container_request["mounts"] = mounts
273         container_request["secret_mounts"] = secret_mounts
274         container_request["runtime_constraints"] = runtime_constraints
275         container_request["scheduling_parameters"] = scheduling_parameters
276
277         enable_reuse = runtimeContext.enable_reuse
278         if enable_reuse:
279             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
280             if reuse_req:
281                 enable_reuse = reuse_req["enableReuse"]
282         container_request["use_existing"] = enable_reuse
283
284         if runtimeContext.runnerjob.startswith("arvwf:"):
285             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
286             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
287             if container_request["name"] == "main":
288                 container_request["name"] = wfrecord["name"]
289             container_request["properties"]["template_uuid"] = wfuuid
290
291         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
292
293         try:
294             if runtimeContext.submit_request_uuid:
295                 response = self.arvrunner.api.container_requests().update(
296                     uuid=runtimeContext.submit_request_uuid,
297                     body=container_request,
298                     **extra_submit_params
299                 ).execute(num_retries=self.arvrunner.num_retries)
300             else:
301                 response = self.arvrunner.api.container_requests().create(
302                     body=container_request,
303                     **extra_submit_params
304                 ).execute(num_retries=self.arvrunner.num_retries)
305
306             self.uuid = response["uuid"]
307             self.arvrunner.process_submitted(self)
308
309             if response["state"] == "Final":
310                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
311             else:
312                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
313         except Exception:
314             logger.exception("%s got an error", self.arvrunner.label(self))
315             self.output_callback({}, "permanentFail")
316
317     def done(self, record):
318         outputs = {}
319         try:
320             container = self.arvrunner.api.containers().get(
321                 uuid=record["container_uuid"]
322             ).execute(num_retries=self.arvrunner.num_retries)
323             if container["state"] == "Complete":
324                 rcode = container["exit_code"]
325                 if self.successCodes and rcode in self.successCodes:
326                     processStatus = "success"
327                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
328                     processStatus = "temporaryFail"
329                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
330                     processStatus = "permanentFail"
331                 elif rcode == 0:
332                     processStatus = "success"
333                 else:
334                     processStatus = "permanentFail"
335             else:
336                 processStatus = "permanentFail"
337
338             if processStatus == "permanentFail" and record["log_uuid"]:
339                 logc = arvados.collection.CollectionReader(record["log_uuid"],
340                                                            api_client=self.arvrunner.api,
341                                                            keep_client=self.arvrunner.keep_client,
342                                                            num_retries=self.arvrunner.num_retries)
343                 label = self.arvrunner.label(self)
344                 done.logtail(
345                     logc, logger.error,
346                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
347
348             if record["output_uuid"]:
349                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
350                     # Compute the trash time to avoid requesting the collection record.
351                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
352                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
353                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
354                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
355                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
356                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
357                 self.arvrunner.add_intermediate_output(record["output_uuid"])
358
359             if container["output"]:
360                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
361         except WorkflowException as e:
362             # Only include a stack trace if in debug mode.
363             # A stack trace may obfuscate more useful output about the workflow.
364             logger.error("%s unable to collect output from %s:\n%s",
365                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
366             processStatus = "permanentFail"
367         except Exception:
368             logger.exception("%s while getting output object:", self.arvrunner.label(self))
369             processStatus = "permanentFail"
370         finally:
371             self.output_callback(outputs, processStatus)
372
373
374 class RunnerContainer(Runner):
375     """Submit and manage a container that runs arvados-cwl-runner."""
376
377     def arvados_job_spec(self, runtimeContext):
378         """Create an Arvados container request for this workflow.
379
380         The returned dict can be used to create a container passed as
381         the +body+ argument to container_requests().create().
382         """
383
384         adjustDirObjs(self.job_order, trim_listing)
385         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
386         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
387
388         secret_mounts = {}
389         for param in sorted(self.job_order.keys()):
390             if self.secret_store.has_secret(self.job_order[param]):
391                 mnt = "/secrets/s%d" % len(secret_mounts)
392                 secret_mounts[mnt] = {
393                     "kind": "text",
394                     "content": self.secret_store.retrieve(self.job_order[param])
395                 }
396                 self.job_order[param] = {"$include": mnt}
397
398         container_req = {
399             "name": self.name,
400             "output_path": "/var/spool/cwl",
401             "cwd": "/var/spool/cwl",
402             "priority": self.priority,
403             "state": "Committed",
404             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
405             "mounts": {
406                 "/var/lib/cwl/cwl.input.json": {
407                     "kind": "json",
408                     "content": self.job_order
409                 },
410                 "stdout": {
411                     "kind": "file",
412                     "path": "/var/spool/cwl/cwl.output.json"
413                 },
414                 "/var/spool/cwl": {
415                     "kind": "collection",
416                     "writable": True
417                 }
418             },
419             "secret_mounts": secret_mounts,
420             "runtime_constraints": {
421                 "vcpus": math.ceil(self.submit_runner_cores),
422                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
423                 "API": True
424             },
425             "use_existing": self.enable_reuse,
426             "properties": {}
427         }
428
429         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
430             sp = self.embedded_tool.tool["id"].split('/')
431             workflowcollection = sp[0][5:]
432             workflowname = "/".join(sp[1:])
433             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
434             container_req["mounts"]["/var/lib/cwl/workflow"] = {
435                 "kind": "collection",
436                 "portable_data_hash": "%s" % workflowcollection
437             }
438         else:
439             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
440             workflowpath = "/var/lib/cwl/workflow.json#main"
441             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
442                 "kind": "json",
443                 "content": packed
444             }
445             if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
446                 container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
447
448
449         # --local means execute the workflow instead of submitting a container request
450         # --api=containers means use the containers API
451         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
452         # --disable-validate because we already validated so don't need to do it again
453         # --eval-timeout is the timeout for javascript invocation
454         # --parallel-task-count is the number of threads to use for job submission
455         # --enable/disable-reuse sets desired job reuse
456         # --collection-cache-size sets aside memory to store collections
457         command = ["arvados-cwl-runner",
458                    "--local",
459                    "--api=containers",
460                    "--no-log-timestamps",
461                    "--disable-validate",
462                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
463                    "--thread-count=%s" % self.arvrunner.thread_count,
464                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
465                    "--collection-cache-size=%s" % self.collection_cache_size]
466
467         if self.output_name:
468             command.append("--output-name=" + self.output_name)
469             container_req["output_name"] = self.output_name
470
471         if self.output_tags:
472             command.append("--output-tags=" + self.output_tags)
473
474         if runtimeContext.debug:
475             command.append("--debug")
476
477         if runtimeContext.storage_classes != "default":
478             command.append("--storage-classes=" + runtimeContext.storage_classes)
479
480         if self.on_error:
481             command.append("--on-error=" + self.on_error)
482
483         if self.intermediate_output_ttl:
484             command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
485
486         if self.arvrunner.trash_intermediate:
487             command.append("--trash-intermediate")
488
489         if self.arvrunner.project_uuid:
490             command.append("--project-uuid="+self.arvrunner.project_uuid)
491
492         if self.enable_dev:
493             command.append("--enable-dev")
494
495         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
496
497         container_req["command"] = command
498
499         return container_req
500
501
502     def run(self, runtimeContext):
503         runtimeContext.keepprefix = "keep:"
504         job_spec = self.arvados_job_spec(runtimeContext)
505         if self.arvrunner.project_uuid:
506             job_spec["owner_uuid"] = self.arvrunner.project_uuid
507
508         extra_submit_params = {}
509         if runtimeContext.submit_runner_cluster:
510             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
511
512         if runtimeContext.submit_request_uuid:
513             if "cluster_id" in extra_submit_params:
514                 # Doesn't make sense for "update" and actually fails
515                 del extra_submit_params["cluster_id"]
516             response = self.arvrunner.api.container_requests().update(
517                 uuid=runtimeContext.submit_request_uuid,
518                 body=job_spec,
519                 **extra_submit_params
520             ).execute(num_retries=self.arvrunner.num_retries)
521         else:
522             response = self.arvrunner.api.container_requests().create(
523                 body=job_spec,
524                 **extra_submit_params
525             ).execute(num_retries=self.arvrunner.num_retries)
526
527         self.uuid = response["uuid"]
528         self.arvrunner.process_submitted(self)
529
530         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
531
532     def done(self, record):
533         try:
534             container = self.arvrunner.api.containers().get(
535                 uuid=record["container_uuid"]
536             ).execute(num_retries=self.arvrunner.num_retries)
537             container["log"] = record["log_uuid"]
538         except Exception:
539             logger.exception("%s while getting runner container", self.arvrunner.label(self))
540             self.arvrunner.output_callback({}, "permanentFail")
541         else:
542             super(RunnerContainer, self).done(container)