Merge branch '13547-respect-insecure-flag-when-talking-ssl-to-keep'
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import logging
6 import json
7 import os
8 import urllib
9 import time
10 import datetime
11 import ciso8601
12 import uuid
13
14 import ruamel.yaml as yaml
15
16 from cwltool.errors import WorkflowException
17 from cwltool.process import UnsupportedRequirement, shortname
18 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
19 from cwltool.utils import aslist
20 from cwltool.job import JobBase
21
22 import arvados.collection
23
24 from .arvdocker import arv_docker_get_image
25 from . import done
26 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
27 from .fsaccess import CollectionFetcher
28 from .pathmapper import NoFollowPathMapper, trim_listing
29 from .perf import Perf
30
31 logger = logging.getLogger('arvados.cwl-runner')
32 metrics = logging.getLogger('arvados.cwl-runner.metrics')
33
34 class ArvadosContainer(JobBase):
35     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
36
37     def __init__(self, runner,
38                  builder,   # type: Builder
39                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
40                  make_path_mapper,  # type: Callable[..., PathMapper]
41                  requirements,      # type: List[Dict[Text, Text]]
42                  hints,     # type: List[Dict[Text, Text]]
43                  name       # type: Text
44     ):
45         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
46         self.arvrunner = runner
47         self.running = False
48         self.uuid = None
49
50     def update_pipeline_component(self, r):
51         pass
52
53     def run(self, runtimeContext):
54         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
55         # which calls makeJobRunner() to get a new ArvadosContainer
56         # object.  The fields that define execution such as
57         # command_line, environment, etc are set on the
58         # ArvadosContainer object by CommandLineTool.job() before
59         # run() is called.
60
61         container_request = {
62             "command": self.command_line,
63             "name": self.name,
64             "output_path": self.outdir,
65             "cwd": self.outdir,
66             "priority": runtimeContext.priority,
67             "state": "Committed",
68             "properties": {},
69         }
70         runtime_constraints = {}
71
72         if self.arvrunner.project_uuid:
73             container_request["owner_uuid"] = self.arvrunner.project_uuid
74
75         if self.arvrunner.secret_store.has_secret(self.command_line):
76             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
77
78         if self.arvrunner.secret_store.has_secret(self.environment):
79             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
80
81         resources = self.builder.resources
82         if resources is not None:
83             runtime_constraints["vcpus"] = resources.get("cores", 1)
84             runtime_constraints["ram"] = resources.get("ram") * 2**20
85
86         mounts = {
87             self.outdir: {
88                 "kind": "tmp",
89                 "capacity": resources.get("outdirSize", 0) * 2**20
90             },
91             self.tmpdir: {
92                 "kind": "tmp",
93                 "capacity": resources.get("tmpdirSize", 0) * 2**20
94             }
95         }
96         secret_mounts = {}
97         scheduling_parameters = {}
98
99         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
100         rf.sort(key=lambda k: k.resolved)
101         prevdir = None
102         for resolved, target, tp, stg in rf:
103             if not stg:
104                 continue
105             if prevdir and target.startswith(prevdir):
106                 continue
107             if tp == "Directory":
108                 targetdir = target
109             else:
110                 targetdir = os.path.dirname(target)
111             sp = resolved.split("/", 1)
112             pdh = sp[0][5:]   # remove "keep:"
113             mounts[targetdir] = {
114                 "kind": "collection",
115                 "portable_data_hash": pdh
116             }
117             if len(sp) == 2:
118                 if tp == "Directory":
119                     path = sp[1]
120                 else:
121                     path = os.path.dirname(sp[1])
122                 if path and path != "/":
123                     mounts[targetdir]["path"] = path
124             prevdir = targetdir + "/"
125
126         with Perf(metrics, "generatefiles %s" % self.name):
127             if self.generatefiles["listing"]:
128                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
129                                                     keep_client=self.arvrunner.keep_client,
130                                                     num_retries=self.arvrunner.num_retries)
131                 generatemapper = NoFollowPathMapper([self.generatefiles], "", "",
132                                                     separateDirs=False)
133
134                 sorteditems = sorted(generatemapper.items(), None, key=lambda n: n[1].target)
135
136                 logger.debug("generatemapper is %s", sorteditems)
137
138                 with Perf(metrics, "createfiles %s" % self.name):
139                     for f, p in sorteditems:
140                         if not p.target:
141                             pass
142                         elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
143                             if p.resolved.startswith("_:"):
144                                 vwd.mkdirs(p.target)
145                             else:
146                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
147                                 vwd.copy(path, p.target, source_collection=source)
148                         elif p.type == "CreateFile":
149                             if self.arvrunner.secret_store.has_secret(p.resolved):
150                                 secret_mounts["%s/%s" % (self.outdir, p.target)] = {
151                                     "kind": "text",
152                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
153                                 }
154                             else:
155                                 with vwd.open(p.target, "w") as n:
156                                     n.write(p.resolved.encode("utf-8"))
157
158                 def keepemptydirs(p):
159                     if isinstance(p, arvados.collection.RichCollectionBase):
160                         if len(p) == 0:
161                             p.open(".keep", "w").close()
162                         else:
163                             for c in p:
164                                 keepemptydirs(p[c])
165
166                 keepemptydirs(vwd)
167
168                 with Perf(metrics, "generatefiles.save_new %s" % self.name):
169                     vwd.save_new()
170
171                 prev = None
172                 for f, p in sorteditems:
173                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
174                         (prev is not None and p.target.startswith(prev))):
175                         continue
176                     mountpoint = "%s/%s" % (self.outdir, p.target)
177                     mounts[mountpoint] = {"kind": "collection",
178                                           "portable_data_hash": vwd.portable_data_hash(),
179                                           "path": p.target}
180                     if p.type.startswith("Writable"):
181                         mounts[mountpoint]["writable"] = True
182                     prev = p.target + "/"
183
184         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
185         if self.environment:
186             container_request["environment"].update(self.environment)
187
188         if self.stdin:
189             sp = self.stdin[6:].split("/", 1)
190             mounts["stdin"] = {"kind": "collection",
191                                 "portable_data_hash": sp[0],
192                                 "path": sp[1]}
193
194         if self.stderr:
195             mounts["stderr"] = {"kind": "file",
196                                 "path": "%s/%s" % (self.outdir, self.stderr)}
197
198         if self.stdout:
199             mounts["stdout"] = {"kind": "file",
200                                 "path": "%s/%s" % (self.outdir, self.stdout)}
201
202         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
203         if not docker_req:
204             docker_req = {"dockerImageId": "arvados/jobs"}
205
206         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
207                                                                      docker_req,
208                                                                      runtimeContext.pull_image,
209                                                                      self.arvrunner.project_uuid)
210
211         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
212         if api_req:
213             runtime_constraints["API"] = True
214
215         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
216         if runtime_req:
217             if "keep_cache" in runtime_req:
218                 runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"] * 2**20
219             if "outputDirType" in runtime_req:
220                 if runtime_req["outputDirType"] == "local_output_dir":
221                     # Currently the default behavior.
222                     pass
223                 elif runtime_req["outputDirType"] == "keep_output_dir":
224                     mounts[self.outdir]= {
225                         "kind": "collection",
226                         "writable": True
227                     }
228
229         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
230         if partition_req:
231             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
232
233         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
234         if intermediate_output_req:
235             self.output_ttl = intermediate_output_req["outputTTL"]
236         else:
237             self.output_ttl = self.arvrunner.intermediate_output_ttl
238
239         if self.output_ttl < 0:
240             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
241
242         if self.timelimit is not None:
243             scheduling_parameters["max_run_time"] = self.timelimit
244
245         container_request["output_ttl"] = self.output_ttl
246         container_request["mounts"] = mounts
247         container_request["secret_mounts"] = secret_mounts
248         container_request["runtime_constraints"] = runtime_constraints
249         container_request["scheduling_parameters"] = scheduling_parameters
250
251         enable_reuse = runtimeContext.enable_reuse
252         if enable_reuse:
253             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
254             if reuse_req:
255                 enable_reuse = reuse_req["enableReuse"]
256         container_request["use_existing"] = enable_reuse
257
258         if runtimeContext.runnerjob.startswith("arvwf:"):
259             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
260             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
261             if container_request["name"] == "main":
262                 container_request["name"] = wfrecord["name"]
263             container_request["properties"]["template_uuid"] = wfuuid
264
265         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
266
267         try:
268             if runtimeContext.submit_request_uuid:
269                 response = self.arvrunner.api.container_requests().update(
270                     uuid=runtimeContext.submit_request_uuid,
271                     body=container_request
272                 ).execute(num_retries=self.arvrunner.num_retries)
273             else:
274                 response = self.arvrunner.api.container_requests().create(
275                     body=container_request
276                 ).execute(num_retries=self.arvrunner.num_retries)
277
278             self.uuid = response["uuid"]
279             self.arvrunner.process_submitted(self)
280
281             if response["state"] == "Final":
282                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
283             else:
284                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
285         except Exception as e:
286             logger.error("%s got error %s" % (self.arvrunner.label(self), str(e)))
287             self.output_callback({}, "permanentFail")
288
289     def done(self, record):
290         outputs = {}
291         try:
292             container = self.arvrunner.api.containers().get(
293                 uuid=record["container_uuid"]
294             ).execute(num_retries=self.arvrunner.num_retries)
295             if container["state"] == "Complete":
296                 rcode = container["exit_code"]
297                 if self.successCodes and rcode in self.successCodes:
298                     processStatus = "success"
299                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
300                     processStatus = "temporaryFail"
301                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
302                     processStatus = "permanentFail"
303                 elif rcode == 0:
304                     processStatus = "success"
305                 else:
306                     processStatus = "permanentFail"
307             else:
308                 processStatus = "permanentFail"
309
310             if processStatus == "permanentFail":
311                 logc = arvados.collection.CollectionReader(container["log"],
312                                                            api_client=self.arvrunner.api,
313                                                            keep_client=self.arvrunner.keep_client,
314                                                            num_retries=self.arvrunner.num_retries)
315                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
316
317             if record["output_uuid"]:
318                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
319                     # Compute the trash time to avoid requesting the collection record.
320                     trash_at = ciso8601.parse_datetime_unaware(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
321                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
322                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
323                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
324                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
325                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
326                 self.arvrunner.add_intermediate_output(record["output_uuid"])
327
328             if container["output"]:
329                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
330         except WorkflowException as e:
331             logger.error("%s unable to collect output from %s:\n%s",
332                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
333             processStatus = "permanentFail"
334         except Exception as e:
335             logger.exception("%s while getting output object: %s", self.arvrunner.label(self), e)
336             processStatus = "permanentFail"
337         finally:
338             self.output_callback(outputs, processStatus)
339
340
341 class RunnerContainer(Runner):
342     """Submit and manage a container that runs arvados-cwl-runner."""
343
344     def arvados_job_spec(self, runtimeContext):
345         """Create an Arvados container request for this workflow.
346
347         The returned dict can be used to create a container passed as
348         the +body+ argument to container_requests().create().
349         """
350
351         adjustDirObjs(self.job_order, trim_listing)
352         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
353         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
354
355         secret_mounts = {}
356         for param in sorted(self.job_order.keys()):
357             if self.secret_store.has_secret(self.job_order[param]):
358                 mnt = "/secrets/s%d" % len(secret_mounts)
359                 secret_mounts[mnt] = {
360                     "kind": "text",
361                     "content": self.secret_store.retrieve(self.job_order[param])
362                 }
363                 self.job_order[param] = {"$include": mnt}
364
365         container_req = {
366             "name": self.name,
367             "output_path": "/var/spool/cwl",
368             "cwd": "/var/spool/cwl",
369             "priority": self.priority,
370             "state": "Committed",
371             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
372             "mounts": {
373                 "/var/lib/cwl/cwl.input.json": {
374                     "kind": "json",
375                     "content": self.job_order
376                 },
377                 "stdout": {
378                     "kind": "file",
379                     "path": "/var/spool/cwl/cwl.output.json"
380                 },
381                 "/var/spool/cwl": {
382                     "kind": "collection",
383                     "writable": True
384                 }
385             },
386             "secret_mounts": secret_mounts,
387             "runtime_constraints": {
388                 "vcpus": 1,
389                 "ram": 1024*1024 * self.submit_runner_ram,
390                 "API": True
391             },
392             "use_existing": self.enable_reuse,
393             "properties": {}
394         }
395
396         if self.tool.tool.get("id", "").startswith("keep:"):
397             sp = self.tool.tool["id"].split('/')
398             workflowcollection = sp[0][5:]
399             workflowname = "/".join(sp[1:])
400             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
401             container_req["mounts"]["/var/lib/cwl/workflow"] = {
402                 "kind": "collection",
403                 "portable_data_hash": "%s" % workflowcollection
404             }
405         else:
406             packed = packed_workflow(self.arvrunner, self.tool, self.merged_map)
407             workflowpath = "/var/lib/cwl/workflow.json#main"
408             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
409                 "kind": "json",
410                 "content": packed
411             }
412             if self.tool.tool.get("id", "").startswith("arvwf:"):
413                 container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
414
415
416         # --local means execute the workflow instead of submitting a container request
417         # --api=containers means use the containers API
418         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
419         # --disable-validate because we already validated so don't need to do it again
420         # --eval-timeout is the timeout for javascript invocation
421         # --parallel-task-count is the number of threads to use for job submission
422         # --enable/disable-reuse sets desired job reuse
423         command = ["arvados-cwl-runner",
424                    "--local",
425                    "--api=containers",
426                    "--no-log-timestamps",
427                    "--disable-validate",
428                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
429                    "--thread-count=%s" % self.arvrunner.thread_count,
430                    "--enable-reuse" if self.enable_reuse else "--disable-reuse"]
431
432         if self.output_name:
433             command.append("--output-name=" + self.output_name)
434             container_req["output_name"] = self.output_name
435
436         if self.output_tags:
437             command.append("--output-tags=" + self.output_tags)
438
439         if runtimeContext.debug:
440             command.append("--debug")
441
442         if runtimeContext.storage_classes != "default":
443             command.append("--storage-classes=" + runtimeContext.storage_classes)
444
445         if self.on_error:
446             command.append("--on-error=" + self.on_error)
447
448         if self.intermediate_output_ttl:
449             command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
450
451         if self.arvrunner.trash_intermediate:
452             command.append("--trash-intermediate")
453
454         if self.arvrunner.project_uuid:
455             command.append("--project-uuid="+self.arvrunner.project_uuid)
456
457         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
458
459         container_req["command"] = command
460
461         return container_req
462
463
464     def run(self, runtimeContext):
465         runtimeContext.keepprefix = "keep:"
466         job_spec = self.arvados_job_spec(runtimeContext)
467         if self.arvrunner.project_uuid:
468             job_spec["owner_uuid"] = self.arvrunner.project_uuid
469
470         if runtimeContext.submit_request_uuid:
471             response = self.arvrunner.api.container_requests().update(
472                 uuid=runtimeContext.submit_request_uuid,
473                 body=job_spec
474             ).execute(num_retries=self.arvrunner.num_retries)
475         else:
476             response = self.arvrunner.api.container_requests().create(
477                 body=job_spec
478             ).execute(num_retries=self.arvrunner.num_retries)
479
480         self.uuid = response["uuid"]
481         self.arvrunner.process_submitted(self)
482
483         logger.info("%s submitted container %s", self.arvrunner.label(self), response["uuid"])
484
485     def done(self, record):
486         try:
487             container = self.arvrunner.api.containers().get(
488                 uuid=record["container_uuid"]
489             ).execute(num_retries=self.arvrunner.num_retries)
490         except Exception as e:
491             logger.exception("%s while getting runner container: %s", self.arvrunner.label(self), e)
492             self.arvrunner.output_callback({}, "permanentFail")
493         else:
494             super(RunnerContainer, self).done(container)