14198: Support expressions in TargetCluster[clusterID, ownerUUID]
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import logging
6 import json
7 import os
8 import urllib
9 import time
10 import datetime
11 import ciso8601
12 import uuid
13 import math
14
15 from arvados_cwl.util import get_current_container, get_intermediate_collection_info
16 import ruamel.yaml as yaml
17
18 from cwltool.errors import WorkflowException
19 from cwltool.process import UnsupportedRequirement, shortname
20 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
21 from cwltool.utils import aslist
22 from cwltool.job import JobBase
23
24 import arvados.collection
25
26 from .arvdocker import arv_docker_get_image
27 from . import done
28 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
29 from .fsaccess import CollectionFetcher
30 from .pathmapper import NoFollowPathMapper, trim_listing
31 from .perf import Perf
32
33 logger = logging.getLogger('arvados.cwl-runner')
34 metrics = logging.getLogger('arvados.cwl-runner.metrics')
35
36 class ArvadosContainer(JobBase):
37     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
38
39     def __init__(self, runner, cluster_target,
40                  builder,   # type: Builder
41                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
42                  make_path_mapper,  # type: Callable[..., PathMapper]
43                  requirements,      # type: List[Dict[Text, Text]]
44                  hints,     # type: List[Dict[Text, Text]]
45                  name       # type: Text
46     ):
47         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
48         self.arvrunner = runner
49         self.cluster_target = cluster_target
50         self.running = False
51         self.uuid = None
52
53     def update_pipeline_component(self, r):
54         pass
55
56     def run(self, runtimeContext):
57         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
58         # which calls makeJobRunner() to get a new ArvadosContainer
59         # object.  The fields that define execution such as
60         # command_line, environment, etc are set on the
61         # ArvadosContainer object by CommandLineTool.job() before
62         # run() is called.
63
64         container_request = {
65             "command": self.command_line,
66             "name": self.name,
67             "output_path": self.outdir,
68             "cwd": self.outdir,
69             "priority": runtimeContext.priority,
70             "state": "Committed",
71             "properties": {},
72         }
73         runtime_constraints = {}
74
75         if self.arvrunner.project_uuid:
76             container_request["owner_uuid"] = self.arvrunner.project_uuid
77
78         if self.arvrunner.secret_store.has_secret(self.command_line):
79             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
80
81         if self.arvrunner.secret_store.has_secret(self.environment):
82             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
83
84         resources = self.builder.resources
85         if resources is not None:
86             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
87             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
88
89         mounts = {
90             self.outdir: {
91                 "kind": "tmp",
92                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
93             },
94             self.tmpdir: {
95                 "kind": "tmp",
96                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
97             }
98         }
99         secret_mounts = {}
100         scheduling_parameters = {}
101
102         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
103         rf.sort(key=lambda k: k.resolved)
104         prevdir = None
105         for resolved, target, tp, stg in rf:
106             if not stg:
107                 continue
108             if prevdir and target.startswith(prevdir):
109                 continue
110             if tp == "Directory":
111                 targetdir = target
112             else:
113                 targetdir = os.path.dirname(target)
114             sp = resolved.split("/", 1)
115             pdh = sp[0][5:]   # remove "keep:"
116             mounts[targetdir] = {
117                 "kind": "collection",
118                 "portable_data_hash": pdh
119             }
120             if len(sp) == 2:
121                 if tp == "Directory":
122                     path = sp[1]
123                 else:
124                     path = os.path.dirname(sp[1])
125                 if path and path != "/":
126                     mounts[targetdir]["path"] = path
127             prevdir = targetdir + "/"
128
129         with Perf(metrics, "generatefiles %s" % self.name):
130             if self.generatefiles["listing"]:
131                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
132                                                     keep_client=self.arvrunner.keep_client,
133                                                     num_retries=self.arvrunner.num_retries)
134                 generatemapper = NoFollowPathMapper([self.generatefiles], "", "",
135                                                     separateDirs=False)
136
137                 sorteditems = sorted(generatemapper.items(), None, key=lambda n: n[1].target)
138
139                 logger.debug("generatemapper is %s", sorteditems)
140
141                 with Perf(metrics, "createfiles %s" % self.name):
142                     for f, p in sorteditems:
143                         if not p.target:
144                             pass
145                         elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
146                             if p.resolved.startswith("_:"):
147                                 vwd.mkdirs(p.target)
148                             else:
149                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
150                                 vwd.copy(path, p.target, source_collection=source)
151                         elif p.type == "CreateFile":
152                             if self.arvrunner.secret_store.has_secret(p.resolved):
153                                 secret_mounts["%s/%s" % (self.outdir, p.target)] = {
154                                     "kind": "text",
155                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
156                                 }
157                             else:
158                                 with vwd.open(p.target, "w") as n:
159                                     n.write(p.resolved.encode("utf-8"))
160
161                 def keepemptydirs(p):
162                     if isinstance(p, arvados.collection.RichCollectionBase):
163                         if len(p) == 0:
164                             p.open(".keep", "w").close()
165                         else:
166                             for c in p:
167                                 keepemptydirs(p[c])
168
169                 keepemptydirs(vwd)
170
171                 if not runtimeContext.current_container:
172                     runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
173                 info = get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
174                 vwd.save_new(name=info["name"],
175                              owner_uuid=self.arvrunner.project_uuid,
176                              ensure_unique_name=True,
177                              trash_at=info["trash_at"],
178                              properties=info["properties"])
179
180                 prev = None
181                 for f, p in sorteditems:
182                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
183                         (prev is not None and p.target.startswith(prev))):
184                         continue
185                     mountpoint = "%s/%s" % (self.outdir, p.target)
186                     mounts[mountpoint] = {"kind": "collection",
187                                           "portable_data_hash": vwd.portable_data_hash(),
188                                           "path": p.target}
189                     if p.type.startswith("Writable"):
190                         mounts[mountpoint]["writable"] = True
191                     prev = p.target + "/"
192
193         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
194         if self.environment:
195             container_request["environment"].update(self.environment)
196
197         if self.stdin:
198             sp = self.stdin[6:].split("/", 1)
199             mounts["stdin"] = {"kind": "collection",
200                                 "portable_data_hash": sp[0],
201                                 "path": sp[1]}
202
203         if self.stderr:
204             mounts["stderr"] = {"kind": "file",
205                                 "path": "%s/%s" % (self.outdir, self.stderr)}
206
207         if self.stdout:
208             mounts["stdout"] = {"kind": "file",
209                                 "path": "%s/%s" % (self.outdir, self.stdout)}
210
211         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
212         if not docker_req:
213             docker_req = {"dockerImageId": "arvados/jobs"}
214
215         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
216                                                                     docker_req,
217                                                                     runtimeContext.pull_image,
218                                                                     self.arvrunner.project_uuid)
219
220         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
221         if api_req:
222             runtime_constraints["API"] = True
223
224         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
225         if runtime_req:
226             if "keep_cache" in runtime_req:
227                 runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
228             if "outputDirType" in runtime_req:
229                 if runtime_req["outputDirType"] == "local_output_dir":
230                     # Currently the default behavior.
231                     pass
232                 elif runtime_req["outputDirType"] == "keep_output_dir":
233                     mounts[self.outdir]= {
234                         "kind": "collection",
235                         "writable": True
236                     }
237
238         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
239         if partition_req:
240             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
241
242         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
243         if intermediate_output_req:
244             self.output_ttl = intermediate_output_req["outputTTL"]
245         else:
246             self.output_ttl = self.arvrunner.intermediate_output_ttl
247
248         if self.output_ttl < 0:
249             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
250
251         if self.timelimit is not None:
252             scheduling_parameters["max_run_time"] = self.timelimit
253
254         extra_submit_params = {}
255         if self.cluster_target is not None:
256             if self.cluster_target.cluster_id:
257                 extra_submit_params["cluster_id"] = self.cluster_target.cluster_id
258             if self.cluster_target.owner_uuid:
259                 container_request["owner_uuid"] = self.cluster_target.owner_uuid
260
261         container_request["output_name"] = "Output for step %s" % (self.name)
262         container_request["output_ttl"] = self.output_ttl
263         container_request["mounts"] = mounts
264         container_request["secret_mounts"] = secret_mounts
265         container_request["runtime_constraints"] = runtime_constraints
266         container_request["scheduling_parameters"] = scheduling_parameters
267
268         enable_reuse = runtimeContext.enable_reuse
269         if enable_reuse:
270             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
271             if reuse_req:
272                 enable_reuse = reuse_req["enableReuse"]
273         container_request["use_existing"] = enable_reuse
274
275         if runtimeContext.runnerjob.startswith("arvwf:"):
276             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
277             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
278             if container_request["name"] == "main":
279                 container_request["name"] = wfrecord["name"]
280             container_request["properties"]["template_uuid"] = wfuuid
281
282         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
283
284         try:
285             if runtimeContext.submit_request_uuid:
286                 response = self.arvrunner.api.container_requests().update(
287                     uuid=runtimeContext.submit_request_uuid,
288                     body=container_request,
289                     **extra_submit_params
290                 ).execute(num_retries=self.arvrunner.num_retries)
291             else:
292                 response = self.arvrunner.api.container_requests().create(
293                     body=container_request,
294                     **extra_submit_params
295                 ).execute(num_retries=self.arvrunner.num_retries)
296
297             self.uuid = response["uuid"]
298             self.arvrunner.process_submitted(self)
299
300             if response["state"] == "Final":
301                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
302             else:
303                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
304         except Exception as e:
305             logger.error("%s got error %s" % (self.arvrunner.label(self), str(e)))
306             self.output_callback({}, "permanentFail")
307
308     def done(self, record):
309         outputs = {}
310         try:
311             container = self.arvrunner.api.containers().get(
312                 uuid=record["container_uuid"]
313             ).execute(num_retries=self.arvrunner.num_retries)
314             if container["state"] == "Complete":
315                 rcode = container["exit_code"]
316                 if self.successCodes and rcode in self.successCodes:
317                     processStatus = "success"
318                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
319                     processStatus = "temporaryFail"
320                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
321                     processStatus = "permanentFail"
322                 elif rcode == 0:
323                     processStatus = "success"
324                 else:
325                     processStatus = "permanentFail"
326             else:
327                 processStatus = "permanentFail"
328
329             if processStatus == "permanentFail":
330                 logc = arvados.collection.CollectionReader(container["log"],
331                                                            api_client=self.arvrunner.api,
332                                                            keep_client=self.arvrunner.keep_client,
333                                                            num_retries=self.arvrunner.num_retries)
334                 label = self.arvrunner.label(self)
335                 done.logtail(
336                     logc, logger.error,
337                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
338
339             if record["output_uuid"]:
340                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
341                     # Compute the trash time to avoid requesting the collection record.
342                     trash_at = ciso8601.parse_datetime_unaware(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
343                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
344                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
345                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
346                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
347                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
348                 self.arvrunner.add_intermediate_output(record["output_uuid"])
349
350             if container["output"]:
351                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
352         except WorkflowException as e:
353             logger.error("%s unable to collect output from %s:\n%s",
354                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
355             processStatus = "permanentFail"
356         except Exception as e:
357             logger.exception("%s while getting output object: %s", self.arvrunner.label(self), e)
358             processStatus = "permanentFail"
359         finally:
360             self.output_callback(outputs, processStatus)
361
362
363 class RunnerContainer(Runner):
364     """Submit and manage a container that runs arvados-cwl-runner."""
365
366     def arvados_job_spec(self, runtimeContext):
367         """Create an Arvados container request for this workflow.
368
369         The returned dict can be used to create a container passed as
370         the +body+ argument to container_requests().create().
371         """
372
373         adjustDirObjs(self.job_order, trim_listing)
374         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
375         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
376
377         secret_mounts = {}
378         for param in sorted(self.job_order.keys()):
379             if self.secret_store.has_secret(self.job_order[param]):
380                 mnt = "/secrets/s%d" % len(secret_mounts)
381                 secret_mounts[mnt] = {
382                     "kind": "text",
383                     "content": self.secret_store.retrieve(self.job_order[param])
384                 }
385                 self.job_order[param] = {"$include": mnt}
386
387         container_req = {
388             "name": self.name,
389             "output_path": "/var/spool/cwl",
390             "cwd": "/var/spool/cwl",
391             "priority": self.priority,
392             "state": "Committed",
393             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
394             "mounts": {
395                 "/var/lib/cwl/cwl.input.json": {
396                     "kind": "json",
397                     "content": self.job_order
398                 },
399                 "stdout": {
400                     "kind": "file",
401                     "path": "/var/spool/cwl/cwl.output.json"
402                 },
403                 "/var/spool/cwl": {
404                     "kind": "collection",
405                     "writable": True
406                 }
407             },
408             "secret_mounts": secret_mounts,
409             "runtime_constraints": {
410                 "vcpus": math.ceil(self.submit_runner_cores),
411                 "ram": math.ceil(1024*1024 * self.submit_runner_ram),
412                 "API": True
413             },
414             "use_existing": self.enable_reuse,
415             "properties": {}
416         }
417
418         if self.tool.tool.get("id", "").startswith("keep:"):
419             sp = self.tool.tool["id"].split('/')
420             workflowcollection = sp[0][5:]
421             workflowname = "/".join(sp[1:])
422             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
423             container_req["mounts"]["/var/lib/cwl/workflow"] = {
424                 "kind": "collection",
425                 "portable_data_hash": "%s" % workflowcollection
426             }
427         else:
428             packed = packed_workflow(self.arvrunner, self.tool, self.merged_map)
429             workflowpath = "/var/lib/cwl/workflow.json#main"
430             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
431                 "kind": "json",
432                 "content": packed
433             }
434             if self.tool.tool.get("id", "").startswith("arvwf:"):
435                 container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
436
437
438         # --local means execute the workflow instead of submitting a container request
439         # --api=containers means use the containers API
440         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
441         # --disable-validate because we already validated so don't need to do it again
442         # --eval-timeout is the timeout for javascript invocation
443         # --parallel-task-count is the number of threads to use for job submission
444         # --enable/disable-reuse sets desired job reuse
445         command = ["arvados-cwl-runner",
446                    "--local",
447                    "--api=containers",
448                    "--no-log-timestamps",
449                    "--disable-validate",
450                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
451                    "--thread-count=%s" % self.arvrunner.thread_count,
452                    "--enable-reuse" if self.enable_reuse else "--disable-reuse"]
453
454         if self.output_name:
455             command.append("--output-name=" + self.output_name)
456             container_req["output_name"] = self.output_name
457
458         if self.output_tags:
459             command.append("--output-tags=" + self.output_tags)
460
461         if runtimeContext.debug:
462             command.append("--debug")
463
464         if runtimeContext.storage_classes != "default":
465             command.append("--storage-classes=" + runtimeContext.storage_classes)
466
467         if self.on_error:
468             command.append("--on-error=" + self.on_error)
469
470         if self.intermediate_output_ttl:
471             command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
472
473         if self.arvrunner.trash_intermediate:
474             command.append("--trash-intermediate")
475
476         if self.arvrunner.project_uuid:
477             command.append("--project-uuid="+self.arvrunner.project_uuid)
478
479         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
480
481         container_req["command"] = command
482
483         return container_req
484
485
486     def run(self, runtimeContext):
487         runtimeContext.keepprefix = "keep:"
488         job_spec = self.arvados_job_spec(runtimeContext)
489         if self.arvrunner.project_uuid:
490             job_spec["owner_uuid"] = self.arvrunner.project_uuid
491
492         extra_submit_params = {}
493         if runtimeContext.submit_runner_cluster:
494             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
495
496         if runtimeContext.submit_request_uuid:
497             response = self.arvrunner.api.container_requests().update(
498                 uuid=runtimeContext.submit_request_uuid,
499                 body=job_spec,
500                 **extra_submit_params
501             ).execute(num_retries=self.arvrunner.num_retries)
502         else:
503             response = self.arvrunner.api.container_requests().create(
504                 body=job_spec,
505                 **extra_submit_params
506             ).execute(num_retries=self.arvrunner.num_retries)
507
508         self.uuid = response["uuid"]
509         self.arvrunner.process_submitted(self)
510
511         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
512
513     def done(self, record):
514         try:
515             container = self.arvrunner.api.containers().get(
516                 uuid=record["container_uuid"]
517             ).execute(num_retries=self.arvrunner.num_retries)
518         except Exception as e:
519             logger.exception("%s while getting runner container: %s", self.arvrunner.label(self), e)
520             self.arvrunner.output_callback({}, "permanentFail")
521         else:
522             super(RunnerContainer, self).done(container)