af7c02a8f30010bfe85e51a6928e63a5a617d37e
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.utils import aslist
26 from cwltool.job import JobBase
27
28 import arvados.collection
29
30 from .arvdocker import arv_docker_get_image
31 from . import done
32 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
33 from .fsaccess import CollectionFetcher
34 from .pathmapper import NoFollowPathMapper, trim_listing
35 from .perf import Perf
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 class ArvadosContainer(JobBase):
41     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
42
43     def __init__(self, runner, job_runtime,
44                  builder,   # type: Builder
45                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
46                  make_path_mapper,  # type: Callable[..., PathMapper]
47                  requirements,      # type: List[Dict[Text, Text]]
48                  hints,     # type: List[Dict[Text, Text]]
49                  name       # type: Text
50     ):
51         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
52         self.arvrunner = runner
53         self.job_runtime = job_runtime
54         self.running = False
55         self.uuid = None
56
57     def update_pipeline_component(self, r):
58         pass
59
60     def run(self, runtimeContext):
61         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
62         # which calls makeJobRunner() to get a new ArvadosContainer
63         # object.  The fields that define execution such as
64         # command_line, environment, etc are set on the
65         # ArvadosContainer object by CommandLineTool.job() before
66         # run() is called.
67
68         runtimeContext = self.job_runtime
69
70         container_request = {
71             "command": self.command_line,
72             "name": self.name,
73             "output_path": self.outdir,
74             "cwd": self.outdir,
75             "priority": runtimeContext.priority,
76             "state": "Committed",
77             "properties": {},
78         }
79         runtime_constraints = {}
80
81         if runtimeContext.project_uuid:
82             container_request["owner_uuid"] = runtimeContext.project_uuid
83
84         if self.arvrunner.secret_store.has_secret(self.command_line):
85             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
86
87         if self.arvrunner.secret_store.has_secret(self.environment):
88             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
89
90         resources = self.builder.resources
91         if resources is not None:
92             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
93             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
94
95         mounts = {
96             self.outdir: {
97                 "kind": "tmp",
98                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
99             },
100             self.tmpdir: {
101                 "kind": "tmp",
102                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
103             }
104         }
105         secret_mounts = {}
106         scheduling_parameters = {}
107
108         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
109         rf.sort(key=lambda k: k.resolved)
110         prevdir = None
111         for resolved, target, tp, stg in rf:
112             if not stg:
113                 continue
114             if prevdir and target.startswith(prevdir):
115                 continue
116             if tp == "Directory":
117                 targetdir = target
118             else:
119                 targetdir = os.path.dirname(target)
120             sp = resolved.split("/", 1)
121             pdh = sp[0][5:]   # remove "keep:"
122             mounts[targetdir] = {
123                 "kind": "collection",
124                 "portable_data_hash": pdh
125             }
126             if len(sp) == 2:
127                 if tp == "Directory":
128                     path = sp[1]
129                 else:
130                     path = os.path.dirname(sp[1])
131                 if path and path != "/":
132                     mounts[targetdir]["path"] = path
133             prevdir = targetdir + "/"
134
135         with Perf(metrics, "generatefiles %s" % self.name):
136             if self.generatefiles["listing"]:
137                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
138                                                     keep_client=self.arvrunner.keep_client,
139                                                     num_retries=self.arvrunner.num_retries)
140                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
141                                                     separateDirs=False)
142
143                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
144
145                 logger.debug("generatemapper is %s", sorteditems)
146
147                 with Perf(metrics, "createfiles %s" % self.name):
148                     for f, p in sorteditems:
149                         if not p.target:
150                             pass
151                         elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
152                             if p.resolved.startswith("_:"):
153                                 vwd.mkdirs(p.target)
154                             else:
155                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
156                                 vwd.copy(path, p.target, source_collection=source)
157                         elif p.type == "CreateFile":
158                             if self.arvrunner.secret_store.has_secret(p.resolved):
159                                 secret_mounts["%s/%s" % (self.outdir, p.target)] = {
160                                     "kind": "text",
161                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
162                                 }
163                             else:
164                                 with vwd.open(p.target, "w") as n:
165                                     n.write(p.resolved)
166
167                 def keepemptydirs(p):
168                     if isinstance(p, arvados.collection.RichCollectionBase):
169                         if len(p) == 0:
170                             p.open(".keep", "w").close()
171                         else:
172                             for c in p:
173                                 keepemptydirs(p[c])
174
175                 keepemptydirs(vwd)
176
177                 if not runtimeContext.current_container:
178                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
179                 info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
180                 vwd.save_new(name=info["name"],
181                              owner_uuid=runtimeContext.project_uuid,
182                              ensure_unique_name=True,
183                              trash_at=info["trash_at"],
184                              properties=info["properties"])
185
186                 prev = None
187                 for f, p in sorteditems:
188                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
189                         (prev is not None and p.target.startswith(prev))):
190                         continue
191                     mountpoint = "%s/%s" % (self.outdir, p.target)
192                     mounts[mountpoint] = {"kind": "collection",
193                                           "portable_data_hash": vwd.portable_data_hash(),
194                                           "path": p.target}
195                     if p.type.startswith("Writable"):
196                         mounts[mountpoint]["writable"] = True
197                     prev = p.target + "/"
198
199         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
200         if self.environment:
201             container_request["environment"].update(self.environment)
202
203         if self.stdin:
204             sp = self.stdin[6:].split("/", 1)
205             mounts["stdin"] = {"kind": "collection",
206                                 "portable_data_hash": sp[0],
207                                 "path": sp[1]}
208
209         if self.stderr:
210             mounts["stderr"] = {"kind": "file",
211                                 "path": "%s/%s" % (self.outdir, self.stderr)}
212
213         if self.stdout:
214             mounts["stdout"] = {"kind": "file",
215                                 "path": "%s/%s" % (self.outdir, self.stdout)}
216
217         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
218         if not docker_req:
219             docker_req = {"dockerImageId": "arvados/jobs"}
220
221         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
222                                                                     docker_req,
223                                                                     runtimeContext.pull_image,
224                                                                     runtimeContext.project_uuid)
225
226         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
227         if api_req:
228             runtime_constraints["API"] = True
229
230         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
231         if runtime_req:
232             if "keep_cache" in runtime_req:
233                 runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
234             if "outputDirType" in runtime_req:
235                 if runtime_req["outputDirType"] == "local_output_dir":
236                     # Currently the default behavior.
237                     pass
238                 elif runtime_req["outputDirType"] == "keep_output_dir":
239                     mounts[self.outdir]= {
240                         "kind": "collection",
241                         "writable": True
242                     }
243
244         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
245         if partition_req:
246             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
247
248         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
249         if intermediate_output_req:
250             self.output_ttl = intermediate_output_req["outputTTL"]
251         else:
252             self.output_ttl = self.arvrunner.intermediate_output_ttl
253
254         if self.output_ttl < 0:
255             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
256
257         if self.timelimit is not None:
258             scheduling_parameters["max_run_time"] = self.timelimit
259
260         extra_submit_params = {}
261         if runtimeContext.submit_runner_cluster:
262             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
263
264         container_request["output_name"] = "Output for step %s" % (self.name)
265         container_request["output_ttl"] = self.output_ttl
266         container_request["mounts"] = mounts
267         container_request["secret_mounts"] = secret_mounts
268         container_request["runtime_constraints"] = runtime_constraints
269         container_request["scheduling_parameters"] = scheduling_parameters
270
271         enable_reuse = runtimeContext.enable_reuse
272         if enable_reuse:
273             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
274             if reuse_req:
275                 enable_reuse = reuse_req["enableReuse"]
276         container_request["use_existing"] = enable_reuse
277
278         if runtimeContext.runnerjob.startswith("arvwf:"):
279             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
280             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
281             if container_request["name"] == "main":
282                 container_request["name"] = wfrecord["name"]
283             container_request["properties"]["template_uuid"] = wfuuid
284
285         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
286
287         try:
288             if runtimeContext.submit_request_uuid:
289                 response = self.arvrunner.api.container_requests().update(
290                     uuid=runtimeContext.submit_request_uuid,
291                     body=container_request,
292                     **extra_submit_params
293                 ).execute(num_retries=self.arvrunner.num_retries)
294             else:
295                 response = self.arvrunner.api.container_requests().create(
296                     body=container_request,
297                     **extra_submit_params
298                 ).execute(num_retries=self.arvrunner.num_retries)
299
300             self.uuid = response["uuid"]
301             self.arvrunner.process_submitted(self)
302
303             if response["state"] == "Final":
304                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
305             else:
306                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
307         except Exception as e:
308             logger.error("%s got error %s" % (self.arvrunner.label(self), str(e)))
309             self.output_callback({}, "permanentFail")
310
311     def done(self, record):
312         outputs = {}
313         try:
314             container = self.arvrunner.api.containers().get(
315                 uuid=record["container_uuid"]
316             ).execute(num_retries=self.arvrunner.num_retries)
317             if container["state"] == "Complete":
318                 rcode = container["exit_code"]
319                 if self.successCodes and rcode in self.successCodes:
320                     processStatus = "success"
321                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
322                     processStatus = "temporaryFail"
323                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
324                     processStatus = "permanentFail"
325                 elif rcode == 0:
326                     processStatus = "success"
327                 else:
328                     processStatus = "permanentFail"
329             else:
330                 processStatus = "permanentFail"
331
332             if processStatus == "permanentFail":
333                 logc = arvados.collection.CollectionReader(container["log"],
334                                                            api_client=self.arvrunner.api,
335                                                            keep_client=self.arvrunner.keep_client,
336                                                            num_retries=self.arvrunner.num_retries)
337                 label = self.arvrunner.label(self)
338                 done.logtail(
339                     logc, logger.error,
340                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
341
342             if record["output_uuid"]:
343                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
344                     # Compute the trash time to avoid requesting the collection record.
345                     trash_at = ciso8601.parse_datetime_unaware(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
346                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
347                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
348                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
349                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
350                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
351                 self.arvrunner.add_intermediate_output(record["output_uuid"])
352
353             if container["output"]:
354                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
355         except WorkflowException as e:
356             logger.error("%s unable to collect output from %s:\n%s",
357                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
358             processStatus = "permanentFail"
359         except Exception as e:
360             logger.exception("%s while getting output object: %s", self.arvrunner.label(self), e)
361             processStatus = "permanentFail"
362         finally:
363             self.output_callback(outputs, processStatus)
364
365
366 class RunnerContainer(Runner):
367     """Submit and manage a container that runs arvados-cwl-runner."""
368
369     def arvados_job_spec(self, runtimeContext):
370         """Create an Arvados container request for this workflow.
371
372         The returned dict can be used to create a container passed as
373         the +body+ argument to container_requests().create().
374         """
375
376         adjustDirObjs(self.job_order, trim_listing)
377         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
378         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
379
380         secret_mounts = {}
381         for param in sorted(self.job_order.keys()):
382             if self.secret_store.has_secret(self.job_order[param]):
383                 mnt = "/secrets/s%d" % len(secret_mounts)
384                 secret_mounts[mnt] = {
385                     "kind": "text",
386                     "content": self.secret_store.retrieve(self.job_order[param])
387                 }
388                 self.job_order[param] = {"$include": mnt}
389
390         container_req = {
391             "name": self.name,
392             "output_path": "/var/spool/cwl",
393             "cwd": "/var/spool/cwl",
394             "priority": self.priority,
395             "state": "Committed",
396             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
397             "mounts": {
398                 "/var/lib/cwl/cwl.input.json": {
399                     "kind": "json",
400                     "content": self.job_order
401                 },
402                 "stdout": {
403                     "kind": "file",
404                     "path": "/var/spool/cwl/cwl.output.json"
405                 },
406                 "/var/spool/cwl": {
407                     "kind": "collection",
408                     "writable": True
409                 }
410             },
411             "secret_mounts": secret_mounts,
412             "runtime_constraints": {
413                 "vcpus": math.ceil(self.submit_runner_cores),
414                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
415                 "API": True
416             },
417             "use_existing": self.enable_reuse,
418             "properties": {}
419         }
420
421         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
422             sp = self.embedded_tool.tool["id"].split('/')
423             workflowcollection = sp[0][5:]
424             workflowname = "/".join(sp[1:])
425             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
426             container_req["mounts"]["/var/lib/cwl/workflow"] = {
427                 "kind": "collection",
428                 "portable_data_hash": "%s" % workflowcollection
429             }
430         else:
431             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
432             workflowpath = "/var/lib/cwl/workflow.json#main"
433             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
434                 "kind": "json",
435                 "content": packed
436             }
437             if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
438                 container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
439
440
441         # --local means execute the workflow instead of submitting a container request
442         # --api=containers means use the containers API
443         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
444         # --disable-validate because we already validated so don't need to do it again
445         # --eval-timeout is the timeout for javascript invocation
446         # --parallel-task-count is the number of threads to use for job submission
447         # --enable/disable-reuse sets desired job reuse
448         # --collection-cache-size sets aside memory to store collections
449         command = ["arvados-cwl-runner",
450                    "--local",
451                    "--api=containers",
452                    "--no-log-timestamps",
453                    "--disable-validate",
454                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
455                    "--thread-count=%s" % self.arvrunner.thread_count,
456                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
457                    "--collection-cache-size=%s" % self.collection_cache_size]
458
459         if self.output_name:
460             command.append("--output-name=" + self.output_name)
461             container_req["output_name"] = self.output_name
462
463         if self.output_tags:
464             command.append("--output-tags=" + self.output_tags)
465
466         if runtimeContext.debug:
467             command.append("--debug")
468
469         if runtimeContext.storage_classes != "default":
470             command.append("--storage-classes=" + runtimeContext.storage_classes)
471
472         if self.on_error:
473             command.append("--on-error=" + self.on_error)
474
475         if self.intermediate_output_ttl:
476             command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
477
478         if self.arvrunner.trash_intermediate:
479             command.append("--trash-intermediate")
480
481         if self.arvrunner.project_uuid:
482             command.append("--project-uuid="+self.arvrunner.project_uuid)
483
484         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
485
486         container_req["command"] = command
487
488         return container_req
489
490
491     def run(self, runtimeContext):
492         runtimeContext.keepprefix = "keep:"
493         job_spec = self.arvados_job_spec(runtimeContext)
494         if self.arvrunner.project_uuid:
495             job_spec["owner_uuid"] = self.arvrunner.project_uuid
496
497         extra_submit_params = {}
498         if runtimeContext.submit_runner_cluster:
499             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
500
501         if runtimeContext.submit_request_uuid:
502             if "cluster_id" in extra_submit_params:
503                 # Doesn't make sense for "update" and actually fails
504                 del extra_submit_params["cluster_id"]
505             response = self.arvrunner.api.container_requests().update(
506                 uuid=runtimeContext.submit_request_uuid,
507                 body=job_spec,
508                 **extra_submit_params
509             ).execute(num_retries=self.arvrunner.num_retries)
510         else:
511             response = self.arvrunner.api.container_requests().create(
512                 body=job_spec,
513                 **extra_submit_params
514             ).execute(num_retries=self.arvrunner.num_retries)
515
516         self.uuid = response["uuid"]
517         self.arvrunner.process_submitted(self)
518
519         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
520
521     def done(self, record):
522         try:
523             container = self.arvrunner.api.containers().get(
524                 uuid=record["container_uuid"]
525             ).execute(num_retries=self.arvrunner.num_retries)
526         except Exception as e:
527             logger.exception("%s while getting runner container: %s", self.arvrunner.label(self), e)
528             self.arvrunner.output_callback({}, "permanentFail")
529         else:
530             super(RunnerContainer, self).done(container)