19847: Add calculation for choosing keep disk cache size.
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.job import JobBase
26
27 import arvados.collection
28
29 from .arvdocker import arv_docker_get_image
30 from . import done
31 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
32 from .fsaccess import CollectionFetcher
33 from .pathmapper import NoFollowPathMapper, trim_listing
34 from .perf import Perf
35 from ._version import __version__
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 def cleanup_name_for_collection(name):
41     return name.replace("/", " ")
42
43 class ArvadosContainer(JobBase):
44     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
45
46     def __init__(self, runner, job_runtime,
47                  builder,   # type: Builder
48                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
49                  make_path_mapper,  # type: Callable[..., PathMapper]
50                  requirements,      # type: List[Dict[Text, Text]]
51                  hints,     # type: List[Dict[Text, Text]]
52                  name       # type: Text
53     ):
54         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
55         self.arvrunner = runner
56         self.job_runtime = job_runtime
57         self.running = False
58         self.uuid = None
59
60     def update_pipeline_component(self, r):
61         pass
62
63     def _required_env(self):
64         env = {}
65         env["HOME"] = self.outdir
66         env["TMPDIR"] = self.tmpdir
67         return env
68
69     def run(self, toplevelRuntimeContext):
70         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
71         # which calls makeJobRunner() to get a new ArvadosContainer
72         # object.  The fields that define execution such as
73         # command_line, environment, etc are set on the
74         # ArvadosContainer object by CommandLineTool.job() before
75         # run() is called.
76
77         runtimeContext = self.job_runtime
78
79         if runtimeContext.submit_request_uuid:
80             container_request = self.arvrunner.api.container_requests().get(
81                 uuid=runtimeContext.submit_request_uuid
82             ).execute(num_retries=self.arvrunner.num_retries)
83         else:
84             container_request = {}
85
86         container_request["command"] = self.command_line
87         container_request["name"] = self.name
88         container_request["output_path"] = self.outdir
89         container_request["cwd"] = self.outdir
90         container_request["priority"] = runtimeContext.priority
91         container_request["state"] = "Committed"
92         container_request.setdefault("properties", {})
93
94         container_request["properties"]["cwl_input"] = self.joborder
95
96         runtime_constraints = {}
97
98         if runtimeContext.project_uuid:
99             container_request["owner_uuid"] = runtimeContext.project_uuid
100
101         if self.arvrunner.secret_store.has_secret(self.command_line):
102             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
103
104         if self.arvrunner.secret_store.has_secret(self.environment):
105             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
106
107         resources = self.builder.resources
108         if resources is not None:
109             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
110             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
111
112         mounts = {
113             self.outdir: {
114                 "kind": "tmp",
115                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
116             },
117             self.tmpdir: {
118                 "kind": "tmp",
119                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
120             }
121         }
122         secret_mounts = {}
123         scheduling_parameters = {}
124
125         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
126         rf.sort(key=lambda k: k.resolved)
127         prevdir = None
128         for resolved, target, tp, stg in rf:
129             if not stg:
130                 continue
131             if prevdir and target.startswith(prevdir):
132                 continue
133             if tp == "Directory":
134                 targetdir = target
135             else:
136                 targetdir = os.path.dirname(target)
137             sp = resolved.split("/", 1)
138             pdh = sp[0][5:]   # remove "keep:"
139             mounts[targetdir] = {
140                 "kind": "collection",
141                 "portable_data_hash": pdh
142             }
143             if pdh in self.pathmapper.pdh_to_uuid:
144                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
145             if len(sp) == 2:
146                 if tp == "Directory":
147                     path = sp[1]
148                 else:
149                     path = os.path.dirname(sp[1])
150                 if path and path != "/":
151                     mounts[targetdir]["path"] = path
152             prevdir = targetdir + "/"
153
154         intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
155
156         with Perf(metrics, "generatefiles %s" % self.name):
157             if self.generatefiles["listing"]:
158                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
159                                                     keep_client=self.arvrunner.keep_client,
160                                                     num_retries=self.arvrunner.num_retries)
161                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
162                                                     separateDirs=False)
163
164                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
165
166                 logger.debug("generatemapper is %s", sorteditems)
167
168                 with Perf(metrics, "createfiles %s" % self.name):
169                     for f, p in sorteditems:
170                         if not p.target:
171                             continue
172
173                         if p.target.startswith("/"):
174                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
175                         else:
176                             dst = p.target
177
178                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
179                             if p.resolved.startswith("_:"):
180                                 vwd.mkdirs(dst)
181                             else:
182                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
183                                 vwd.copy(path or ".", dst, source_collection=source)
184                         elif p.type == "CreateFile":
185                             if self.arvrunner.secret_store.has_secret(p.resolved):
186                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
187                                 secret_mounts[mountpoint] = {
188                                     "kind": "text",
189                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
190                                 }
191                             else:
192                                 with vwd.open(dst, "w") as n:
193                                     n.write(p.resolved)
194
195                 def keepemptydirs(p):
196                     if isinstance(p, arvados.collection.RichCollectionBase):
197                         if len(p) == 0:
198                             p.open(".keep", "w").close()
199                         else:
200                             for c in p:
201                                 keepemptydirs(p[c])
202
203                 keepemptydirs(vwd)
204
205                 if not runtimeContext.current_container:
206                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
207                 vwd.save_new(name=intermediate_collection_info["name"],
208                              owner_uuid=runtimeContext.project_uuid,
209                              ensure_unique_name=True,
210                              trash_at=intermediate_collection_info["trash_at"],
211                              properties=intermediate_collection_info["properties"])
212
213                 prev = None
214                 for f, p in sorteditems:
215                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
216                         (prev is not None and p.target.startswith(prev))):
217                         continue
218                     if p.target.startswith("/"):
219                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
220                     else:
221                         dst = p.target
222                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
223                     mounts[mountpoint] = {"kind": "collection",
224                                           "portable_data_hash": vwd.portable_data_hash(),
225                                           "path": dst}
226                     if p.type.startswith("Writable"):
227                         mounts[mountpoint]["writable"] = True
228                     prev = p.target + "/"
229
230         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
231         if self.environment:
232             container_request["environment"].update(self.environment)
233
234         if self.stdin:
235             sp = self.stdin[6:].split("/", 1)
236             mounts["stdin"] = {"kind": "collection",
237                                 "portable_data_hash": sp[0],
238                                 "path": sp[1]}
239
240         if self.stderr:
241             mounts["stderr"] = {"kind": "file",
242                                 "path": "%s/%s" % (self.outdir, self.stderr)}
243
244         if self.stdout:
245             mounts["stdout"] = {"kind": "file",
246                                 "path": "%s/%s" % (self.outdir, self.stdout)}
247
248         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
249
250         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
251                                                                     docker_req,
252                                                                     runtimeContext.pull_image,
253                                                                     runtimeContext.project_uuid,
254                                                                     runtimeContext.force_docker_pull,
255                                                                     runtimeContext.tmp_outdir_prefix,
256                                                                     runtimeContext.match_local_docker,
257                                                                     runtimeContext.copy_deps)
258
259         network_req, _ = self.get_requirement("NetworkAccess")
260         if network_req:
261             runtime_constraints["API"] = network_req["networkAccess"]
262
263         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
264         if api_req:
265             runtime_constraints["API"] = True
266
267         use_disk_cache = (self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheDisk", 0) > 0)
268
269         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
270         if runtime_req:
271             if "keepCacheType" in runtime_req:
272                 if cache_type == "ram_cache":
273                     use_disk_cache = False
274             if "keep_cache" in runtime_req:
275                 if use_disk_cache:
276                     # If DefaultKeepCacheDisk is non-zero it means we should use disk cache.
277                     runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20)
278                 else:
279                     runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
280             if "outputDirType" in runtime_req:
281                 if runtime_req["outputDirType"] == "local_output_dir":
282                     # Currently the default behavior.
283                     pass
284                 elif runtime_req["outputDirType"] == "keep_output_dir":
285                     mounts[self.outdir]= {
286                         "kind": "collection",
287                         "writable": True
288                     }
289
290         if use_disk_cache and "keep_cache_disk" not in runtime_constraints:
291             # Cache size wasn't explicitly set so calculate a default
292             # based on 2x RAM request or 1 GB per core, whichever is
293             # smaller.  This is to avoid requesting 100s of GB of disk
294             # cache when requesting a node with a huge amount of RAM.
295             runtime_constraints["keep_cache_disk"] = min(runtime_constraints["ram"] * 2, runtime_constraints["vcpus"] * (1024*1024*1024))
296
297         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
298         if partition_req:
299             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
300
301         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
302         if intermediate_output_req:
303             self.output_ttl = intermediate_output_req["outputTTL"]
304         else:
305             self.output_ttl = self.arvrunner.intermediate_output_ttl
306
307         if self.output_ttl < 0:
308             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
309
310
311         if self.arvrunner.api._rootDesc["revision"] >= "20210628":
312             storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
313             if storage_class_req and storage_class_req.get("intermediateStorageClass"):
314                 container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
315             else:
316                 container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
317
318         cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
319         if cuda_req:
320             runtime_constraints["cuda"] = {
321                 "device_count": resources.get("cudaDeviceCount", 1),
322                 "driver_version": cuda_req["cudaVersionMin"],
323                 "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
324             }
325
326         if runtimeContext.enable_preemptible is False:
327             scheduling_parameters["preemptible"] = False
328         else:
329             preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
330             if preemptible_req:
331                 scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
332             elif runtimeContext.enable_preemptible is True:
333                 scheduling_parameters["preemptible"] = True
334             elif runtimeContext.enable_preemptible is None:
335                 pass
336
337         if self.timelimit is not None and self.timelimit > 0:
338             scheduling_parameters["max_run_time"] = self.timelimit
339
340         extra_submit_params = {}
341         if runtimeContext.submit_runner_cluster:
342             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
343
344         container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name))
345         container_request["output_ttl"] = self.output_ttl
346         container_request["mounts"] = mounts
347         container_request["secret_mounts"] = secret_mounts
348         container_request["runtime_constraints"] = runtime_constraints
349         container_request["scheduling_parameters"] = scheduling_parameters
350
351         enable_reuse = runtimeContext.enable_reuse
352         if enable_reuse:
353             reuse_req, _ = self.get_requirement("WorkReuse")
354             if reuse_req:
355                 enable_reuse = reuse_req["enableReuse"]
356             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
357             if reuse_req:
358                 enable_reuse = reuse_req["enableReuse"]
359         container_request["use_existing"] = enable_reuse
360
361         properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
362         if properties_req:
363             for pr in properties_req["processProperties"]:
364                 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
365
366         output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
367         if output_properties_req:
368             if self.arvrunner.api._rootDesc["revision"] >= "20220510":
369                 container_request["output_properties"] = {}
370                 for pr in output_properties_req["outputProperties"]:
371                     container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
372             else:
373                 logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
374                                self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
375
376         if runtimeContext.runnerjob.startswith("arvwf:"):
377             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
378             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
379             if container_request["name"] == "main":
380                 container_request["name"] = wfrecord["name"]
381             container_request["properties"]["template_uuid"] = wfuuid
382
383         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
384
385         try:
386             if runtimeContext.submit_request_uuid:
387                 response = self.arvrunner.api.container_requests().update(
388                     uuid=runtimeContext.submit_request_uuid,
389                     body=container_request,
390                     **extra_submit_params
391                 ).execute(num_retries=self.arvrunner.num_retries)
392             else:
393                 response = self.arvrunner.api.container_requests().create(
394                     body=container_request,
395                     **extra_submit_params
396                 ).execute(num_retries=self.arvrunner.num_retries)
397
398             self.uuid = response["uuid"]
399             self.arvrunner.process_submitted(self)
400
401             if response["state"] == "Final":
402                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
403             else:
404                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
405         except Exception as e:
406             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
407             logger.debug("Container request was %s", container_request)
408             self.output_callback({}, "permanentFail")
409
410     def done(self, record):
411         outputs = {}
412         try:
413             container = self.arvrunner.api.containers().get(
414                 uuid=record["container_uuid"]
415             ).execute(num_retries=self.arvrunner.num_retries)
416             if container["state"] == "Complete":
417                 rcode = container["exit_code"]
418                 if self.successCodes and rcode in self.successCodes:
419                     processStatus = "success"
420                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
421                     processStatus = "temporaryFail"
422                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
423                     processStatus = "permanentFail"
424                 elif rcode == 0:
425                     processStatus = "success"
426                 else:
427                     processStatus = "permanentFail"
428
429                 if rcode == 137:
430                     logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin'.",
431                                  self.arvrunner.label(self))
432             else:
433                 processStatus = "permanentFail"
434
435             if processStatus == "permanentFail" and record["log_uuid"]:
436                 logc = arvados.collection.CollectionReader(record["log_uuid"],
437                                                            api_client=self.arvrunner.api,
438                                                            keep_client=self.arvrunner.keep_client,
439                                                            num_retries=self.arvrunner.num_retries)
440                 label = self.arvrunner.label(self)
441                 done.logtail(
442                     logc, logger.error,
443                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
444
445             if record["output_uuid"]:
446                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
447                     # Compute the trash time to avoid requesting the collection record.
448                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
449                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
450                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
451                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
452                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
453                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
454                 self.arvrunner.add_intermediate_output(record["output_uuid"])
455
456             if container["output"]:
457                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
458
459             properties = record["properties"].copy()
460             properties["cwl_output"] = outputs
461             self.arvrunner.api.container_requests().update(
462                 uuid=self.uuid,
463                 body={"container_request": {"properties": properties}}
464             ).execute(num_retries=self.arvrunner.num_retries)
465         except WorkflowException as e:
466             # Only include a stack trace if in debug mode.
467             # A stack trace may obfuscate more useful output about the workflow.
468             logger.error("%s unable to collect output from %s:\n%s",
469                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
470             processStatus = "permanentFail"
471         except Exception:
472             logger.exception("%s while getting output object:", self.arvrunner.label(self))
473             processStatus = "permanentFail"
474         finally:
475             self.output_callback(outputs, processStatus)
476
477
478 class RunnerContainer(Runner):
479     """Submit and manage a container that runs arvados-cwl-runner."""
480
481     def arvados_job_spec(self, runtimeContext, git_info):
482         """Create an Arvados container request for this workflow.
483
484         The returned dict can be used to create a container passed as
485         the +body+ argument to container_requests().create().
486         """
487
488         adjustDirObjs(self.job_order, trim_listing)
489         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
490         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
491
492         secret_mounts = {}
493         for param in sorted(self.job_order.keys()):
494             if self.secret_store.has_secret(self.job_order[param]):
495                 mnt = "/secrets/s%d" % len(secret_mounts)
496                 secret_mounts[mnt] = {
497                     "kind": "text",
498                     "content": self.secret_store.retrieve(self.job_order[param])
499                 }
500                 self.job_order[param] = {"$include": mnt}
501
502         container_req = {
503             "name": self.name,
504             "output_path": "/var/spool/cwl",
505             "cwd": "/var/spool/cwl",
506             "priority": self.priority,
507             "state": "Committed",
508             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
509             "mounts": {
510                 "/var/lib/cwl/cwl.input.json": {
511                     "kind": "json",
512                     "content": self.job_order
513                 },
514                 "stdout": {
515                     "kind": "file",
516                     "path": "/var/spool/cwl/cwl.output.json"
517                 },
518                 "/var/spool/cwl": {
519                     "kind": "collection",
520                     "writable": True
521                 }
522             },
523             "secret_mounts": secret_mounts,
524             "runtime_constraints": {
525                 "vcpus": math.ceil(self.submit_runner_cores),
526                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
527                 "API": True
528             },
529             "use_existing": False, # Never reuse the runner container - see #15497.
530             "properties": {}
531         }
532
533         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
534             sp = self.embedded_tool.tool["id"].split('/')
535             workflowcollection = sp[0][5:]
536             workflowname = "/".join(sp[1:])
537             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
538             container_req["mounts"]["/var/lib/cwl/workflow"] = {
539                 "kind": "collection",
540                 "portable_data_hash": "%s" % workflowcollection
541             }
542         elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
543             workflowpath = "/var/lib/cwl/workflow.json#main"
544             record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
545             packed = yaml.safe_load(record["definition"])
546             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
547                 "kind": "json",
548                 "content": packed
549             }
550             container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
551         else:
552             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
553             workflowpath = "/var/lib/cwl/workflow.json#main"
554             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
555                 "kind": "json",
556                 "content": packed
557             }
558
559         container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
560
561         properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
562         if properties_req:
563             builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
564             for pr in properties_req["processProperties"]:
565                 container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
566
567         # --local means execute the workflow instead of submitting a container request
568         # --api=containers means use the containers API
569         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
570         # --disable-validate because we already validated so don't need to do it again
571         # --eval-timeout is the timeout for javascript invocation
572         # --parallel-task-count is the number of threads to use for job submission
573         # --enable/disable-reuse sets desired job reuse
574         # --collection-cache-size sets aside memory to store collections
575         command = ["arvados-cwl-runner",
576                    "--local",
577                    "--api=containers",
578                    "--no-log-timestamps",
579                    "--disable-validate",
580                    "--disable-color",
581                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
582                    "--thread-count=%s" % self.arvrunner.thread_count,
583                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
584                    "--collection-cache-size=%s" % self.collection_cache_size]
585
586         if self.output_name:
587             command.append("--output-name=" + self.output_name)
588             container_req["output_name"] = self.output_name
589
590         if self.output_tags:
591             command.append("--output-tags=" + self.output_tags)
592
593         if runtimeContext.debug:
594             command.append("--debug")
595
596         if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
597             command.append("--storage-classes=" + runtimeContext.storage_classes)
598
599         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
600             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
601
602         if runtimeContext.on_error:
603             command.append("--on-error=" + self.on_error)
604
605         if runtimeContext.intermediate_output_ttl:
606             command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
607
608         if runtimeContext.trash_intermediate:
609             command.append("--trash-intermediate")
610
611         if runtimeContext.project_uuid:
612             command.append("--project-uuid="+runtimeContext.project_uuid)
613
614         if self.enable_dev:
615             command.append("--enable-dev")
616
617         if runtimeContext.enable_preemptible is True:
618             command.append("--enable-preemptible")
619
620         if runtimeContext.enable_preemptible is False:
621             command.append("--disable-preemptible")
622
623         if runtimeContext.varying_url_params:
624             command.append("--varying-url-params="+runtimeContext.varying_url_params)
625
626         if runtimeContext.prefer_cached_downloads:
627             command.append("--prefer-cached-downloads")
628
629         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
630
631         container_req["command"] = command
632
633         return container_req
634
635
636     def run(self, runtimeContext):
637         runtimeContext.keepprefix = "keep:"
638         job_spec = self.arvados_job_spec(runtimeContext, self.git_info)
639         if runtimeContext.project_uuid:
640             job_spec["owner_uuid"] = runtimeContext.project_uuid
641
642         extra_submit_params = {}
643         if runtimeContext.submit_runner_cluster:
644             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
645
646         if runtimeContext.submit_request_uuid:
647             if "cluster_id" in extra_submit_params:
648                 # Doesn't make sense for "update" and actually fails
649                 del extra_submit_params["cluster_id"]
650             response = self.arvrunner.api.container_requests().update(
651                 uuid=runtimeContext.submit_request_uuid,
652                 body=job_spec,
653                 **extra_submit_params
654             ).execute(num_retries=self.arvrunner.num_retries)
655         else:
656             response = self.arvrunner.api.container_requests().create(
657                 body=job_spec,
658                 **extra_submit_params
659             ).execute(num_retries=self.arvrunner.num_retries)
660
661         self.uuid = response["uuid"]
662         self.arvrunner.process_submitted(self)
663
664         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
665
666         workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
667         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
668         url = ""
669         if workbench2:
670             url = "{}processes/{}".format(workbench2, response["uuid"])
671         elif workbench1:
672             url = "{}container_requests/{}".format(workbench1, response["uuid"])
673         if url:
674             logger.info("Monitor workflow progress at %s", url)
675
676
677     def done(self, record):
678         try:
679             container = self.arvrunner.api.containers().get(
680                 uuid=record["container_uuid"]
681             ).execute(num_retries=self.arvrunner.num_retries)
682             container["log"] = record["log_uuid"]
683         except Exception:
684             logger.exception("%s while getting runner container", self.arvrunner.label(self))
685             self.arvrunner.output_callback({}, "permanentFail")
686         else:
687             super(RunnerContainer, self).done(container)