13627: Fix tests after rebase
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import logging
6 import json
7 import os
8 import urllib
9 import time
10 import datetime
11 import ciso8601
12 import uuid
13
14 import ruamel.yaml as yaml
15
16 from cwltool.errors import WorkflowException
17 from cwltool.process import UnsupportedRequirement, shortname
18 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
19 from cwltool.utils import aslist
20 from cwltool.job import JobBase
21
22 import arvados.collection
23
24 from .arvdocker import arv_docker_get_image
25 from . import done
26 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
27 from .fsaccess import CollectionFetcher
28 from .pathmapper import NoFollowPathMapper, trim_listing
29 from .perf import Perf
30
31 logger = logging.getLogger('arvados.cwl-runner')
32 metrics = logging.getLogger('arvados.cwl-runner.metrics')
33
34 class ArvadosContainer(JobBase):
35     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
36
37     def __init__(self, runner,
38                  builder,   # type: Builder
39                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
40                  make_path_mapper,  # type: Callable[..., PathMapper]
41                  requirements,      # type: List[Dict[Text, Text]]
42                  hints,     # type: List[Dict[Text, Text]]
43                  name       # type: Text
44     ):
45         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
46         self.arvrunner = runner
47         self.running = False
48         self.uuid = None
49
50     def update_pipeline_component(self, r):
51         pass
52
53     def run(self, runtimeContext):
54         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
55         # which calls makeJobRunner() to get a new ArvadosContainer
56         # object.  The fields that define execution such as
57         # command_line, environment, etc are set on the
58         # ArvadosContainer object by CommandLineTool.job() before
59         # run() is called.
60
61         container_request = {
62             "command": self.command_line,
63             "name": self.name,
64             "output_path": self.outdir,
65             "cwd": self.outdir,
66             "priority": runtimeContext.priority,
67             "state": "Committed",
68             "properties": {},
69         }
70         runtime_constraints = {}
71
72         if self.arvrunner.project_uuid:
73             container_request["owner_uuid"] = self.arvrunner.project_uuid
74
75         if self.arvrunner.secret_store.has_secret(self.command_line):
76             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
77
78         if self.arvrunner.secret_store.has_secret(self.environment):
79             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
80
81         resources = self.builder.resources
82         if resources is not None:
83             runtime_constraints["vcpus"] = resources.get("cores", 1)
84             runtime_constraints["ram"] = resources.get("ram") * 2**20
85
86         mounts = {
87             self.outdir: {
88                 "kind": "tmp",
89                 "capacity": resources.get("outdirSize", 0) * 2**20
90             },
91             self.tmpdir: {
92                 "kind": "tmp",
93                 "capacity": resources.get("tmpdirSize", 0) * 2**20
94             }
95         }
96         secret_mounts = {}
97         scheduling_parameters = {}
98
99         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
100         rf.sort(key=lambda k: k.resolved)
101         prevdir = None
102         for resolved, target, tp, stg in rf:
103             if not stg:
104                 continue
105             if prevdir and target.startswith(prevdir):
106                 continue
107             if tp == "Directory":
108                 targetdir = target
109             else:
110                 targetdir = os.path.dirname(target)
111             sp = resolved.split("/", 1)
112             pdh = sp[0][5:]   # remove "keep:"
113             mounts[targetdir] = {
114                 "kind": "collection",
115                 "portable_data_hash": pdh
116             }
117             if len(sp) == 2:
118                 if tp == "Directory":
119                     path = sp[1]
120                 else:
121                     path = os.path.dirname(sp[1])
122                 if path and path != "/":
123                     mounts[targetdir]["path"] = path
124             prevdir = targetdir + "/"
125
126         with Perf(metrics, "generatefiles %s" % self.name):
127             if self.generatefiles["listing"]:
128                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
129                                                     keep_client=self.arvrunner.keep_client,
130                                                     num_retries=self.arvrunner.num_retries)
131                 generatemapper = NoFollowPathMapper([self.generatefiles], "", "",
132                                                     separateDirs=False)
133
134                 sorteditems = sorted(generatemapper.items(), None, key=lambda n: n[1].target)
135
136                 logger.debug("generatemapper is %s", sorteditems)
137
138                 with Perf(metrics, "createfiles %s" % self.name):
139                     for f, p in sorteditems:
140                         if not p.target:
141                             pass
142                         elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
143                             if p.resolved.startswith("_:"):
144                                 vwd.mkdirs(p.target)
145                             else:
146                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
147                                 vwd.copy(path, p.target, source_collection=source)
148                         elif p.type == "CreateFile":
149                             if self.arvrunner.secret_store.has_secret(p.resolved):
150                                 secret_mounts["%s/%s" % (self.outdir, p.target)] = {
151                                     "kind": "text",
152                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
153                                 }
154                             else:
155                                 with vwd.open(p.target, "w") as n:
156                                     n.write(p.resolved.encode("utf-8"))
157
158                 def keepemptydirs(p):
159                     if isinstance(p, arvados.collection.RichCollectionBase):
160                         if len(p) == 0:
161                             p.open(".keep", "w").close()
162                         else:
163                             for c in p:
164                                 keepemptydirs(p[c])
165
166                 keepemptydirs(vwd)
167
168                 with Perf(metrics, "generatefiles.save_new %s" % self.name):
169                     vwd.save_new()
170
171                 prev = None
172                 for f, p in sorteditems:
173                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
174                         (prev is not None and p.target.startswith(prev))):
175                         continue
176                     mountpoint = "%s/%s" % (self.outdir, p.target)
177                     mounts[mountpoint] = {"kind": "collection",
178                                           "portable_data_hash": vwd.portable_data_hash(),
179                                           "path": p.target}
180                     if p.type.startswith("Writable"):
181                         mounts[mountpoint]["writable"] = True
182                     prev = p.target + "/"
183
184         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
185         if self.environment:
186             container_request["environment"].update(self.environment)
187
188         if self.stdin:
189             sp = self.stdin[6:].split("/", 1)
190             mounts["stdin"] = {"kind": "collection",
191                                 "portable_data_hash": sp[0],
192                                 "path": sp[1]}
193
194         if self.stderr:
195             mounts["stderr"] = {"kind": "file",
196                                 "path": "%s/%s" % (self.outdir, self.stderr)}
197
198         if self.stdout:
199             mounts["stdout"] = {"kind": "file",
200                                 "path": "%s/%s" % (self.outdir, self.stdout)}
201
202         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
203         if not docker_req:
204             docker_req = {"dockerImageId": "arvados/jobs"}
205
206         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
207                                                                      docker_req,
208                                                                      runtimeContext.pull_image,
209                                                                      self.arvrunner.project_uuid)
210
211         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
212         if api_req:
213             runtime_constraints["API"] = True
214
215         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
216         if runtime_req:
217             if "keep_cache" in runtime_req:
218                 runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"] * 2**20
219             if "outputDirType" in runtime_req:
220                 if runtime_req["outputDirType"] == "local_output_dir":
221                     # Currently the default behavior.
222                     pass
223                 elif runtime_req["outputDirType"] == "keep_output_dir":
224                     mounts[self.outdir]= {
225                         "kind": "collection",
226                         "writable": True
227                     }
228
229         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
230         if partition_req:
231             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
232
233         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
234         if intermediate_output_req:
235             self.output_ttl = intermediate_output_req["outputTTL"]
236         else:
237             self.output_ttl = self.arvrunner.intermediate_output_ttl
238
239         if self.output_ttl < 0:
240             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
241
242         container_request["output_ttl"] = self.output_ttl
243         container_request["mounts"] = mounts
244         container_request["secret_mounts"] = secret_mounts
245         container_request["runtime_constraints"] = runtime_constraints
246         container_request["scheduling_parameters"] = scheduling_parameters
247
248         enable_reuse = runtimeContext.enable_reuse
249         if enable_reuse:
250             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
251             if reuse_req:
252                 enable_reuse = reuse_req["enableReuse"]
253         container_request["use_existing"] = enable_reuse
254
255         if runtimeContext.runnerjob.startswith("arvwf:"):
256             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
257             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
258             if container_request["name"] == "main":
259                 container_request["name"] = wfrecord["name"]
260             container_request["properties"]["template_uuid"] = wfuuid
261
262         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
263
264         try:
265             if runtimeContext.submit_request_uuid:
266                 response = self.arvrunner.api.container_requests().update(
267                     uuid=runtimeContext.submit_request_uuid,
268                     body=container_request
269                 ).execute(num_retries=self.arvrunner.num_retries)
270             else:
271                 response = self.arvrunner.api.container_requests().create(
272                     body=container_request
273                 ).execute(num_retries=self.arvrunner.num_retries)
274
275             self.uuid = response["uuid"]
276             self.arvrunner.process_submitted(self)
277
278             if response["state"] == "Final":
279                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
280             else:
281                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
282         except Exception as e:
283             logger.error("%s got error %s" % (self.arvrunner.label(self), str(e)))
284             self.output_callback({}, "permanentFail")
285
286     def done(self, record):
287         outputs = {}
288         try:
289             container = self.arvrunner.api.containers().get(
290                 uuid=record["container_uuid"]
291             ).execute(num_retries=self.arvrunner.num_retries)
292             if container["state"] == "Complete":
293                 rcode = container["exit_code"]
294                 if self.successCodes and rcode in self.successCodes:
295                     processStatus = "success"
296                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
297                     processStatus = "temporaryFail"
298                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
299                     processStatus = "permanentFail"
300                 elif rcode == 0:
301                     processStatus = "success"
302                 else:
303                     processStatus = "permanentFail"
304             else:
305                 processStatus = "permanentFail"
306
307             if processStatus == "permanentFail":
308                 logc = arvados.collection.CollectionReader(container["log"],
309                                                            api_client=self.arvrunner.api,
310                                                            keep_client=self.arvrunner.keep_client,
311                                                            num_retries=self.arvrunner.num_retries)
312                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
313
314             if record["output_uuid"]:
315                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
316                     # Compute the trash time to avoid requesting the collection record.
317                     trash_at = ciso8601.parse_datetime_unaware(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
318                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
319                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
320                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
321                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
322                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
323                 self.arvrunner.add_intermediate_output(record["output_uuid"])
324
325             if container["output"]:
326                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
327         except WorkflowException as e:
328             logger.error("%s unable to collect output from %s:\n%s",
329                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
330             processStatus = "permanentFail"
331         except Exception as e:
332             logger.exception("%s while getting output object: %s", self.arvrunner.label(self), e)
333             processStatus = "permanentFail"
334         finally:
335             self.output_callback(outputs, processStatus)
336
337
338 class RunnerContainer(Runner):
339     """Submit and manage a container that runs arvados-cwl-runner."""
340
341     def arvados_job_spec(self, runtimeContext):
342         """Create an Arvados container request for this workflow.
343
344         The returned dict can be used to create a container passed as
345         the +body+ argument to container_requests().create().
346         """
347
348         adjustDirObjs(self.job_order, trim_listing)
349         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
350         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
351
352         secret_mounts = {}
353         for param in sorted(self.job_order.keys()):
354             if self.secret_store.has_secret(self.job_order[param]):
355                 mnt = "/secrets/s%d" % len(secret_mounts)
356                 secret_mounts[mnt] = {
357                     "kind": "text",
358                     "content": self.secret_store.retrieve(self.job_order[param])
359                 }
360                 self.job_order[param] = {"$include": mnt}
361
362         container_req = {
363             "name": self.name,
364             "output_path": "/var/spool/cwl",
365             "cwd": "/var/spool/cwl",
366             "priority": self.priority,
367             "state": "Committed",
368             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
369             "mounts": {
370                 "/var/lib/cwl/cwl.input.json": {
371                     "kind": "json",
372                     "content": self.job_order
373                 },
374                 "stdout": {
375                     "kind": "file",
376                     "path": "/var/spool/cwl/cwl.output.json"
377                 },
378                 "/var/spool/cwl": {
379                     "kind": "collection",
380                     "writable": True
381                 }
382             },
383             "secret_mounts": secret_mounts,
384             "runtime_constraints": {
385                 "vcpus": 1,
386                 "ram": 1024*1024 * self.submit_runner_ram,
387                 "API": True
388             },
389             "use_existing": self.enable_reuse,
390             "properties": {}
391         }
392
393         if self.tool.tool.get("id", "").startswith("keep:"):
394             sp = self.tool.tool["id"].split('/')
395             workflowcollection = sp[0][5:]
396             workflowname = "/".join(sp[1:])
397             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
398             container_req["mounts"]["/var/lib/cwl/workflow"] = {
399                 "kind": "collection",
400                 "portable_data_hash": "%s" % workflowcollection
401             }
402         else:
403             packed = packed_workflow(self.arvrunner, self.tool, self.merged_map)
404             workflowpath = "/var/lib/cwl/workflow.json#main"
405             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
406                 "kind": "json",
407                 "content": packed
408             }
409             if self.tool.tool.get("id", "").startswith("arvwf:"):
410                 container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
411
412
413         # --local means execute the workflow instead of submitting a container request
414         # --api=containers means use the containers API
415         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
416         # --disable-validate because we already validated so don't need to do it again
417         # --eval-timeout is the timeout for javascript invocation
418         # --parallel-task-count is the number of threads to use for job submission
419         # --enable/disable-reuse sets desired job reuse
420         command = ["arvados-cwl-runner",
421                    "--local",
422                    "--api=containers",
423                    "--no-log-timestamps",
424                    "--disable-validate",
425                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
426                    "--thread-count=%s" % self.arvrunner.thread_count,
427                    "--enable-reuse" if self.enable_reuse else "--disable-reuse"]
428
429         if self.output_name:
430             command.append("--output-name=" + self.output_name)
431             container_req["output_name"] = self.output_name
432
433         if self.output_tags:
434             command.append("--output-tags=" + self.output_tags)
435
436         if runtimeContext.debug:
437             command.append("--debug")
438
439         if runtimeContext.storage_classes != "default":
440             command.append("--storage-classes=" + runtimeContext.storage_classes)
441
442         if self.on_error:
443             command.append("--on-error=" + self.on_error)
444
445         if self.intermediate_output_ttl:
446             command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
447
448         if self.arvrunner.trash_intermediate:
449             command.append("--trash-intermediate")
450
451         if self.arvrunner.project_uuid:
452             command.append("--project-uuid="+self.arvrunner.project_uuid)
453
454         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
455
456         container_req["command"] = command
457
458         return container_req
459
460
461     def run(self, runtimeContext):
462         runtimeContext.keepprefix = "keep:"
463         job_spec = self.arvados_job_spec(runtimeContext)
464         if self.arvrunner.project_uuid:
465             job_spec["owner_uuid"] = self.arvrunner.project_uuid
466
467         if runtimeContext.submit_request_uuid:
468             response = self.arvrunner.api.container_requests().update(
469                 uuid=runtimeContext.submit_request_uuid,
470                 body=job_spec
471             ).execute(num_retries=self.arvrunner.num_retries)
472         else:
473             response = self.arvrunner.api.container_requests().create(
474                 body=job_spec
475             ).execute(num_retries=self.arvrunner.num_retries)
476
477         self.uuid = response["uuid"]
478         self.arvrunner.process_submitted(self)
479
480         logger.info("%s submitted container %s", self.arvrunner.label(self), response["uuid"])
481
482     def done(self, record):
483         try:
484             container = self.arvrunner.api.containers().get(
485                 uuid=record["container_uuid"]
486             ).execute(num_retries=self.arvrunner.num_retries)
487         except Exception as e:
488             logger.exception("%s while getting runner container: %s", self.arvrunner.label(self), e)
489             self.arvrunner.output_callback({}, "permanentFail")
490         else:
491             super(RunnerContainer, self).done(container)