19847: Change disk cache size heuristic
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.job import JobBase
26
27 import arvados.collection
28
29 from .arvdocker import arv_docker_get_image
30 from . import done
31 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
32 from .fsaccess import CollectionFetcher
33 from .pathmapper import NoFollowPathMapper, trim_listing
34 from .perf import Perf
35 from ._version import __version__
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 def cleanup_name_for_collection(name):
41     return name.replace("/", " ")
42
43 class ArvadosContainer(JobBase):
44     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
45
46     def __init__(self, runner, job_runtime,
47                  builder,   # type: Builder
48                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
49                  make_path_mapper,  # type: Callable[..., PathMapper]
50                  requirements,      # type: List[Dict[Text, Text]]
51                  hints,     # type: List[Dict[Text, Text]]
52                  name       # type: Text
53     ):
54         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
55         self.arvrunner = runner
56         self.job_runtime = job_runtime
57         self.running = False
58         self.uuid = None
59
60     def update_pipeline_component(self, r):
61         pass
62
63     def _required_env(self):
64         env = {}
65         env["HOME"] = self.outdir
66         env["TMPDIR"] = self.tmpdir
67         return env
68
69     def run(self, toplevelRuntimeContext):
70         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
71         # which calls makeJobRunner() to get a new ArvadosContainer
72         # object.  The fields that define execution such as
73         # command_line, environment, etc are set on the
74         # ArvadosContainer object by CommandLineTool.job() before
75         # run() is called.
76
77         runtimeContext = self.job_runtime
78
79         if runtimeContext.submit_request_uuid:
80             container_request = self.arvrunner.api.container_requests().get(
81                 uuid=runtimeContext.submit_request_uuid
82             ).execute(num_retries=self.arvrunner.num_retries)
83         else:
84             container_request = {}
85
86         container_request["command"] = self.command_line
87         container_request["name"] = self.name
88         container_request["output_path"] = self.outdir
89         container_request["cwd"] = self.outdir
90         container_request["priority"] = runtimeContext.priority
91         container_request["state"] = "Committed"
92         container_request.setdefault("properties", {})
93
94         container_request["properties"]["cwl_input"] = self.joborder
95
96         runtime_constraints = {}
97
98         if runtimeContext.project_uuid:
99             container_request["owner_uuid"] = runtimeContext.project_uuid
100
101         if self.arvrunner.secret_store.has_secret(self.command_line):
102             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
103
104         if self.arvrunner.secret_store.has_secret(self.environment):
105             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
106
107         resources = self.builder.resources
108         if resources is not None:
109             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
110             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
111
112         mounts = {
113             self.outdir: {
114                 "kind": "tmp",
115                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
116             },
117             self.tmpdir: {
118                 "kind": "tmp",
119                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
120             }
121         }
122         secret_mounts = {}
123         scheduling_parameters = {}
124
125         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
126         rf.sort(key=lambda k: k.resolved)
127         prevdir = None
128         for resolved, target, tp, stg in rf:
129             if not stg:
130                 continue
131             if prevdir and target.startswith(prevdir):
132                 continue
133             if tp == "Directory":
134                 targetdir = target
135             else:
136                 targetdir = os.path.dirname(target)
137             sp = resolved.split("/", 1)
138             pdh = sp[0][5:]   # remove "keep:"
139             mounts[targetdir] = {
140                 "kind": "collection",
141                 "portable_data_hash": pdh
142             }
143             if pdh in self.pathmapper.pdh_to_uuid:
144                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
145             if len(sp) == 2:
146                 if tp == "Directory":
147                     path = sp[1]
148                 else:
149                     path = os.path.dirname(sp[1])
150                 if path and path != "/":
151                     mounts[targetdir]["path"] = path
152             prevdir = targetdir + "/"
153
154         intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
155
156         with Perf(metrics, "generatefiles %s" % self.name):
157             if self.generatefiles["listing"]:
158                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
159                                                     keep_client=self.arvrunner.keep_client,
160                                                     num_retries=self.arvrunner.num_retries)
161                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
162                                                     separateDirs=False)
163
164                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
165
166                 logger.debug("generatemapper is %s", sorteditems)
167
168                 with Perf(metrics, "createfiles %s" % self.name):
169                     for f, p in sorteditems:
170                         if not p.target:
171                             continue
172
173                         if p.target.startswith("/"):
174                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
175                         else:
176                             dst = p.target
177
178                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
179                             if p.resolved.startswith("_:"):
180                                 vwd.mkdirs(dst)
181                             else:
182                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
183                                 vwd.copy(path or ".", dst, source_collection=source)
184                         elif p.type == "CreateFile":
185                             if self.arvrunner.secret_store.has_secret(p.resolved):
186                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
187                                 secret_mounts[mountpoint] = {
188                                     "kind": "text",
189                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
190                                 }
191                             else:
192                                 with vwd.open(dst, "w") as n:
193                                     n.write(p.resolved)
194
195                 def keepemptydirs(p):
196                     if isinstance(p, arvados.collection.RichCollectionBase):
197                         if len(p) == 0:
198                             p.open(".keep", "w").close()
199                         else:
200                             for c in p:
201                                 keepemptydirs(p[c])
202
203                 keepemptydirs(vwd)
204
205                 if not runtimeContext.current_container:
206                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
207                 vwd.save_new(name=intermediate_collection_info["name"],
208                              owner_uuid=runtimeContext.project_uuid,
209                              ensure_unique_name=True,
210                              trash_at=intermediate_collection_info["trash_at"],
211                              properties=intermediate_collection_info["properties"])
212
213                 prev = None
214                 for f, p in sorteditems:
215                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
216                         (prev is not None and p.target.startswith(prev))):
217                         continue
218                     if p.target.startswith("/"):
219                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
220                     else:
221                         dst = p.target
222                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
223                     mounts[mountpoint] = {"kind": "collection",
224                                           "portable_data_hash": vwd.portable_data_hash(),
225                                           "path": dst}
226                     if p.type.startswith("Writable"):
227                         mounts[mountpoint]["writable"] = True
228                     prev = p.target + "/"
229
230         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
231         if self.environment:
232             container_request["environment"].update(self.environment)
233
234         if self.stdin:
235             sp = self.stdin[6:].split("/", 1)
236             mounts["stdin"] = {"kind": "collection",
237                                 "portable_data_hash": sp[0],
238                                 "path": sp[1]}
239
240         if self.stderr:
241             mounts["stderr"] = {"kind": "file",
242                                 "path": "%s/%s" % (self.outdir, self.stderr)}
243
244         if self.stdout:
245             mounts["stdout"] = {"kind": "file",
246                                 "path": "%s/%s" % (self.outdir, self.stdout)}
247
248         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
249
250         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
251                                                                     docker_req,
252                                                                     runtimeContext.pull_image,
253                                                                     runtimeContext.project_uuid,
254                                                                     runtimeContext.force_docker_pull,
255                                                                     runtimeContext.tmp_outdir_prefix,
256                                                                     runtimeContext.match_local_docker,
257                                                                     runtimeContext.copy_deps)
258
259         network_req, _ = self.get_requirement("NetworkAccess")
260         if network_req:
261             runtime_constraints["API"] = network_req["networkAccess"]
262
263         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
264         if api_req:
265             runtime_constraints["API"] = True
266
267         use_disk_cache = (self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheDisk", 0) > 0)
268
269         keep_cache_type_req, _ = self.get_requirement("http://arvados.org/cwl#KeepCacheTypeRequirement")
270         if keep_cache_type_req:
271             if "keepCacheType" in keep_cache_type_req:
272                 if keep_cache_type_req["keepCacheType"] == "ram_cache":
273                     use_disk_cache = False
274
275         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
276         if runtime_req:
277             if "keep_cache" in runtime_req:
278                 if use_disk_cache:
279                     # If DefaultKeepCacheDisk is non-zero it means we should use disk cache.
280                     runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20)
281                 else:
282                     runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
283             if "outputDirType" in runtime_req:
284                 if runtime_req["outputDirType"] == "local_output_dir":
285                     # Currently the default behavior.
286                     pass
287                 elif runtime_req["outputDirType"] == "keep_output_dir":
288                     mounts[self.outdir]= {
289                         "kind": "collection",
290                         "writable": True
291                     }
292
293         if use_disk_cache and "keep_cache_disk" not in runtime_constraints:
294             # Cache size wasn't explicitly set so set the default to
295             # 2 GB <= the size of the RAM request <= 32 GiB
296             runtime_constraints["keep_cache_disk"] = min(max(2*1024*1024*1024, runtime_constraints["ram"]), 32*1024*1024*1024)
297
298         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
299         if partition_req:
300             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
301
302         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
303         if intermediate_output_req:
304             self.output_ttl = intermediate_output_req["outputTTL"]
305         else:
306             self.output_ttl = self.arvrunner.intermediate_output_ttl
307
308         if self.output_ttl < 0:
309             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
310
311
312         if self.arvrunner.api._rootDesc["revision"] >= "20210628":
313             storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
314             if storage_class_req and storage_class_req.get("intermediateStorageClass"):
315                 container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
316             else:
317                 container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
318
319         cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
320         if cuda_req:
321             runtime_constraints["cuda"] = {
322                 "device_count": resources.get("cudaDeviceCount", 1),
323                 "driver_version": cuda_req["cudaVersionMin"],
324                 "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
325             }
326
327         if runtimeContext.enable_preemptible is False:
328             scheduling_parameters["preemptible"] = False
329         else:
330             preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
331             if preemptible_req:
332                 scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
333             elif runtimeContext.enable_preemptible is True:
334                 scheduling_parameters["preemptible"] = True
335             elif runtimeContext.enable_preemptible is None:
336                 pass
337
338         if self.timelimit is not None and self.timelimit > 0:
339             scheduling_parameters["max_run_time"] = self.timelimit
340
341         extra_submit_params = {}
342         if runtimeContext.submit_runner_cluster:
343             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
344
345         container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name))
346         container_request["output_ttl"] = self.output_ttl
347         container_request["mounts"] = mounts
348         container_request["secret_mounts"] = secret_mounts
349         container_request["runtime_constraints"] = runtime_constraints
350         container_request["scheduling_parameters"] = scheduling_parameters
351
352         enable_reuse = runtimeContext.enable_reuse
353         if enable_reuse:
354             reuse_req, _ = self.get_requirement("WorkReuse")
355             if reuse_req:
356                 enable_reuse = reuse_req["enableReuse"]
357             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
358             if reuse_req:
359                 enable_reuse = reuse_req["enableReuse"]
360         container_request["use_existing"] = enable_reuse
361
362         properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
363         if properties_req:
364             for pr in properties_req["processProperties"]:
365                 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
366
367         output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
368         if output_properties_req:
369             if self.arvrunner.api._rootDesc["revision"] >= "20220510":
370                 container_request["output_properties"] = {}
371                 for pr in output_properties_req["outputProperties"]:
372                     container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
373             else:
374                 logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
375                                self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
376
377         if runtimeContext.runnerjob.startswith("arvwf:"):
378             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
379             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
380             if container_request["name"] == "main":
381                 container_request["name"] = wfrecord["name"]
382             container_request["properties"]["template_uuid"] = wfuuid
383
384         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
385
386         try:
387             if runtimeContext.submit_request_uuid:
388                 response = self.arvrunner.api.container_requests().update(
389                     uuid=runtimeContext.submit_request_uuid,
390                     body=container_request,
391                     **extra_submit_params
392                 ).execute(num_retries=self.arvrunner.num_retries)
393             else:
394                 response = self.arvrunner.api.container_requests().create(
395                     body=container_request,
396                     **extra_submit_params
397                 ).execute(num_retries=self.arvrunner.num_retries)
398
399             self.uuid = response["uuid"]
400             self.arvrunner.process_submitted(self)
401
402             if response["state"] == "Final":
403                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
404             else:
405                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
406         except Exception as e:
407             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
408             logger.debug("Container request was %s", container_request)
409             self.output_callback({}, "permanentFail")
410
411     def done(self, record):
412         outputs = {}
413         try:
414             container = self.arvrunner.api.containers().get(
415                 uuid=record["container_uuid"]
416             ).execute(num_retries=self.arvrunner.num_retries)
417             if container["state"] == "Complete":
418                 rcode = container["exit_code"]
419                 if self.successCodes and rcode in self.successCodes:
420                     processStatus = "success"
421                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
422                     processStatus = "temporaryFail"
423                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
424                     processStatus = "permanentFail"
425                 elif rcode == 0:
426                     processStatus = "success"
427                 else:
428                     processStatus = "permanentFail"
429
430                 if rcode == 137:
431                     logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin'.",
432                                  self.arvrunner.label(self))
433             else:
434                 processStatus = "permanentFail"
435
436             if processStatus == "permanentFail" and record["log_uuid"]:
437                 logc = arvados.collection.CollectionReader(record["log_uuid"],
438                                                            api_client=self.arvrunner.api,
439                                                            keep_client=self.arvrunner.keep_client,
440                                                            num_retries=self.arvrunner.num_retries)
441                 label = self.arvrunner.label(self)
442                 done.logtail(
443                     logc, logger.error,
444                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
445
446             if record["output_uuid"]:
447                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
448                     # Compute the trash time to avoid requesting the collection record.
449                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
450                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
451                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
452                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
453                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
454                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
455                 self.arvrunner.add_intermediate_output(record["output_uuid"])
456
457             if container["output"]:
458                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
459
460             properties = record["properties"].copy()
461             properties["cwl_output"] = outputs
462             self.arvrunner.api.container_requests().update(
463                 uuid=self.uuid,
464                 body={"container_request": {"properties": properties}}
465             ).execute(num_retries=self.arvrunner.num_retries)
466         except WorkflowException as e:
467             # Only include a stack trace if in debug mode.
468             # A stack trace may obfuscate more useful output about the workflow.
469             logger.error("%s unable to collect output from %s:\n%s",
470                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
471             processStatus = "permanentFail"
472         except Exception:
473             logger.exception("%s while getting output object:", self.arvrunner.label(self))
474             processStatus = "permanentFail"
475         finally:
476             self.output_callback(outputs, processStatus)
477
478
479 class RunnerContainer(Runner):
480     """Submit and manage a container that runs arvados-cwl-runner."""
481
482     def arvados_job_spec(self, runtimeContext, git_info):
483         """Create an Arvados container request for this workflow.
484
485         The returned dict can be used to create a container passed as
486         the +body+ argument to container_requests().create().
487         """
488
489         adjustDirObjs(self.job_order, trim_listing)
490         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
491         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
492
493         secret_mounts = {}
494         for param in sorted(self.job_order.keys()):
495             if self.secret_store.has_secret(self.job_order[param]):
496                 mnt = "/secrets/s%d" % len(secret_mounts)
497                 secret_mounts[mnt] = {
498                     "kind": "text",
499                     "content": self.secret_store.retrieve(self.job_order[param])
500                 }
501                 self.job_order[param] = {"$include": mnt}
502
503         container_req = {
504             "name": self.name,
505             "output_path": "/var/spool/cwl",
506             "cwd": "/var/spool/cwl",
507             "priority": self.priority,
508             "state": "Committed",
509             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
510             "mounts": {
511                 "/var/lib/cwl/cwl.input.json": {
512                     "kind": "json",
513                     "content": self.job_order
514                 },
515                 "stdout": {
516                     "kind": "file",
517                     "path": "/var/spool/cwl/cwl.output.json"
518                 },
519                 "/var/spool/cwl": {
520                     "kind": "collection",
521                     "writable": True
522                 }
523             },
524             "secret_mounts": secret_mounts,
525             "runtime_constraints": {
526                 "vcpus": math.ceil(self.submit_runner_cores),
527                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
528                 "API": True
529             },
530             "use_existing": False, # Never reuse the runner container - see #15497.
531             "properties": {}
532         }
533
534         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
535             sp = self.embedded_tool.tool["id"].split('/')
536             workflowcollection = sp[0][5:]
537             workflowname = "/".join(sp[1:])
538             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
539             container_req["mounts"]["/var/lib/cwl/workflow"] = {
540                 "kind": "collection",
541                 "portable_data_hash": "%s" % workflowcollection
542             }
543         elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
544             workflowpath = "/var/lib/cwl/workflow.json#main"
545             record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
546             packed = yaml.safe_load(record["definition"])
547             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
548                 "kind": "json",
549                 "content": packed
550             }
551             container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
552         else:
553             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
554             workflowpath = "/var/lib/cwl/workflow.json#main"
555             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
556                 "kind": "json",
557                 "content": packed
558             }
559
560         container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
561
562         properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
563         if properties_req:
564             builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
565             for pr in properties_req["processProperties"]:
566                 container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
567
568         # --local means execute the workflow instead of submitting a container request
569         # --api=containers means use the containers API
570         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
571         # --disable-validate because we already validated so don't need to do it again
572         # --eval-timeout is the timeout for javascript invocation
573         # --parallel-task-count is the number of threads to use for job submission
574         # --enable/disable-reuse sets desired job reuse
575         # --collection-cache-size sets aside memory to store collections
576         command = ["arvados-cwl-runner",
577                    "--local",
578                    "--api=containers",
579                    "--no-log-timestamps",
580                    "--disable-validate",
581                    "--disable-color",
582                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
583                    "--thread-count=%s" % self.arvrunner.thread_count,
584                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
585                    "--collection-cache-size=%s" % self.collection_cache_size]
586
587         if self.output_name:
588             command.append("--output-name=" + self.output_name)
589             container_req["output_name"] = self.output_name
590
591         if self.output_tags:
592             command.append("--output-tags=" + self.output_tags)
593
594         if runtimeContext.debug:
595             command.append("--debug")
596
597         if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
598             command.append("--storage-classes=" + runtimeContext.storage_classes)
599
600         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
601             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
602
603         if runtimeContext.on_error:
604             command.append("--on-error=" + self.on_error)
605
606         if runtimeContext.intermediate_output_ttl:
607             command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
608
609         if runtimeContext.trash_intermediate:
610             command.append("--trash-intermediate")
611
612         if runtimeContext.project_uuid:
613             command.append("--project-uuid="+runtimeContext.project_uuid)
614
615         if self.enable_dev:
616             command.append("--enable-dev")
617
618         if runtimeContext.enable_preemptible is True:
619             command.append("--enable-preemptible")
620
621         if runtimeContext.enable_preemptible is False:
622             command.append("--disable-preemptible")
623
624         if runtimeContext.varying_url_params:
625             command.append("--varying-url-params="+runtimeContext.varying_url_params)
626
627         if runtimeContext.prefer_cached_downloads:
628             command.append("--prefer-cached-downloads")
629
630         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
631
632         container_req["command"] = command
633
634         return container_req
635
636
637     def run(self, runtimeContext):
638         runtimeContext.keepprefix = "keep:"
639         job_spec = self.arvados_job_spec(runtimeContext, self.git_info)
640         if runtimeContext.project_uuid:
641             job_spec["owner_uuid"] = runtimeContext.project_uuid
642
643         extra_submit_params = {}
644         if runtimeContext.submit_runner_cluster:
645             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
646
647         if runtimeContext.submit_request_uuid:
648             if "cluster_id" in extra_submit_params:
649                 # Doesn't make sense for "update" and actually fails
650                 del extra_submit_params["cluster_id"]
651             response = self.arvrunner.api.container_requests().update(
652                 uuid=runtimeContext.submit_request_uuid,
653                 body=job_spec,
654                 **extra_submit_params
655             ).execute(num_retries=self.arvrunner.num_retries)
656         else:
657             response = self.arvrunner.api.container_requests().create(
658                 body=job_spec,
659                 **extra_submit_params
660             ).execute(num_retries=self.arvrunner.num_retries)
661
662         self.uuid = response["uuid"]
663         self.arvrunner.process_submitted(self)
664
665         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
666
667         workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
668         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
669         url = ""
670         if workbench2:
671             url = "{}processes/{}".format(workbench2, response["uuid"])
672         elif workbench1:
673             url = "{}container_requests/{}".format(workbench1, response["uuid"])
674         if url:
675             logger.info("Monitor workflow progress at %s", url)
676
677
678     def done(self, record):
679         try:
680             container = self.arvrunner.api.containers().get(
681                 uuid=record["container_uuid"]
682             ).execute(num_retries=self.arvrunner.num_retries)
683             container["log"] = record["log_uuid"]
684         except Exception:
685             logger.exception("%s while getting runner container", self.arvrunner.label(self))
686             self.arvrunner.output_callback({}, "permanentFail")
687         else:
688             super(RunnerContainer, self).done(container)