b0dff491f575634e5947637be099c9c570615451
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.utils import aslist
26 from cwltool.job import JobBase
27
28 import arvados.collection
29
30 from .arvdocker import arv_docker_get_image
31 from . import done
32 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
33 from .fsaccess import CollectionFetcher
34 from .pathmapper import NoFollowPathMapper, trim_listing
35 from .perf import Perf
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 class ArvadosContainer(JobBase):
41     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
42
43     def __init__(self, runner, job_runtime,
44                  builder,   # type: Builder
45                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
46                  make_path_mapper,  # type: Callable[..., PathMapper]
47                  requirements,      # type: List[Dict[Text, Text]]
48                  hints,     # type: List[Dict[Text, Text]]
49                  name       # type: Text
50     ):
51         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
52         self.arvrunner = runner
53         self.job_runtime = job_runtime
54         self.running = False
55         self.uuid = None
56
57     def update_pipeline_component(self, r):
58         pass
59
60     def run(self, runtimeContext):
61         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
62         # which calls makeJobRunner() to get a new ArvadosContainer
63         # object.  The fields that define execution such as
64         # command_line, environment, etc are set on the
65         # ArvadosContainer object by CommandLineTool.job() before
66         # run() is called.
67
68         runtimeContext = self.job_runtime
69
70         container_request = {
71             "command": self.command_line,
72             "name": self.name,
73             "output_path": self.outdir,
74             "cwd": self.outdir,
75             "priority": runtimeContext.priority,
76             "state": "Committed",
77             "properties": {},
78         }
79         runtime_constraints = {}
80
81         if runtimeContext.project_uuid:
82             container_request["owner_uuid"] = runtimeContext.project_uuid
83
84         if self.arvrunner.secret_store.has_secret(self.command_line):
85             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
86
87         if self.arvrunner.secret_store.has_secret(self.environment):
88             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
89
90         resources = self.builder.resources
91         if resources is not None:
92             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
93             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
94
95         mounts = {
96             self.outdir: {
97                 "kind": "tmp",
98                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
99             },
100             self.tmpdir: {
101                 "kind": "tmp",
102                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
103             }
104         }
105         secret_mounts = {}
106         scheduling_parameters = {}
107
108         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
109         rf.sort(key=lambda k: k.resolved)
110         prevdir = None
111         for resolved, target, tp, stg in rf:
112             if not stg:
113                 continue
114             if prevdir and target.startswith(prevdir):
115                 continue
116             if tp == "Directory":
117                 targetdir = target
118             else:
119                 targetdir = os.path.dirname(target)
120             sp = resolved.split("/", 1)
121             pdh = sp[0][5:]   # remove "keep:"
122             mounts[targetdir] = {
123                 "kind": "collection",
124                 "portable_data_hash": pdh
125             }
126             if pdh in self.pathmapper.pdh_to_uuid:
127                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
128             if len(sp) == 2:
129                 if tp == "Directory":
130                     path = sp[1]
131                 else:
132                     path = os.path.dirname(sp[1])
133                 if path and path != "/":
134                     mounts[targetdir]["path"] = path
135             prevdir = targetdir + "/"
136
137         with Perf(metrics, "generatefiles %s" % self.name):
138             if self.generatefiles["listing"]:
139                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
140                                                     keep_client=self.arvrunner.keep_client,
141                                                     num_retries=self.arvrunner.num_retries)
142                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
143                                                     separateDirs=False)
144
145                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
146
147                 logger.debug("generatemapper is %s", sorteditems)
148
149                 with Perf(metrics, "createfiles %s" % self.name):
150                     for f, p in sorteditems:
151                         if not p.target:
152                             pass
153                         elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
154                             if p.resolved.startswith("_:"):
155                                 vwd.mkdirs(p.target)
156                             else:
157                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
158                                 vwd.copy(path, p.target, source_collection=source)
159                         elif p.type == "CreateFile":
160                             if self.arvrunner.secret_store.has_secret(p.resolved):
161                                 secret_mounts["%s/%s" % (self.outdir, p.target)] = {
162                                     "kind": "text",
163                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
164                                 }
165                             else:
166                                 with vwd.open(p.target, "w") as n:
167                                     n.write(p.resolved)
168
169                 def keepemptydirs(p):
170                     if isinstance(p, arvados.collection.RichCollectionBase):
171                         if len(p) == 0:
172                             p.open(".keep", "w").close()
173                         else:
174                             for c in p:
175                                 keepemptydirs(p[c])
176
177                 keepemptydirs(vwd)
178
179                 if not runtimeContext.current_container:
180                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
181                 info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
182                 vwd.save_new(name=info["name"],
183                              owner_uuid=runtimeContext.project_uuid,
184                              ensure_unique_name=True,
185                              trash_at=info["trash_at"],
186                              properties=info["properties"])
187
188                 prev = None
189                 for f, p in sorteditems:
190                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
191                         (prev is not None and p.target.startswith(prev))):
192                         continue
193                     mountpoint = "%s/%s" % (self.outdir, p.target)
194                     mounts[mountpoint] = {"kind": "collection",
195                                           "portable_data_hash": vwd.portable_data_hash(),
196                                           "path": p.target}
197                     if p.type.startswith("Writable"):
198                         mounts[mountpoint]["writable"] = True
199                     prev = p.target + "/"
200
201         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
202         if self.environment:
203             container_request["environment"].update(self.environment)
204
205         if self.stdin:
206             sp = self.stdin[6:].split("/", 1)
207             mounts["stdin"] = {"kind": "collection",
208                                 "portable_data_hash": sp[0],
209                                 "path": sp[1]}
210
211         if self.stderr:
212             mounts["stderr"] = {"kind": "file",
213                                 "path": "%s/%s" % (self.outdir, self.stderr)}
214
215         if self.stdout:
216             mounts["stdout"] = {"kind": "file",
217                                 "path": "%s/%s" % (self.outdir, self.stdout)}
218
219         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
220         if not docker_req:
221             docker_req = {"dockerImageId": "arvados/jobs"}
222
223         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
224                                                                     docker_req,
225                                                                     runtimeContext.pull_image,
226                                                                     runtimeContext.project_uuid)
227
228         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
229         if api_req:
230             runtime_constraints["API"] = True
231
232         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
233         if runtime_req:
234             if "keep_cache" in runtime_req:
235                 runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
236             if "outputDirType" in runtime_req:
237                 if runtime_req["outputDirType"] == "local_output_dir":
238                     # Currently the default behavior.
239                     pass
240                 elif runtime_req["outputDirType"] == "keep_output_dir":
241                     mounts[self.outdir]= {
242                         "kind": "collection",
243                         "writable": True
244                     }
245
246         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
247         if partition_req:
248             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
249
250         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
251         if intermediate_output_req:
252             self.output_ttl = intermediate_output_req["outputTTL"]
253         else:
254             self.output_ttl = self.arvrunner.intermediate_output_ttl
255
256         if self.output_ttl < 0:
257             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
258
259         if self.timelimit is not None:
260             scheduling_parameters["max_run_time"] = self.timelimit
261
262         extra_submit_params = {}
263         if runtimeContext.submit_runner_cluster:
264             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
265
266         container_request["output_name"] = "Output for step %s" % (self.name)
267         container_request["output_ttl"] = self.output_ttl
268         container_request["mounts"] = mounts
269         container_request["secret_mounts"] = secret_mounts
270         container_request["runtime_constraints"] = runtime_constraints
271         container_request["scheduling_parameters"] = scheduling_parameters
272
273         enable_reuse = runtimeContext.enable_reuse
274         if enable_reuse:
275             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
276             if reuse_req:
277                 enable_reuse = reuse_req["enableReuse"]
278         container_request["use_existing"] = enable_reuse
279
280         if runtimeContext.runnerjob.startswith("arvwf:"):
281             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
282             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
283             if container_request["name"] == "main":
284                 container_request["name"] = wfrecord["name"]
285             container_request["properties"]["template_uuid"] = wfuuid
286
287         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
288
289         try:
290             if runtimeContext.submit_request_uuid:
291                 response = self.arvrunner.api.container_requests().update(
292                     uuid=runtimeContext.submit_request_uuid,
293                     body=container_request,
294                     **extra_submit_params
295                 ).execute(num_retries=self.arvrunner.num_retries)
296             else:
297                 response = self.arvrunner.api.container_requests().create(
298                     body=container_request,
299                     **extra_submit_params
300                 ).execute(num_retries=self.arvrunner.num_retries)
301
302             self.uuid = response["uuid"]
303             self.arvrunner.process_submitted(self)
304
305             if response["state"] == "Final":
306                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
307             else:
308                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
309         except Exception:
310             logger.exception("%s got an error", self.arvrunner.label(self))
311             self.output_callback({}, "permanentFail")
312
313     def done(self, record):
314         outputs = {}
315         try:
316             container = self.arvrunner.api.containers().get(
317                 uuid=record["container_uuid"]
318             ).execute(num_retries=self.arvrunner.num_retries)
319             if container["state"] == "Complete":
320                 rcode = container["exit_code"]
321                 if self.successCodes and rcode in self.successCodes:
322                     processStatus = "success"
323                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
324                     processStatus = "temporaryFail"
325                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
326                     processStatus = "permanentFail"
327                 elif rcode == 0:
328                     processStatus = "success"
329                 else:
330                     processStatus = "permanentFail"
331             else:
332                 processStatus = "permanentFail"
333
334             if processStatus == "permanentFail" and record["log_uuid"]:
335                 logc = arvados.collection.CollectionReader(record["log_uuid"],
336                                                            api_client=self.arvrunner.api,
337                                                            keep_client=self.arvrunner.keep_client,
338                                                            num_retries=self.arvrunner.num_retries)
339                 label = self.arvrunner.label(self)
340                 done.logtail(
341                     logc, logger.error,
342                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
343
344             if record["output_uuid"]:
345                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
346                     # Compute the trash time to avoid requesting the collection record.
347                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
348                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
349                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
350                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
351                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
352                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
353                 self.arvrunner.add_intermediate_output(record["output_uuid"])
354
355             if container["output"]:
356                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
357         except WorkflowException as e:
358             # Only include a stack trace if in debug mode.
359             # A stack trace may obfuscate more useful output about the workflow.
360             logger.error("%s unable to collect output from %s:\n%s",
361                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
362             processStatus = "permanentFail"
363         except Exception:
364             logger.exception("%s while getting output object:", self.arvrunner.label(self))
365             processStatus = "permanentFail"
366         finally:
367             self.output_callback(outputs, processStatus)
368
369
370 class RunnerContainer(Runner):
371     """Submit and manage a container that runs arvados-cwl-runner."""
372
373     def arvados_job_spec(self, runtimeContext):
374         """Create an Arvados container request for this workflow.
375
376         The returned dict can be used to create a container passed as
377         the +body+ argument to container_requests().create().
378         """
379
380         adjustDirObjs(self.job_order, trim_listing)
381         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
382         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
383
384         secret_mounts = {}
385         for param in sorted(self.job_order.keys()):
386             if self.secret_store.has_secret(self.job_order[param]):
387                 mnt = "/secrets/s%d" % len(secret_mounts)
388                 secret_mounts[mnt] = {
389                     "kind": "text",
390                     "content": self.secret_store.retrieve(self.job_order[param])
391                 }
392                 self.job_order[param] = {"$include": mnt}
393
394         container_req = {
395             "name": self.name,
396             "output_path": "/var/spool/cwl",
397             "cwd": "/var/spool/cwl",
398             "priority": self.priority,
399             "state": "Committed",
400             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
401             "mounts": {
402                 "/var/lib/cwl/cwl.input.json": {
403                     "kind": "json",
404                     "content": self.job_order
405                 },
406                 "stdout": {
407                     "kind": "file",
408                     "path": "/var/spool/cwl/cwl.output.json"
409                 },
410                 "/var/spool/cwl": {
411                     "kind": "collection",
412                     "writable": True
413                 }
414             },
415             "secret_mounts": secret_mounts,
416             "runtime_constraints": {
417                 "vcpus": math.ceil(self.submit_runner_cores),
418                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
419                 "API": True
420             },
421             "use_existing": self.enable_reuse,
422             "properties": {}
423         }
424
425         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
426             sp = self.embedded_tool.tool["id"].split('/')
427             workflowcollection = sp[0][5:]
428             workflowname = "/".join(sp[1:])
429             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
430             container_req["mounts"]["/var/lib/cwl/workflow"] = {
431                 "kind": "collection",
432                 "portable_data_hash": "%s" % workflowcollection
433             }
434         else:
435             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
436             workflowpath = "/var/lib/cwl/workflow.json#main"
437             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
438                 "kind": "json",
439                 "content": packed
440             }
441             if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
442                 container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
443
444
445         # --local means execute the workflow instead of submitting a container request
446         # --api=containers means use the containers API
447         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
448         # --disable-validate because we already validated so don't need to do it again
449         # --eval-timeout is the timeout for javascript invocation
450         # --parallel-task-count is the number of threads to use for job submission
451         # --enable/disable-reuse sets desired job reuse
452         # --collection-cache-size sets aside memory to store collections
453         command = ["arvados-cwl-runner",
454                    "--local",
455                    "--api=containers",
456                    "--no-log-timestamps",
457                    "--disable-validate",
458                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
459                    "--thread-count=%s" % self.arvrunner.thread_count,
460                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
461                    "--collection-cache-size=%s" % self.collection_cache_size]
462
463         if self.output_name:
464             command.append("--output-name=" + self.output_name)
465             container_req["output_name"] = self.output_name
466
467         if self.output_tags:
468             command.append("--output-tags=" + self.output_tags)
469
470         if runtimeContext.debug:
471             command.append("--debug")
472
473         if runtimeContext.storage_classes != "default":
474             command.append("--storage-classes=" + runtimeContext.storage_classes)
475
476         if self.on_error:
477             command.append("--on-error=" + self.on_error)
478
479         if self.intermediate_output_ttl:
480             command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
481
482         if self.arvrunner.trash_intermediate:
483             command.append("--trash-intermediate")
484
485         if self.arvrunner.project_uuid:
486             command.append("--project-uuid="+self.arvrunner.project_uuid)
487
488         if self.enable_dev:
489             command.append("--enable-dev")
490
491         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
492
493         container_req["command"] = command
494
495         return container_req
496
497
498     def run(self, runtimeContext):
499         runtimeContext.keepprefix = "keep:"
500         job_spec = self.arvados_job_spec(runtimeContext)
501         if self.arvrunner.project_uuid:
502             job_spec["owner_uuid"] = self.arvrunner.project_uuid
503
504         extra_submit_params = {}
505         if runtimeContext.submit_runner_cluster:
506             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
507
508         if runtimeContext.submit_request_uuid:
509             if "cluster_id" in extra_submit_params:
510                 # Doesn't make sense for "update" and actually fails
511                 del extra_submit_params["cluster_id"]
512             response = self.arvrunner.api.container_requests().update(
513                 uuid=runtimeContext.submit_request_uuid,
514                 body=job_spec,
515                 **extra_submit_params
516             ).execute(num_retries=self.arvrunner.num_retries)
517         else:
518             response = self.arvrunner.api.container_requests().create(
519                 body=job_spec,
520                 **extra_submit_params
521             ).execute(num_retries=self.arvrunner.num_retries)
522
523         self.uuid = response["uuid"]
524         self.arvrunner.process_submitted(self)
525
526         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
527
528     def done(self, record):
529         try:
530             container = self.arvrunner.api.containers().get(
531                 uuid=record["container_uuid"]
532             ).execute(num_retries=self.arvrunner.num_retries)
533             container["log"] = record["log_uuid"]
534         except Exception:
535             logger.exception("%s while getting runner container", self.arvrunner.label(self))
536             self.arvrunner.output_callback({}, "permanentFail")
537         else:
538             super(RunnerContainer, self).done(container)