97b4e45225f47b42d40711da0224c80fc6889153
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18 import re
19
20 import arvados_cwl.util
21 import ruamel.yaml
22
23 from cwltool.errors import WorkflowException
24 from cwltool.process import UnsupportedRequirement, shortname
25 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
26 from cwltool.job import JobBase
27
28 import arvados.collection
29
30 import crunchstat_summary.summarizer
31
32 from .arvdocker import arv_docker_get_image
33 from . import done
34 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
35 from .fsaccess import CollectionFetcher
36 from .pathmapper import NoFollowPathMapper, trim_listing
37 from .perf import Perf
38 from ._version import __version__
39
40 logger = logging.getLogger('arvados.cwl-runner')
41 metrics = logging.getLogger('arvados.cwl-runner.metrics')
42
43 def cleanup_name_for_collection(name):
44     return name.replace("/", " ")
45
46 class ArvadosContainer(JobBase):
47     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
48
49     def __init__(self, runner, job_runtime,
50                  builder,   # type: Builder
51                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
52                  make_path_mapper,  # type: Callable[..., PathMapper]
53                  requirements,      # type: List[Dict[Text, Text]]
54                  hints,     # type: List[Dict[Text, Text]]
55                  name       # type: Text
56     ):
57         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
58         self.arvrunner = runner
59         self.job_runtime = job_runtime
60         self.running = False
61         self.uuid = None
62         self.attempt_count = 0
63
64     def update_pipeline_component(self, r):
65         pass
66
67     def _required_env(self):
68         env = {}
69         env["HOME"] = self.outdir
70         env["TMPDIR"] = self.tmpdir
71         return env
72
73     def run(self, toplevelRuntimeContext):
74         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
75         # which calls makeJobRunner() to get a new ArvadosContainer
76         # object.  The fields that define execution such as
77         # command_line, environment, etc are set on the
78         # ArvadosContainer object by CommandLineTool.job() before
79         # run() is called.
80
81         runtimeContext = self.job_runtime
82
83         if runtimeContext.submit_request_uuid:
84             container_request = self.arvrunner.api.container_requests().get(
85                 uuid=runtimeContext.submit_request_uuid
86             ).execute(num_retries=self.arvrunner.num_retries)
87         else:
88             container_request = {}
89
90         container_request["command"] = self.command_line
91         container_request["name"] = self.name
92         container_request["output_path"] = self.outdir
93         container_request["cwd"] = self.outdir
94         container_request["priority"] = runtimeContext.priority
95         container_request["state"] = "Uncommitted"
96         container_request.setdefault("properties", {})
97
98         container_request["properties"]["cwl_input"] = self.joborder
99
100         runtime_constraints = {}
101
102         if runtimeContext.project_uuid:
103             container_request["owner_uuid"] = runtimeContext.project_uuid
104
105         if self.arvrunner.secret_store.has_secret(self.command_line):
106             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
107
108         if self.arvrunner.secret_store.has_secret(self.environment):
109             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
110
111         resources = self.builder.resources
112         if resources is not None:
113             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
114             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
115
116         mounts = {
117             self.outdir: {
118                 "kind": "tmp",
119                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
120             },
121             self.tmpdir: {
122                 "kind": "tmp",
123                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
124             }
125         }
126         secret_mounts = {}
127         scheduling_parameters = {}
128
129         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
130         rf.sort(key=lambda k: k.resolved)
131         prevdir = None
132         for resolved, target, tp, stg in rf:
133             if not stg:
134                 continue
135             if prevdir and target.startswith(prevdir):
136                 continue
137             if tp == "Directory":
138                 targetdir = target
139             else:
140                 targetdir = os.path.dirname(target)
141             sp = resolved.split("/", 1)
142             pdh = sp[0][5:]   # remove "keep:"
143             mounts[targetdir] = {
144                 "kind": "collection",
145                 "portable_data_hash": pdh
146             }
147             if pdh in self.pathmapper.pdh_to_uuid:
148                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
149             if len(sp) == 2:
150                 if tp == "Directory":
151                     path = sp[1]
152                 else:
153                     path = os.path.dirname(sp[1])
154                 if path and path != "/":
155                     mounts[targetdir]["path"] = path
156             prevdir = targetdir + "/"
157
158         intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
159
160         with Perf(metrics, "generatefiles %s" % self.name):
161             if self.generatefiles["listing"]:
162                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
163                                                     keep_client=self.arvrunner.keep_client,
164                                                     num_retries=self.arvrunner.num_retries)
165                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
166                                                     separateDirs=False)
167
168                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
169
170                 logger.debug("generatemapper is %s", sorteditems)
171
172                 with Perf(metrics, "createfiles %s" % self.name):
173                     for f, p in sorteditems:
174                         if not p.target:
175                             continue
176
177                         if p.target.startswith("/"):
178                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
179                         else:
180                             dst = p.target
181
182                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
183                             if p.resolved.startswith("_:"):
184                                 vwd.mkdirs(dst)
185                             else:
186                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
187                                 vwd.copy(path or ".", dst, source_collection=source)
188                         elif p.type == "CreateFile":
189                             if self.arvrunner.secret_store.has_secret(p.resolved):
190                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
191                                 secret_mounts[mountpoint] = {
192                                     "kind": "text",
193                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
194                                 }
195                             else:
196                                 with vwd.open(dst, "w") as n:
197                                     n.write(p.resolved)
198
199                 def keepemptydirs(p):
200                     if isinstance(p, arvados.collection.RichCollectionBase):
201                         if len(p) == 0:
202                             p.open(".keep", "w").close()
203                         else:
204                             for c in p:
205                                 keepemptydirs(p[c])
206
207                 keepemptydirs(vwd)
208
209                 if not runtimeContext.current_container:
210                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
211                 vwd.save_new(name=intermediate_collection_info["name"],
212                              owner_uuid=runtimeContext.project_uuid,
213                              ensure_unique_name=True,
214                              trash_at=intermediate_collection_info["trash_at"],
215                              properties=intermediate_collection_info["properties"])
216
217                 prev = None
218                 for f, p in sorteditems:
219                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
220                         (prev is not None and p.target.startswith(prev))):
221                         continue
222                     if p.target.startswith("/"):
223                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
224                     else:
225                         dst = p.target
226                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
227                     mounts[mountpoint] = {"kind": "collection",
228                                           "portable_data_hash": vwd.portable_data_hash(),
229                                           "path": dst}
230                     if p.type.startswith("Writable"):
231                         mounts[mountpoint]["writable"] = True
232                     prev = p.target + "/"
233
234         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
235         if self.environment:
236             container_request["environment"].update(self.environment)
237
238         if self.stdin:
239             sp = self.stdin[6:].split("/", 1)
240             mounts["stdin"] = {"kind": "collection",
241                                 "portable_data_hash": sp[0],
242                                 "path": sp[1]}
243
244         if self.stderr:
245             mounts["stderr"] = {"kind": "file",
246                                 "path": "%s/%s" % (self.outdir, self.stderr)}
247
248         if self.stdout:
249             mounts["stdout"] = {"kind": "file",
250                                 "path": "%s/%s" % (self.outdir, self.stdout)}
251
252         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
253
254         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
255                                                                     docker_req,
256                                                                     runtimeContext.pull_image,
257                                                                     runtimeContext)
258
259         network_req, _ = self.get_requirement("NetworkAccess")
260         if network_req:
261             runtime_constraints["API"] = network_req["networkAccess"]
262
263         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
264         if api_req:
265             runtime_constraints["API"] = True
266
267         use_disk_cache = (self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheRAM", 0) == 0)
268
269         keep_cache_type_req, _ = self.get_requirement("http://arvados.org/cwl#KeepCacheTypeRequirement")
270         if keep_cache_type_req:
271             if "keepCacheType" in keep_cache_type_req:
272                 if keep_cache_type_req["keepCacheType"] == "ram_cache":
273                     use_disk_cache = False
274
275         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
276         if runtime_req:
277             if "keep_cache" in runtime_req:
278                 if use_disk_cache:
279                     # If DefaultKeepCacheRAM is zero it means we should use disk cache.
280                     runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20)
281                 else:
282                     runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
283             if "outputDirType" in runtime_req:
284                 if runtime_req["outputDirType"] == "local_output_dir":
285                     # Currently the default behavior.
286                     pass
287                 elif runtime_req["outputDirType"] == "keep_output_dir":
288                     mounts[self.outdir]= {
289                         "kind": "collection",
290                         "writable": True
291                     }
292
293         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
294         if partition_req:
295             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
296
297         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
298         if intermediate_output_req:
299             self.output_ttl = intermediate_output_req["outputTTL"]
300         else:
301             self.output_ttl = self.arvrunner.intermediate_output_ttl
302
303         if self.output_ttl < 0:
304             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
305
306
307         if self.arvrunner.api._rootDesc["revision"] >= "20210628":
308             storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
309             if storage_class_req and storage_class_req.get("intermediateStorageClass"):
310                 container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
311             else:
312                 container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
313
314         cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
315         if cuda_req:
316             runtime_constraints["cuda"] = {
317                 "device_count": resources.get("cudaDeviceCount", 1),
318                 "driver_version": cuda_req["cudaVersionMin"],
319                 "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
320             }
321
322         if runtimeContext.enable_preemptible is False:
323             scheduling_parameters["preemptible"] = False
324         else:
325             preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
326             if preemptible_req:
327                 scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
328             elif runtimeContext.enable_preemptible is True:
329                 scheduling_parameters["preemptible"] = True
330             elif runtimeContext.enable_preemptible is None:
331                 pass
332
333         if self.timelimit is not None and self.timelimit > 0:
334             scheduling_parameters["max_run_time"] = self.timelimit
335
336         extra_submit_params = {}
337         if runtimeContext.submit_runner_cluster:
338             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
339
340         container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name))
341         container_request["output_ttl"] = self.output_ttl
342         container_request["mounts"] = mounts
343         container_request["secret_mounts"] = secret_mounts
344         container_request["runtime_constraints"] = runtime_constraints
345         container_request["scheduling_parameters"] = scheduling_parameters
346
347         enable_reuse = runtimeContext.enable_reuse
348         if enable_reuse:
349             reuse_req, _ = self.get_requirement("WorkReuse")
350             if reuse_req:
351                 enable_reuse = reuse_req["enableReuse"]
352             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
353             if reuse_req:
354                 enable_reuse = reuse_req["enableReuse"]
355         container_request["use_existing"] = enable_reuse
356
357         properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
358         if properties_req:
359             for pr in properties_req["processProperties"]:
360                 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
361
362         output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
363         if output_properties_req:
364             if self.arvrunner.api._rootDesc["revision"] >= "20220510":
365                 container_request["output_properties"] = {}
366                 for pr in output_properties_req["outputProperties"]:
367                     container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
368             else:
369                 logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
370                                self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
371
372         ram_multiplier = [1]
373
374         oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry")
375         if oom_retry_req:
376             if oom_retry_req.get('memoryRetryMultiplier'):
377                 ram_multiplier.append(oom_retry_req.get('memoryRetryMultiplier'))
378             elif oom_retry_req.get('memoryRetryMultipler'):
379                 ram_multiplier.append(oom_retry_req.get('memoryRetryMultipler'))
380             else:
381                 ram_multiplier.append(2)
382
383         if runtimeContext.runnerjob.startswith("arvwf:"):
384             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
385             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
386             if container_request["name"] == "main":
387                 container_request["name"] = wfrecord["name"]
388             container_request["properties"]["template_uuid"] = wfuuid
389
390         if self.attempt_count == 0:
391             self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
392
393         try:
394             ram = runtime_constraints["ram"]
395
396             self.uuid = runtimeContext.submit_request_uuid
397
398             for i in ram_multiplier:
399                 runtime_constraints["ram"] = ram * i
400
401                 if self.uuid:
402                     response = self.arvrunner.api.container_requests().update(
403                         uuid=self.uuid,
404                         body=container_request,
405                         **extra_submit_params
406                     ).execute(num_retries=self.arvrunner.num_retries)
407                 else:
408                     response = self.arvrunner.api.container_requests().create(
409                         body=container_request,
410                         **extra_submit_params
411                     ).execute(num_retries=self.arvrunner.num_retries)
412                     self.uuid = response["uuid"]
413
414                 if response["container_uuid"] is not None:
415                     break
416
417             if response["container_uuid"] is None:
418                 runtime_constraints["ram"] = ram * ram_multiplier[self.attempt_count]
419
420             container_request["state"] = "Committed"
421             response = self.arvrunner.api.container_requests().update(
422                 uuid=self.uuid,
423                 body=container_request,
424                 **extra_submit_params
425             ).execute(num_retries=self.arvrunner.num_retries)
426
427             self.arvrunner.process_submitted(self)
428             self.attempt_count += 1
429
430             if response["state"] == "Final":
431                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
432             else:
433                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
434         except Exception as e:
435             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
436             logger.debug("Container request was %s", container_request)
437             self.output_callback({}, "permanentFail")
438
439     def out_of_memory_retry(self, record, container):
440         oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry")
441         if oom_retry_req is None:
442             return False
443
444         # Sometimes it gets killed with no warning
445         if container["exit_code"] == 137:
446             return True
447
448         logc = arvados.collection.CollectionReader(record["log_uuid"],
449                                                    api_client=self.arvrunner.api,
450                                                    keep_client=self.arvrunner.keep_client,
451                                                    num_retries=self.arvrunner.num_retries)
452
453         loglines = [""]
454         def callback(v1, v2, v3):
455             loglines[0] = v3
456
457         done.logtail(logc, callback, "", maxlen=1000)
458
459         # Check allocation failure
460         oom_matches = oom_retry_req.get('memoryErrorRegex') or r'(bad_alloc|out ?of ?memory|memory ?error|container using over 9.% of memory)'
461         if re.search(oom_matches, loglines[0], re.IGNORECASE | re.MULTILINE):
462             return True
463
464         return False
465
466     def done(self, record):
467         outputs = {}
468         retried = False
469         rcode = None
470         try:
471             container = self.arvrunner.api.containers().get(
472                 uuid=record["container_uuid"]
473             ).execute(num_retries=self.arvrunner.num_retries)
474             if container["state"] == "Complete":
475                 rcode = container["exit_code"]
476                 if self.successCodes and rcode in self.successCodes:
477                     processStatus = "success"
478                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
479                     processStatus = "temporaryFail"
480                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
481                     processStatus = "permanentFail"
482                 elif rcode == 0:
483                     processStatus = "success"
484                 else:
485                     processStatus = "permanentFail"
486
487                 if processStatus == "permanentFail" and self.attempt_count == 1 and self.out_of_memory_retry(record, container):
488                     logger.warning("%s Container failed with out of memory error, retrying with more RAM.",
489                                  self.arvrunner.label(self))
490                     self.job_runtime.submit_request_uuid = None
491                     self.uuid = None
492                     self.run(None)
493                     retried = True
494                     return
495
496                 if rcode == 137:
497                     logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin' or use the arv:OutOfMemoryRetry feature.",
498                                  self.arvrunner.label(self))
499             else:
500                 processStatus = "permanentFail"
501
502             logc = None
503             if record["log_uuid"]:
504                 logc = arvados.collection.Collection(record["log_uuid"],
505                                                      api_client=self.arvrunner.api,
506                                                      keep_client=self.arvrunner.keep_client,
507                                                      num_retries=self.arvrunner.num_retries)
508
509             if processStatus == "permanentFail" and logc is not None:
510                 label = self.arvrunner.label(self)
511                 done.logtail(
512                     logc, logger.error,
513                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40, include_crunchrun=(rcode is None or rcode > 127))
514
515             if record["output_uuid"]:
516                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
517                     # Compute the trash time to avoid requesting the collection record.
518                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
519                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
520                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
521                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
522                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
523                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
524                 self.arvrunner.add_intermediate_output(record["output_uuid"])
525
526             if container["output"]:
527                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
528
529             properties = record["properties"].copy()
530             properties["cwl_output"] = outputs
531             self.arvrunner.api.container_requests().update(
532                 uuid=self.uuid,
533                 body={"container_request": {"properties": properties}}
534             ).execute(num_retries=self.arvrunner.num_retries)
535
536             if logc is not None:
537                 try:
538                     summerizer = crunchstat_summary.summarizer.NewSummarizer(self.uuid, arv=self.arvrunner.api)
539                     summerizer.run()
540                     with logc.open("usage_report.html", "wt") as mr:
541                         mr.write(summerizer.html_report())
542                     logc.save()
543                 except Exception as e:
544                     logger.error("%s unable to generate resource usage report",
545                                  self.arvrunner.label(self),
546                                  exc_info=(e if self.arvrunner.debug else False))
547
548         except WorkflowException as e:
549             # Only include a stack trace if in debug mode.
550             # A stack trace may obfuscate more useful output about the workflow.
551             logger.error("%s unable to collect output from %s:\n%s",
552                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
553             processStatus = "permanentFail"
554         except Exception:
555             logger.exception("%s while getting output object:", self.arvrunner.label(self))
556             processStatus = "permanentFail"
557         finally:
558             if not retried:
559                 self.output_callback(outputs, processStatus)
560
561
562 class RunnerContainer(Runner):
563     """Submit and manage a container that runs arvados-cwl-runner."""
564
565     def arvados_job_spec(self, runtimeContext, git_info):
566         """Create an Arvados container request for this workflow.
567
568         The returned dict can be used to create a container passed as
569         the +body+ argument to container_requests().create().
570         """
571
572         adjustDirObjs(self.job_order, trim_listing)
573         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
574         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
575
576         secret_mounts = {}
577         for param in sorted(self.job_order.keys()):
578             if self.secret_store.has_secret(self.job_order[param]):
579                 mnt = "/secrets/s%d" % len(secret_mounts)
580                 secret_mounts[mnt] = {
581                     "kind": "text",
582                     "content": self.secret_store.retrieve(self.job_order[param])
583                 }
584                 self.job_order[param] = {"$include": mnt}
585
586         container_image = arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext)
587
588         workflow_runner_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
589         if workflow_runner_req and workflow_runner_req.get("acrContainerImage"):
590             container_image = workflow_runner_req.get("acrContainerImage")
591
592         container_req = {
593             "name": self.name,
594             "output_path": "/var/spool/cwl",
595             "cwd": "/var/spool/cwl",
596             "priority": self.priority,
597             "state": "Committed",
598             "container_image": container_image,
599             "mounts": {
600                 "/var/lib/cwl/cwl.input.json": {
601                     "kind": "json",
602                     "content": self.job_order
603                 },
604                 "stdout": {
605                     "kind": "file",
606                     "path": "/var/spool/cwl/cwl.output.json"
607                 },
608                 "/var/spool/cwl": {
609                     "kind": "collection",
610                     "writable": True
611                 }
612             },
613             "secret_mounts": secret_mounts,
614             "runtime_constraints": {
615                 "vcpus": math.ceil(self.submit_runner_cores),
616                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
617                 "API": True
618             },
619             "use_existing": self.reuse_runner,
620             "properties": {}
621         }
622
623         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
624             sp = self.embedded_tool.tool["id"].split('/')
625             workflowcollection = sp[0][5:]
626             workflowname = "/".join(sp[1:])
627             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
628             container_req["mounts"]["/var/lib/cwl/workflow"] = {
629                 "kind": "collection",
630                 "portable_data_hash": "%s" % workflowcollection
631             }
632         elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
633             uuid, frg = urllib.parse.urldefrag(self.embedded_tool.tool["id"])
634             workflowpath = "/var/lib/cwl/workflow.json#" + frg
635             packedtxt = self.loadingContext.loader.fetch_text(uuid)
636             yaml = ruamel.yaml.YAML(typ='safe', pure=True)
637             packed = yaml.load(packedtxt)
638             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
639                 "kind": "json",
640                 "content": packed
641             }
642             container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
643         elif self.embedded_tool.tool.get("id", "").startswith("file:"):
644             raise WorkflowException("Tool id '%s' is a local file but expected keep: or arvwf:" % self.embedded_tool.tool.get("id"))
645         else:
646             main = self.loadingContext.loader.idx["_:main"]
647             if main.get("id") == "_:main":
648                 del main["id"]
649             workflowpath = "/var/lib/cwl/workflow.json#main"
650             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
651                 "kind": "json",
652                 "content": main
653             }
654
655         container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
656
657         properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
658         if properties_req:
659             builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
660             for pr in properties_req["processProperties"]:
661                 container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
662
663         # --local means execute the workflow instead of submitting a container request
664         # --api=containers means use the containers API
665         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
666         # --disable-validate because we already validated so don't need to do it again
667         # --eval-timeout is the timeout for javascript invocation
668         # --parallel-task-count is the number of threads to use for job submission
669         # --enable/disable-reuse sets desired job reuse
670         # --collection-cache-size sets aside memory to store collections
671         command = ["arvados-cwl-runner",
672                    "--local",
673                    "--api=containers",
674                    "--no-log-timestamps",
675                    "--disable-validate",
676                    "--disable-color",
677                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
678                    "--thread-count=%s" % self.arvrunner.thread_count,
679                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
680                    "--collection-cache-size=%s" % self.collection_cache_size]
681
682         if self.output_name:
683             command.append("--output-name=" + self.output_name)
684             container_req["output_name"] = self.output_name
685
686         if self.output_tags:
687             command.append("--output-tags=" + self.output_tags)
688
689         if runtimeContext.debug:
690             command.append("--debug")
691
692         if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
693             command.append("--storage-classes=" + runtimeContext.storage_classes)
694
695         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
696             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
697
698         if runtimeContext.on_error:
699             command.append("--on-error=" + self.on_error)
700
701         if runtimeContext.intermediate_output_ttl:
702             command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
703
704         if runtimeContext.trash_intermediate:
705             command.append("--trash-intermediate")
706
707         if runtimeContext.project_uuid:
708             command.append("--project-uuid="+runtimeContext.project_uuid)
709
710         if self.enable_dev:
711             command.append("--enable-dev")
712
713         if runtimeContext.enable_preemptible is True:
714             command.append("--enable-preemptible")
715
716         if runtimeContext.enable_preemptible is False:
717             command.append("--disable-preemptible")
718
719         if runtimeContext.varying_url_params:
720             command.append("--varying-url-params="+runtimeContext.varying_url_params)
721
722         if runtimeContext.prefer_cached_downloads:
723             command.append("--prefer-cached-downloads")
724
725         if self.fast_parser:
726             command.append("--fast-parser")
727
728         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
729
730         container_req["command"] = command
731
732         return container_req
733
734
735     def run(self, runtimeContext):
736         runtimeContext.keepprefix = "keep:"
737         job_spec = self.arvados_job_spec(runtimeContext, self.git_info)
738         if runtimeContext.project_uuid:
739             job_spec["owner_uuid"] = runtimeContext.project_uuid
740
741         extra_submit_params = {}
742         if runtimeContext.submit_runner_cluster:
743             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
744
745         if runtimeContext.submit_request_uuid:
746             if "cluster_id" in extra_submit_params:
747                 # Doesn't make sense for "update" and actually fails
748                 del extra_submit_params["cluster_id"]
749             response = self.arvrunner.api.container_requests().update(
750                 uuid=runtimeContext.submit_request_uuid,
751                 body=job_spec,
752                 **extra_submit_params
753             ).execute(num_retries=self.arvrunner.num_retries)
754         else:
755             response = self.arvrunner.api.container_requests().create(
756                 body=job_spec,
757                 **extra_submit_params
758             ).execute(num_retries=self.arvrunner.num_retries)
759
760         self.uuid = response["uuid"]
761         self.arvrunner.process_submitted(self)
762
763         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
764
765         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
766         if workbench2:
767             url = "{}processes/{}".format(workbench2, response["uuid"])
768             logger.info("Monitor workflow progress at %s", url)
769
770
771     def done(self, record):
772         try:
773             container = self.arvrunner.api.containers().get(
774                 uuid=record["container_uuid"]
775             ).execute(num_retries=self.arvrunner.num_retries)
776             container["log"] = record["log_uuid"]
777         except Exception:
778             logger.exception("%s while getting runner container", self.arvrunner.label(self))
779             self.arvrunner.output_callback({}, "permanentFail")
780         else:
781             super(RunnerContainer, self).done(container)