Merge branch 'master' of git.curoverse.com:arvados into 13330-cwl-intermediate-collec...
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import logging
6 import json
7 import os
8 import urllib
9 import time
10 import datetime
11 import ciso8601
12 import uuid
13
14 import ruamel.yaml as yaml
15
16 from cwltool.errors import WorkflowException
17 from cwltool.process import UnsupportedRequirement, shortname
18 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
19 from cwltool.utils import aslist
20 from cwltool.job import JobBase
21
22 import arvados.collection
23
24 from arvados.errors import ApiError
25 from .arvdocker import arv_docker_get_image
26 from . import done
27 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
28 from .fsaccess import CollectionFetcher
29 from .pathmapper import NoFollowPathMapper, trim_listing
30 from .perf import Perf
31
32 logger = logging.getLogger('arvados.cwl-runner')
33 metrics = logging.getLogger('arvados.cwl-runner.metrics')
34
35 class ArvadosContainer(JobBase):
36     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
37
38     def __init__(self, runner,
39                  builder,   # type: Builder
40                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
41                  make_path_mapper,  # type: Callable[..., PathMapper]
42                  requirements,      # type: List[Dict[Text, Text]]
43                  hints,     # type: List[Dict[Text, Text]]
44                  name       # type: Text
45     ):
46         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
47         self.arvrunner = runner
48         self.running = False
49         self.uuid = None
50
51     def update_pipeline_component(self, r):
52         pass
53
54     def run(self, runtimeContext):
55         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
56         # which calls makeJobRunner() to get a new ArvadosContainer
57         # object.  The fields that define execution such as
58         # command_line, environment, etc are set on the
59         # ArvadosContainer object by CommandLineTool.job() before
60         # run() is called.
61
62         container_request = {
63             "command": self.command_line,
64             "name": self.name,
65             "output_path": self.outdir,
66             "cwd": self.outdir,
67             "priority": runtimeContext.priority,
68             "state": "Committed",
69             "properties": {},
70         }
71         runtime_constraints = {}
72
73         if self.arvrunner.project_uuid:
74             container_request["owner_uuid"] = self.arvrunner.project_uuid
75
76         if self.arvrunner.secret_store.has_secret(self.command_line):
77             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
78
79         if self.arvrunner.secret_store.has_secret(self.environment):
80             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
81
82         resources = self.builder.resources
83         if resources is not None:
84             runtime_constraints["vcpus"] = resources.get("cores", 1)
85             runtime_constraints["ram"] = resources.get("ram") * 2**20
86
87         mounts = {
88             self.outdir: {
89                 "kind": "tmp",
90                 "capacity": resources.get("outdirSize", 0) * 2**20
91             },
92             self.tmpdir: {
93                 "kind": "tmp",
94                 "capacity": resources.get("tmpdirSize", 0) * 2**20
95             }
96         }
97         secret_mounts = {}
98         scheduling_parameters = {}
99
100         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
101         rf.sort(key=lambda k: k.resolved)
102         prevdir = None
103         for resolved, target, tp, stg in rf:
104             if not stg:
105                 continue
106             if prevdir and target.startswith(prevdir):
107                 continue
108             if tp == "Directory":
109                 targetdir = target
110             else:
111                 targetdir = os.path.dirname(target)
112             sp = resolved.split("/", 1)
113             pdh = sp[0][5:]   # remove "keep:"
114             mounts[targetdir] = {
115                 "kind": "collection",
116                 "portable_data_hash": pdh
117             }
118             if len(sp) == 2:
119                 if tp == "Directory":
120                     path = sp[1]
121                 else:
122                     path = os.path.dirname(sp[1])
123                 if path and path != "/":
124                     mounts[targetdir]["path"] = path
125             prevdir = targetdir + "/"
126
127         with Perf(metrics, "generatefiles %s" % self.name):
128             if self.generatefiles["listing"]:
129                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
130                                                     keep_client=self.arvrunner.keep_client,
131                                                     num_retries=self.arvrunner.num_retries)
132                 generatemapper = NoFollowPathMapper([self.generatefiles], "", "",
133                                                     separateDirs=False)
134
135                 sorteditems = sorted(generatemapper.items(), None, key=lambda n: n[1].target)
136
137                 logger.debug("generatemapper is %s", sorteditems)
138
139                 with Perf(metrics, "createfiles %s" % self.name):
140                     for f, p in sorteditems:
141                         if not p.target:
142                             pass
143                         elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
144                             if p.resolved.startswith("_:"):
145                                 vwd.mkdirs(p.target)
146                             else:
147                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
148                                 vwd.copy(path, p.target, source_collection=source)
149                         elif p.type == "CreateFile":
150                             if self.arvrunner.secret_store.has_secret(p.resolved):
151                                 secret_mounts["%s/%s" % (self.outdir, p.target)] = {
152                                     "kind": "text",
153                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
154                                 }
155                             else:
156                                 with vwd.open(p.target, "w") as n:
157                                     n.write(p.resolved.encode("utf-8"))
158
159                 def keepemptydirs(p):
160                     if isinstance(p, arvados.collection.RichCollectionBase):
161                         if len(p) == 0:
162                             p.open(".keep", "w").close()
163                         else:
164                             for c in p:
165                                 keepemptydirs(p[c])
166
167                 keepemptydirs(vwd)
168
169                 info = self._get_intermediate_collection_info()
170                 vwd.save_new(name=info["name"],
171                              ensure_unique_name=True,
172                              trash_at=info["trash_at"],
173                              properties=info["properties"])
174
175                 prev = None
176                 for f, p in sorteditems:
177                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
178                         (prev is not None and p.target.startswith(prev))):
179                         continue
180                     mountpoint = "%s/%s" % (self.outdir, p.target)
181                     mounts[mountpoint] = {"kind": "collection",
182                                           "portable_data_hash": vwd.portable_data_hash(),
183                                           "path": p.target}
184                     if p.type.startswith("Writable"):
185                         mounts[mountpoint]["writable"] = True
186                     prev = p.target + "/"
187
188         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
189         if self.environment:
190             container_request["environment"].update(self.environment)
191
192         if self.stdin:
193             sp = self.stdin[6:].split("/", 1)
194             mounts["stdin"] = {"kind": "collection",
195                                 "portable_data_hash": sp[0],
196                                 "path": sp[1]}
197
198         if self.stderr:
199             mounts["stderr"] = {"kind": "file",
200                                 "path": "%s/%s" % (self.outdir, self.stderr)}
201
202         if self.stdout:
203             mounts["stdout"] = {"kind": "file",
204                                 "path": "%s/%s" % (self.outdir, self.stdout)}
205
206         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
207         if not docker_req:
208             docker_req = {"dockerImageId": "arvados/jobs"}
209
210         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
211                                                                      docker_req,
212                                                                      runtimeContext.pull_image,
213                                                                      self.arvrunner.project_uuid)
214
215         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
216         if api_req:
217             runtime_constraints["API"] = True
218
219         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
220         if runtime_req:
221             if "keep_cache" in runtime_req:
222                 runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"] * 2**20
223             if "outputDirType" in runtime_req:
224                 if runtime_req["outputDirType"] == "local_output_dir":
225                     # Currently the default behavior.
226                     pass
227                 elif runtime_req["outputDirType"] == "keep_output_dir":
228                     mounts[self.outdir]= {
229                         "kind": "collection",
230                         "writable": True
231                     }
232
233         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
234         if partition_req:
235             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
236
237         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
238         if intermediate_output_req:
239             self.output_ttl = intermediate_output_req["outputTTL"]
240         else:
241             self.output_ttl = self.arvrunner.intermediate_output_ttl
242
243         if self.output_ttl < 0:
244             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
245
246         if self.timelimit is not None:
247             scheduling_parameters["max_run_time"] = self.timelimit
248
249         container_request["output_ttl"] = self.output_ttl
250         container_request["mounts"] = mounts
251         container_request["secret_mounts"] = secret_mounts
252         container_request["runtime_constraints"] = runtime_constraints
253         container_request["scheduling_parameters"] = scheduling_parameters
254
255         enable_reuse = runtimeContext.enable_reuse
256         if enable_reuse:
257             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
258             if reuse_req:
259                 enable_reuse = reuse_req["enableReuse"]
260         container_request["use_existing"] = enable_reuse
261
262         if runtimeContext.runnerjob.startswith("arvwf:"):
263             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
264             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
265             if container_request["name"] == "main":
266                 container_request["name"] = wfrecord["name"]
267             container_request["properties"]["template_uuid"] = wfuuid
268
269         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
270
271         try:
272             if runtimeContext.submit_request_uuid:
273                 response = self.arvrunner.api.container_requests().update(
274                     uuid=runtimeContext.submit_request_uuid,
275                     body=container_request
276                 ).execute(num_retries=self.arvrunner.num_retries)
277             else:
278                 response = self.arvrunner.api.container_requests().create(
279                     body=container_request
280                 ).execute(num_retries=self.arvrunner.num_retries)
281
282             self.uuid = response["uuid"]
283             self.arvrunner.process_submitted(self)
284
285             if response["state"] == "Final":
286                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
287             else:
288                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
289         except Exception as e:
290             logger.error("%s got error %s" % (self.arvrunner.label(self), str(e)))
291             self.output_callback({}, "permanentFail")
292
293     def done(self, record):
294         outputs = {}
295         try:
296             container = self.arvrunner.api.containers().get(
297                 uuid=record["container_uuid"]
298             ).execute(num_retries=self.arvrunner.num_retries)
299             if container["state"] == "Complete":
300                 rcode = container["exit_code"]
301                 if self.successCodes and rcode in self.successCodes:
302                     processStatus = "success"
303                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
304                     processStatus = "temporaryFail"
305                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
306                     processStatus = "permanentFail"
307                 elif rcode == 0:
308                     processStatus = "success"
309                 else:
310                     processStatus = "permanentFail"
311             else:
312                 processStatus = "permanentFail"
313
314             if processStatus == "permanentFail":
315                 logc = arvados.collection.CollectionReader(container["log"],
316                                                            api_client=self.arvrunner.api,
317                                                            keep_client=self.arvrunner.keep_client,
318                                                            num_retries=self.arvrunner.num_retries)
319                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
320
321             if record["output_uuid"]:
322                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
323                     # Compute the trash time to avoid requesting the collection record.
324                     trash_at = ciso8601.parse_datetime_unaware(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
325                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
326                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
327                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
328                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
329                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
330                 self.arvrunner.add_intermediate_output(record["output_uuid"])
331
332             if container["output"]:
333                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
334         except WorkflowException as e:
335             logger.error("%s unable to collect output from %s:\n%s",
336                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
337             processStatus = "permanentFail"
338         except Exception as e:
339             logger.exception("%s while getting output object: %s", self.arvrunner.label(self), e)
340             processStatus = "permanentFail"
341         finally:
342             self.output_callback(outputs, processStatus)
343
344     def _get_intermediate_collection_info(self):
345             trash_time = None
346             if self.arvrunner.intermediate_output_ttl > 0:
347                 trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl)
348
349             current_container_uuid = None
350             try:
351                 current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries)
352                 current_container_uuid = current_container['uuid']
353             except ApiError as e:
354                 # Status code 404 just means we're not running in a container.
355                 if e.resp.status != 404:
356                     logger.info("Getting current container: %s", e)
357             props = {"type": "Intermediate",
358                           "container": current_container_uuid}
359
360             return {"name" : "Intermediate collection",
361                     "trash_at" : trash_time,
362                     "properties" : props}
363
364
365 class RunnerContainer(Runner):
366     """Submit and manage a container that runs arvados-cwl-runner."""
367
368     def arvados_job_spec(self, runtimeContext):
369         """Create an Arvados container request for this workflow.
370
371         The returned dict can be used to create a container passed as
372         the +body+ argument to container_requests().create().
373         """
374
375         adjustDirObjs(self.job_order, trim_listing)
376         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
377         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
378
379         secret_mounts = {}
380         for param in sorted(self.job_order.keys()):
381             if self.secret_store.has_secret(self.job_order[param]):
382                 mnt = "/secrets/s%d" % len(secret_mounts)
383                 secret_mounts[mnt] = {
384                     "kind": "text",
385                     "content": self.secret_store.retrieve(self.job_order[param])
386                 }
387                 self.job_order[param] = {"$include": mnt}
388
389         container_req = {
390             "name": self.name,
391             "output_path": "/var/spool/cwl",
392             "cwd": "/var/spool/cwl",
393             "priority": self.priority,
394             "state": "Committed",
395             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
396             "mounts": {
397                 "/var/lib/cwl/cwl.input.json": {
398                     "kind": "json",
399                     "content": self.job_order
400                 },
401                 "stdout": {
402                     "kind": "file",
403                     "path": "/var/spool/cwl/cwl.output.json"
404                 },
405                 "/var/spool/cwl": {
406                     "kind": "collection",
407                     "writable": True
408                 }
409             },
410             "secret_mounts": secret_mounts,
411             "runtime_constraints": {
412                 "vcpus": 1,
413                 "ram": 1024*1024 * self.submit_runner_ram,
414                 "API": True
415             },
416             "use_existing": self.enable_reuse,
417             "properties": {}
418         }
419
420         if self.tool.tool.get("id", "").startswith("keep:"):
421             sp = self.tool.tool["id"].split('/')
422             workflowcollection = sp[0][5:]
423             workflowname = "/".join(sp[1:])
424             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
425             container_req["mounts"]["/var/lib/cwl/workflow"] = {
426                 "kind": "collection",
427                 "portable_data_hash": "%s" % workflowcollection
428             }
429         else:
430             packed = packed_workflow(self.arvrunner, self.tool, self.merged_map)
431             workflowpath = "/var/lib/cwl/workflow.json#main"
432             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
433                 "kind": "json",
434                 "content": packed
435             }
436             if self.tool.tool.get("id", "").startswith("arvwf:"):
437                 container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
438
439
440         # --local means execute the workflow instead of submitting a container request
441         # --api=containers means use the containers API
442         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
443         # --disable-validate because we already validated so don't need to do it again
444         # --eval-timeout is the timeout for javascript invocation
445         # --parallel-task-count is the number of threads to use for job submission
446         # --enable/disable-reuse sets desired job reuse
447         command = ["arvados-cwl-runner",
448                    "--local",
449                    "--api=containers",
450                    "--no-log-timestamps",
451                    "--disable-validate",
452                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
453                    "--thread-count=%s" % self.arvrunner.thread_count,
454                    "--enable-reuse" if self.enable_reuse else "--disable-reuse"]
455
456         if self.output_name:
457             command.append("--output-name=" + self.output_name)
458             container_req["output_name"] = self.output_name
459
460         if self.output_tags:
461             command.append("--output-tags=" + self.output_tags)
462
463         if runtimeContext.debug:
464             command.append("--debug")
465
466         if runtimeContext.storage_classes != "default":
467             command.append("--storage-classes=" + runtimeContext.storage_classes)
468
469         if self.on_error:
470             command.append("--on-error=" + self.on_error)
471
472         if self.intermediate_output_ttl:
473             command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
474
475         if self.arvrunner.trash_intermediate:
476             command.append("--trash-intermediate")
477
478         if self.arvrunner.project_uuid:
479             command.append("--project-uuid="+self.arvrunner.project_uuid)
480
481         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
482
483         container_req["command"] = command
484
485         return container_req
486
487
488     def run(self, runtimeContext):
489         runtimeContext.keepprefix = "keep:"
490         job_spec = self.arvados_job_spec(runtimeContext)
491         if self.arvrunner.project_uuid:
492             job_spec["owner_uuid"] = self.arvrunner.project_uuid
493
494         if runtimeContext.submit_request_uuid:
495             response = self.arvrunner.api.container_requests().update(
496                 uuid=runtimeContext.submit_request_uuid,
497                 body=job_spec
498             ).execute(num_retries=self.arvrunner.num_retries)
499         else:
500             response = self.arvrunner.api.container_requests().create(
501                 body=job_spec
502             ).execute(num_retries=self.arvrunner.num_retries)
503
504         self.uuid = response["uuid"]
505         self.arvrunner.process_submitted(self)
506
507         logger.info("%s submitted container %s", self.arvrunner.label(self), response["uuid"])
508
509     def done(self, record):
510         try:
511             container = self.arvrunner.api.containers().get(
512                 uuid=record["container_uuid"]
513             ).execute(num_retries=self.arvrunner.num_retries)
514         except Exception as e:
515             logger.exception("%s while getting runner container: %s", self.arvrunner.label(self), e)
516             self.arvrunner.output_callback({}, "permanentFail")
517         else:
518             super(RunnerContainer, self).done(container)