Merge branch '14885-ciso-and-conda-packaging-pr'
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.utils import aslist
26 from cwltool.job import JobBase
27
28 import arvados.collection
29
30 from .arvdocker import arv_docker_get_image
31 from . import done
32 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
33 from .fsaccess import CollectionFetcher
34 from .pathmapper import NoFollowPathMapper, trim_listing
35 from .perf import Perf
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 class ArvadosContainer(JobBase):
41     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
42
43     def __init__(self, runner, job_runtime,
44                  builder,   # type: Builder
45                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
46                  make_path_mapper,  # type: Callable[..., PathMapper]
47                  requirements,      # type: List[Dict[Text, Text]]
48                  hints,     # type: List[Dict[Text, Text]]
49                  name       # type: Text
50     ):
51         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
52         self.arvrunner = runner
53         self.job_runtime = job_runtime
54         self.running = False
55         self.uuid = None
56
57     def update_pipeline_component(self, r):
58         pass
59
60     def run(self, runtimeContext):
61         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
62         # which calls makeJobRunner() to get a new ArvadosContainer
63         # object.  The fields that define execution such as
64         # command_line, environment, etc are set on the
65         # ArvadosContainer object by CommandLineTool.job() before
66         # run() is called.
67
68         runtimeContext = self.job_runtime
69
70         container_request = {
71             "command": self.command_line,
72             "name": self.name,
73             "output_path": self.outdir,
74             "cwd": self.outdir,
75             "priority": runtimeContext.priority,
76             "state": "Committed",
77             "properties": {},
78         }
79         runtime_constraints = {}
80
81         if runtimeContext.project_uuid:
82             container_request["owner_uuid"] = runtimeContext.project_uuid
83
84         if self.arvrunner.secret_store.has_secret(self.command_line):
85             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
86
87         if self.arvrunner.secret_store.has_secret(self.environment):
88             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
89
90         resources = self.builder.resources
91         if resources is not None:
92             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
93             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
94
95         mounts = {
96             self.outdir: {
97                 "kind": "tmp",
98                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
99             },
100             self.tmpdir: {
101                 "kind": "tmp",
102                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
103             }
104         }
105         secret_mounts = {}
106         scheduling_parameters = {}
107
108         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
109         rf.sort(key=lambda k: k.resolved)
110         prevdir = None
111         for resolved, target, tp, stg in rf:
112             if not stg:
113                 continue
114             if prevdir and target.startswith(prevdir):
115                 continue
116             if tp == "Directory":
117                 targetdir = target
118             else:
119                 targetdir = os.path.dirname(target)
120             sp = resolved.split("/", 1)
121             pdh = sp[0][5:]   # remove "keep:"
122             mounts[targetdir] = {
123                 "kind": "collection",
124                 "portable_data_hash": pdh
125             }
126             if len(sp) == 2:
127                 if tp == "Directory":
128                     path = sp[1]
129                 else:
130                     path = os.path.dirname(sp[1])
131                 if path and path != "/":
132                     mounts[targetdir]["path"] = path
133             prevdir = targetdir + "/"
134
135         with Perf(metrics, "generatefiles %s" % self.name):
136             if self.generatefiles["listing"]:
137                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
138                                                     keep_client=self.arvrunner.keep_client,
139                                                     num_retries=self.arvrunner.num_retries)
140                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
141                                                     separateDirs=False)
142
143                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
144
145                 logger.debug("generatemapper is %s", sorteditems)
146
147                 with Perf(metrics, "createfiles %s" % self.name):
148                     for f, p in sorteditems:
149                         if not p.target:
150                             pass
151                         elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
152                             if p.resolved.startswith("_:"):
153                                 vwd.mkdirs(p.target)
154                             else:
155                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
156                                 vwd.copy(path, p.target, source_collection=source)
157                         elif p.type == "CreateFile":
158                             if self.arvrunner.secret_store.has_secret(p.resolved):
159                                 secret_mounts["%s/%s" % (self.outdir, p.target)] = {
160                                     "kind": "text",
161                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
162                                 }
163                             else:
164                                 with vwd.open(p.target, "w") as n:
165                                     n.write(p.resolved)
166
167                 def keepemptydirs(p):
168                     if isinstance(p, arvados.collection.RichCollectionBase):
169                         if len(p) == 0:
170                             p.open(".keep", "w").close()
171                         else:
172                             for c in p:
173                                 keepemptydirs(p[c])
174
175                 keepemptydirs(vwd)
176
177                 if not runtimeContext.current_container:
178                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
179                 info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
180                 vwd.save_new(name=info["name"],
181                              owner_uuid=runtimeContext.project_uuid,
182                              ensure_unique_name=True,
183                              trash_at=info["trash_at"],
184                              properties=info["properties"])
185
186                 prev = None
187                 for f, p in sorteditems:
188                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
189                         (prev is not None and p.target.startswith(prev))):
190                         continue
191                     mountpoint = "%s/%s" % (self.outdir, p.target)
192                     mounts[mountpoint] = {"kind": "collection",
193                                           "portable_data_hash": vwd.portable_data_hash(),
194                                           "path": p.target}
195                     if p.type.startswith("Writable"):
196                         mounts[mountpoint]["writable"] = True
197                     prev = p.target + "/"
198
199         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
200         if self.environment:
201             container_request["environment"].update(self.environment)
202
203         if self.stdin:
204             sp = self.stdin[6:].split("/", 1)
205             mounts["stdin"] = {"kind": "collection",
206                                 "portable_data_hash": sp[0],
207                                 "path": sp[1]}
208
209         if self.stderr:
210             mounts["stderr"] = {"kind": "file",
211                                 "path": "%s/%s" % (self.outdir, self.stderr)}
212
213         if self.stdout:
214             mounts["stdout"] = {"kind": "file",
215                                 "path": "%s/%s" % (self.outdir, self.stdout)}
216
217         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
218         if not docker_req:
219             docker_req = {"dockerImageId": "arvados/jobs"}
220
221         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
222                                                                     docker_req,
223                                                                     runtimeContext.pull_image,
224                                                                     runtimeContext.project_uuid)
225
226         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
227         if api_req:
228             runtime_constraints["API"] = True
229
230         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
231         if runtime_req:
232             if "keep_cache" in runtime_req:
233                 runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
234             if "outputDirType" in runtime_req:
235                 if runtime_req["outputDirType"] == "local_output_dir":
236                     # Currently the default behavior.
237                     pass
238                 elif runtime_req["outputDirType"] == "keep_output_dir":
239                     mounts[self.outdir]= {
240                         "kind": "collection",
241                         "writable": True
242                     }
243
244         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
245         if partition_req:
246             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
247
248         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
249         if intermediate_output_req:
250             self.output_ttl = intermediate_output_req["outputTTL"]
251         else:
252             self.output_ttl = self.arvrunner.intermediate_output_ttl
253
254         if self.output_ttl < 0:
255             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
256
257         if self.timelimit is not None:
258             scheduling_parameters["max_run_time"] = self.timelimit
259
260         extra_submit_params = {}
261         if runtimeContext.submit_runner_cluster:
262             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
263
264         container_request["output_name"] = "Output for step %s" % (self.name)
265         container_request["output_ttl"] = self.output_ttl
266         container_request["mounts"] = mounts
267         container_request["secret_mounts"] = secret_mounts
268         container_request["runtime_constraints"] = runtime_constraints
269         container_request["scheduling_parameters"] = scheduling_parameters
270
271         enable_reuse = runtimeContext.enable_reuse
272         if enable_reuse:
273             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
274             if reuse_req:
275                 enable_reuse = reuse_req["enableReuse"]
276         container_request["use_existing"] = enable_reuse
277
278         if runtimeContext.runnerjob.startswith("arvwf:"):
279             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
280             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
281             if container_request["name"] == "main":
282                 container_request["name"] = wfrecord["name"]
283             container_request["properties"]["template_uuid"] = wfuuid
284
285         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
286
287         try:
288             if runtimeContext.submit_request_uuid:
289                 response = self.arvrunner.api.container_requests().update(
290                     uuid=runtimeContext.submit_request_uuid,
291                     body=container_request,
292                     **extra_submit_params
293                 ).execute(num_retries=self.arvrunner.num_retries)
294             else:
295                 response = self.arvrunner.api.container_requests().create(
296                     body=container_request,
297                     **extra_submit_params
298                 ).execute(num_retries=self.arvrunner.num_retries)
299
300             self.uuid = response["uuid"]
301             self.arvrunner.process_submitted(self)
302
303             if response["state"] == "Final":
304                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
305             else:
306                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
307         except Exception:
308             logger.exception("%s got an error", self.arvrunner.label(self))
309             self.output_callback({}, "permanentFail")
310
311     def done(self, record):
312         outputs = {}
313         try:
314             container = self.arvrunner.api.containers().get(
315                 uuid=record["container_uuid"]
316             ).execute(num_retries=self.arvrunner.num_retries)
317             if container["state"] == "Complete":
318                 rcode = container["exit_code"]
319                 if self.successCodes and rcode in self.successCodes:
320                     processStatus = "success"
321                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
322                     processStatus = "temporaryFail"
323                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
324                     processStatus = "permanentFail"
325                 elif rcode == 0:
326                     processStatus = "success"
327                 else:
328                     processStatus = "permanentFail"
329             else:
330                 processStatus = "permanentFail"
331
332             if processStatus == "permanentFail":
333                 logc = arvados.collection.CollectionReader(container["log"],
334                                                            api_client=self.arvrunner.api,
335                                                            keep_client=self.arvrunner.keep_client,
336                                                            num_retries=self.arvrunner.num_retries)
337                 label = self.arvrunner.label(self)
338                 done.logtail(
339                     logc, logger.error,
340                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
341
342             if record["output_uuid"]:
343                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
344                     # Compute the trash time to avoid requesting the collection record.
345                     trash_at = ciso8601.parse_datetime(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
346                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
347                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
348                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
349                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
350                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
351                 self.arvrunner.add_intermediate_output(record["output_uuid"])
352
353             if container["output"]:
354                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
355         except WorkflowException as e:
356             # Only include a stack trace if in debug mode. 
357             # A stack trace may obfuscate more useful output about the workflow. 
358             logger.error("%s unable to collect output from %s:\n%s",
359                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
360             processStatus = "permanentFail"
361         except Exception:
362             logger.exception("%s while getting output object:", self.arvrunner.label(self))
363             processStatus = "permanentFail"
364         finally:
365             self.output_callback(outputs, processStatus)
366
367
368 class RunnerContainer(Runner):
369     """Submit and manage a container that runs arvados-cwl-runner."""
370
371     def arvados_job_spec(self, runtimeContext):
372         """Create an Arvados container request for this workflow.
373
374         The returned dict can be used to create a container passed as
375         the +body+ argument to container_requests().create().
376         """
377
378         adjustDirObjs(self.job_order, trim_listing)
379         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
380         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
381
382         secret_mounts = {}
383         for param in sorted(self.job_order.keys()):
384             if self.secret_store.has_secret(self.job_order[param]):
385                 mnt = "/secrets/s%d" % len(secret_mounts)
386                 secret_mounts[mnt] = {
387                     "kind": "text",
388                     "content": self.secret_store.retrieve(self.job_order[param])
389                 }
390                 self.job_order[param] = {"$include": mnt}
391
392         container_req = {
393             "name": self.name,
394             "output_path": "/var/spool/cwl",
395             "cwd": "/var/spool/cwl",
396             "priority": self.priority,
397             "state": "Committed",
398             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
399             "mounts": {
400                 "/var/lib/cwl/cwl.input.json": {
401                     "kind": "json",
402                     "content": self.job_order
403                 },
404                 "stdout": {
405                     "kind": "file",
406                     "path": "/var/spool/cwl/cwl.output.json"
407                 },
408                 "/var/spool/cwl": {
409                     "kind": "collection",
410                     "writable": True
411                 }
412             },
413             "secret_mounts": secret_mounts,
414             "runtime_constraints": {
415                 "vcpus": math.ceil(self.submit_runner_cores),
416                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
417                 "API": True
418             },
419             "use_existing": self.enable_reuse,
420             "properties": {}
421         }
422
423         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
424             sp = self.embedded_tool.tool["id"].split('/')
425             workflowcollection = sp[0][5:]
426             workflowname = "/".join(sp[1:])
427             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
428             container_req["mounts"]["/var/lib/cwl/workflow"] = {
429                 "kind": "collection",
430                 "portable_data_hash": "%s" % workflowcollection
431             }
432         else:
433             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
434             workflowpath = "/var/lib/cwl/workflow.json#main"
435             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
436                 "kind": "json",
437                 "content": packed
438             }
439             if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
440                 container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
441
442
443         # --local means execute the workflow instead of submitting a container request
444         # --api=containers means use the containers API
445         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
446         # --disable-validate because we already validated so don't need to do it again
447         # --eval-timeout is the timeout for javascript invocation
448         # --parallel-task-count is the number of threads to use for job submission
449         # --enable/disable-reuse sets desired job reuse
450         # --collection-cache-size sets aside memory to store collections
451         command = ["arvados-cwl-runner",
452                    "--local",
453                    "--api=containers",
454                    "--no-log-timestamps",
455                    "--disable-validate",
456                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
457                    "--thread-count=%s" % self.arvrunner.thread_count,
458                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
459                    "--collection-cache-size=%s" % self.collection_cache_size]
460
461         if self.output_name:
462             command.append("--output-name=" + self.output_name)
463             container_req["output_name"] = self.output_name
464
465         if self.output_tags:
466             command.append("--output-tags=" + self.output_tags)
467
468         if runtimeContext.debug:
469             command.append("--debug")
470
471         if runtimeContext.storage_classes != "default":
472             command.append("--storage-classes=" + runtimeContext.storage_classes)
473
474         if self.on_error:
475             command.append("--on-error=" + self.on_error)
476
477         if self.intermediate_output_ttl:
478             command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
479
480         if self.arvrunner.trash_intermediate:
481             command.append("--trash-intermediate")
482
483         if self.arvrunner.project_uuid:
484             command.append("--project-uuid="+self.arvrunner.project_uuid)
485
486         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
487
488         container_req["command"] = command
489
490         return container_req
491
492
493     def run(self, runtimeContext):
494         runtimeContext.keepprefix = "keep:"
495         job_spec = self.arvados_job_spec(runtimeContext)
496         if self.arvrunner.project_uuid:
497             job_spec["owner_uuid"] = self.arvrunner.project_uuid
498
499         extra_submit_params = {}
500         if runtimeContext.submit_runner_cluster:
501             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
502
503         if runtimeContext.submit_request_uuid:
504             if "cluster_id" in extra_submit_params:
505                 # Doesn't make sense for "update" and actually fails
506                 del extra_submit_params["cluster_id"]
507             response = self.arvrunner.api.container_requests().update(
508                 uuid=runtimeContext.submit_request_uuid,
509                 body=job_spec,
510                 **extra_submit_params
511             ).execute(num_retries=self.arvrunner.num_retries)
512         else:
513             response = self.arvrunner.api.container_requests().create(
514                 body=job_spec,
515                 **extra_submit_params
516             ).execute(num_retries=self.arvrunner.num_retries)
517
518         self.uuid = response["uuid"]
519         self.arvrunner.process_submitted(self)
520
521         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
522
523     def done(self, record):
524         try:
525             container = self.arvrunner.api.containers().get(
526                 uuid=record["container_uuid"]
527             ).execute(num_retries=self.arvrunner.num_retries)
528         except Exception:
529             logger.exception("%s while getting runner container", self.arvrunner.label(self))
530             self.arvrunner.output_callback({}, "permanentFail")
531         else:
532             super(RunnerContainer, self).done(container)