Merge branch '21361-excise-old-distros'
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18 import re
19
20 import arvados_cwl.util
21 import ruamel.yaml
22
23 from cwltool.errors import WorkflowException
24 from cwltool.process import UnsupportedRequirement, shortname
25 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
26 from cwltool.job import JobBase
27
28 import arvados.collection
29
30 from .arvdocker import arv_docker_get_image
31 from . import done
32 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
33 from .fsaccess import CollectionFetcher
34 from .pathmapper import NoFollowPathMapper, trim_listing
35 from .perf import Perf
36 from ._version import __version__
37
38 logger = logging.getLogger('arvados.cwl-runner')
39 metrics = logging.getLogger('arvados.cwl-runner.metrics')
40
41 def cleanup_name_for_collection(name):
42     return name.replace("/", " ")
43
44 class ArvadosContainer(JobBase):
45     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
46
47     def __init__(self, runner, job_runtime,
48                  builder,   # type: Builder
49                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
50                  make_path_mapper,  # type: Callable[..., PathMapper]
51                  requirements,      # type: List[Dict[Text, Text]]
52                  hints,     # type: List[Dict[Text, Text]]
53                  name       # type: Text
54     ):
55         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
56         self.arvrunner = runner
57         self.job_runtime = job_runtime
58         self.running = False
59         self.uuid = None
60         self.attempt_count = 0
61
62     def update_pipeline_component(self, r):
63         pass
64
65     def _required_env(self):
66         env = {}
67         env["HOME"] = self.outdir
68         env["TMPDIR"] = self.tmpdir
69         return env
70
71     def run(self, toplevelRuntimeContext):
72         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
73         # which calls makeJobRunner() to get a new ArvadosContainer
74         # object.  The fields that define execution such as
75         # command_line, environment, etc are set on the
76         # ArvadosContainer object by CommandLineTool.job() before
77         # run() is called.
78
79         runtimeContext = self.job_runtime
80
81         if runtimeContext.submit_request_uuid:
82             container_request = self.arvrunner.api.container_requests().get(
83                 uuid=runtimeContext.submit_request_uuid
84             ).execute(num_retries=self.arvrunner.num_retries)
85         else:
86             container_request = {}
87
88         container_request["command"] = self.command_line
89         container_request["name"] = self.name
90         container_request["output_path"] = self.outdir
91         container_request["cwd"] = self.outdir
92         container_request["priority"] = runtimeContext.priority
93         container_request["state"] = "Uncommitted"
94         container_request.setdefault("properties", {})
95
96         container_request["properties"]["cwl_input"] = self.joborder
97
98         runtime_constraints = {}
99
100         if runtimeContext.project_uuid:
101             container_request["owner_uuid"] = runtimeContext.project_uuid
102
103         if self.arvrunner.secret_store.has_secret(self.command_line):
104             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
105
106         if self.arvrunner.secret_store.has_secret(self.environment):
107             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
108
109         resources = self.builder.resources
110         if resources is not None:
111             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
112             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
113
114         mounts = {
115             self.outdir: {
116                 "kind": "tmp",
117                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
118             },
119             self.tmpdir: {
120                 "kind": "tmp",
121                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
122             }
123         }
124         secret_mounts = {}
125         scheduling_parameters = {}
126
127         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
128         rf.sort(key=lambda k: k.resolved)
129         prevdir = None
130         for resolved, target, tp, stg in rf:
131             if not stg:
132                 continue
133             if prevdir and target.startswith(prevdir):
134                 continue
135             if tp == "Directory":
136                 targetdir = target
137             else:
138                 targetdir = os.path.dirname(target)
139             sp = resolved.split("/", 1)
140             pdh = sp[0][5:]   # remove "keep:"
141             mounts[targetdir] = {
142                 "kind": "collection",
143                 "portable_data_hash": pdh
144             }
145             if pdh in self.pathmapper.pdh_to_uuid:
146                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
147             if len(sp) == 2:
148                 if tp == "Directory":
149                     path = sp[1]
150                 else:
151                     path = os.path.dirname(sp[1])
152                 if path and path != "/":
153                     mounts[targetdir]["path"] = path
154             prevdir = targetdir + "/"
155
156         intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
157
158         with Perf(metrics, "generatefiles %s" % self.name):
159             if self.generatefiles["listing"]:
160                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
161                                                     keep_client=self.arvrunner.keep_client,
162                                                     num_retries=self.arvrunner.num_retries)
163                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
164                                                     separateDirs=False)
165
166                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
167
168                 logger.debug("generatemapper is %s", sorteditems)
169
170                 with Perf(metrics, "createfiles %s" % self.name):
171                     for f, p in sorteditems:
172                         if not p.target:
173                             continue
174
175                         if p.target.startswith("/"):
176                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
177                         else:
178                             dst = p.target
179
180                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
181                             if p.resolved.startswith("_:"):
182                                 vwd.mkdirs(dst)
183                             else:
184                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
185                                 vwd.copy(path or ".", dst, source_collection=source)
186                         elif p.type == "CreateFile":
187                             if self.arvrunner.secret_store.has_secret(p.resolved):
188                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
189                                 secret_mounts[mountpoint] = {
190                                     "kind": "text",
191                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
192                                 }
193                             else:
194                                 with vwd.open(dst, "w") as n:
195                                     n.write(p.resolved)
196
197                 def keepemptydirs(p):
198                     if isinstance(p, arvados.collection.RichCollectionBase):
199                         if len(p) == 0:
200                             p.open(".keep", "w").close()
201                         else:
202                             for c in p:
203                                 keepemptydirs(p[c])
204
205                 keepemptydirs(vwd)
206
207                 if not runtimeContext.current_container:
208                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
209                 vwd.save_new(name=intermediate_collection_info["name"],
210                              owner_uuid=runtimeContext.project_uuid,
211                              ensure_unique_name=True,
212                              trash_at=intermediate_collection_info["trash_at"],
213                              properties=intermediate_collection_info["properties"])
214
215                 prev = None
216                 for f, p in sorteditems:
217                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
218                         (prev is not None and p.target.startswith(prev))):
219                         continue
220                     if p.target.startswith("/"):
221                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
222                     else:
223                         dst = p.target
224                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
225                     mounts[mountpoint] = {"kind": "collection",
226                                           "portable_data_hash": vwd.portable_data_hash(),
227                                           "path": dst}
228                     if p.type.startswith("Writable"):
229                         mounts[mountpoint]["writable"] = True
230                     prev = p.target + "/"
231
232         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
233         if self.environment:
234             container_request["environment"].update(self.environment)
235
236         if self.stdin:
237             sp = self.stdin[6:].split("/", 1)
238             mounts["stdin"] = {"kind": "collection",
239                                 "portable_data_hash": sp[0],
240                                 "path": sp[1]}
241
242         if self.stderr:
243             mounts["stderr"] = {"kind": "file",
244                                 "path": "%s/%s" % (self.outdir, self.stderr)}
245
246         if self.stdout:
247             mounts["stdout"] = {"kind": "file",
248                                 "path": "%s/%s" % (self.outdir, self.stdout)}
249
250         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
251
252         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
253                                                                     docker_req,
254                                                                     runtimeContext.pull_image,
255                                                                     runtimeContext)
256
257         network_req, _ = self.get_requirement("NetworkAccess")
258         if network_req:
259             runtime_constraints["API"] = network_req["networkAccess"]
260
261         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
262         if api_req:
263             runtime_constraints["API"] = True
264
265         use_disk_cache = (self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheRAM", 0) == 0)
266
267         keep_cache_type_req, _ = self.get_requirement("http://arvados.org/cwl#KeepCacheTypeRequirement")
268         if keep_cache_type_req:
269             if "keepCacheType" in keep_cache_type_req:
270                 if keep_cache_type_req["keepCacheType"] == "ram_cache":
271                     use_disk_cache = False
272
273         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
274         if runtime_req:
275             if "keep_cache" in runtime_req:
276                 if use_disk_cache:
277                     # If DefaultKeepCacheRAM is zero it means we should use disk cache.
278                     runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20)
279                 else:
280                     runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
281             if "outputDirType" in runtime_req:
282                 if runtime_req["outputDirType"] == "local_output_dir":
283                     # Currently the default behavior.
284                     pass
285                 elif runtime_req["outputDirType"] == "keep_output_dir":
286                     mounts[self.outdir]= {
287                         "kind": "collection",
288                         "writable": True
289                     }
290
291         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
292         if partition_req:
293             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
294
295         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
296         if intermediate_output_req:
297             self.output_ttl = intermediate_output_req["outputTTL"]
298         else:
299             self.output_ttl = self.arvrunner.intermediate_output_ttl
300
301         if self.output_ttl < 0:
302             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
303
304
305         if self.arvrunner.api._rootDesc["revision"] >= "20210628":
306             storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
307             if storage_class_req and storage_class_req.get("intermediateStorageClass"):
308                 container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
309             else:
310                 container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
311
312         cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
313         if cuda_req:
314             runtime_constraints["cuda"] = {
315                 "device_count": resources.get("cudaDeviceCount", 1),
316                 "driver_version": cuda_req["cudaVersionMin"],
317                 "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
318             }
319
320         if runtimeContext.enable_preemptible is False:
321             scheduling_parameters["preemptible"] = False
322         else:
323             preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
324             if preemptible_req:
325                 scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
326             elif runtimeContext.enable_preemptible is True:
327                 scheduling_parameters["preemptible"] = True
328             elif runtimeContext.enable_preemptible is None:
329                 pass
330
331         if self.timelimit is not None and self.timelimit > 0:
332             scheduling_parameters["max_run_time"] = self.timelimit
333
334         extra_submit_params = {}
335         if runtimeContext.submit_runner_cluster:
336             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
337
338         container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name))
339         container_request["output_ttl"] = self.output_ttl
340         container_request["mounts"] = mounts
341         container_request["secret_mounts"] = secret_mounts
342         container_request["runtime_constraints"] = runtime_constraints
343         container_request["scheduling_parameters"] = scheduling_parameters
344
345         enable_reuse = runtimeContext.enable_reuse
346         if enable_reuse:
347             reuse_req, _ = self.get_requirement("WorkReuse")
348             if reuse_req:
349                 enable_reuse = reuse_req["enableReuse"]
350             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
351             if reuse_req:
352                 enable_reuse = reuse_req["enableReuse"]
353         container_request["use_existing"] = enable_reuse
354
355         properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
356         if properties_req:
357             for pr in properties_req["processProperties"]:
358                 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
359
360         output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
361         if output_properties_req:
362             if self.arvrunner.api._rootDesc["revision"] >= "20220510":
363                 container_request["output_properties"] = {}
364                 for pr in output_properties_req["outputProperties"]:
365                     container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
366             else:
367                 logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
368                                self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
369
370         ram_multiplier = [1]
371
372         oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry")
373         if oom_retry_req:
374             if oom_retry_req.get('memoryRetryMultiplier'):
375                 ram_multiplier.append(oom_retry_req.get('memoryRetryMultiplier'))
376             elif oom_retry_req.get('memoryRetryMultipler'):
377                 ram_multiplier.append(oom_retry_req.get('memoryRetryMultipler'))
378             else:
379                 ram_multiplier.append(2)
380
381         if runtimeContext.runnerjob.startswith("arvwf:"):
382             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
383             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
384             if container_request["name"] == "main":
385                 container_request["name"] = wfrecord["name"]
386             container_request["properties"]["template_uuid"] = wfuuid
387
388         if self.attempt_count == 0:
389             self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
390
391         try:
392             ram = runtime_constraints["ram"]
393
394             self.uuid = runtimeContext.submit_request_uuid
395
396             for i in ram_multiplier:
397                 runtime_constraints["ram"] = ram * i
398
399                 if self.uuid:
400                     response = self.arvrunner.api.container_requests().update(
401                         uuid=self.uuid,
402                         body=container_request,
403                         **extra_submit_params
404                     ).execute(num_retries=self.arvrunner.num_retries)
405                 else:
406                     response = self.arvrunner.api.container_requests().create(
407                         body=container_request,
408                         **extra_submit_params
409                     ).execute(num_retries=self.arvrunner.num_retries)
410                     self.uuid = response["uuid"]
411
412                 if response["container_uuid"] is not None:
413                     break
414
415             if response["container_uuid"] is None:
416                 runtime_constraints["ram"] = ram * ram_multiplier[self.attempt_count]
417
418             container_request["state"] = "Committed"
419             response = self.arvrunner.api.container_requests().update(
420                 uuid=self.uuid,
421                 body=container_request,
422                 **extra_submit_params
423             ).execute(num_retries=self.arvrunner.num_retries)
424
425             self.arvrunner.process_submitted(self)
426             self.attempt_count += 1
427
428             if response["state"] == "Final":
429                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
430             else:
431                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
432         except Exception as e:
433             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
434             logger.debug("Container request was %s", container_request)
435             self.output_callback({}, "permanentFail")
436
437     def out_of_memory_retry(self, record, container):
438         oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry")
439         if oom_retry_req is None:
440             return False
441
442         # Sometimes it gets killed with no warning
443         if container["exit_code"] == 137:
444             return True
445
446         logc = arvados.collection.CollectionReader(record["log_uuid"],
447                                                    api_client=self.arvrunner.api,
448                                                    keep_client=self.arvrunner.keep_client,
449                                                    num_retries=self.arvrunner.num_retries)
450
451         loglines = [""]
452         def callback(v1, v2, v3):
453             loglines[0] = v3
454
455         done.logtail(logc, callback, "", maxlen=1000)
456
457         # Check allocation failure
458         oom_matches = oom_retry_req.get('memoryErrorRegex') or r'(bad_alloc|out ?of ?memory|memory ?error|container using over 9.% of memory)'
459         if re.search(oom_matches, loglines[0], re.IGNORECASE | re.MULTILINE):
460             return True
461
462         return False
463
464     def done(self, record):
465         outputs = {}
466         retried = False
467         rcode = None
468         try:
469             container = self.arvrunner.api.containers().get(
470                 uuid=record["container_uuid"]
471             ).execute(num_retries=self.arvrunner.num_retries)
472             if container["state"] == "Complete":
473                 rcode = container["exit_code"]
474                 if self.successCodes and rcode in self.successCodes:
475                     processStatus = "success"
476                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
477                     processStatus = "temporaryFail"
478                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
479                     processStatus = "permanentFail"
480                 elif rcode == 0:
481                     processStatus = "success"
482                 else:
483                     processStatus = "permanentFail"
484
485                 if processStatus == "permanentFail" and self.attempt_count == 1 and self.out_of_memory_retry(record, container):
486                     logger.warning("%s Container failed with out of memory error, retrying with more RAM.",
487                                  self.arvrunner.label(self))
488                     self.job_runtime.submit_request_uuid = None
489                     self.uuid = None
490                     self.run(None)
491                     retried = True
492                     return
493
494                 if rcode == 137:
495                     logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin' or use the arv:OutOfMemoryRetry feature.",
496                                  self.arvrunner.label(self))
497             else:
498                 processStatus = "permanentFail"
499
500             if processStatus == "permanentFail" and record["log_uuid"]:
501                 logc = arvados.collection.CollectionReader(record["log_uuid"],
502                                                            api_client=self.arvrunner.api,
503                                                            keep_client=self.arvrunner.keep_client,
504                                                            num_retries=self.arvrunner.num_retries)
505                 label = self.arvrunner.label(self)
506                 done.logtail(
507                     logc, logger.error,
508                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40, include_crunchrun=(rcode is None or rcode > 127))
509
510             if record["output_uuid"]:
511                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
512                     # Compute the trash time to avoid requesting the collection record.
513                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
514                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
515                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
516                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
517                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
518                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
519                 self.arvrunner.add_intermediate_output(record["output_uuid"])
520
521             if container["output"]:
522                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
523
524             properties = record["properties"].copy()
525             properties["cwl_output"] = outputs
526             self.arvrunner.api.container_requests().update(
527                 uuid=self.uuid,
528                 body={"container_request": {"properties": properties}}
529             ).execute(num_retries=self.arvrunner.num_retries)
530         except WorkflowException as e:
531             # Only include a stack trace if in debug mode.
532             # A stack trace may obfuscate more useful output about the workflow.
533             logger.error("%s unable to collect output from %s:\n%s",
534                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
535             processStatus = "permanentFail"
536         except Exception:
537             logger.exception("%s while getting output object:", self.arvrunner.label(self))
538             processStatus = "permanentFail"
539         finally:
540             if not retried:
541                 self.output_callback(outputs, processStatus)
542
543
544 class RunnerContainer(Runner):
545     """Submit and manage a container that runs arvados-cwl-runner."""
546
547     def arvados_job_spec(self, runtimeContext, git_info):
548         """Create an Arvados container request for this workflow.
549
550         The returned dict can be used to create a container passed as
551         the +body+ argument to container_requests().create().
552         """
553
554         adjustDirObjs(self.job_order, trim_listing)
555         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
556         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
557
558         secret_mounts = {}
559         for param in sorted(self.job_order.keys()):
560             if self.secret_store.has_secret(self.job_order[param]):
561                 mnt = "/secrets/s%d" % len(secret_mounts)
562                 secret_mounts[mnt] = {
563                     "kind": "text",
564                     "content": self.secret_store.retrieve(self.job_order[param])
565                 }
566                 self.job_order[param] = {"$include": mnt}
567
568         container_image = arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext)
569
570         workflow_runner_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
571         if workflow_runner_req and workflow_runner_req.get("acrContainerImage"):
572             container_image = workflow_runner_req.get("acrContainerImage")
573
574         container_req = {
575             "name": self.name,
576             "output_path": "/var/spool/cwl",
577             "cwd": "/var/spool/cwl",
578             "priority": self.priority,
579             "state": "Committed",
580             "container_image": container_image,
581             "mounts": {
582                 "/var/lib/cwl/cwl.input.json": {
583                     "kind": "json",
584                     "content": self.job_order
585                 },
586                 "stdout": {
587                     "kind": "file",
588                     "path": "/var/spool/cwl/cwl.output.json"
589                 },
590                 "/var/spool/cwl": {
591                     "kind": "collection",
592                     "writable": True
593                 }
594             },
595             "secret_mounts": secret_mounts,
596             "runtime_constraints": {
597                 "vcpus": math.ceil(self.submit_runner_cores),
598                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
599                 "API": True
600             },
601             "use_existing": self.reuse_runner,
602             "properties": {}
603         }
604
605         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
606             sp = self.embedded_tool.tool["id"].split('/')
607             workflowcollection = sp[0][5:]
608             workflowname = "/".join(sp[1:])
609             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
610             container_req["mounts"]["/var/lib/cwl/workflow"] = {
611                 "kind": "collection",
612                 "portable_data_hash": "%s" % workflowcollection
613             }
614         elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
615             uuid, frg = urllib.parse.urldefrag(self.embedded_tool.tool["id"])
616             workflowpath = "/var/lib/cwl/workflow.json#" + frg
617             packedtxt = self.loadingContext.loader.fetch_text(uuid)
618             yaml = ruamel.yaml.YAML(typ='safe', pure=True)
619             packed = yaml.load(packedtxt)
620             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
621                 "kind": "json",
622                 "content": packed
623             }
624             container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
625         elif self.embedded_tool.tool.get("id", "").startswith("file:"):
626             raise WorkflowException("Tool id '%s' is a local file but expected keep: or arvwf:" % self.embedded_tool.tool.get("id"))
627         else:
628             main = self.loadingContext.loader.idx["_:main"]
629             if main.get("id") == "_:main":
630                 del main["id"]
631             workflowpath = "/var/lib/cwl/workflow.json#main"
632             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
633                 "kind": "json",
634                 "content": main
635             }
636
637         container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
638
639         properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
640         if properties_req:
641             builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
642             for pr in properties_req["processProperties"]:
643                 container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
644
645         # --local means execute the workflow instead of submitting a container request
646         # --api=containers means use the containers API
647         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
648         # --disable-validate because we already validated so don't need to do it again
649         # --eval-timeout is the timeout for javascript invocation
650         # --parallel-task-count is the number of threads to use for job submission
651         # --enable/disable-reuse sets desired job reuse
652         # --collection-cache-size sets aside memory to store collections
653         command = ["arvados-cwl-runner",
654                    "--local",
655                    "--api=containers",
656                    "--no-log-timestamps",
657                    "--disable-validate",
658                    "--disable-color",
659                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
660                    "--thread-count=%s" % self.arvrunner.thread_count,
661                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
662                    "--collection-cache-size=%s" % self.collection_cache_size]
663
664         if self.output_name:
665             command.append("--output-name=" + self.output_name)
666             container_req["output_name"] = self.output_name
667
668         if self.output_tags:
669             command.append("--output-tags=" + self.output_tags)
670
671         if runtimeContext.debug:
672             command.append("--debug")
673
674         if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
675             command.append("--storage-classes=" + runtimeContext.storage_classes)
676
677         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
678             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
679
680         if runtimeContext.on_error:
681             command.append("--on-error=" + self.on_error)
682
683         if runtimeContext.intermediate_output_ttl:
684             command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
685
686         if runtimeContext.trash_intermediate:
687             command.append("--trash-intermediate")
688
689         if runtimeContext.project_uuid:
690             command.append("--project-uuid="+runtimeContext.project_uuid)
691
692         if self.enable_dev:
693             command.append("--enable-dev")
694
695         if runtimeContext.enable_preemptible is True:
696             command.append("--enable-preemptible")
697
698         if runtimeContext.enable_preemptible is False:
699             command.append("--disable-preemptible")
700
701         if runtimeContext.varying_url_params:
702             command.append("--varying-url-params="+runtimeContext.varying_url_params)
703
704         if runtimeContext.prefer_cached_downloads:
705             command.append("--prefer-cached-downloads")
706
707         if self.fast_parser:
708             command.append("--fast-parser")
709
710         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
711
712         container_req["command"] = command
713
714         return container_req
715
716
717     def run(self, runtimeContext):
718         runtimeContext.keepprefix = "keep:"
719         job_spec = self.arvados_job_spec(runtimeContext, self.git_info)
720         if runtimeContext.project_uuid:
721             job_spec["owner_uuid"] = runtimeContext.project_uuid
722
723         extra_submit_params = {}
724         if runtimeContext.submit_runner_cluster:
725             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
726
727         if runtimeContext.submit_request_uuid:
728             if "cluster_id" in extra_submit_params:
729                 # Doesn't make sense for "update" and actually fails
730                 del extra_submit_params["cluster_id"]
731             response = self.arvrunner.api.container_requests().update(
732                 uuid=runtimeContext.submit_request_uuid,
733                 body=job_spec,
734                 **extra_submit_params
735             ).execute(num_retries=self.arvrunner.num_retries)
736         else:
737             response = self.arvrunner.api.container_requests().create(
738                 body=job_spec,
739                 **extra_submit_params
740             ).execute(num_retries=self.arvrunner.num_retries)
741
742         self.uuid = response["uuid"]
743         self.arvrunner.process_submitted(self)
744
745         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
746
747         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
748         if workbench2:
749             url = "{}processes/{}".format(workbench2, response["uuid"])
750             logger.info("Monitor workflow progress at %s", url)
751
752
753     def done(self, record):
754         try:
755             container = self.arvrunner.api.containers().get(
756                 uuid=record["container_uuid"]
757             ).execute(num_retries=self.arvrunner.num_retries)
758             container["log"] = record["log_uuid"]
759         except Exception:
760             logger.exception("%s while getting runner container", self.arvrunner.label(self))
761             self.arvrunner.output_callback({}, "permanentFail")
762         else:
763             super(RunnerContainer, self).done(container)