17072: Fix imports. Use task_queue from cwltool.
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.job import JobBase
26
27 import arvados.collection
28
29 from .arvdocker import arv_docker_get_image
30 from . import done
31 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
32 from .fsaccess import CollectionFetcher
33 from .pathmapper import NoFollowPathMapper, trim_listing
34 from .perf import Perf
35 from ._version import __version__
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 class ArvadosContainer(JobBase):
41     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
42
43     def __init__(self, runner, job_runtime,
44                  builder,   # type: Builder
45                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
46                  make_path_mapper,  # type: Callable[..., PathMapper]
47                  requirements,      # type: List[Dict[Text, Text]]
48                  hints,     # type: List[Dict[Text, Text]]
49                  name       # type: Text
50     ):
51         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
52         self.arvrunner = runner
53         self.job_runtime = job_runtime
54         self.running = False
55         self.uuid = None
56
57     def update_pipeline_component(self, r):
58         pass
59
60     def run(self, runtimeContext):
61         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
62         # which calls makeJobRunner() to get a new ArvadosContainer
63         # object.  The fields that define execution such as
64         # command_line, environment, etc are set on the
65         # ArvadosContainer object by CommandLineTool.job() before
66         # run() is called.
67
68         runtimeContext = self.job_runtime
69
70         container_request = {
71             "command": self.command_line,
72             "name": self.name,
73             "output_path": self.outdir,
74             "cwd": self.outdir,
75             "priority": runtimeContext.priority,
76             "state": "Committed",
77             "properties": {},
78         }
79         runtime_constraints = {}
80
81         if runtimeContext.project_uuid:
82             container_request["owner_uuid"] = runtimeContext.project_uuid
83
84         if self.arvrunner.secret_store.has_secret(self.command_line):
85             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
86
87         if self.arvrunner.secret_store.has_secret(self.environment):
88             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
89
90         resources = self.builder.resources
91         if resources is not None:
92             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
93             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
94
95         mounts = {
96             self.outdir: {
97                 "kind": "tmp",
98                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
99             },
100             self.tmpdir: {
101                 "kind": "tmp",
102                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
103             }
104         }
105         secret_mounts = {}
106         scheduling_parameters = {}
107
108         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
109         rf.sort(key=lambda k: k.resolved)
110         prevdir = None
111         for resolved, target, tp, stg in rf:
112             if not stg:
113                 continue
114             if prevdir and target.startswith(prevdir):
115                 continue
116             if tp == "Directory":
117                 targetdir = target
118             else:
119                 targetdir = os.path.dirname(target)
120             sp = resolved.split("/", 1)
121             pdh = sp[0][5:]   # remove "keep:"
122             mounts[targetdir] = {
123                 "kind": "collection",
124                 "portable_data_hash": pdh
125             }
126             if pdh in self.pathmapper.pdh_to_uuid:
127                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
128             if len(sp) == 2:
129                 if tp == "Directory":
130                     path = sp[1]
131                 else:
132                     path = os.path.dirname(sp[1])
133                 if path and path != "/":
134                     mounts[targetdir]["path"] = path
135             prevdir = targetdir + "/"
136
137         with Perf(metrics, "generatefiles %s" % self.name):
138             if self.generatefiles["listing"]:
139                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
140                                                     keep_client=self.arvrunner.keep_client,
141                                                     num_retries=self.arvrunner.num_retries)
142                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
143                                                     separateDirs=False)
144
145                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
146
147                 logger.debug("generatemapper is %s", sorteditems)
148
149                 with Perf(metrics, "createfiles %s" % self.name):
150                     for f, p in sorteditems:
151                         if not p.target:
152                             continue
153
154                         if p.target.startswith("/"):
155                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
156                         else:
157                             dst = p.target
158
159                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
160                             if p.resolved.startswith("_:"):
161                                 vwd.mkdirs(dst)
162                             else:
163                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
164                                 vwd.copy(path or ".", dst, source_collection=source)
165                         elif p.type == "CreateFile":
166                             if self.arvrunner.secret_store.has_secret(p.resolved):
167                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
168                                 secret_mounts[mountpoint] = {
169                                     "kind": "text",
170                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
171                                 }
172                             else:
173                                 with vwd.open(dst, "w") as n:
174                                     n.write(p.resolved)
175
176                 def keepemptydirs(p):
177                     if isinstance(p, arvados.collection.RichCollectionBase):
178                         if len(p) == 0:
179                             p.open(".keep", "w").close()
180                         else:
181                             for c in p:
182                                 keepemptydirs(p[c])
183
184                 keepemptydirs(vwd)
185
186                 if not runtimeContext.current_container:
187                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
188                 info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
189                 vwd.save_new(name=info["name"],
190                              owner_uuid=runtimeContext.project_uuid,
191                              ensure_unique_name=True,
192                              trash_at=info["trash_at"],
193                              properties=info["properties"])
194
195                 prev = None
196                 for f, p in sorteditems:
197                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
198                         (prev is not None and p.target.startswith(prev))):
199                         continue
200                     if p.target.startswith("/"):
201                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
202                     else:
203                         dst = p.target
204                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
205                     mounts[mountpoint] = {"kind": "collection",
206                                           "portable_data_hash": vwd.portable_data_hash(),
207                                           "path": dst}
208                     if p.type.startswith("Writable"):
209                         mounts[mountpoint]["writable"] = True
210                     prev = p.target + "/"
211
212         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
213         if self.environment:
214             container_request["environment"].update(self.environment)
215
216         if self.stdin:
217             sp = self.stdin[6:].split("/", 1)
218             mounts["stdin"] = {"kind": "collection",
219                                 "portable_data_hash": sp[0],
220                                 "path": sp[1]}
221
222         if self.stderr:
223             mounts["stderr"] = {"kind": "file",
224                                 "path": "%s/%s" % (self.outdir, self.stderr)}
225
226         if self.stdout:
227             mounts["stdout"] = {"kind": "file",
228                                 "path": "%s/%s" % (self.outdir, self.stdout)}
229
230         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
231         if not docker_req:
232             docker_req = {"dockerImageId": "arvados/jobs:"+__version__}
233
234         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
235                                                                     docker_req,
236                                                                     runtimeContext.pull_image,
237                                                                     runtimeContext.project_uuid)
238
239         network_req, _ = self.get_requirement("NetworkAccess")
240         if network_req:
241             runtime_constraints["API"] = network_req["networkAccess"]
242
243         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
244         if api_req:
245             runtime_constraints["API"] = True
246
247         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
248         if runtime_req:
249             if "keep_cache" in runtime_req:
250                 runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
251             if "outputDirType" in runtime_req:
252                 if runtime_req["outputDirType"] == "local_output_dir":
253                     # Currently the default behavior.
254                     pass
255                 elif runtime_req["outputDirType"] == "keep_output_dir":
256                     mounts[self.outdir]= {
257                         "kind": "collection",
258                         "writable": True
259                     }
260
261         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
262         if partition_req:
263             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
264
265         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
266         if intermediate_output_req:
267             self.output_ttl = intermediate_output_req["outputTTL"]
268         else:
269             self.output_ttl = self.arvrunner.intermediate_output_ttl
270
271         if self.output_ttl < 0:
272             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
273
274         if self.timelimit is not None and self.timelimit > 0:
275             scheduling_parameters["max_run_time"] = self.timelimit
276
277         extra_submit_params = {}
278         if runtimeContext.submit_runner_cluster:
279             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
280
281         container_request["output_name"] = "Output for step %s" % (self.name)
282         container_request["output_ttl"] = self.output_ttl
283         container_request["mounts"] = mounts
284         container_request["secret_mounts"] = secret_mounts
285         container_request["runtime_constraints"] = runtime_constraints
286         container_request["scheduling_parameters"] = scheduling_parameters
287
288         enable_reuse = runtimeContext.enable_reuse
289         if enable_reuse:
290             reuse_req, _ = self.get_requirement("WorkReuse")
291             if reuse_req:
292                 enable_reuse = reuse_req["enableReuse"]
293             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
294             if reuse_req:
295                 enable_reuse = reuse_req["enableReuse"]
296         container_request["use_existing"] = enable_reuse
297
298         if runtimeContext.runnerjob.startswith("arvwf:"):
299             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
300             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
301             if container_request["name"] == "main":
302                 container_request["name"] = wfrecord["name"]
303             container_request["properties"]["template_uuid"] = wfuuid
304
305         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
306
307         try:
308             if runtimeContext.submit_request_uuid:
309                 response = self.arvrunner.api.container_requests().update(
310                     uuid=runtimeContext.submit_request_uuid,
311                     body=container_request,
312                     **extra_submit_params
313                 ).execute(num_retries=self.arvrunner.num_retries)
314             else:
315                 response = self.arvrunner.api.container_requests().create(
316                     body=container_request,
317                     **extra_submit_params
318                 ).execute(num_retries=self.arvrunner.num_retries)
319
320             self.uuid = response["uuid"]
321             self.arvrunner.process_submitted(self)
322
323             if response["state"] == "Final":
324                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
325             else:
326                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
327         except Exception as e:
328             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
329             logger.debug("Container request was %s", container_request)
330             self.output_callback({}, "permanentFail")
331
332     def done(self, record):
333         outputs = {}
334         try:
335             container = self.arvrunner.api.containers().get(
336                 uuid=record["container_uuid"]
337             ).execute(num_retries=self.arvrunner.num_retries)
338             if container["state"] == "Complete":
339                 rcode = container["exit_code"]
340                 if self.successCodes and rcode in self.successCodes:
341                     processStatus = "success"
342                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
343                     processStatus = "temporaryFail"
344                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
345                     processStatus = "permanentFail"
346                 elif rcode == 0:
347                     processStatus = "success"
348                 else:
349                     processStatus = "permanentFail"
350             else:
351                 processStatus = "permanentFail"
352
353             if processStatus == "permanentFail" and record["log_uuid"]:
354                 logc = arvados.collection.CollectionReader(record["log_uuid"],
355                                                            api_client=self.arvrunner.api,
356                                                            keep_client=self.arvrunner.keep_client,
357                                                            num_retries=self.arvrunner.num_retries)
358                 label = self.arvrunner.label(self)
359                 done.logtail(
360                     logc, logger.error,
361                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
362
363             if record["output_uuid"]:
364                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
365                     # Compute the trash time to avoid requesting the collection record.
366                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
367                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
368                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
369                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
370                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
371                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
372                 self.arvrunner.add_intermediate_output(record["output_uuid"])
373
374             if container["output"]:
375                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
376         except WorkflowException as e:
377             # Only include a stack trace if in debug mode.
378             # A stack trace may obfuscate more useful output about the workflow.
379             logger.error("%s unable to collect output from %s:\n%s",
380                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
381             processStatus = "permanentFail"
382         except Exception:
383             logger.exception("%s while getting output object:", self.arvrunner.label(self))
384             processStatus = "permanentFail"
385         finally:
386             self.output_callback(outputs, processStatus)
387
388
389 class RunnerContainer(Runner):
390     """Submit and manage a container that runs arvados-cwl-runner."""
391
392     def arvados_job_spec(self, runtimeContext):
393         """Create an Arvados container request for this workflow.
394
395         The returned dict can be used to create a container passed as
396         the +body+ argument to container_requests().create().
397         """
398
399         adjustDirObjs(self.job_order, trim_listing)
400         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
401         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
402
403         secret_mounts = {}
404         for param in sorted(self.job_order.keys()):
405             if self.secret_store.has_secret(self.job_order[param]):
406                 mnt = "/secrets/s%d" % len(secret_mounts)
407                 secret_mounts[mnt] = {
408                     "kind": "text",
409                     "content": self.secret_store.retrieve(self.job_order[param])
410                 }
411                 self.job_order[param] = {"$include": mnt}
412
413         container_req = {
414             "name": self.name,
415             "output_path": "/var/spool/cwl",
416             "cwd": "/var/spool/cwl",
417             "priority": self.priority,
418             "state": "Committed",
419             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
420             "mounts": {
421                 "/var/lib/cwl/cwl.input.json": {
422                     "kind": "json",
423                     "content": self.job_order
424                 },
425                 "stdout": {
426                     "kind": "file",
427                     "path": "/var/spool/cwl/cwl.output.json"
428                 },
429                 "/var/spool/cwl": {
430                     "kind": "collection",
431                     "writable": True
432                 }
433             },
434             "secret_mounts": secret_mounts,
435             "runtime_constraints": {
436                 "vcpus": math.ceil(self.submit_runner_cores),
437                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
438                 "API": True
439             },
440             "use_existing": False, # Never reuse the runner container - see #15497.
441             "properties": {}
442         }
443
444         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
445             sp = self.embedded_tool.tool["id"].split('/')
446             workflowcollection = sp[0][5:]
447             workflowname = "/".join(sp[1:])
448             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
449             container_req["mounts"]["/var/lib/cwl/workflow"] = {
450                 "kind": "collection",
451                 "portable_data_hash": "%s" % workflowcollection
452             }
453         else:
454             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
455             workflowpath = "/var/lib/cwl/workflow.json#main"
456             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
457                 "kind": "json",
458                 "content": packed
459             }
460             if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
461                 container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
462
463
464         # --local means execute the workflow instead of submitting a container request
465         # --api=containers means use the containers API
466         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
467         # --disable-validate because we already validated so don't need to do it again
468         # --eval-timeout is the timeout for javascript invocation
469         # --parallel-task-count is the number of threads to use for job submission
470         # --enable/disable-reuse sets desired job reuse
471         # --collection-cache-size sets aside memory to store collections
472         command = ["arvados-cwl-runner",
473                    "--local",
474                    "--api=containers",
475                    "--no-log-timestamps",
476                    "--disable-validate",
477                    "--disable-color",
478                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
479                    "--thread-count=%s" % self.arvrunner.thread_count,
480                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
481                    "--collection-cache-size=%s" % self.collection_cache_size]
482
483         if self.output_name:
484             command.append("--output-name=" + self.output_name)
485             container_req["output_name"] = self.output_name
486
487         if self.output_tags:
488             command.append("--output-tags=" + self.output_tags)
489
490         if runtimeContext.debug:
491             command.append("--debug")
492
493         if runtimeContext.storage_classes != "default":
494             command.append("--storage-classes=" + runtimeContext.storage_classes)
495
496         if self.on_error:
497             command.append("--on-error=" + self.on_error)
498
499         if self.intermediate_output_ttl:
500             command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
501
502         if self.arvrunner.trash_intermediate:
503             command.append("--trash-intermediate")
504
505         if self.arvrunner.project_uuid:
506             command.append("--project-uuid="+self.arvrunner.project_uuid)
507
508         if self.enable_dev:
509             command.append("--enable-dev")
510
511         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
512
513         container_req["command"] = command
514
515         return container_req
516
517
518     def run(self, runtimeContext):
519         runtimeContext.keepprefix = "keep:"
520         job_spec = self.arvados_job_spec(runtimeContext)
521         if self.arvrunner.project_uuid:
522             job_spec["owner_uuid"] = self.arvrunner.project_uuid
523
524         extra_submit_params = {}
525         if runtimeContext.submit_runner_cluster:
526             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
527
528         if runtimeContext.submit_request_uuid:
529             if "cluster_id" in extra_submit_params:
530                 # Doesn't make sense for "update" and actually fails
531                 del extra_submit_params["cluster_id"]
532             response = self.arvrunner.api.container_requests().update(
533                 uuid=runtimeContext.submit_request_uuid,
534                 body=job_spec,
535                 **extra_submit_params
536             ).execute(num_retries=self.arvrunner.num_retries)
537         else:
538             response = self.arvrunner.api.container_requests().create(
539                 body=job_spec,
540                 **extra_submit_params
541             ).execute(num_retries=self.arvrunner.num_retries)
542
543         self.uuid = response["uuid"]
544         self.arvrunner.process_submitted(self)
545
546         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
547
548     def done(self, record):
549         try:
550             container = self.arvrunner.api.containers().get(
551                 uuid=record["container_uuid"]
552             ).execute(num_retries=self.arvrunner.num_retries)
553             container["log"] = record["log_uuid"]
554         except Exception:
555             logger.exception("%s while getting runner container", self.arvrunner.label(self))
556             self.arvrunner.output_callback({}, "permanentFail")
557         else:
558             super(RunnerContainer, self).done(container)