19744: Remove jobs/pipeline templates from crunchstat-summary
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18 import re
19
20 import arvados_cwl.util
21 import ruamel.yaml
22
23 from cwltool.errors import WorkflowException
24 from cwltool.process import UnsupportedRequirement, shortname
25 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
26 from cwltool.job import JobBase
27
28 import arvados.collection
29
30 import crunchstat_summary.summarizer
31 import crunchstat_summary.reader
32
33 from .arvdocker import arv_docker_get_image
34 from . import done
35 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
36 from .fsaccess import CollectionFetcher
37 from .pathmapper import NoFollowPathMapper, trim_listing
38 from .perf import Perf
39 from ._version import __version__
40
41 logger = logging.getLogger('arvados.cwl-runner')
42 metrics = logging.getLogger('arvados.cwl-runner.metrics')
43
44 def cleanup_name_for_collection(name):
45     return name.replace("/", " ")
46
47 class ArvadosContainer(JobBase):
48     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
49
50     def __init__(self, runner, job_runtime,
51                  builder,   # type: Builder
52                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
53                  make_path_mapper,  # type: Callable[..., PathMapper]
54                  requirements,      # type: List[Dict[Text, Text]]
55                  hints,     # type: List[Dict[Text, Text]]
56                  name       # type: Text
57     ):
58         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
59         self.arvrunner = runner
60         self.job_runtime = job_runtime
61         self.running = False
62         self.uuid = None
63         self.attempt_count = 0
64
65     def update_pipeline_component(self, r):
66         pass
67
68     def _required_env(self):
69         env = {}
70         env["HOME"] = self.outdir
71         env["TMPDIR"] = self.tmpdir
72         return env
73
74     def run(self, toplevelRuntimeContext):
75         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
76         # which calls makeJobRunner() to get a new ArvadosContainer
77         # object.  The fields that define execution such as
78         # command_line, environment, etc are set on the
79         # ArvadosContainer object by CommandLineTool.job() before
80         # run() is called.
81
82         runtimeContext = self.job_runtime
83
84         if runtimeContext.submit_request_uuid:
85             container_request = self.arvrunner.api.container_requests().get(
86                 uuid=runtimeContext.submit_request_uuid
87             ).execute(num_retries=self.arvrunner.num_retries)
88         else:
89             container_request = {}
90
91         container_request["command"] = self.command_line
92         container_request["name"] = self.name
93         container_request["output_path"] = self.outdir
94         container_request["cwd"] = self.outdir
95         container_request["priority"] = runtimeContext.priority
96         container_request["state"] = "Uncommitted"
97         container_request.setdefault("properties", {})
98
99         container_request["properties"]["cwl_input"] = self.joborder
100
101         runtime_constraints = {}
102
103         if runtimeContext.project_uuid:
104             container_request["owner_uuid"] = runtimeContext.project_uuid
105
106         if self.arvrunner.secret_store.has_secret(self.command_line):
107             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
108
109         if self.arvrunner.secret_store.has_secret(self.environment):
110             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
111
112         resources = self.builder.resources
113         if resources is not None:
114             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
115             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
116
117         mounts = {
118             self.outdir: {
119                 "kind": "tmp",
120                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
121             },
122             self.tmpdir: {
123                 "kind": "tmp",
124                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
125             }
126         }
127         secret_mounts = {}
128         scheduling_parameters = {}
129
130         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
131         rf.sort(key=lambda k: k.resolved)
132         prevdir = None
133         for resolved, target, tp, stg in rf:
134             if not stg:
135                 continue
136             if prevdir and target.startswith(prevdir):
137                 continue
138             if tp == "Directory":
139                 targetdir = target
140             else:
141                 targetdir = os.path.dirname(target)
142             sp = resolved.split("/", 1)
143             pdh = sp[0][5:]   # remove "keep:"
144             mounts[targetdir] = {
145                 "kind": "collection",
146                 "portable_data_hash": pdh
147             }
148             if pdh in self.pathmapper.pdh_to_uuid:
149                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
150             if len(sp) == 2:
151                 if tp == "Directory":
152                     path = sp[1]
153                 else:
154                     path = os.path.dirname(sp[1])
155                 if path and path != "/":
156                     mounts[targetdir]["path"] = path
157             prevdir = targetdir + "/"
158
159         intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
160
161         with Perf(metrics, "generatefiles %s" % self.name):
162             if self.generatefiles["listing"]:
163                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
164                                                     keep_client=self.arvrunner.keep_client,
165                                                     num_retries=self.arvrunner.num_retries)
166                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
167                                                     separateDirs=False)
168
169                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
170
171                 logger.debug("generatemapper is %s", sorteditems)
172
173                 with Perf(metrics, "createfiles %s" % self.name):
174                     for f, p in sorteditems:
175                         if not p.target:
176                             continue
177
178                         if p.target.startswith("/"):
179                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
180                         else:
181                             dst = p.target
182
183                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
184                             if p.resolved.startswith("_:"):
185                                 vwd.mkdirs(dst)
186                             else:
187                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
188                                 vwd.copy(path or ".", dst, source_collection=source)
189                         elif p.type == "CreateFile":
190                             if self.arvrunner.secret_store.has_secret(p.resolved):
191                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
192                                 secret_mounts[mountpoint] = {
193                                     "kind": "text",
194                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
195                                 }
196                             else:
197                                 with vwd.open(dst, "w") as n:
198                                     n.write(p.resolved)
199
200                 def keepemptydirs(p):
201                     if isinstance(p, arvados.collection.RichCollectionBase):
202                         if len(p) == 0:
203                             p.open(".keep", "w").close()
204                         else:
205                             for c in p:
206                                 keepemptydirs(p[c])
207
208                 keepemptydirs(vwd)
209
210                 if not runtimeContext.current_container:
211                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
212                 vwd.save_new(name=intermediate_collection_info["name"],
213                              owner_uuid=runtimeContext.project_uuid,
214                              ensure_unique_name=True,
215                              trash_at=intermediate_collection_info["trash_at"],
216                              properties=intermediate_collection_info["properties"])
217
218                 prev = None
219                 for f, p in sorteditems:
220                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
221                         (prev is not None and p.target.startswith(prev))):
222                         continue
223                     if p.target.startswith("/"):
224                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
225                     else:
226                         dst = p.target
227                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
228                     mounts[mountpoint] = {"kind": "collection",
229                                           "portable_data_hash": vwd.portable_data_hash(),
230                                           "path": dst}
231                     if p.type.startswith("Writable"):
232                         mounts[mountpoint]["writable"] = True
233                     prev = p.target + "/"
234
235         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
236         if self.environment:
237             container_request["environment"].update(self.environment)
238
239         if self.stdin:
240             sp = self.stdin[6:].split("/", 1)
241             mounts["stdin"] = {"kind": "collection",
242                                 "portable_data_hash": sp[0],
243                                 "path": sp[1]}
244
245         if self.stderr:
246             mounts["stderr"] = {"kind": "file",
247                                 "path": "%s/%s" % (self.outdir, self.stderr)}
248
249         if self.stdout:
250             mounts["stdout"] = {"kind": "file",
251                                 "path": "%s/%s" % (self.outdir, self.stdout)}
252
253         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
254
255         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
256                                                                     docker_req,
257                                                                     runtimeContext.pull_image,
258                                                                     runtimeContext)
259
260         network_req, _ = self.get_requirement("NetworkAccess")
261         if network_req:
262             runtime_constraints["API"] = network_req["networkAccess"]
263
264         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
265         if api_req:
266             runtime_constraints["API"] = True
267
268         use_disk_cache = (self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheRAM", 0) == 0)
269
270         keep_cache_type_req, _ = self.get_requirement("http://arvados.org/cwl#KeepCacheTypeRequirement")
271         if keep_cache_type_req:
272             if "keepCacheType" in keep_cache_type_req:
273                 if keep_cache_type_req["keepCacheType"] == "ram_cache":
274                     use_disk_cache = False
275
276         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
277         if runtime_req:
278             if "keep_cache" in runtime_req:
279                 if use_disk_cache:
280                     # If DefaultKeepCacheRAM is zero it means we should use disk cache.
281                     runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20)
282                 else:
283                     runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
284             if "outputDirType" in runtime_req:
285                 if runtime_req["outputDirType"] == "local_output_dir":
286                     # Currently the default behavior.
287                     pass
288                 elif runtime_req["outputDirType"] == "keep_output_dir":
289                     mounts[self.outdir]= {
290                         "kind": "collection",
291                         "writable": True
292                     }
293
294         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
295         if partition_req:
296             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
297
298         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
299         if intermediate_output_req:
300             self.output_ttl = intermediate_output_req["outputTTL"]
301         else:
302             self.output_ttl = self.arvrunner.intermediate_output_ttl
303
304         if self.output_ttl < 0:
305             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
306
307
308         if self.arvrunner.api._rootDesc["revision"] >= "20210628":
309             storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
310             if storage_class_req and storage_class_req.get("intermediateStorageClass"):
311                 container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
312             else:
313                 container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
314
315         cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
316         if cuda_req:
317             runtime_constraints["cuda"] = {
318                 "device_count": resources.get("cudaDeviceCount", 1),
319                 "driver_version": cuda_req["cudaVersionMin"],
320                 "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
321             }
322
323         if runtimeContext.enable_preemptible is False:
324             scheduling_parameters["preemptible"] = False
325         else:
326             preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
327             if preemptible_req:
328                 scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
329             elif runtimeContext.enable_preemptible is True:
330                 scheduling_parameters["preemptible"] = True
331             elif runtimeContext.enable_preemptible is None:
332                 pass
333
334         if self.timelimit is not None and self.timelimit > 0:
335             scheduling_parameters["max_run_time"] = self.timelimit
336
337         extra_submit_params = {}
338         if runtimeContext.submit_runner_cluster:
339             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
340
341         container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name))
342         container_request["output_ttl"] = self.output_ttl
343         container_request["mounts"] = mounts
344         container_request["secret_mounts"] = secret_mounts
345         container_request["runtime_constraints"] = runtime_constraints
346         container_request["scheduling_parameters"] = scheduling_parameters
347
348         enable_reuse = runtimeContext.enable_reuse
349         if enable_reuse:
350             reuse_req, _ = self.get_requirement("WorkReuse")
351             if reuse_req:
352                 enable_reuse = reuse_req["enableReuse"]
353             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
354             if reuse_req:
355                 enable_reuse = reuse_req["enableReuse"]
356         container_request["use_existing"] = enable_reuse
357
358         properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
359         if properties_req:
360             for pr in properties_req["processProperties"]:
361                 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
362
363         output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
364         if output_properties_req:
365             if self.arvrunner.api._rootDesc["revision"] >= "20220510":
366                 container_request["output_properties"] = {}
367                 for pr in output_properties_req["outputProperties"]:
368                     container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
369             else:
370                 logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
371                                self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
372
373         ram_multiplier = [1]
374
375         oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry")
376         if oom_retry_req:
377             if oom_retry_req.get('memoryRetryMultiplier'):
378                 ram_multiplier.append(oom_retry_req.get('memoryRetryMultiplier'))
379             elif oom_retry_req.get('memoryRetryMultipler'):
380                 ram_multiplier.append(oom_retry_req.get('memoryRetryMultipler'))
381             else:
382                 ram_multiplier.append(2)
383
384         if runtimeContext.runnerjob.startswith("arvwf:"):
385             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
386             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
387             if container_request["name"] == "main":
388                 container_request["name"] = wfrecord["name"]
389             container_request["properties"]["template_uuid"] = wfuuid
390
391         if self.attempt_count == 0:
392             self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
393
394         try:
395             ram = runtime_constraints["ram"]
396
397             self.uuid = runtimeContext.submit_request_uuid
398
399             for i in ram_multiplier:
400                 runtime_constraints["ram"] = ram * i
401
402                 if self.uuid:
403                     response = self.arvrunner.api.container_requests().update(
404                         uuid=self.uuid,
405                         body=container_request,
406                         **extra_submit_params
407                     ).execute(num_retries=self.arvrunner.num_retries)
408                 else:
409                     response = self.arvrunner.api.container_requests().create(
410                         body=container_request,
411                         **extra_submit_params
412                     ).execute(num_retries=self.arvrunner.num_retries)
413                     self.uuid = response["uuid"]
414
415                 if response["container_uuid"] is not None:
416                     break
417
418             if response["container_uuid"] is None:
419                 runtime_constraints["ram"] = ram * ram_multiplier[self.attempt_count]
420
421             container_request["state"] = "Committed"
422             response = self.arvrunner.api.container_requests().update(
423                 uuid=self.uuid,
424                 body=container_request,
425                 **extra_submit_params
426             ).execute(num_retries=self.arvrunner.num_retries)
427
428             self.arvrunner.process_submitted(self)
429             self.attempt_count += 1
430
431             if response["state"] == "Final":
432                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
433             else:
434                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
435         except Exception as e:
436             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
437             logger.debug("Container request was %s", container_request)
438             self.output_callback({}, "permanentFail")
439
440     def out_of_memory_retry(self, record, container):
441         oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry")
442         if oom_retry_req is None:
443             return False
444
445         # Sometimes it gets killed with no warning
446         if container["exit_code"] == 137:
447             return True
448
449         logc = arvados.collection.CollectionReader(record["log_uuid"],
450                                                    api_client=self.arvrunner.api,
451                                                    keep_client=self.arvrunner.keep_client,
452                                                    num_retries=self.arvrunner.num_retries)
453
454         loglines = [""]
455         def callback(v1, v2, v3):
456             loglines[0] = v3
457
458         done.logtail(logc, callback, "", maxlen=1000)
459
460         # Check allocation failure
461         oom_matches = oom_retry_req.get('memoryErrorRegex') or r'(bad_alloc|out ?of ?memory|memory ?error|container using over 9.% of memory)'
462         if re.search(oom_matches, loglines[0], re.IGNORECASE | re.MULTILINE):
463             return True
464
465         return False
466
467     def done(self, record):
468         outputs = {}
469         retried = False
470         rcode = None
471         try:
472             container = self.arvrunner.api.containers().get(
473                 uuid=record["container_uuid"]
474             ).execute(num_retries=self.arvrunner.num_retries)
475             if container["state"] == "Complete":
476                 rcode = container["exit_code"]
477                 if self.successCodes and rcode in self.successCodes:
478                     processStatus = "success"
479                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
480                     processStatus = "temporaryFail"
481                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
482                     processStatus = "permanentFail"
483                 elif rcode == 0:
484                     processStatus = "success"
485                 else:
486                     processStatus = "permanentFail"
487
488                 if processStatus == "permanentFail" and self.attempt_count == 1 and self.out_of_memory_retry(record, container):
489                     logger.warning("%s Container failed with out of memory error, retrying with more RAM.",
490                                  self.arvrunner.label(self))
491                     self.job_runtime.submit_request_uuid = None
492                     self.uuid = None
493                     self.run(None)
494                     retried = True
495                     return
496
497                 if rcode == 137:
498                     logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin' or use the arv:OutOfMemoryRetry feature.",
499                                  self.arvrunner.label(self))
500             else:
501                 processStatus = "permanentFail"
502
503             logc = None
504             if record["log_uuid"]:
505                 logc = arvados.collection.Collection(record["log_uuid"],
506                                                      api_client=self.arvrunner.api,
507                                                      keep_client=self.arvrunner.keep_client,
508                                                      num_retries=self.arvrunner.num_retries)
509
510             if processStatus == "permanentFail" and logc is not None:
511                 label = self.arvrunner.label(self)
512                 done.logtail(
513                     logc, logger.error,
514                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40, include_crunchrun=(rcode is None or rcode > 127))
515
516             if record["output_uuid"]:
517                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
518                     # Compute the trash time to avoid requesting the collection record.
519                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
520                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
521                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
522                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
523                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
524                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
525                 self.arvrunner.add_intermediate_output(record["output_uuid"])
526
527             if container["output"]:
528                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
529
530             properties = record["properties"].copy()
531             properties["cwl_output"] = outputs
532             self.arvrunner.api.container_requests().update(
533                 uuid=self.uuid,
534                 body={"container_request": {"properties": properties}}
535             ).execute(num_retries=self.arvrunner.num_retries)
536
537             if logc is not None:
538                 try:
539                     summerizer = crunchstat_summary.summarizer.ContainerRequestSummarizer(
540                         record,
541                         collection_object=logc,
542                         label=self.name,
543                         arv=self.arvrunner.api)
544                     summerizer.run()
545                     with logc.open("usage_report.html", "wt") as mr:
546                         mr.write(summerizer.html_report())
547                     logc.save()
548                 except Exception as e:
549                     logger.warning("%s unable to generate resource usage report",
550                                  self.arvrunner.label(self),
551                                  exc_info=(e if self.arvrunner.debug else False))
552
553         except WorkflowException as e:
554             # Only include a stack trace if in debug mode.
555             # A stack trace may obfuscate more useful output about the workflow.
556             logger.error("%s unable to collect output from %s:\n%s",
557                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
558             processStatus = "permanentFail"
559         except Exception:
560             logger.exception("%s while getting output object:", self.arvrunner.label(self))
561             processStatus = "permanentFail"
562         finally:
563             if not retried:
564                 self.output_callback(outputs, processStatus)
565
566
567 class RunnerContainer(Runner):
568     """Submit and manage a container that runs arvados-cwl-runner."""
569
570     def arvados_job_spec(self, runtimeContext, git_info):
571         """Create an Arvados container request for this workflow.
572
573         The returned dict can be used to create a container passed as
574         the +body+ argument to container_requests().create().
575         """
576
577         adjustDirObjs(self.job_order, trim_listing)
578         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
579         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
580
581         secret_mounts = {}
582         for param in sorted(self.job_order.keys()):
583             if self.secret_store.has_secret(self.job_order[param]):
584                 mnt = "/secrets/s%d" % len(secret_mounts)
585                 secret_mounts[mnt] = {
586                     "kind": "text",
587                     "content": self.secret_store.retrieve(self.job_order[param])
588                 }
589                 self.job_order[param] = {"$include": mnt}
590
591         container_image = arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext)
592
593         workflow_runner_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
594         if workflow_runner_req and workflow_runner_req.get("acrContainerImage"):
595             container_image = workflow_runner_req.get("acrContainerImage")
596
597         container_req = {
598             "name": self.name,
599             "output_path": "/var/spool/cwl",
600             "cwd": "/var/spool/cwl",
601             "priority": self.priority,
602             "state": "Committed",
603             "container_image": container_image,
604             "mounts": {
605                 "/var/lib/cwl/cwl.input.json": {
606                     "kind": "json",
607                     "content": self.job_order
608                 },
609                 "stdout": {
610                     "kind": "file",
611                     "path": "/var/spool/cwl/cwl.output.json"
612                 },
613                 "/var/spool/cwl": {
614                     "kind": "collection",
615                     "writable": True
616                 }
617             },
618             "secret_mounts": secret_mounts,
619             "runtime_constraints": {
620                 "vcpus": math.ceil(self.submit_runner_cores),
621                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
622                 "API": True
623             },
624             "use_existing": self.reuse_runner,
625             "properties": {}
626         }
627
628         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
629             sp = self.embedded_tool.tool["id"].split('/')
630             workflowcollection = sp[0][5:]
631             workflowname = "/".join(sp[1:])
632             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
633             container_req["mounts"]["/var/lib/cwl/workflow"] = {
634                 "kind": "collection",
635                 "portable_data_hash": "%s" % workflowcollection
636             }
637         elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
638             uuid, frg = urllib.parse.urldefrag(self.embedded_tool.tool["id"])
639             workflowpath = "/var/lib/cwl/workflow.json#" + frg
640             packedtxt = self.loadingContext.loader.fetch_text(uuid)
641             yaml = ruamel.yaml.YAML(typ='safe', pure=True)
642             packed = yaml.load(packedtxt)
643             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
644                 "kind": "json",
645                 "content": packed
646             }
647             container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
648         elif self.embedded_tool.tool.get("id", "").startswith("file:"):
649             raise WorkflowException("Tool id '%s' is a local file but expected keep: or arvwf:" % self.embedded_tool.tool.get("id"))
650         else:
651             main = self.loadingContext.loader.idx["_:main"]
652             if main.get("id") == "_:main":
653                 del main["id"]
654             workflowpath = "/var/lib/cwl/workflow.json#main"
655             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
656                 "kind": "json",
657                 "content": main
658             }
659
660         container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
661
662         properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
663         if properties_req:
664             builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
665             for pr in properties_req["processProperties"]:
666                 container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
667
668         # --local means execute the workflow instead of submitting a container request
669         # --api=containers means use the containers API
670         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
671         # --disable-validate because we already validated so don't need to do it again
672         # --eval-timeout is the timeout for javascript invocation
673         # --parallel-task-count is the number of threads to use for job submission
674         # --enable/disable-reuse sets desired job reuse
675         # --collection-cache-size sets aside memory to store collections
676         command = ["arvados-cwl-runner",
677                    "--local",
678                    "--api=containers",
679                    "--no-log-timestamps",
680                    "--disable-validate",
681                    "--disable-color",
682                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
683                    "--thread-count=%s" % self.arvrunner.thread_count,
684                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
685                    "--collection-cache-size=%s" % self.collection_cache_size]
686
687         if self.output_name:
688             command.append("--output-name=" + self.output_name)
689             container_req["output_name"] = self.output_name
690
691         if self.output_tags:
692             command.append("--output-tags=" + self.output_tags)
693
694         if runtimeContext.debug:
695             command.append("--debug")
696
697         if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
698             command.append("--storage-classes=" + runtimeContext.storage_classes)
699
700         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
701             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
702
703         if runtimeContext.on_error:
704             command.append("--on-error=" + self.on_error)
705
706         if runtimeContext.intermediate_output_ttl:
707             command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
708
709         if runtimeContext.trash_intermediate:
710             command.append("--trash-intermediate")
711
712         if runtimeContext.project_uuid:
713             command.append("--project-uuid="+runtimeContext.project_uuid)
714
715         if self.enable_dev:
716             command.append("--enable-dev")
717
718         if runtimeContext.enable_preemptible is True:
719             command.append("--enable-preemptible")
720
721         if runtimeContext.enable_preemptible is False:
722             command.append("--disable-preemptible")
723
724         if runtimeContext.varying_url_params:
725             command.append("--varying-url-params="+runtimeContext.varying_url_params)
726
727         if runtimeContext.prefer_cached_downloads:
728             command.append("--prefer-cached-downloads")
729
730         if self.fast_parser:
731             command.append("--fast-parser")
732
733         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
734
735         container_req["command"] = command
736
737         return container_req
738
739
740     def run(self, runtimeContext):
741         runtimeContext.keepprefix = "keep:"
742         job_spec = self.arvados_job_spec(runtimeContext, self.git_info)
743         if runtimeContext.project_uuid:
744             job_spec["owner_uuid"] = runtimeContext.project_uuid
745
746         extra_submit_params = {}
747         if runtimeContext.submit_runner_cluster:
748             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
749
750         if runtimeContext.submit_request_uuid:
751             if "cluster_id" in extra_submit_params:
752                 # Doesn't make sense for "update" and actually fails
753                 del extra_submit_params["cluster_id"]
754             response = self.arvrunner.api.container_requests().update(
755                 uuid=runtimeContext.submit_request_uuid,
756                 body=job_spec,
757                 **extra_submit_params
758             ).execute(num_retries=self.arvrunner.num_retries)
759         else:
760             response = self.arvrunner.api.container_requests().create(
761                 body=job_spec,
762                 **extra_submit_params
763             ).execute(num_retries=self.arvrunner.num_retries)
764
765         self.uuid = response["uuid"]
766         self.arvrunner.process_submitted(self)
767
768         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
769
770         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
771         if workbench2:
772             url = "{}processes/{}".format(workbench2, response["uuid"])
773             logger.info("Monitor workflow progress at %s", url)
774
775
776     def done(self, record):
777         try:
778             container = self.arvrunner.api.containers().get(
779                 uuid=record["container_uuid"]
780             ).execute(num_retries=self.arvrunner.num_retries)
781             container["log"] = record["log_uuid"]
782         except Exception:
783             logger.exception("%s while getting runner container", self.arvrunner.label(self))
784             self.arvrunner.output_callback({}, "permanentFail")
785         else:
786             super(RunnerContainer, self).done(container)