19699: Pass through --varying-url-params to runner container
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.job import JobBase
26
27 import arvados.collection
28
29 from .arvdocker import arv_docker_get_image
30 from . import done
31 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
32 from .fsaccess import CollectionFetcher
33 from .pathmapper import NoFollowPathMapper, trim_listing
34 from .perf import Perf
35 from ._version import __version__
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 def cleanup_name_for_collection(name):
41     return name.replace("/", " ")
42
43 class ArvadosContainer(JobBase):
44     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
45
46     def __init__(self, runner, job_runtime,
47                  builder,   # type: Builder
48                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
49                  make_path_mapper,  # type: Callable[..., PathMapper]
50                  requirements,      # type: List[Dict[Text, Text]]
51                  hints,     # type: List[Dict[Text, Text]]
52                  name       # type: Text
53     ):
54         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
55         self.arvrunner = runner
56         self.job_runtime = job_runtime
57         self.running = False
58         self.uuid = None
59
60     def update_pipeline_component(self, r):
61         pass
62
63     def _required_env(self):
64         env = {}
65         env["HOME"] = self.outdir
66         env["TMPDIR"] = self.tmpdir
67         return env
68
69     def run(self, toplevelRuntimeContext):
70         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
71         # which calls makeJobRunner() to get a new ArvadosContainer
72         # object.  The fields that define execution such as
73         # command_line, environment, etc are set on the
74         # ArvadosContainer object by CommandLineTool.job() before
75         # run() is called.
76
77         runtimeContext = self.job_runtime
78
79         if runtimeContext.submit_request_uuid:
80             container_request = self.arvrunner.api.container_requests().get(
81                 uuid=runtimeContext.submit_request_uuid
82             ).execute(num_retries=self.arvrunner.num_retries)
83         else:
84             container_request = {}
85
86         container_request["command"] = self.command_line
87         container_request["name"] = self.name
88         container_request["output_path"] = self.outdir
89         container_request["cwd"] = self.outdir
90         container_request["priority"] = runtimeContext.priority
91         container_request["state"] = "Committed"
92         container_request.setdefault("properties", {})
93
94         container_request["properties"]["cwl_input"] = self.joborder
95
96         runtime_constraints = {}
97
98         if runtimeContext.project_uuid:
99             container_request["owner_uuid"] = runtimeContext.project_uuid
100
101         if self.arvrunner.secret_store.has_secret(self.command_line):
102             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
103
104         if self.arvrunner.secret_store.has_secret(self.environment):
105             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
106
107         resources = self.builder.resources
108         if resources is not None:
109             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
110             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
111
112         mounts = {
113             self.outdir: {
114                 "kind": "tmp",
115                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
116             },
117             self.tmpdir: {
118                 "kind": "tmp",
119                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
120             }
121         }
122         secret_mounts = {}
123         scheduling_parameters = {}
124
125         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
126         rf.sort(key=lambda k: k.resolved)
127         prevdir = None
128         for resolved, target, tp, stg in rf:
129             if not stg:
130                 continue
131             if prevdir and target.startswith(prevdir):
132                 continue
133             if tp == "Directory":
134                 targetdir = target
135             else:
136                 targetdir = os.path.dirname(target)
137             sp = resolved.split("/", 1)
138             pdh = sp[0][5:]   # remove "keep:"
139             mounts[targetdir] = {
140                 "kind": "collection",
141                 "portable_data_hash": pdh
142             }
143             if pdh in self.pathmapper.pdh_to_uuid:
144                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
145             if len(sp) == 2:
146                 if tp == "Directory":
147                     path = sp[1]
148                 else:
149                     path = os.path.dirname(sp[1])
150                 if path and path != "/":
151                     mounts[targetdir]["path"] = path
152             prevdir = targetdir + "/"
153
154         intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
155
156         with Perf(metrics, "generatefiles %s" % self.name):
157             if self.generatefiles["listing"]:
158                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
159                                                     keep_client=self.arvrunner.keep_client,
160                                                     num_retries=self.arvrunner.num_retries)
161                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
162                                                     separateDirs=False)
163
164                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
165
166                 logger.debug("generatemapper is %s", sorteditems)
167
168                 with Perf(metrics, "createfiles %s" % self.name):
169                     for f, p in sorteditems:
170                         if not p.target:
171                             continue
172
173                         if p.target.startswith("/"):
174                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
175                         else:
176                             dst = p.target
177
178                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
179                             if p.resolved.startswith("_:"):
180                                 vwd.mkdirs(dst)
181                             else:
182                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
183                                 vwd.copy(path or ".", dst, source_collection=source)
184                         elif p.type == "CreateFile":
185                             if self.arvrunner.secret_store.has_secret(p.resolved):
186                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
187                                 secret_mounts[mountpoint] = {
188                                     "kind": "text",
189                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
190                                 }
191                             else:
192                                 with vwd.open(dst, "w") as n:
193                                     n.write(p.resolved)
194
195                 def keepemptydirs(p):
196                     if isinstance(p, arvados.collection.RichCollectionBase):
197                         if len(p) == 0:
198                             p.open(".keep", "w").close()
199                         else:
200                             for c in p:
201                                 keepemptydirs(p[c])
202
203                 keepemptydirs(vwd)
204
205                 if not runtimeContext.current_container:
206                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
207                 vwd.save_new(name=intermediate_collection_info["name"],
208                              owner_uuid=runtimeContext.project_uuid,
209                              ensure_unique_name=True,
210                              trash_at=intermediate_collection_info["trash_at"],
211                              properties=intermediate_collection_info["properties"])
212
213                 prev = None
214                 for f, p in sorteditems:
215                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
216                         (prev is not None and p.target.startswith(prev))):
217                         continue
218                     if p.target.startswith("/"):
219                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
220                     else:
221                         dst = p.target
222                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
223                     mounts[mountpoint] = {"kind": "collection",
224                                           "portable_data_hash": vwd.portable_data_hash(),
225                                           "path": dst}
226                     if p.type.startswith("Writable"):
227                         mounts[mountpoint]["writable"] = True
228                     prev = p.target + "/"
229
230         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
231         if self.environment:
232             container_request["environment"].update(self.environment)
233
234         if self.stdin:
235             sp = self.stdin[6:].split("/", 1)
236             mounts["stdin"] = {"kind": "collection",
237                                 "portable_data_hash": sp[0],
238                                 "path": sp[1]}
239
240         if self.stderr:
241             mounts["stderr"] = {"kind": "file",
242                                 "path": "%s/%s" % (self.outdir, self.stderr)}
243
244         if self.stdout:
245             mounts["stdout"] = {"kind": "file",
246                                 "path": "%s/%s" % (self.outdir, self.stdout)}
247
248         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
249
250         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
251                                                                     docker_req,
252                                                                     runtimeContext.pull_image,
253                                                                     runtimeContext.project_uuid,
254                                                                     runtimeContext.force_docker_pull,
255                                                                     runtimeContext.tmp_outdir_prefix,
256                                                                     runtimeContext.match_local_docker,
257                                                                     runtimeContext.copy_deps)
258
259         network_req, _ = self.get_requirement("NetworkAccess")
260         if network_req:
261             runtime_constraints["API"] = network_req["networkAccess"]
262
263         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
264         if api_req:
265             runtime_constraints["API"] = True
266
267         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
268         if runtime_req:
269             if "keep_cache" in runtime_req:
270                 runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
271             if "outputDirType" in runtime_req:
272                 if runtime_req["outputDirType"] == "local_output_dir":
273                     # Currently the default behavior.
274                     pass
275                 elif runtime_req["outputDirType"] == "keep_output_dir":
276                     mounts[self.outdir]= {
277                         "kind": "collection",
278                         "writable": True
279                     }
280
281         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
282         if partition_req:
283             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
284
285         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
286         if intermediate_output_req:
287             self.output_ttl = intermediate_output_req["outputTTL"]
288         else:
289             self.output_ttl = self.arvrunner.intermediate_output_ttl
290
291         if self.output_ttl < 0:
292             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
293
294
295         if self.arvrunner.api._rootDesc["revision"] >= "20210628":
296             storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
297             if storage_class_req and storage_class_req.get("intermediateStorageClass"):
298                 container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
299             else:
300                 container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
301
302         cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
303         if cuda_req:
304             runtime_constraints["cuda"] = {
305                 "device_count": resources.get("cudaDeviceCount", 1),
306                 "driver_version": cuda_req["cudaVersionMin"],
307                 "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
308             }
309
310         if runtimeContext.enable_preemptible is False:
311             scheduling_parameters["preemptible"] = False
312         else:
313             preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
314             if preemptible_req:
315                 scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
316             elif runtimeContext.enable_preemptible is True:
317                 scheduling_parameters["preemptible"] = True
318             elif runtimeContext.enable_preemptible is None:
319                 pass
320
321         if self.timelimit is not None and self.timelimit > 0:
322             scheduling_parameters["max_run_time"] = self.timelimit
323
324         extra_submit_params = {}
325         if runtimeContext.submit_runner_cluster:
326             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
327
328         container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name))
329         container_request["output_ttl"] = self.output_ttl
330         container_request["mounts"] = mounts
331         container_request["secret_mounts"] = secret_mounts
332         container_request["runtime_constraints"] = runtime_constraints
333         container_request["scheduling_parameters"] = scheduling_parameters
334
335         enable_reuse = runtimeContext.enable_reuse
336         if enable_reuse:
337             reuse_req, _ = self.get_requirement("WorkReuse")
338             if reuse_req:
339                 enable_reuse = reuse_req["enableReuse"]
340             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
341             if reuse_req:
342                 enable_reuse = reuse_req["enableReuse"]
343         container_request["use_existing"] = enable_reuse
344
345         properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
346         if properties_req:
347             for pr in properties_req["processProperties"]:
348                 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
349
350         output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
351         if output_properties_req:
352             if self.arvrunner.api._rootDesc["revision"] >= "20220510":
353                 container_request["output_properties"] = {}
354                 for pr in output_properties_req["outputProperties"]:
355                     container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
356             else:
357                 logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
358                                self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
359
360         if runtimeContext.runnerjob.startswith("arvwf:"):
361             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
362             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
363             if container_request["name"] == "main":
364                 container_request["name"] = wfrecord["name"]
365             container_request["properties"]["template_uuid"] = wfuuid
366
367         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
368
369         try:
370             if runtimeContext.submit_request_uuid:
371                 response = self.arvrunner.api.container_requests().update(
372                     uuid=runtimeContext.submit_request_uuid,
373                     body=container_request,
374                     **extra_submit_params
375                 ).execute(num_retries=self.arvrunner.num_retries)
376             else:
377                 response = self.arvrunner.api.container_requests().create(
378                     body=container_request,
379                     **extra_submit_params
380                 ).execute(num_retries=self.arvrunner.num_retries)
381
382             self.uuid = response["uuid"]
383             self.arvrunner.process_submitted(self)
384
385             if response["state"] == "Final":
386                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
387             else:
388                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
389         except Exception as e:
390             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
391             logger.debug("Container request was %s", container_request)
392             self.output_callback({}, "permanentFail")
393
394     def done(self, record):
395         outputs = {}
396         try:
397             container = self.arvrunner.api.containers().get(
398                 uuid=record["container_uuid"]
399             ).execute(num_retries=self.arvrunner.num_retries)
400             if container["state"] == "Complete":
401                 rcode = container["exit_code"]
402                 if self.successCodes and rcode in self.successCodes:
403                     processStatus = "success"
404                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
405                     processStatus = "temporaryFail"
406                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
407                     processStatus = "permanentFail"
408                 elif rcode == 0:
409                     processStatus = "success"
410                 else:
411                     processStatus = "permanentFail"
412
413                 if rcode == 137:
414                     logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin'.",
415                                  self.arvrunner.label(self))
416             else:
417                 processStatus = "permanentFail"
418
419             if processStatus == "permanentFail" and record["log_uuid"]:
420                 logc = arvados.collection.CollectionReader(record["log_uuid"],
421                                                            api_client=self.arvrunner.api,
422                                                            keep_client=self.arvrunner.keep_client,
423                                                            num_retries=self.arvrunner.num_retries)
424                 label = self.arvrunner.label(self)
425                 done.logtail(
426                     logc, logger.error,
427                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
428
429             if record["output_uuid"]:
430                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
431                     # Compute the trash time to avoid requesting the collection record.
432                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
433                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
434                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
435                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
436                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
437                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
438                 self.arvrunner.add_intermediate_output(record["output_uuid"])
439
440             if container["output"]:
441                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
442
443             properties = record["properties"].copy()
444             properties["cwl_output"] = outputs
445             self.arvrunner.api.container_requests().update(
446                 uuid=self.uuid,
447                 body={"container_request": {"properties": properties}}
448             ).execute(num_retries=self.arvrunner.num_retries)
449         except WorkflowException as e:
450             # Only include a stack trace if in debug mode.
451             # A stack trace may obfuscate more useful output about the workflow.
452             logger.error("%s unable to collect output from %s:\n%s",
453                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
454             processStatus = "permanentFail"
455         except Exception:
456             logger.exception("%s while getting output object:", self.arvrunner.label(self))
457             processStatus = "permanentFail"
458         finally:
459             self.output_callback(outputs, processStatus)
460
461
462 class RunnerContainer(Runner):
463     """Submit and manage a container that runs arvados-cwl-runner."""
464
465     def arvados_job_spec(self, runtimeContext, git_info):
466         """Create an Arvados container request for this workflow.
467
468         The returned dict can be used to create a container passed as
469         the +body+ argument to container_requests().create().
470         """
471
472         adjustDirObjs(self.job_order, trim_listing)
473         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
474         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
475
476         secret_mounts = {}
477         for param in sorted(self.job_order.keys()):
478             if self.secret_store.has_secret(self.job_order[param]):
479                 mnt = "/secrets/s%d" % len(secret_mounts)
480                 secret_mounts[mnt] = {
481                     "kind": "text",
482                     "content": self.secret_store.retrieve(self.job_order[param])
483                 }
484                 self.job_order[param] = {"$include": mnt}
485
486         container_req = {
487             "name": self.name,
488             "output_path": "/var/spool/cwl",
489             "cwd": "/var/spool/cwl",
490             "priority": self.priority,
491             "state": "Committed",
492             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
493             "mounts": {
494                 "/var/lib/cwl/cwl.input.json": {
495                     "kind": "json",
496                     "content": self.job_order
497                 },
498                 "stdout": {
499                     "kind": "file",
500                     "path": "/var/spool/cwl/cwl.output.json"
501                 },
502                 "/var/spool/cwl": {
503                     "kind": "collection",
504                     "writable": True
505                 }
506             },
507             "secret_mounts": secret_mounts,
508             "runtime_constraints": {
509                 "vcpus": math.ceil(self.submit_runner_cores),
510                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
511                 "API": True
512             },
513             "use_existing": False, # Never reuse the runner container - see #15497.
514             "properties": {}
515         }
516
517         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
518             sp = self.embedded_tool.tool["id"].split('/')
519             workflowcollection = sp[0][5:]
520             workflowname = "/".join(sp[1:])
521             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
522             container_req["mounts"]["/var/lib/cwl/workflow"] = {
523                 "kind": "collection",
524                 "portable_data_hash": "%s" % workflowcollection
525             }
526         elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
527             workflowpath = "/var/lib/cwl/workflow.json#main"
528             record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
529             packed = yaml.safe_load(record["definition"])
530             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
531                 "kind": "json",
532                 "content": packed
533             }
534             container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
535         else:
536             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
537             workflowpath = "/var/lib/cwl/workflow.json#main"
538             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
539                 "kind": "json",
540                 "content": packed
541             }
542
543         container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
544
545         properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
546         if properties_req:
547             builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
548             for pr in properties_req["processProperties"]:
549                 container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
550
551         # --local means execute the workflow instead of submitting a container request
552         # --api=containers means use the containers API
553         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
554         # --disable-validate because we already validated so don't need to do it again
555         # --eval-timeout is the timeout for javascript invocation
556         # --parallel-task-count is the number of threads to use for job submission
557         # --enable/disable-reuse sets desired job reuse
558         # --collection-cache-size sets aside memory to store collections
559         command = ["arvados-cwl-runner",
560                    "--local",
561                    "--api=containers",
562                    "--no-log-timestamps",
563                    "--disable-validate",
564                    "--disable-color",
565                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
566                    "--thread-count=%s" % self.arvrunner.thread_count,
567                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
568                    "--collection-cache-size=%s" % self.collection_cache_size]
569
570         if self.output_name:
571             command.append("--output-name=" + self.output_name)
572             container_req["output_name"] = self.output_name
573
574         if self.output_tags:
575             command.append("--output-tags=" + self.output_tags)
576
577         if runtimeContext.debug:
578             command.append("--debug")
579
580         if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
581             command.append("--storage-classes=" + runtimeContext.storage_classes)
582
583         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
584             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
585
586         if runtimeContext.on_error:
587             command.append("--on-error=" + self.on_error)
588
589         if runtimeContext.intermediate_output_ttl:
590             command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
591
592         if runtimeContext.trash_intermediate:
593             command.append("--trash-intermediate")
594
595         if runtimeContext.project_uuid:
596             command.append("--project-uuid="+runtimeContext.project_uuid)
597
598         if self.enable_dev:
599             command.append("--enable-dev")
600
601         if runtimeContext.enable_preemptible is True:
602             command.append("--enable-preemptible")
603
604         if runtimeContext.enable_preemptible is False:
605             command.append("--disable-preemptible")
606
607         if runtimeContext.varying_url_params:
608             command.append("--varying-url-params="+runtimeContext.varying_url_params)
609
610         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
611
612         container_req["command"] = command
613
614         return container_req
615
616
617     def run(self, runtimeContext):
618         runtimeContext.keepprefix = "keep:"
619         job_spec = self.arvados_job_spec(runtimeContext, self.git_info)
620         if runtimeContext.project_uuid:
621             job_spec["owner_uuid"] = runtimeContext.project_uuid
622
623         extra_submit_params = {}
624         if runtimeContext.submit_runner_cluster:
625             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
626
627         if runtimeContext.submit_request_uuid:
628             if "cluster_id" in extra_submit_params:
629                 # Doesn't make sense for "update" and actually fails
630                 del extra_submit_params["cluster_id"]
631             response = self.arvrunner.api.container_requests().update(
632                 uuid=runtimeContext.submit_request_uuid,
633                 body=job_spec,
634                 **extra_submit_params
635             ).execute(num_retries=self.arvrunner.num_retries)
636         else:
637             response = self.arvrunner.api.container_requests().create(
638                 body=job_spec,
639                 **extra_submit_params
640             ).execute(num_retries=self.arvrunner.num_retries)
641
642         self.uuid = response["uuid"]
643         self.arvrunner.process_submitted(self)
644
645         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
646
647         workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
648         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
649         url = ""
650         if workbench2:
651             url = "{}processes/{}".format(workbench2, response["uuid"])
652         elif workbench1:
653             url = "{}container_requests/{}".format(workbench1, response["uuid"])
654         if url:
655             logger.info("Monitor workflow progress at %s", url)
656
657
658     def done(self, record):
659         try:
660             container = self.arvrunner.api.containers().get(
661                 uuid=record["container_uuid"]
662             ).execute(num_retries=self.arvrunner.num_retries)
663             container["log"] = record["log_uuid"]
664         except Exception:
665             logger.exception("%s while getting runner container", self.arvrunner.label(self))
666             self.arvrunner.output_callback({}, "permanentFail")
667         else:
668             super(RunnerContainer, self).done(container)