19847: Fix KeepCacheTypeRequirement
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.job import JobBase
26
27 import arvados.collection
28
29 from .arvdocker import arv_docker_get_image
30 from . import done
31 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
32 from .fsaccess import CollectionFetcher
33 from .pathmapper import NoFollowPathMapper, trim_listing
34 from .perf import Perf
35 from ._version import __version__
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 def cleanup_name_for_collection(name):
41     return name.replace("/", " ")
42
43 class ArvadosContainer(JobBase):
44     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
45
46     def __init__(self, runner, job_runtime,
47                  builder,   # type: Builder
48                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
49                  make_path_mapper,  # type: Callable[..., PathMapper]
50                  requirements,      # type: List[Dict[Text, Text]]
51                  hints,     # type: List[Dict[Text, Text]]
52                  name       # type: Text
53     ):
54         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
55         self.arvrunner = runner
56         self.job_runtime = job_runtime
57         self.running = False
58         self.uuid = None
59
60     def update_pipeline_component(self, r):
61         pass
62
63     def _required_env(self):
64         env = {}
65         env["HOME"] = self.outdir
66         env["TMPDIR"] = self.tmpdir
67         return env
68
69     def run(self, toplevelRuntimeContext):
70         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
71         # which calls makeJobRunner() to get a new ArvadosContainer
72         # object.  The fields that define execution such as
73         # command_line, environment, etc are set on the
74         # ArvadosContainer object by CommandLineTool.job() before
75         # run() is called.
76
77         runtimeContext = self.job_runtime
78
79         if runtimeContext.submit_request_uuid:
80             container_request = self.arvrunner.api.container_requests().get(
81                 uuid=runtimeContext.submit_request_uuid
82             ).execute(num_retries=self.arvrunner.num_retries)
83         else:
84             container_request = {}
85
86         container_request["command"] = self.command_line
87         container_request["name"] = self.name
88         container_request["output_path"] = self.outdir
89         container_request["cwd"] = self.outdir
90         container_request["priority"] = runtimeContext.priority
91         container_request["state"] = "Committed"
92         container_request.setdefault("properties", {})
93
94         container_request["properties"]["cwl_input"] = self.joborder
95
96         runtime_constraints = {}
97
98         if runtimeContext.project_uuid:
99             container_request["owner_uuid"] = runtimeContext.project_uuid
100
101         if self.arvrunner.secret_store.has_secret(self.command_line):
102             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
103
104         if self.arvrunner.secret_store.has_secret(self.environment):
105             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
106
107         resources = self.builder.resources
108         if resources is not None:
109             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
110             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
111
112         mounts = {
113             self.outdir: {
114                 "kind": "tmp",
115                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
116             },
117             self.tmpdir: {
118                 "kind": "tmp",
119                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
120             }
121         }
122         secret_mounts = {}
123         scheduling_parameters = {}
124
125         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
126         rf.sort(key=lambda k: k.resolved)
127         prevdir = None
128         for resolved, target, tp, stg in rf:
129             if not stg:
130                 continue
131             if prevdir and target.startswith(prevdir):
132                 continue
133             if tp == "Directory":
134                 targetdir = target
135             else:
136                 targetdir = os.path.dirname(target)
137             sp = resolved.split("/", 1)
138             pdh = sp[0][5:]   # remove "keep:"
139             mounts[targetdir] = {
140                 "kind": "collection",
141                 "portable_data_hash": pdh
142             }
143             if pdh in self.pathmapper.pdh_to_uuid:
144                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
145             if len(sp) == 2:
146                 if tp == "Directory":
147                     path = sp[1]
148                 else:
149                     path = os.path.dirname(sp[1])
150                 if path and path != "/":
151                     mounts[targetdir]["path"] = path
152             prevdir = targetdir + "/"
153
154         intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
155
156         with Perf(metrics, "generatefiles %s" % self.name):
157             if self.generatefiles["listing"]:
158                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
159                                                     keep_client=self.arvrunner.keep_client,
160                                                     num_retries=self.arvrunner.num_retries)
161                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
162                                                     separateDirs=False)
163
164                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
165
166                 logger.debug("generatemapper is %s", sorteditems)
167
168                 with Perf(metrics, "createfiles %s" % self.name):
169                     for f, p in sorteditems:
170                         if not p.target:
171                             continue
172
173                         if p.target.startswith("/"):
174                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
175                         else:
176                             dst = p.target
177
178                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
179                             if p.resolved.startswith("_:"):
180                                 vwd.mkdirs(dst)
181                             else:
182                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
183                                 vwd.copy(path or ".", dst, source_collection=source)
184                         elif p.type == "CreateFile":
185                             if self.arvrunner.secret_store.has_secret(p.resolved):
186                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
187                                 secret_mounts[mountpoint] = {
188                                     "kind": "text",
189                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
190                                 }
191                             else:
192                                 with vwd.open(dst, "w") as n:
193                                     n.write(p.resolved)
194
195                 def keepemptydirs(p):
196                     if isinstance(p, arvados.collection.RichCollectionBase):
197                         if len(p) == 0:
198                             p.open(".keep", "w").close()
199                         else:
200                             for c in p:
201                                 keepemptydirs(p[c])
202
203                 keepemptydirs(vwd)
204
205                 if not runtimeContext.current_container:
206                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
207                 vwd.save_new(name=intermediate_collection_info["name"],
208                              owner_uuid=runtimeContext.project_uuid,
209                              ensure_unique_name=True,
210                              trash_at=intermediate_collection_info["trash_at"],
211                              properties=intermediate_collection_info["properties"])
212
213                 prev = None
214                 for f, p in sorteditems:
215                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
216                         (prev is not None and p.target.startswith(prev))):
217                         continue
218                     if p.target.startswith("/"):
219                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
220                     else:
221                         dst = p.target
222                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
223                     mounts[mountpoint] = {"kind": "collection",
224                                           "portable_data_hash": vwd.portable_data_hash(),
225                                           "path": dst}
226                     if p.type.startswith("Writable"):
227                         mounts[mountpoint]["writable"] = True
228                     prev = p.target + "/"
229
230         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
231         if self.environment:
232             container_request["environment"].update(self.environment)
233
234         if self.stdin:
235             sp = self.stdin[6:].split("/", 1)
236             mounts["stdin"] = {"kind": "collection",
237                                 "portable_data_hash": sp[0],
238                                 "path": sp[1]}
239
240         if self.stderr:
241             mounts["stderr"] = {"kind": "file",
242                                 "path": "%s/%s" % (self.outdir, self.stderr)}
243
244         if self.stdout:
245             mounts["stdout"] = {"kind": "file",
246                                 "path": "%s/%s" % (self.outdir, self.stdout)}
247
248         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
249
250         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
251                                                                     docker_req,
252                                                                     runtimeContext.pull_image,
253                                                                     runtimeContext.project_uuid,
254                                                                     runtimeContext.force_docker_pull,
255                                                                     runtimeContext.tmp_outdir_prefix,
256                                                                     runtimeContext.match_local_docker,
257                                                                     runtimeContext.copy_deps)
258
259         network_req, _ = self.get_requirement("NetworkAccess")
260         if network_req:
261             runtime_constraints["API"] = network_req["networkAccess"]
262
263         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
264         if api_req:
265             runtime_constraints["API"] = True
266
267         use_disk_cache = (self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheDisk", 0) > 0)
268
269         keep_cache_type_req, _ = self.get_requirement("http://arvados.org/cwl#KeepCacheTypeRequirement")
270         if keep_cache_type_req:
271             if "keepCacheType" in keep_cache_type_req:
272                 if keep_cache_type_req["keepCacheType"] == "ram_cache":
273                     use_disk_cache = False
274
275         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
276         if runtime_req:
277             if "keep_cache" in runtime_req:
278                 if use_disk_cache:
279                     # If DefaultKeepCacheDisk is non-zero it means we should use disk cache.
280                     runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20)
281                 else:
282                     runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
283             if "outputDirType" in runtime_req:
284                 if runtime_req["outputDirType"] == "local_output_dir":
285                     # Currently the default behavior.
286                     pass
287                 elif runtime_req["outputDirType"] == "keep_output_dir":
288                     mounts[self.outdir]= {
289                         "kind": "collection",
290                         "writable": True
291                     }
292
293         if use_disk_cache and "keep_cache_disk" not in runtime_constraints:
294             # Cache size wasn't explicitly set so calculate a default
295             # based on 2x RAM request or 1 GB per core, whichever is
296             # smaller.  This is to avoid requesting 100s of GB of disk
297             # cache when requesting a node with a huge amount of RAM.
298             runtime_constraints["keep_cache_disk"] = min(runtime_constraints["ram"] * 2, runtime_constraints["vcpus"] * (1024*1024*1024))
299
300         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
301         if partition_req:
302             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
303
304         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
305         if intermediate_output_req:
306             self.output_ttl = intermediate_output_req["outputTTL"]
307         else:
308             self.output_ttl = self.arvrunner.intermediate_output_ttl
309
310         if self.output_ttl < 0:
311             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
312
313
314         if self.arvrunner.api._rootDesc["revision"] >= "20210628":
315             storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
316             if storage_class_req and storage_class_req.get("intermediateStorageClass"):
317                 container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
318             else:
319                 container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
320
321         cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
322         if cuda_req:
323             runtime_constraints["cuda"] = {
324                 "device_count": resources.get("cudaDeviceCount", 1),
325                 "driver_version": cuda_req["cudaVersionMin"],
326                 "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
327             }
328
329         if runtimeContext.enable_preemptible is False:
330             scheduling_parameters["preemptible"] = False
331         else:
332             preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
333             if preemptible_req:
334                 scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
335             elif runtimeContext.enable_preemptible is True:
336                 scheduling_parameters["preemptible"] = True
337             elif runtimeContext.enable_preemptible is None:
338                 pass
339
340         if self.timelimit is not None and self.timelimit > 0:
341             scheduling_parameters["max_run_time"] = self.timelimit
342
343         extra_submit_params = {}
344         if runtimeContext.submit_runner_cluster:
345             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
346
347         container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name))
348         container_request["output_ttl"] = self.output_ttl
349         container_request["mounts"] = mounts
350         container_request["secret_mounts"] = secret_mounts
351         container_request["runtime_constraints"] = runtime_constraints
352         container_request["scheduling_parameters"] = scheduling_parameters
353
354         enable_reuse = runtimeContext.enable_reuse
355         if enable_reuse:
356             reuse_req, _ = self.get_requirement("WorkReuse")
357             if reuse_req:
358                 enable_reuse = reuse_req["enableReuse"]
359             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
360             if reuse_req:
361                 enable_reuse = reuse_req["enableReuse"]
362         container_request["use_existing"] = enable_reuse
363
364         properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
365         if properties_req:
366             for pr in properties_req["processProperties"]:
367                 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
368
369         output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
370         if output_properties_req:
371             if self.arvrunner.api._rootDesc["revision"] >= "20220510":
372                 container_request["output_properties"] = {}
373                 for pr in output_properties_req["outputProperties"]:
374                     container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
375             else:
376                 logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
377                                self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
378
379         if runtimeContext.runnerjob.startswith("arvwf:"):
380             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
381             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
382             if container_request["name"] == "main":
383                 container_request["name"] = wfrecord["name"]
384             container_request["properties"]["template_uuid"] = wfuuid
385
386         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
387
388         try:
389             if runtimeContext.submit_request_uuid:
390                 response = self.arvrunner.api.container_requests().update(
391                     uuid=runtimeContext.submit_request_uuid,
392                     body=container_request,
393                     **extra_submit_params
394                 ).execute(num_retries=self.arvrunner.num_retries)
395             else:
396                 response = self.arvrunner.api.container_requests().create(
397                     body=container_request,
398                     **extra_submit_params
399                 ).execute(num_retries=self.arvrunner.num_retries)
400
401             self.uuid = response["uuid"]
402             self.arvrunner.process_submitted(self)
403
404             if response["state"] == "Final":
405                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
406             else:
407                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
408         except Exception as e:
409             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
410             logger.debug("Container request was %s", container_request)
411             self.output_callback({}, "permanentFail")
412
413     def done(self, record):
414         outputs = {}
415         try:
416             container = self.arvrunner.api.containers().get(
417                 uuid=record["container_uuid"]
418             ).execute(num_retries=self.arvrunner.num_retries)
419             if container["state"] == "Complete":
420                 rcode = container["exit_code"]
421                 if self.successCodes and rcode in self.successCodes:
422                     processStatus = "success"
423                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
424                     processStatus = "temporaryFail"
425                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
426                     processStatus = "permanentFail"
427                 elif rcode == 0:
428                     processStatus = "success"
429                 else:
430                     processStatus = "permanentFail"
431
432                 if rcode == 137:
433                     logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin'.",
434                                  self.arvrunner.label(self))
435             else:
436                 processStatus = "permanentFail"
437
438             if processStatus == "permanentFail" and record["log_uuid"]:
439                 logc = arvados.collection.CollectionReader(record["log_uuid"],
440                                                            api_client=self.arvrunner.api,
441                                                            keep_client=self.arvrunner.keep_client,
442                                                            num_retries=self.arvrunner.num_retries)
443                 label = self.arvrunner.label(self)
444                 done.logtail(
445                     logc, logger.error,
446                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
447
448             if record["output_uuid"]:
449                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
450                     # Compute the trash time to avoid requesting the collection record.
451                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
452                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
453                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
454                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
455                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
456                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
457                 self.arvrunner.add_intermediate_output(record["output_uuid"])
458
459             if container["output"]:
460                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
461
462             properties = record["properties"].copy()
463             properties["cwl_output"] = outputs
464             self.arvrunner.api.container_requests().update(
465                 uuid=self.uuid,
466                 body={"container_request": {"properties": properties}}
467             ).execute(num_retries=self.arvrunner.num_retries)
468         except WorkflowException as e:
469             # Only include a stack trace if in debug mode.
470             # A stack trace may obfuscate more useful output about the workflow.
471             logger.error("%s unable to collect output from %s:\n%s",
472                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
473             processStatus = "permanentFail"
474         except Exception:
475             logger.exception("%s while getting output object:", self.arvrunner.label(self))
476             processStatus = "permanentFail"
477         finally:
478             self.output_callback(outputs, processStatus)
479
480
481 class RunnerContainer(Runner):
482     """Submit and manage a container that runs arvados-cwl-runner."""
483
484     def arvados_job_spec(self, runtimeContext, git_info):
485         """Create an Arvados container request for this workflow.
486
487         The returned dict can be used to create a container passed as
488         the +body+ argument to container_requests().create().
489         """
490
491         adjustDirObjs(self.job_order, trim_listing)
492         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
493         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
494
495         secret_mounts = {}
496         for param in sorted(self.job_order.keys()):
497             if self.secret_store.has_secret(self.job_order[param]):
498                 mnt = "/secrets/s%d" % len(secret_mounts)
499                 secret_mounts[mnt] = {
500                     "kind": "text",
501                     "content": self.secret_store.retrieve(self.job_order[param])
502                 }
503                 self.job_order[param] = {"$include": mnt}
504
505         container_req = {
506             "name": self.name,
507             "output_path": "/var/spool/cwl",
508             "cwd": "/var/spool/cwl",
509             "priority": self.priority,
510             "state": "Committed",
511             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
512             "mounts": {
513                 "/var/lib/cwl/cwl.input.json": {
514                     "kind": "json",
515                     "content": self.job_order
516                 },
517                 "stdout": {
518                     "kind": "file",
519                     "path": "/var/spool/cwl/cwl.output.json"
520                 },
521                 "/var/spool/cwl": {
522                     "kind": "collection",
523                     "writable": True
524                 }
525             },
526             "secret_mounts": secret_mounts,
527             "runtime_constraints": {
528                 "vcpus": math.ceil(self.submit_runner_cores),
529                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
530                 "API": True
531             },
532             "use_existing": False, # Never reuse the runner container - see #15497.
533             "properties": {}
534         }
535
536         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
537             sp = self.embedded_tool.tool["id"].split('/')
538             workflowcollection = sp[0][5:]
539             workflowname = "/".join(sp[1:])
540             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
541             container_req["mounts"]["/var/lib/cwl/workflow"] = {
542                 "kind": "collection",
543                 "portable_data_hash": "%s" % workflowcollection
544             }
545         elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
546             workflowpath = "/var/lib/cwl/workflow.json#main"
547             record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
548             packed = yaml.safe_load(record["definition"])
549             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
550                 "kind": "json",
551                 "content": packed
552             }
553             container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
554         else:
555             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
556             workflowpath = "/var/lib/cwl/workflow.json#main"
557             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
558                 "kind": "json",
559                 "content": packed
560             }
561
562         container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
563
564         properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
565         if properties_req:
566             builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
567             for pr in properties_req["processProperties"]:
568                 container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
569
570         # --local means execute the workflow instead of submitting a container request
571         # --api=containers means use the containers API
572         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
573         # --disable-validate because we already validated so don't need to do it again
574         # --eval-timeout is the timeout for javascript invocation
575         # --parallel-task-count is the number of threads to use for job submission
576         # --enable/disable-reuse sets desired job reuse
577         # --collection-cache-size sets aside memory to store collections
578         command = ["arvados-cwl-runner",
579                    "--local",
580                    "--api=containers",
581                    "--no-log-timestamps",
582                    "--disable-validate",
583                    "--disable-color",
584                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
585                    "--thread-count=%s" % self.arvrunner.thread_count,
586                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
587                    "--collection-cache-size=%s" % self.collection_cache_size]
588
589         if self.output_name:
590             command.append("--output-name=" + self.output_name)
591             container_req["output_name"] = self.output_name
592
593         if self.output_tags:
594             command.append("--output-tags=" + self.output_tags)
595
596         if runtimeContext.debug:
597             command.append("--debug")
598
599         if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
600             command.append("--storage-classes=" + runtimeContext.storage_classes)
601
602         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
603             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
604
605         if runtimeContext.on_error:
606             command.append("--on-error=" + self.on_error)
607
608         if runtimeContext.intermediate_output_ttl:
609             command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
610
611         if runtimeContext.trash_intermediate:
612             command.append("--trash-intermediate")
613
614         if runtimeContext.project_uuid:
615             command.append("--project-uuid="+runtimeContext.project_uuid)
616
617         if self.enable_dev:
618             command.append("--enable-dev")
619
620         if runtimeContext.enable_preemptible is True:
621             command.append("--enable-preemptible")
622
623         if runtimeContext.enable_preemptible is False:
624             command.append("--disable-preemptible")
625
626         if runtimeContext.varying_url_params:
627             command.append("--varying-url-params="+runtimeContext.varying_url_params)
628
629         if runtimeContext.prefer_cached_downloads:
630             command.append("--prefer-cached-downloads")
631
632         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
633
634         container_req["command"] = command
635
636         return container_req
637
638
639     def run(self, runtimeContext):
640         runtimeContext.keepprefix = "keep:"
641         job_spec = self.arvados_job_spec(runtimeContext, self.git_info)
642         if runtimeContext.project_uuid:
643             job_spec["owner_uuid"] = runtimeContext.project_uuid
644
645         extra_submit_params = {}
646         if runtimeContext.submit_runner_cluster:
647             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
648
649         if runtimeContext.submit_request_uuid:
650             if "cluster_id" in extra_submit_params:
651                 # Doesn't make sense for "update" and actually fails
652                 del extra_submit_params["cluster_id"]
653             response = self.arvrunner.api.container_requests().update(
654                 uuid=runtimeContext.submit_request_uuid,
655                 body=job_spec,
656                 **extra_submit_params
657             ).execute(num_retries=self.arvrunner.num_retries)
658         else:
659             response = self.arvrunner.api.container_requests().create(
660                 body=job_spec,
661                 **extra_submit_params
662             ).execute(num_retries=self.arvrunner.num_retries)
663
664         self.uuid = response["uuid"]
665         self.arvrunner.process_submitted(self)
666
667         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
668
669         workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
670         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
671         url = ""
672         if workbench2:
673             url = "{}processes/{}".format(workbench2, response["uuid"])
674         elif workbench1:
675             url = "{}container_requests/{}".format(workbench1, response["uuid"])
676         if url:
677             logger.info("Monitor workflow progress at %s", url)
678
679
680     def done(self, record):
681         try:
682             container = self.arvrunner.api.containers().get(
683                 uuid=record["container_uuid"]
684             ).execute(num_retries=self.arvrunner.num_retries)
685             container["log"] = record["log_uuid"]
686         except Exception:
687             logger.exception("%s while getting runner container", self.arvrunner.label(self))
688             self.arvrunner.output_callback({}, "permanentFail")
689         else:
690             super(RunnerContainer, self).done(container)