16811: Add a test that system users/groups can't be deleted.
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.utils import aslist
26 from cwltool.job import JobBase
27
28 import arvados.collection
29
30 from .arvdocker import arv_docker_get_image
31 from . import done
32 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
33 from .fsaccess import CollectionFetcher
34 from .pathmapper import NoFollowPathMapper, trim_listing
35 from .perf import Perf
36 from ._version import __version__
37
38 logger = logging.getLogger('arvados.cwl-runner')
39 metrics = logging.getLogger('arvados.cwl-runner.metrics')
40
41 class ArvadosContainer(JobBase):
42     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
43
44     def __init__(self, runner, job_runtime,
45                  builder,   # type: Builder
46                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
47                  make_path_mapper,  # type: Callable[..., PathMapper]
48                  requirements,      # type: List[Dict[Text, Text]]
49                  hints,     # type: List[Dict[Text, Text]]
50                  name       # type: Text
51     ):
52         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
53         self.arvrunner = runner
54         self.job_runtime = job_runtime
55         self.running = False
56         self.uuid = None
57
58     def update_pipeline_component(self, r):
59         pass
60
61     def run(self, runtimeContext):
62         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
63         # which calls makeJobRunner() to get a new ArvadosContainer
64         # object.  The fields that define execution such as
65         # command_line, environment, etc are set on the
66         # ArvadosContainer object by CommandLineTool.job() before
67         # run() is called.
68
69         runtimeContext = self.job_runtime
70
71         container_request = {
72             "command": self.command_line,
73             "name": self.name,
74             "output_path": self.outdir,
75             "cwd": self.outdir,
76             "priority": runtimeContext.priority,
77             "state": "Committed",
78             "properties": {},
79         }
80         runtime_constraints = {}
81
82         if runtimeContext.project_uuid:
83             container_request["owner_uuid"] = runtimeContext.project_uuid
84
85         if self.arvrunner.secret_store.has_secret(self.command_line):
86             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
87
88         if self.arvrunner.secret_store.has_secret(self.environment):
89             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
90
91         resources = self.builder.resources
92         if resources is not None:
93             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
94             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
95
96         mounts = {
97             self.outdir: {
98                 "kind": "tmp",
99                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
100             },
101             self.tmpdir: {
102                 "kind": "tmp",
103                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
104             }
105         }
106         secret_mounts = {}
107         scheduling_parameters = {}
108
109         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
110         rf.sort(key=lambda k: k.resolved)
111         prevdir = None
112         for resolved, target, tp, stg in rf:
113             if not stg:
114                 continue
115             if prevdir and target.startswith(prevdir):
116                 continue
117             if tp == "Directory":
118                 targetdir = target
119             else:
120                 targetdir = os.path.dirname(target)
121             sp = resolved.split("/", 1)
122             pdh = sp[0][5:]   # remove "keep:"
123             mounts[targetdir] = {
124                 "kind": "collection",
125                 "portable_data_hash": pdh
126             }
127             if pdh in self.pathmapper.pdh_to_uuid:
128                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
129             if len(sp) == 2:
130                 if tp == "Directory":
131                     path = sp[1]
132                 else:
133                     path = os.path.dirname(sp[1])
134                 if path and path != "/":
135                     mounts[targetdir]["path"] = path
136             prevdir = targetdir + "/"
137
138         with Perf(metrics, "generatefiles %s" % self.name):
139             if self.generatefiles["listing"]:
140                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
141                                                     keep_client=self.arvrunner.keep_client,
142                                                     num_retries=self.arvrunner.num_retries)
143                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
144                                                     separateDirs=False)
145
146                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
147
148                 logger.debug("generatemapper is %s", sorteditems)
149
150                 with Perf(metrics, "createfiles %s" % self.name):
151                     for f, p in sorteditems:
152                         if not p.target:
153                             continue
154
155                         if p.target.startswith("/"):
156                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
157                         else:
158                             dst = p.target
159
160                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
161                             if p.resolved.startswith("_:"):
162                                 vwd.mkdirs(dst)
163                             else:
164                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
165                                 vwd.copy(path or ".", dst, source_collection=source)
166                         elif p.type == "CreateFile":
167                             if self.arvrunner.secret_store.has_secret(p.resolved):
168                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
169                                 secret_mounts[mountpoint] = {
170                                     "kind": "text",
171                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
172                                 }
173                             else:
174                                 with vwd.open(dst, "w") as n:
175                                     n.write(p.resolved)
176
177                 def keepemptydirs(p):
178                     if isinstance(p, arvados.collection.RichCollectionBase):
179                         if len(p) == 0:
180                             p.open(".keep", "w").close()
181                         else:
182                             for c in p:
183                                 keepemptydirs(p[c])
184
185                 keepemptydirs(vwd)
186
187                 if not runtimeContext.current_container:
188                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
189                 info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
190                 vwd.save_new(name=info["name"],
191                              owner_uuid=runtimeContext.project_uuid,
192                              ensure_unique_name=True,
193                              trash_at=info["trash_at"],
194                              properties=info["properties"])
195
196                 prev = None
197                 for f, p in sorteditems:
198                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
199                         (prev is not None and p.target.startswith(prev))):
200                         continue
201                     if p.target.startswith("/"):
202                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
203                     else:
204                         dst = p.target
205                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
206                     mounts[mountpoint] = {"kind": "collection",
207                                           "portable_data_hash": vwd.portable_data_hash(),
208                                           "path": dst}
209                     if p.type.startswith("Writable"):
210                         mounts[mountpoint]["writable"] = True
211                     prev = p.target + "/"
212
213         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
214         if self.environment:
215             container_request["environment"].update(self.environment)
216
217         if self.stdin:
218             sp = self.stdin[6:].split("/", 1)
219             mounts["stdin"] = {"kind": "collection",
220                                 "portable_data_hash": sp[0],
221                                 "path": sp[1]}
222
223         if self.stderr:
224             mounts["stderr"] = {"kind": "file",
225                                 "path": "%s/%s" % (self.outdir, self.stderr)}
226
227         if self.stdout:
228             mounts["stdout"] = {"kind": "file",
229                                 "path": "%s/%s" % (self.outdir, self.stdout)}
230
231         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
232         if not docker_req:
233             docker_req = {"dockerImageId": "arvados/jobs:"+__version__}
234
235         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
236                                                                     docker_req,
237                                                                     runtimeContext.pull_image,
238                                                                     runtimeContext.project_uuid)
239
240         network_req, _ = self.get_requirement("NetworkAccess")
241         if network_req:
242             runtime_constraints["API"] = network_req["networkAccess"]
243
244         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
245         if api_req:
246             runtime_constraints["API"] = True
247
248         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
249         if runtime_req:
250             if "keep_cache" in runtime_req:
251                 runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
252             if "outputDirType" in runtime_req:
253                 if runtime_req["outputDirType"] == "local_output_dir":
254                     # Currently the default behavior.
255                     pass
256                 elif runtime_req["outputDirType"] == "keep_output_dir":
257                     mounts[self.outdir]= {
258                         "kind": "collection",
259                         "writable": True
260                     }
261
262         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
263         if partition_req:
264             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
265
266         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
267         if intermediate_output_req:
268             self.output_ttl = intermediate_output_req["outputTTL"]
269         else:
270             self.output_ttl = self.arvrunner.intermediate_output_ttl
271
272         if self.output_ttl < 0:
273             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
274
275         if self.timelimit is not None and self.timelimit > 0:
276             scheduling_parameters["max_run_time"] = self.timelimit
277
278         extra_submit_params = {}
279         if runtimeContext.submit_runner_cluster:
280             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
281
282         container_request["output_name"] = "Output for step %s" % (self.name)
283         container_request["output_ttl"] = self.output_ttl
284         container_request["mounts"] = mounts
285         container_request["secret_mounts"] = secret_mounts
286         container_request["runtime_constraints"] = runtime_constraints
287         container_request["scheduling_parameters"] = scheduling_parameters
288
289         enable_reuse = runtimeContext.enable_reuse
290         if enable_reuse:
291             reuse_req, _ = self.get_requirement("WorkReuse")
292             if reuse_req:
293                 enable_reuse = reuse_req["enableReuse"]
294             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
295             if reuse_req:
296                 enable_reuse = reuse_req["enableReuse"]
297         container_request["use_existing"] = enable_reuse
298
299         if runtimeContext.runnerjob.startswith("arvwf:"):
300             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
301             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
302             if container_request["name"] == "main":
303                 container_request["name"] = wfrecord["name"]
304             container_request["properties"]["template_uuid"] = wfuuid
305
306         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
307
308         try:
309             if runtimeContext.submit_request_uuid:
310                 response = self.arvrunner.api.container_requests().update(
311                     uuid=runtimeContext.submit_request_uuid,
312                     body=container_request,
313                     **extra_submit_params
314                 ).execute(num_retries=self.arvrunner.num_retries)
315             else:
316                 response = self.arvrunner.api.container_requests().create(
317                     body=container_request,
318                     **extra_submit_params
319                 ).execute(num_retries=self.arvrunner.num_retries)
320
321             self.uuid = response["uuid"]
322             self.arvrunner.process_submitted(self)
323
324             if response["state"] == "Final":
325                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
326             else:
327                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
328         except Exception:
329             logger.exception("%s got an error", self.arvrunner.label(self))
330             logger.debug("Container request was %s", container_request)
331             self.output_callback({}, "permanentFail")
332
333     def done(self, record):
334         outputs = {}
335         try:
336             container = self.arvrunner.api.containers().get(
337                 uuid=record["container_uuid"]
338             ).execute(num_retries=self.arvrunner.num_retries)
339             if container["state"] == "Complete":
340                 rcode = container["exit_code"]
341                 if self.successCodes and rcode in self.successCodes:
342                     processStatus = "success"
343                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
344                     processStatus = "temporaryFail"
345                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
346                     processStatus = "permanentFail"
347                 elif rcode == 0:
348                     processStatus = "success"
349                 else:
350                     processStatus = "permanentFail"
351             else:
352                 processStatus = "permanentFail"
353
354             if processStatus == "permanentFail" and record["log_uuid"]:
355                 logc = arvados.collection.CollectionReader(record["log_uuid"],
356                                                            api_client=self.arvrunner.api,
357                                                            keep_client=self.arvrunner.keep_client,
358                                                            num_retries=self.arvrunner.num_retries)
359                 label = self.arvrunner.label(self)
360                 done.logtail(
361                     logc, logger.error,
362                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
363
364             if record["output_uuid"]:
365                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
366                     # Compute the trash time to avoid requesting the collection record.
367                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
368                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
369                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
370                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
371                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
372                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
373                 self.arvrunner.add_intermediate_output(record["output_uuid"])
374
375             if container["output"]:
376                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
377         except WorkflowException as e:
378             # Only include a stack trace if in debug mode.
379             # A stack trace may obfuscate more useful output about the workflow.
380             logger.error("%s unable to collect output from %s:\n%s",
381                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
382             processStatus = "permanentFail"
383         except Exception:
384             logger.exception("%s while getting output object:", self.arvrunner.label(self))
385             processStatus = "permanentFail"
386         finally:
387             self.output_callback(outputs, processStatus)
388
389
390 class RunnerContainer(Runner):
391     """Submit and manage a container that runs arvados-cwl-runner."""
392
393     def arvados_job_spec(self, runtimeContext):
394         """Create an Arvados container request for this workflow.
395
396         The returned dict can be used to create a container passed as
397         the +body+ argument to container_requests().create().
398         """
399
400         adjustDirObjs(self.job_order, trim_listing)
401         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
402         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
403
404         secret_mounts = {}
405         for param in sorted(self.job_order.keys()):
406             if self.secret_store.has_secret(self.job_order[param]):
407                 mnt = "/secrets/s%d" % len(secret_mounts)
408                 secret_mounts[mnt] = {
409                     "kind": "text",
410                     "content": self.secret_store.retrieve(self.job_order[param])
411                 }
412                 self.job_order[param] = {"$include": mnt}
413
414         container_req = {
415             "name": self.name,
416             "output_path": "/var/spool/cwl",
417             "cwd": "/var/spool/cwl",
418             "priority": self.priority,
419             "state": "Committed",
420             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
421             "mounts": {
422                 "/var/lib/cwl/cwl.input.json": {
423                     "kind": "json",
424                     "content": self.job_order
425                 },
426                 "stdout": {
427                     "kind": "file",
428                     "path": "/var/spool/cwl/cwl.output.json"
429                 },
430                 "/var/spool/cwl": {
431                     "kind": "collection",
432                     "writable": True
433                 }
434             },
435             "secret_mounts": secret_mounts,
436             "runtime_constraints": {
437                 "vcpus": math.ceil(self.submit_runner_cores),
438                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
439                 "API": True
440             },
441             "use_existing": False, # Never reuse the runner container - see #15497.
442             "properties": {}
443         }
444
445         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
446             sp = self.embedded_tool.tool["id"].split('/')
447             workflowcollection = sp[0][5:]
448             workflowname = "/".join(sp[1:])
449             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
450             container_req["mounts"]["/var/lib/cwl/workflow"] = {
451                 "kind": "collection",
452                 "portable_data_hash": "%s" % workflowcollection
453             }
454         else:
455             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
456             workflowpath = "/var/lib/cwl/workflow.json#main"
457             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
458                 "kind": "json",
459                 "content": packed
460             }
461             if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
462                 container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
463
464
465         # --local means execute the workflow instead of submitting a container request
466         # --api=containers means use the containers API
467         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
468         # --disable-validate because we already validated so don't need to do it again
469         # --eval-timeout is the timeout for javascript invocation
470         # --parallel-task-count is the number of threads to use for job submission
471         # --enable/disable-reuse sets desired job reuse
472         # --collection-cache-size sets aside memory to store collections
473         command = ["arvados-cwl-runner",
474                    "--local",
475                    "--api=containers",
476                    "--no-log-timestamps",
477                    "--disable-validate",
478                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
479                    "--thread-count=%s" % self.arvrunner.thread_count,
480                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
481                    "--collection-cache-size=%s" % self.collection_cache_size]
482
483         if self.output_name:
484             command.append("--output-name=" + self.output_name)
485             container_req["output_name"] = self.output_name
486
487         if self.output_tags:
488             command.append("--output-tags=" + self.output_tags)
489
490         if runtimeContext.debug:
491             command.append("--debug")
492
493         if runtimeContext.storage_classes != "default":
494             command.append("--storage-classes=" + runtimeContext.storage_classes)
495
496         if self.on_error:
497             command.append("--on-error=" + self.on_error)
498
499         if self.intermediate_output_ttl:
500             command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
501
502         if self.arvrunner.trash_intermediate:
503             command.append("--trash-intermediate")
504
505         if self.arvrunner.project_uuid:
506             command.append("--project-uuid="+self.arvrunner.project_uuid)
507
508         if self.enable_dev:
509             command.append("--enable-dev")
510
511         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
512
513         container_req["command"] = command
514
515         return container_req
516
517
518     def run(self, runtimeContext):
519         runtimeContext.keepprefix = "keep:"
520         job_spec = self.arvados_job_spec(runtimeContext)
521         if self.arvrunner.project_uuid:
522             job_spec["owner_uuid"] = self.arvrunner.project_uuid
523
524         extra_submit_params = {}
525         if runtimeContext.submit_runner_cluster:
526             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
527
528         if runtimeContext.submit_request_uuid:
529             if "cluster_id" in extra_submit_params:
530                 # Doesn't make sense for "update" and actually fails
531                 del extra_submit_params["cluster_id"]
532             response = self.arvrunner.api.container_requests().update(
533                 uuid=runtimeContext.submit_request_uuid,
534                 body=job_spec,
535                 **extra_submit_params
536             ).execute(num_retries=self.arvrunner.num_retries)
537         else:
538             response = self.arvrunner.api.container_requests().create(
539                 body=job_spec,
540                 **extra_submit_params
541             ).execute(num_retries=self.arvrunner.num_retries)
542
543         self.uuid = response["uuid"]
544         self.arvrunner.process_submitted(self)
545
546         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
547
548     def done(self, record):
549         try:
550             container = self.arvrunner.api.containers().get(
551                 uuid=record["container_uuid"]
552             ).execute(num_retries=self.arvrunner.num_retries)
553             container["log"] = record["log_uuid"]
554         except Exception:
555             logger.exception("%s while getting runner container", self.arvrunner.label(self))
556             self.arvrunner.output_callback({}, "permanentFail")
557         else:
558             super(RunnerContainer, self).done(container)