19744: Add --enable/disable-usage-report
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18 import re
19
20 import arvados_cwl.util
21 import ruamel.yaml
22
23 from cwltool.errors import WorkflowException
24 from cwltool.process import UnsupportedRequirement, shortname
25 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
26 from cwltool.job import JobBase
27
28 import arvados.collection
29
30 import crunchstat_summary.summarizer
31 import crunchstat_summary.reader
32
33 from .arvdocker import arv_docker_get_image
34 from . import done
35 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
36 from .fsaccess import CollectionFetcher
37 from .pathmapper import NoFollowPathMapper, trim_listing
38 from .perf import Perf
39 from ._version import __version__
40
41 logger = logging.getLogger('arvados.cwl-runner')
42 metrics = logging.getLogger('arvados.cwl-runner.metrics')
43
44 def cleanup_name_for_collection(name):
45     return name.replace("/", " ")
46
47 class ArvadosContainer(JobBase):
48     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
49
50     def __init__(self, runner, job_runtime,
51                  builder,   # type: Builder
52                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
53                  make_path_mapper,  # type: Callable[..., PathMapper]
54                  requirements,      # type: List[Dict[Text, Text]]
55                  hints,     # type: List[Dict[Text, Text]]
56                  name       # type: Text
57     ):
58         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
59         self.arvrunner = runner
60         self.job_runtime = job_runtime
61         self.running = False
62         self.uuid = None
63         self.attempt_count = 0
64
65     def update_pipeline_component(self, r):
66         pass
67
68     def _required_env(self):
69         env = {}
70         env["HOME"] = self.outdir
71         env["TMPDIR"] = self.tmpdir
72         return env
73
74     def run(self, toplevelRuntimeContext):
75         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
76         # which calls makeJobRunner() to get a new ArvadosContainer
77         # object.  The fields that define execution such as
78         # command_line, environment, etc are set on the
79         # ArvadosContainer object by CommandLineTool.job() before
80         # run() is called.
81
82         runtimeContext = self.job_runtime
83
84         if runtimeContext.submit_request_uuid:
85             container_request = self.arvrunner.api.container_requests().get(
86                 uuid=runtimeContext.submit_request_uuid
87             ).execute(num_retries=self.arvrunner.num_retries)
88         else:
89             container_request = {}
90
91         container_request["command"] = self.command_line
92         container_request["name"] = self.name
93         container_request["output_path"] = self.outdir
94         container_request["cwd"] = self.outdir
95         container_request["priority"] = runtimeContext.priority
96         container_request["state"] = "Uncommitted"
97         container_request.setdefault("properties", {})
98
99         container_request["properties"]["cwl_input"] = self.joborder
100
101         runtime_constraints = {}
102
103         if runtimeContext.project_uuid:
104             container_request["owner_uuid"] = runtimeContext.project_uuid
105
106         if self.arvrunner.secret_store.has_secret(self.command_line):
107             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
108
109         if self.arvrunner.secret_store.has_secret(self.environment):
110             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
111
112         resources = self.builder.resources
113         if resources is not None:
114             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
115             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
116
117         mounts = {
118             self.outdir: {
119                 "kind": "tmp",
120                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
121             },
122             self.tmpdir: {
123                 "kind": "tmp",
124                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
125             }
126         }
127         secret_mounts = {}
128         scheduling_parameters = {}
129
130         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
131         rf.sort(key=lambda k: k.resolved)
132         prevdir = None
133         for resolved, target, tp, stg in rf:
134             if not stg:
135                 continue
136             if prevdir and target.startswith(prevdir):
137                 continue
138             if tp == "Directory":
139                 targetdir = target
140             else:
141                 targetdir = os.path.dirname(target)
142             sp = resolved.split("/", 1)
143             pdh = sp[0][5:]   # remove "keep:"
144             mounts[targetdir] = {
145                 "kind": "collection",
146                 "portable_data_hash": pdh
147             }
148             if pdh in self.pathmapper.pdh_to_uuid:
149                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
150             if len(sp) == 2:
151                 if tp == "Directory":
152                     path = sp[1]
153                 else:
154                     path = os.path.dirname(sp[1])
155                 if path and path != "/":
156                     mounts[targetdir]["path"] = path
157             prevdir = targetdir + "/"
158
159         intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
160
161         with Perf(metrics, "generatefiles %s" % self.name):
162             if self.generatefiles["listing"]:
163                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
164                                                     keep_client=self.arvrunner.keep_client,
165                                                     num_retries=self.arvrunner.num_retries)
166                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
167                                                     separateDirs=False)
168
169                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
170
171                 logger.debug("generatemapper is %s", sorteditems)
172
173                 with Perf(metrics, "createfiles %s" % self.name):
174                     for f, p in sorteditems:
175                         if not p.target:
176                             continue
177
178                         if p.target.startswith("/"):
179                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
180                         else:
181                             dst = p.target
182
183                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
184                             if p.resolved.startswith("_:"):
185                                 vwd.mkdirs(dst)
186                             else:
187                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
188                                 vwd.copy(path or ".", dst, source_collection=source)
189                         elif p.type == "CreateFile":
190                             if self.arvrunner.secret_store.has_secret(p.resolved):
191                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
192                                 secret_mounts[mountpoint] = {
193                                     "kind": "text",
194                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
195                                 }
196                             else:
197                                 with vwd.open(dst, "w") as n:
198                                     n.write(p.resolved)
199
200                 def keepemptydirs(p):
201                     if isinstance(p, arvados.collection.RichCollectionBase):
202                         if len(p) == 0:
203                             p.open(".keep", "w").close()
204                         else:
205                             for c in p:
206                                 keepemptydirs(p[c])
207
208                 keepemptydirs(vwd)
209
210                 if not runtimeContext.current_container:
211                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
212                 vwd.save_new(name=intermediate_collection_info["name"],
213                              owner_uuid=runtimeContext.project_uuid,
214                              ensure_unique_name=True,
215                              trash_at=intermediate_collection_info["trash_at"],
216                              properties=intermediate_collection_info["properties"])
217
218                 prev = None
219                 for f, p in sorteditems:
220                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
221                         (prev is not None and p.target.startswith(prev))):
222                         continue
223                     if p.target.startswith("/"):
224                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
225                     else:
226                         dst = p.target
227                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
228                     mounts[mountpoint] = {"kind": "collection",
229                                           "portable_data_hash": vwd.portable_data_hash(),
230                                           "path": dst}
231                     if p.type.startswith("Writable"):
232                         mounts[mountpoint]["writable"] = True
233                     prev = p.target + "/"
234
235         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
236         if self.environment:
237             container_request["environment"].update(self.environment)
238
239         if self.stdin:
240             sp = self.stdin[6:].split("/", 1)
241             mounts["stdin"] = {"kind": "collection",
242                                 "portable_data_hash": sp[0],
243                                 "path": sp[1]}
244
245         if self.stderr:
246             mounts["stderr"] = {"kind": "file",
247                                 "path": "%s/%s" % (self.outdir, self.stderr)}
248
249         if self.stdout:
250             mounts["stdout"] = {"kind": "file",
251                                 "path": "%s/%s" % (self.outdir, self.stdout)}
252
253         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
254
255         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
256                                                                     docker_req,
257                                                                     runtimeContext.pull_image,
258                                                                     runtimeContext)
259
260         network_req, _ = self.get_requirement("NetworkAccess")
261         if network_req:
262             runtime_constraints["API"] = network_req["networkAccess"]
263
264         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
265         if api_req:
266             runtime_constraints["API"] = True
267
268         use_disk_cache = (self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheRAM", 0) == 0)
269
270         keep_cache_type_req, _ = self.get_requirement("http://arvados.org/cwl#KeepCacheTypeRequirement")
271         if keep_cache_type_req:
272             if "keepCacheType" in keep_cache_type_req:
273                 if keep_cache_type_req["keepCacheType"] == "ram_cache":
274                     use_disk_cache = False
275
276         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
277         if runtime_req:
278             if "keep_cache" in runtime_req:
279                 if use_disk_cache:
280                     # If DefaultKeepCacheRAM is zero it means we should use disk cache.
281                     runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20)
282                 else:
283                     runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
284             if "outputDirType" in runtime_req:
285                 if runtime_req["outputDirType"] == "local_output_dir":
286                     # Currently the default behavior.
287                     pass
288                 elif runtime_req["outputDirType"] == "keep_output_dir":
289                     mounts[self.outdir]= {
290                         "kind": "collection",
291                         "writable": True
292                     }
293
294         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
295         if partition_req:
296             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
297
298         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
299         if intermediate_output_req:
300             self.output_ttl = intermediate_output_req["outputTTL"]
301         else:
302             self.output_ttl = self.arvrunner.intermediate_output_ttl
303
304         if self.output_ttl < 0:
305             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
306
307
308         if self.arvrunner.api._rootDesc["revision"] >= "20210628":
309             storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
310             if storage_class_req and storage_class_req.get("intermediateStorageClass"):
311                 container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
312             else:
313                 container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
314
315         cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
316         if cuda_req:
317             runtime_constraints["cuda"] = {
318                 "device_count": resources.get("cudaDeviceCount", 1),
319                 "driver_version": cuda_req["cudaVersionMin"],
320                 "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
321             }
322
323         if runtimeContext.enable_preemptible is False:
324             scheduling_parameters["preemptible"] = False
325         else:
326             preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
327             if preemptible_req:
328                 scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
329             elif runtimeContext.enable_preemptible is True:
330                 scheduling_parameters["preemptible"] = True
331             elif runtimeContext.enable_preemptible is None:
332                 pass
333
334         if self.timelimit is not None and self.timelimit > 0:
335             scheduling_parameters["max_run_time"] = self.timelimit
336
337         extra_submit_params = {}
338         if runtimeContext.submit_runner_cluster:
339             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
340
341         container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name))
342         container_request["output_ttl"] = self.output_ttl
343         container_request["mounts"] = mounts
344         container_request["secret_mounts"] = secret_mounts
345         container_request["runtime_constraints"] = runtime_constraints
346         container_request["scheduling_parameters"] = scheduling_parameters
347
348         enable_reuse = runtimeContext.enable_reuse
349         if enable_reuse:
350             reuse_req, _ = self.get_requirement("WorkReuse")
351             if reuse_req:
352                 enable_reuse = reuse_req["enableReuse"]
353             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
354             if reuse_req:
355                 enable_reuse = reuse_req["enableReuse"]
356         container_request["use_existing"] = enable_reuse
357
358         properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
359         if properties_req:
360             for pr in properties_req["processProperties"]:
361                 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
362
363         output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
364         if output_properties_req:
365             if self.arvrunner.api._rootDesc["revision"] >= "20220510":
366                 container_request["output_properties"] = {}
367                 for pr in output_properties_req["outputProperties"]:
368                     container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
369             else:
370                 logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
371                                self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
372
373         ram_multiplier = [1]
374
375         oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry")
376         if oom_retry_req:
377             if oom_retry_req.get('memoryRetryMultiplier'):
378                 ram_multiplier.append(oom_retry_req.get('memoryRetryMultiplier'))
379             elif oom_retry_req.get('memoryRetryMultipler'):
380                 ram_multiplier.append(oom_retry_req.get('memoryRetryMultipler'))
381             else:
382                 ram_multiplier.append(2)
383
384         if runtimeContext.runnerjob.startswith("arvwf:"):
385             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
386             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
387             if container_request["name"] == "main":
388                 container_request["name"] = wfrecord["name"]
389             container_request["properties"]["template_uuid"] = wfuuid
390
391         if self.attempt_count == 0:
392             self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
393
394         try:
395             ram = runtime_constraints["ram"]
396
397             self.uuid = runtimeContext.submit_request_uuid
398
399             for i in ram_multiplier:
400                 runtime_constraints["ram"] = ram * i
401
402                 if self.uuid:
403                     response = self.arvrunner.api.container_requests().update(
404                         uuid=self.uuid,
405                         body=container_request,
406                         **extra_submit_params
407                     ).execute(num_retries=self.arvrunner.num_retries)
408                 else:
409                     response = self.arvrunner.api.container_requests().create(
410                         body=container_request,
411                         **extra_submit_params
412                     ).execute(num_retries=self.arvrunner.num_retries)
413                     self.uuid = response["uuid"]
414
415                 if response["container_uuid"] is not None:
416                     break
417
418             if response["container_uuid"] is None:
419                 runtime_constraints["ram"] = ram * ram_multiplier[self.attempt_count]
420
421             container_request["state"] = "Committed"
422             response = self.arvrunner.api.container_requests().update(
423                 uuid=self.uuid,
424                 body=container_request,
425                 **extra_submit_params
426             ).execute(num_retries=self.arvrunner.num_retries)
427
428             self.arvrunner.process_submitted(self)
429             self.attempt_count += 1
430
431             if response["state"] == "Final":
432                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
433             else:
434                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
435         except Exception as e:
436             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
437             logger.debug("Container request was %s", container_request)
438             self.output_callback({}, "permanentFail")
439
440     def out_of_memory_retry(self, record, container):
441         oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry")
442         if oom_retry_req is None:
443             return False
444
445         # Sometimes it gets killed with no warning
446         if container["exit_code"] == 137:
447             return True
448
449         logc = arvados.collection.CollectionReader(record["log_uuid"],
450                                                    api_client=self.arvrunner.api,
451                                                    keep_client=self.arvrunner.keep_client,
452                                                    num_retries=self.arvrunner.num_retries)
453
454         loglines = [""]
455         def callback(v1, v2, v3):
456             loglines[0] = v3
457
458         done.logtail(logc, callback, "", maxlen=1000)
459
460         # Check allocation failure
461         oom_matches = oom_retry_req.get('memoryErrorRegex') or r'(bad_alloc|out ?of ?memory|memory ?error|container using over 9.% of memory)'
462         if re.search(oom_matches, loglines[0], re.IGNORECASE | re.MULTILINE):
463             return True
464
465         return False
466
467     def done(self, record):
468         outputs = {}
469         retried = False
470         rcode = None
471         try:
472             container = self.arvrunner.api.containers().get(
473                 uuid=record["container_uuid"]
474             ).execute(num_retries=self.arvrunner.num_retries)
475             if container["state"] == "Complete":
476                 rcode = container["exit_code"]
477                 if self.successCodes and rcode in self.successCodes:
478                     processStatus = "success"
479                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
480                     processStatus = "temporaryFail"
481                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
482                     processStatus = "permanentFail"
483                 elif rcode == 0:
484                     processStatus = "success"
485                 else:
486                     processStatus = "permanentFail"
487
488                 if processStatus == "permanentFail" and self.attempt_count == 1 and self.out_of_memory_retry(record, container):
489                     logger.warning("%s Container failed with out of memory error, retrying with more RAM.",
490                                  self.arvrunner.label(self))
491                     self.job_runtime.submit_request_uuid = None
492                     self.uuid = None
493                     self.run(None)
494                     retried = True
495                     return
496
497                 if rcode == 137:
498                     logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin' or use the arv:OutOfMemoryRetry feature.",
499                                  self.arvrunner.label(self))
500             else:
501                 processStatus = "permanentFail"
502
503             logc = None
504             if record["log_uuid"]:
505                 logc = arvados.collection.Collection(record["log_uuid"],
506                                                      api_client=self.arvrunner.api,
507                                                      keep_client=self.arvrunner.keep_client,
508                                                      num_retries=self.arvrunner.num_retries)
509
510             if processStatus == "permanentFail" and logc is not None:
511                 label = self.arvrunner.label(self)
512                 done.logtail(
513                     logc, logger.error,
514                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40, include_crunchrun=(rcode is None or rcode > 127))
515
516             if record["output_uuid"]:
517                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
518                     # Compute the trash time to avoid requesting the collection record.
519                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
520                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
521                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
522                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
523                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
524                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
525                 self.arvrunner.add_intermediate_output(record["output_uuid"])
526
527             if container["output"]:
528                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
529
530             properties = record["properties"].copy()
531             properties["cwl_output"] = outputs
532             self.arvrunner.api.container_requests().update(
533                 uuid=self.uuid,
534                 body={"container_request": {"properties": properties}}
535             ).execute(num_retries=self.arvrunner.num_retries)
536
537             if logc is not None and self.job_runtime.enable_usage_report is not False:
538                 try:
539                     summarizer = crunchstat_summary.summarizer.ContainerRequestSummarizer(
540                         record,
541                         collection_object=logc,
542                         label=self.name,
543                         arv=self.arvrunner.api)
544                     summarizer.run()
545                     with logc.open("usage_report.html", "wt") as mr:
546                         mr.write(summarizer.html_report())
547                     logc.save()
548
549                     # Post warnings about nodes that are under-utilized.
550                     for rc in summarizer._recommend_gen(lambda x: x):
551                         logger.warning(x)
552
553                 except Exception as e:
554                     logger.warning("%s unable to generate resource usage report",
555                                  self.arvrunner.label(self),
556                                  exc_info=(e if self.arvrunner.debug else False))
557
558         except WorkflowException as e:
559             # Only include a stack trace if in debug mode.
560             # A stack trace may obfuscate more useful output about the workflow.
561             logger.error("%s unable to collect output from %s:\n%s",
562                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
563             processStatus = "permanentFail"
564         except Exception:
565             logger.exception("%s while getting output object:", self.arvrunner.label(self))
566             processStatus = "permanentFail"
567         finally:
568             if not retried:
569                 self.output_callback(outputs, processStatus)
570
571
572 class RunnerContainer(Runner):
573     """Submit and manage a container that runs arvados-cwl-runner."""
574
575     def arvados_job_spec(self, runtimeContext, git_info):
576         """Create an Arvados container request for this workflow.
577
578         The returned dict can be used to create a container passed as
579         the +body+ argument to container_requests().create().
580         """
581
582         adjustDirObjs(self.job_order, trim_listing)
583         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
584         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
585
586         secret_mounts = {}
587         for param in sorted(self.job_order.keys()):
588             if self.secret_store.has_secret(self.job_order[param]):
589                 mnt = "/secrets/s%d" % len(secret_mounts)
590                 secret_mounts[mnt] = {
591                     "kind": "text",
592                     "content": self.secret_store.retrieve(self.job_order[param])
593                 }
594                 self.job_order[param] = {"$include": mnt}
595
596         container_image = arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext)
597
598         workflow_runner_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
599         if workflow_runner_req and workflow_runner_req.get("acrContainerImage"):
600             container_image = workflow_runner_req.get("acrContainerImage")
601
602         container_req = {
603             "name": self.name,
604             "output_path": "/var/spool/cwl",
605             "cwd": "/var/spool/cwl",
606             "priority": self.priority,
607             "state": "Committed",
608             "container_image": container_image,
609             "mounts": {
610                 "/var/lib/cwl/cwl.input.json": {
611                     "kind": "json",
612                     "content": self.job_order
613                 },
614                 "stdout": {
615                     "kind": "file",
616                     "path": "/var/spool/cwl/cwl.output.json"
617                 },
618                 "/var/spool/cwl": {
619                     "kind": "collection",
620                     "writable": True
621                 }
622             },
623             "secret_mounts": secret_mounts,
624             "runtime_constraints": {
625                 "vcpus": math.ceil(self.submit_runner_cores),
626                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
627                 "API": True
628             },
629             "use_existing": self.reuse_runner,
630             "properties": {}
631         }
632
633         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
634             sp = self.embedded_tool.tool["id"].split('/')
635             workflowcollection = sp[0][5:]
636             workflowname = "/".join(sp[1:])
637             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
638             container_req["mounts"]["/var/lib/cwl/workflow"] = {
639                 "kind": "collection",
640                 "portable_data_hash": "%s" % workflowcollection
641             }
642         elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
643             uuid, frg = urllib.parse.urldefrag(self.embedded_tool.tool["id"])
644             workflowpath = "/var/lib/cwl/workflow.json#" + frg
645             packedtxt = self.loadingContext.loader.fetch_text(uuid)
646             yaml = ruamel.yaml.YAML(typ='safe', pure=True)
647             packed = yaml.load(packedtxt)
648             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
649                 "kind": "json",
650                 "content": packed
651             }
652             container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
653         elif self.embedded_tool.tool.get("id", "").startswith("file:"):
654             raise WorkflowException("Tool id '%s' is a local file but expected keep: or arvwf:" % self.embedded_tool.tool.get("id"))
655         else:
656             main = self.loadingContext.loader.idx["_:main"]
657             if main.get("id") == "_:main":
658                 del main["id"]
659             workflowpath = "/var/lib/cwl/workflow.json#main"
660             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
661                 "kind": "json",
662                 "content": main
663             }
664
665         container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
666
667         properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
668         if properties_req:
669             builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
670             for pr in properties_req["processProperties"]:
671                 container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
672
673         # --local means execute the workflow instead of submitting a container request
674         # --api=containers means use the containers API
675         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
676         # --disable-validate because we already validated so don't need to do it again
677         # --eval-timeout is the timeout for javascript invocation
678         # --parallel-task-count is the number of threads to use for job submission
679         # --enable/disable-reuse sets desired job reuse
680         # --collection-cache-size sets aside memory to store collections
681         command = ["arvados-cwl-runner",
682                    "--local",
683                    "--api=containers",
684                    "--no-log-timestamps",
685                    "--disable-validate",
686                    "--disable-color",
687                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
688                    "--thread-count=%s" % self.arvrunner.thread_count,
689                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
690                    "--collection-cache-size=%s" % self.collection_cache_size]
691
692         if self.output_name:
693             command.append("--output-name=" + self.output_name)
694             container_req["output_name"] = self.output_name
695
696         if self.output_tags:
697             command.append("--output-tags=" + self.output_tags)
698
699         if runtimeContext.debug:
700             command.append("--debug")
701
702         if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
703             command.append("--storage-classes=" + runtimeContext.storage_classes)
704
705         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
706             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
707
708         if runtimeContext.on_error:
709             command.append("--on-error=" + self.on_error)
710
711         if runtimeContext.intermediate_output_ttl:
712             command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
713
714         if runtimeContext.trash_intermediate:
715             command.append("--trash-intermediate")
716
717         if runtimeContext.project_uuid:
718             command.append("--project-uuid="+runtimeContext.project_uuid)
719
720         if self.enable_dev:
721             command.append("--enable-dev")
722
723         if runtimeContext.enable_preemptible is True:
724             command.append("--enable-preemptible")
725
726         if runtimeContext.enable_preemptible is False:
727             command.append("--disable-preemptible")
728
729         if runtimeContext.varying_url_params:
730             command.append("--varying-url-params="+runtimeContext.varying_url_params)
731
732         if runtimeContext.prefer_cached_downloads:
733             command.append("--prefer-cached-downloads")
734
735         if runtimeContext.enable_usage_report is True:
736             command.append("--enable-usage-report")
737
738         if runtimeContext.enable_usage_report is False:
739             command.append("--disable-usage-report")
740
741         if self.fast_parser:
742             command.append("--fast-parser")
743
744         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
745
746         container_req["command"] = command
747
748         return container_req
749
750
751     def run(self, runtimeContext):
752         runtimeContext.keepprefix = "keep:"
753         job_spec = self.arvados_job_spec(runtimeContext, self.git_info)
754         if runtimeContext.project_uuid:
755             job_spec["owner_uuid"] = runtimeContext.project_uuid
756
757         extra_submit_params = {}
758         if runtimeContext.submit_runner_cluster:
759             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
760
761         if runtimeContext.submit_request_uuid:
762             if "cluster_id" in extra_submit_params:
763                 # Doesn't make sense for "update" and actually fails
764                 del extra_submit_params["cluster_id"]
765             response = self.arvrunner.api.container_requests().update(
766                 uuid=runtimeContext.submit_request_uuid,
767                 body=job_spec,
768                 **extra_submit_params
769             ).execute(num_retries=self.arvrunner.num_retries)
770         else:
771             response = self.arvrunner.api.container_requests().create(
772                 body=job_spec,
773                 **extra_submit_params
774             ).execute(num_retries=self.arvrunner.num_retries)
775
776         self.uuid = response["uuid"]
777         self.arvrunner.process_submitted(self)
778
779         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
780
781         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
782         if workbench2:
783             url = "{}processes/{}".format(workbench2, response["uuid"])
784             logger.info("Monitor workflow progress at %s", url)
785
786
787     def done(self, record):
788         try:
789             container = self.arvrunner.api.containers().get(
790                 uuid=record["container_uuid"]
791             ).execute(num_retries=self.arvrunner.num_retries)
792             container["log"] = record["log_uuid"]
793         except Exception:
794             logger.exception("%s while getting runner container", self.arvrunner.label(self))
795             self.arvrunner.output_callback({}, "permanentFail")
796         else:
797             super(RunnerContainer, self).done(container)