Merge branch '20035-salt-installer-privkey'. Closes #20035
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.job import JobBase
26
27 import arvados.collection
28
29 from .arvdocker import arv_docker_get_image
30 from . import done
31 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
32 from .fsaccess import CollectionFetcher
33 from .pathmapper import NoFollowPathMapper, trim_listing
34 from .perf import Perf
35 from ._version import __version__
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 def cleanup_name_for_collection(name):
41     return name.replace("/", " ")
42
43 class ArvadosContainer(JobBase):
44     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
45
46     def __init__(self, runner, job_runtime,
47                  builder,   # type: Builder
48                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
49                  make_path_mapper,  # type: Callable[..., PathMapper]
50                  requirements,      # type: List[Dict[Text, Text]]
51                  hints,     # type: List[Dict[Text, Text]]
52                  name       # type: Text
53     ):
54         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
55         self.arvrunner = runner
56         self.job_runtime = job_runtime
57         self.running = False
58         self.uuid = None
59
60     def update_pipeline_component(self, r):
61         pass
62
63     def _required_env(self):
64         env = {}
65         env["HOME"] = self.outdir
66         env["TMPDIR"] = self.tmpdir
67         return env
68
69     def run(self, toplevelRuntimeContext):
70         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
71         # which calls makeJobRunner() to get a new ArvadosContainer
72         # object.  The fields that define execution such as
73         # command_line, environment, etc are set on the
74         # ArvadosContainer object by CommandLineTool.job() before
75         # run() is called.
76
77         runtimeContext = self.job_runtime
78
79         if runtimeContext.submit_request_uuid:
80             container_request = self.arvrunner.api.container_requests().get(
81                 uuid=runtimeContext.submit_request_uuid
82             ).execute(num_retries=self.arvrunner.num_retries)
83         else:
84             container_request = {}
85
86         container_request["command"] = self.command_line
87         container_request["name"] = self.name
88         container_request["output_path"] = self.outdir
89         container_request["cwd"] = self.outdir
90         container_request["priority"] = runtimeContext.priority
91         container_request["state"] = "Committed"
92         container_request.setdefault("properties", {})
93
94         container_request["properties"]["cwl_input"] = self.joborder
95
96         runtime_constraints = {}
97
98         if runtimeContext.project_uuid:
99             container_request["owner_uuid"] = runtimeContext.project_uuid
100
101         if self.arvrunner.secret_store.has_secret(self.command_line):
102             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
103
104         if self.arvrunner.secret_store.has_secret(self.environment):
105             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
106
107         resources = self.builder.resources
108         if resources is not None:
109             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
110             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
111
112         mounts = {
113             self.outdir: {
114                 "kind": "tmp",
115                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
116             },
117             self.tmpdir: {
118                 "kind": "tmp",
119                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
120             }
121         }
122         secret_mounts = {}
123         scheduling_parameters = {}
124
125         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
126         rf.sort(key=lambda k: k.resolved)
127         prevdir = None
128         for resolved, target, tp, stg in rf:
129             if not stg:
130                 continue
131             if prevdir and target.startswith(prevdir):
132                 continue
133             if tp == "Directory":
134                 targetdir = target
135             else:
136                 targetdir = os.path.dirname(target)
137             sp = resolved.split("/", 1)
138             pdh = sp[0][5:]   # remove "keep:"
139             mounts[targetdir] = {
140                 "kind": "collection",
141                 "portable_data_hash": pdh
142             }
143             if pdh in self.pathmapper.pdh_to_uuid:
144                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
145             if len(sp) == 2:
146                 if tp == "Directory":
147                     path = sp[1]
148                 else:
149                     path = os.path.dirname(sp[1])
150                 if path and path != "/":
151                     mounts[targetdir]["path"] = path
152             prevdir = targetdir + "/"
153
154         intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
155
156         with Perf(metrics, "generatefiles %s" % self.name):
157             if self.generatefiles["listing"]:
158                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
159                                                     keep_client=self.arvrunner.keep_client,
160                                                     num_retries=self.arvrunner.num_retries)
161                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
162                                                     separateDirs=False)
163
164                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
165
166                 logger.debug("generatemapper is %s", sorteditems)
167
168                 with Perf(metrics, "createfiles %s" % self.name):
169                     for f, p in sorteditems:
170                         if not p.target:
171                             continue
172
173                         if p.target.startswith("/"):
174                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
175                         else:
176                             dst = p.target
177
178                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
179                             if p.resolved.startswith("_:"):
180                                 vwd.mkdirs(dst)
181                             else:
182                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
183                                 vwd.copy(path or ".", dst, source_collection=source)
184                         elif p.type == "CreateFile":
185                             if self.arvrunner.secret_store.has_secret(p.resolved):
186                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
187                                 secret_mounts[mountpoint] = {
188                                     "kind": "text",
189                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
190                                 }
191                             else:
192                                 with vwd.open(dst, "w") as n:
193                                     n.write(p.resolved)
194
195                 def keepemptydirs(p):
196                     if isinstance(p, arvados.collection.RichCollectionBase):
197                         if len(p) == 0:
198                             p.open(".keep", "w").close()
199                         else:
200                             for c in p:
201                                 keepemptydirs(p[c])
202
203                 keepemptydirs(vwd)
204
205                 if not runtimeContext.current_container:
206                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
207                 vwd.save_new(name=intermediate_collection_info["name"],
208                              owner_uuid=runtimeContext.project_uuid,
209                              ensure_unique_name=True,
210                              trash_at=intermediate_collection_info["trash_at"],
211                              properties=intermediate_collection_info["properties"])
212
213                 prev = None
214                 for f, p in sorteditems:
215                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
216                         (prev is not None and p.target.startswith(prev))):
217                         continue
218                     if p.target.startswith("/"):
219                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
220                     else:
221                         dst = p.target
222                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
223                     mounts[mountpoint] = {"kind": "collection",
224                                           "portable_data_hash": vwd.portable_data_hash(),
225                                           "path": dst}
226                     if p.type.startswith("Writable"):
227                         mounts[mountpoint]["writable"] = True
228                     prev = p.target + "/"
229
230         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
231         if self.environment:
232             container_request["environment"].update(self.environment)
233
234         if self.stdin:
235             sp = self.stdin[6:].split("/", 1)
236             mounts["stdin"] = {"kind": "collection",
237                                 "portable_data_hash": sp[0],
238                                 "path": sp[1]}
239
240         if self.stderr:
241             mounts["stderr"] = {"kind": "file",
242                                 "path": "%s/%s" % (self.outdir, self.stderr)}
243
244         if self.stdout:
245             mounts["stdout"] = {"kind": "file",
246                                 "path": "%s/%s" % (self.outdir, self.stdout)}
247
248         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
249
250         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
251                                                                     docker_req,
252                                                                     runtimeContext.pull_image,
253                                                                     runtimeContext)
254
255         network_req, _ = self.get_requirement("NetworkAccess")
256         if network_req:
257             runtime_constraints["API"] = network_req["networkAccess"]
258
259         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
260         if api_req:
261             runtime_constraints["API"] = True
262
263         use_disk_cache = (self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheRAM", 0) == 0)
264
265         keep_cache_type_req, _ = self.get_requirement("http://arvados.org/cwl#KeepCacheTypeRequirement")
266         if keep_cache_type_req:
267             if "keepCacheType" in keep_cache_type_req:
268                 if keep_cache_type_req["keepCacheType"] == "ram_cache":
269                     use_disk_cache = False
270
271         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
272         if runtime_req:
273             if "keep_cache" in runtime_req:
274                 if use_disk_cache:
275                     # If DefaultKeepCacheRAM is zero it means we should use disk cache.
276                     runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20)
277                 else:
278                     runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
279             if "outputDirType" in runtime_req:
280                 if runtime_req["outputDirType"] == "local_output_dir":
281                     # Currently the default behavior.
282                     pass
283                 elif runtime_req["outputDirType"] == "keep_output_dir":
284                     mounts[self.outdir]= {
285                         "kind": "collection",
286                         "writable": True
287                     }
288
289         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
290         if partition_req:
291             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
292
293         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
294         if intermediate_output_req:
295             self.output_ttl = intermediate_output_req["outputTTL"]
296         else:
297             self.output_ttl = self.arvrunner.intermediate_output_ttl
298
299         if self.output_ttl < 0:
300             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
301
302
303         if self.arvrunner.api._rootDesc["revision"] >= "20210628":
304             storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
305             if storage_class_req and storage_class_req.get("intermediateStorageClass"):
306                 container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
307             else:
308                 container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
309
310         cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
311         if cuda_req:
312             runtime_constraints["cuda"] = {
313                 "device_count": resources.get("cudaDeviceCount", 1),
314                 "driver_version": cuda_req["cudaVersionMin"],
315                 "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
316             }
317
318         if runtimeContext.enable_preemptible is False:
319             scheduling_parameters["preemptible"] = False
320         else:
321             preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
322             if preemptible_req:
323                 scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
324             elif runtimeContext.enable_preemptible is True:
325                 scheduling_parameters["preemptible"] = True
326             elif runtimeContext.enable_preemptible is None:
327                 pass
328
329         if self.timelimit is not None and self.timelimit > 0:
330             scheduling_parameters["max_run_time"] = self.timelimit
331
332         extra_submit_params = {}
333         if runtimeContext.submit_runner_cluster:
334             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
335
336         container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name))
337         container_request["output_ttl"] = self.output_ttl
338         container_request["mounts"] = mounts
339         container_request["secret_mounts"] = secret_mounts
340         container_request["runtime_constraints"] = runtime_constraints
341         container_request["scheduling_parameters"] = scheduling_parameters
342
343         enable_reuse = runtimeContext.enable_reuse
344         if enable_reuse:
345             reuse_req, _ = self.get_requirement("WorkReuse")
346             if reuse_req:
347                 enable_reuse = reuse_req["enableReuse"]
348             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
349             if reuse_req:
350                 enable_reuse = reuse_req["enableReuse"]
351         container_request["use_existing"] = enable_reuse
352
353         properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
354         if properties_req:
355             for pr in properties_req["processProperties"]:
356                 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
357
358         output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
359         if output_properties_req:
360             if self.arvrunner.api._rootDesc["revision"] >= "20220510":
361                 container_request["output_properties"] = {}
362                 for pr in output_properties_req["outputProperties"]:
363                     container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
364             else:
365                 logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
366                                self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
367
368         if runtimeContext.runnerjob.startswith("arvwf:"):
369             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
370             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
371             if container_request["name"] == "main":
372                 container_request["name"] = wfrecord["name"]
373             container_request["properties"]["template_uuid"] = wfuuid
374
375         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
376
377         try:
378             if runtimeContext.submit_request_uuid:
379                 response = self.arvrunner.api.container_requests().update(
380                     uuid=runtimeContext.submit_request_uuid,
381                     body=container_request,
382                     **extra_submit_params
383                 ).execute(num_retries=self.arvrunner.num_retries)
384             else:
385                 response = self.arvrunner.api.container_requests().create(
386                     body=container_request,
387                     **extra_submit_params
388                 ).execute(num_retries=self.arvrunner.num_retries)
389
390             self.uuid = response["uuid"]
391             self.arvrunner.process_submitted(self)
392
393             if response["state"] == "Final":
394                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
395             else:
396                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
397         except Exception as e:
398             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
399             logger.debug("Container request was %s", container_request)
400             self.output_callback({}, "permanentFail")
401
402     def done(self, record):
403         outputs = {}
404         try:
405             container = self.arvrunner.api.containers().get(
406                 uuid=record["container_uuid"]
407             ).execute(num_retries=self.arvrunner.num_retries)
408             if container["state"] == "Complete":
409                 rcode = container["exit_code"]
410                 if self.successCodes and rcode in self.successCodes:
411                     processStatus = "success"
412                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
413                     processStatus = "temporaryFail"
414                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
415                     processStatus = "permanentFail"
416                 elif rcode == 0:
417                     processStatus = "success"
418                 else:
419                     processStatus = "permanentFail"
420
421                 if rcode == 137:
422                     logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin'.",
423                                  self.arvrunner.label(self))
424             else:
425                 processStatus = "permanentFail"
426
427             if processStatus == "permanentFail" and record["log_uuid"]:
428                 logc = arvados.collection.CollectionReader(record["log_uuid"],
429                                                            api_client=self.arvrunner.api,
430                                                            keep_client=self.arvrunner.keep_client,
431                                                            num_retries=self.arvrunner.num_retries)
432                 label = self.arvrunner.label(self)
433                 done.logtail(
434                     logc, logger.error,
435                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
436
437             if record["output_uuid"]:
438                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
439                     # Compute the trash time to avoid requesting the collection record.
440                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
441                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
442                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
443                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
444                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
445                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
446                 self.arvrunner.add_intermediate_output(record["output_uuid"])
447
448             if container["output"]:
449                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
450
451             properties = record["properties"].copy()
452             properties["cwl_output"] = outputs
453             self.arvrunner.api.container_requests().update(
454                 uuid=self.uuid,
455                 body={"container_request": {"properties": properties}}
456             ).execute(num_retries=self.arvrunner.num_retries)
457         except WorkflowException as e:
458             # Only include a stack trace if in debug mode.
459             # A stack trace may obfuscate more useful output about the workflow.
460             logger.error("%s unable to collect output from %s:\n%s",
461                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
462             processStatus = "permanentFail"
463         except Exception:
464             logger.exception("%s while getting output object:", self.arvrunner.label(self))
465             processStatus = "permanentFail"
466         finally:
467             self.output_callback(outputs, processStatus)
468
469
470 class RunnerContainer(Runner):
471     """Submit and manage a container that runs arvados-cwl-runner."""
472
473     def arvados_job_spec(self, runtimeContext, git_info):
474         """Create an Arvados container request for this workflow.
475
476         The returned dict can be used to create a container passed as
477         the +body+ argument to container_requests().create().
478         """
479
480         adjustDirObjs(self.job_order, trim_listing)
481         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
482         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
483
484         secret_mounts = {}
485         for param in sorted(self.job_order.keys()):
486             if self.secret_store.has_secret(self.job_order[param]):
487                 mnt = "/secrets/s%d" % len(secret_mounts)
488                 secret_mounts[mnt] = {
489                     "kind": "text",
490                     "content": self.secret_store.retrieve(self.job_order[param])
491                 }
492                 self.job_order[param] = {"$include": mnt}
493
494         container_req = {
495             "name": self.name,
496             "output_path": "/var/spool/cwl",
497             "cwd": "/var/spool/cwl",
498             "priority": self.priority,
499             "state": "Committed",
500             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
501             "mounts": {
502                 "/var/lib/cwl/cwl.input.json": {
503                     "kind": "json",
504                     "content": self.job_order
505                 },
506                 "stdout": {
507                     "kind": "file",
508                     "path": "/var/spool/cwl/cwl.output.json"
509                 },
510                 "/var/spool/cwl": {
511                     "kind": "collection",
512                     "writable": True
513                 }
514             },
515             "secret_mounts": secret_mounts,
516             "runtime_constraints": {
517                 "vcpus": math.ceil(self.submit_runner_cores),
518                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
519                 "API": True
520             },
521             "use_existing": False, # Never reuse the runner container - see #15497.
522             "properties": {}
523         }
524
525         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
526             sp = self.embedded_tool.tool["id"].split('/')
527             workflowcollection = sp[0][5:]
528             workflowname = "/".join(sp[1:])
529             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
530             container_req["mounts"]["/var/lib/cwl/workflow"] = {
531                 "kind": "collection",
532                 "portable_data_hash": "%s" % workflowcollection
533             }
534         elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
535             uuid, frg = urllib.parse.urldefrag(self.embedded_tool.tool["id"])
536             workflowpath = "/var/lib/cwl/workflow.json#" + frg
537             packedtxt = self.loadingContext.loader.fetch_text(uuid)
538             yaml = ruamel.yaml.YAML(typ='safe', pure=True)
539             packed = yaml.load(packedtxt)
540             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
541                 "kind": "json",
542                 "content": packed
543             }
544             container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
545         else:
546             main = self.loadingContext.loader.idx["_:main"]
547             if main.get("id") == "_:main":
548                 del main["id"]
549             workflowpath = "/var/lib/cwl/workflow.json#main"
550             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
551                 "kind": "json",
552                 "content": main
553             }
554
555         container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
556
557         properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
558         if properties_req:
559             builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
560             for pr in properties_req["processProperties"]:
561                 container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
562
563         # --local means execute the workflow instead of submitting a container request
564         # --api=containers means use the containers API
565         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
566         # --disable-validate because we already validated so don't need to do it again
567         # --eval-timeout is the timeout for javascript invocation
568         # --parallel-task-count is the number of threads to use for job submission
569         # --enable/disable-reuse sets desired job reuse
570         # --collection-cache-size sets aside memory to store collections
571         command = ["arvados-cwl-runner",
572                    "--local",
573                    "--api=containers",
574                    "--no-log-timestamps",
575                    "--disable-validate",
576                    "--disable-color",
577                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
578                    "--thread-count=%s" % self.arvrunner.thread_count,
579                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
580                    "--collection-cache-size=%s" % self.collection_cache_size]
581
582         if self.output_name:
583             command.append("--output-name=" + self.output_name)
584             container_req["output_name"] = self.output_name
585
586         if self.output_tags:
587             command.append("--output-tags=" + self.output_tags)
588
589         if runtimeContext.debug:
590             command.append("--debug")
591
592         if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
593             command.append("--storage-classes=" + runtimeContext.storage_classes)
594
595         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
596             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
597
598         if runtimeContext.on_error:
599             command.append("--on-error=" + self.on_error)
600
601         if runtimeContext.intermediate_output_ttl:
602             command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
603
604         if runtimeContext.trash_intermediate:
605             command.append("--trash-intermediate")
606
607         if runtimeContext.project_uuid:
608             command.append("--project-uuid="+runtimeContext.project_uuid)
609
610         if self.enable_dev:
611             command.append("--enable-dev")
612
613         if runtimeContext.enable_preemptible is True:
614             command.append("--enable-preemptible")
615
616         if runtimeContext.enable_preemptible is False:
617             command.append("--disable-preemptible")
618
619         if runtimeContext.varying_url_params:
620             command.append("--varying-url-params="+runtimeContext.varying_url_params)
621
622         if runtimeContext.prefer_cached_downloads:
623             command.append("--prefer-cached-downloads")
624
625         if self.fast_parser:
626             command.append("--fast-parser")
627
628         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
629
630         container_req["command"] = command
631
632         return container_req
633
634
635     def run(self, runtimeContext):
636         runtimeContext.keepprefix = "keep:"
637         job_spec = self.arvados_job_spec(runtimeContext, self.git_info)
638         if runtimeContext.project_uuid:
639             job_spec["owner_uuid"] = runtimeContext.project_uuid
640
641         extra_submit_params = {}
642         if runtimeContext.submit_runner_cluster:
643             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
644
645         if runtimeContext.submit_request_uuid:
646             if "cluster_id" in extra_submit_params:
647                 # Doesn't make sense for "update" and actually fails
648                 del extra_submit_params["cluster_id"]
649             response = self.arvrunner.api.container_requests().update(
650                 uuid=runtimeContext.submit_request_uuid,
651                 body=job_spec,
652                 **extra_submit_params
653             ).execute(num_retries=self.arvrunner.num_retries)
654         else:
655             response = self.arvrunner.api.container_requests().create(
656                 body=job_spec,
657                 **extra_submit_params
658             ).execute(num_retries=self.arvrunner.num_retries)
659
660         self.uuid = response["uuid"]
661         self.arvrunner.process_submitted(self)
662
663         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
664
665         workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
666         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
667         url = ""
668         if workbench2:
669             url = "{}processes/{}".format(workbench2, response["uuid"])
670         elif workbench1:
671             url = "{}container_requests/{}".format(workbench1, response["uuid"])
672         if url:
673             logger.info("Monitor workflow progress at %s", url)
674
675
676     def done(self, record):
677         try:
678             container = self.arvrunner.api.containers().get(
679                 uuid=record["container_uuid"]
680             ).execute(num_retries=self.arvrunner.num_retries)
681             container["log"] = record["log_uuid"]
682         except Exception:
683             logger.exception("%s while getting runner container", self.arvrunner.label(self))
684             self.arvrunner.output_callback({}, "permanentFail")
685         else:
686             super(RunnerContainer, self).done(container)