17004: Initial CWL support for setting output properties
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.job import JobBase
26
27 import arvados.collection
28
29 from .arvdocker import arv_docker_get_image
30 from . import done
31 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
32 from .fsaccess import CollectionFetcher
33 from .pathmapper import NoFollowPathMapper, trim_listing
34 from .perf import Perf
35 from ._version import __version__
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 class ArvadosContainer(JobBase):
41     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
42
43     def __init__(self, runner, job_runtime,
44                  builder,   # type: Builder
45                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
46                  make_path_mapper,  # type: Callable[..., PathMapper]
47                  requirements,      # type: List[Dict[Text, Text]]
48                  hints,     # type: List[Dict[Text, Text]]
49                  name       # type: Text
50     ):
51         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
52         self.arvrunner = runner
53         self.job_runtime = job_runtime
54         self.running = False
55         self.uuid = None
56
57     def update_pipeline_component(self, r):
58         pass
59
60     def _required_env(self):
61         env = {}
62         env["HOME"] = self.outdir
63         env["TMPDIR"] = self.tmpdir
64         return env
65
66     def run(self, toplevelRuntimeContext):
67         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
68         # which calls makeJobRunner() to get a new ArvadosContainer
69         # object.  The fields that define execution such as
70         # command_line, environment, etc are set on the
71         # ArvadosContainer object by CommandLineTool.job() before
72         # run() is called.
73
74         runtimeContext = self.job_runtime
75
76         if runtimeContext.submit_request_uuid:
77             container_request = self.arvrunner.api.container_requests().get(
78                 uuid=runtimeContext.submit_request_uuid
79             ).execute(num_retries=self.arvrunner.num_retries)
80         else:
81             container_request = {}
82
83         container_request["command"] = self.command_line
84         container_request["name"] = self.name
85         container_request["output_path"] = self.outdir
86         container_request["cwd"] = self.outdir
87         container_request["priority"] = runtimeContext.priority
88         container_request["state"] = "Committed"
89         container_request.setdefault("properties", {})
90
91         runtime_constraints = {}
92
93         if runtimeContext.project_uuid:
94             container_request["owner_uuid"] = runtimeContext.project_uuid
95
96         if self.arvrunner.secret_store.has_secret(self.command_line):
97             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
98
99         if self.arvrunner.secret_store.has_secret(self.environment):
100             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
101
102         resources = self.builder.resources
103         if resources is not None:
104             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
105             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
106
107         mounts = {
108             self.outdir: {
109                 "kind": "tmp",
110                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
111             },
112             self.tmpdir: {
113                 "kind": "tmp",
114                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
115             }
116         }
117         secret_mounts = {}
118         scheduling_parameters = {}
119
120         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
121         rf.sort(key=lambda k: k.resolved)
122         prevdir = None
123         for resolved, target, tp, stg in rf:
124             if not stg:
125                 continue
126             if prevdir and target.startswith(prevdir):
127                 continue
128             if tp == "Directory":
129                 targetdir = target
130             else:
131                 targetdir = os.path.dirname(target)
132             sp = resolved.split("/", 1)
133             pdh = sp[0][5:]   # remove "keep:"
134             mounts[targetdir] = {
135                 "kind": "collection",
136                 "portable_data_hash": pdh
137             }
138             if pdh in self.pathmapper.pdh_to_uuid:
139                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
140             if len(sp) == 2:
141                 if tp == "Directory":
142                     path = sp[1]
143                 else:
144                     path = os.path.dirname(sp[1])
145                 if path and path != "/":
146                     mounts[targetdir]["path"] = path
147             prevdir = targetdir + "/"
148
149         with Perf(metrics, "generatefiles %s" % self.name):
150             if self.generatefiles["listing"]:
151                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
152                                                     keep_client=self.arvrunner.keep_client,
153                                                     num_retries=self.arvrunner.num_retries)
154                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
155                                                     separateDirs=False)
156
157                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
158
159                 logger.debug("generatemapper is %s", sorteditems)
160
161                 with Perf(metrics, "createfiles %s" % self.name):
162                     for f, p in sorteditems:
163                         if not p.target:
164                             continue
165
166                         if p.target.startswith("/"):
167                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
168                         else:
169                             dst = p.target
170
171                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
172                             if p.resolved.startswith("_:"):
173                                 vwd.mkdirs(dst)
174                             else:
175                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
176                                 vwd.copy(path or ".", dst, source_collection=source)
177                         elif p.type == "CreateFile":
178                             if self.arvrunner.secret_store.has_secret(p.resolved):
179                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
180                                 secret_mounts[mountpoint] = {
181                                     "kind": "text",
182                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
183                                 }
184                             else:
185                                 with vwd.open(dst, "w") as n:
186                                     n.write(p.resolved)
187
188                 def keepemptydirs(p):
189                     if isinstance(p, arvados.collection.RichCollectionBase):
190                         if len(p) == 0:
191                             p.open(".keep", "w").close()
192                         else:
193                             for c in p:
194                                 keepemptydirs(p[c])
195
196                 keepemptydirs(vwd)
197
198                 if not runtimeContext.current_container:
199                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
200                 info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
201                 vwd.save_new(name=info["name"],
202                              owner_uuid=runtimeContext.project_uuid,
203                              ensure_unique_name=True,
204                              trash_at=info["trash_at"],
205                              properties=info["properties"])
206
207                 prev = None
208                 for f, p in sorteditems:
209                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
210                         (prev is not None and p.target.startswith(prev))):
211                         continue
212                     if p.target.startswith("/"):
213                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
214                     else:
215                         dst = p.target
216                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
217                     mounts[mountpoint] = {"kind": "collection",
218                                           "portable_data_hash": vwd.portable_data_hash(),
219                                           "path": dst}
220                     if p.type.startswith("Writable"):
221                         mounts[mountpoint]["writable"] = True
222                     prev = p.target + "/"
223
224         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
225         if self.environment:
226             container_request["environment"].update(self.environment)
227
228         if self.stdin:
229             sp = self.stdin[6:].split("/", 1)
230             mounts["stdin"] = {"kind": "collection",
231                                 "portable_data_hash": sp[0],
232                                 "path": sp[1]}
233
234         if self.stderr:
235             mounts["stderr"] = {"kind": "file",
236                                 "path": "%s/%s" % (self.outdir, self.stderr)}
237
238         if self.stdout:
239             mounts["stdout"] = {"kind": "file",
240                                 "path": "%s/%s" % (self.outdir, self.stdout)}
241
242         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
243
244         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
245                                                                     docker_req,
246                                                                     runtimeContext.pull_image,
247                                                                     runtimeContext.project_uuid,
248                                                                     runtimeContext.force_docker_pull,
249                                                                     runtimeContext.tmp_outdir_prefix,
250                                                                     runtimeContext.match_local_docker,
251                                                                     runtimeContext.copy_deps)
252
253         network_req, _ = self.get_requirement("NetworkAccess")
254         if network_req:
255             runtime_constraints["API"] = network_req["networkAccess"]
256
257         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
258         if api_req:
259             runtime_constraints["API"] = True
260
261         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
262         if runtime_req:
263             if "keep_cache" in runtime_req:
264                 runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
265             if "outputDirType" in runtime_req:
266                 if runtime_req["outputDirType"] == "local_output_dir":
267                     # Currently the default behavior.
268                     pass
269                 elif runtime_req["outputDirType"] == "keep_output_dir":
270                     mounts[self.outdir]= {
271                         "kind": "collection",
272                         "writable": True
273                     }
274
275         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
276         if partition_req:
277             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
278
279         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
280         if intermediate_output_req:
281             self.output_ttl = intermediate_output_req["outputTTL"]
282         else:
283             self.output_ttl = self.arvrunner.intermediate_output_ttl
284
285         if self.output_ttl < 0:
286             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
287
288
289         if self.arvrunner.api._rootDesc["revision"] >= "20210628":
290             storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
291             if storage_class_req and storage_class_req.get("intermediateStorageClass"):
292                 container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
293             else:
294                 container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
295
296         cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
297         if cuda_req:
298             runtime_constraints["cuda"] = {
299                 "device_count": resources.get("cudaDeviceCount", 1),
300                 "driver_version": cuda_req["cudaVersionMin"],
301                 "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
302             }
303
304         if runtimeContext.enable_preemptible is False:
305             scheduling_parameters["preemptible"] = False
306         else:
307             preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
308             if preemptible_req:
309                 scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
310             elif runtimeContext.enable_preemptible is True:
311                 scheduling_parameters["preemptible"] = True
312             elif runtimeContext.enable_preemptible is None:
313                 pass
314
315         if self.timelimit is not None and self.timelimit > 0:
316             scheduling_parameters["max_run_time"] = self.timelimit
317
318         extra_submit_params = {}
319         if runtimeContext.submit_runner_cluster:
320             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
321
322         container_request["output_name"] = "Output for step %s" % (self.name)
323         container_request["output_ttl"] = self.output_ttl
324         container_request["mounts"] = mounts
325         container_request["secret_mounts"] = secret_mounts
326         container_request["runtime_constraints"] = runtime_constraints
327         container_request["scheduling_parameters"] = scheduling_parameters
328
329         enable_reuse = runtimeContext.enable_reuse
330         if enable_reuse:
331             reuse_req, _ = self.get_requirement("WorkReuse")
332             if reuse_req:
333                 enable_reuse = reuse_req["enableReuse"]
334             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
335             if reuse_req:
336                 enable_reuse = reuse_req["enableReuse"]
337         container_request["use_existing"] = enable_reuse
338
339         properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
340         if properties_req:
341             for pr in properties_req["processProperties"]:
342                 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
343
344         output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
345         if output_properties_req:
346             for pr in output_properties_req["processProperties"]:
347                 container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
348
349         if runtimeContext.runnerjob.startswith("arvwf:"):
350             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
351             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
352             if container_request["name"] == "main":
353                 container_request["name"] = wfrecord["name"]
354             container_request["properties"]["template_uuid"] = wfuuid
355
356         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
357
358         try:
359             if runtimeContext.submit_request_uuid:
360                 response = self.arvrunner.api.container_requests().update(
361                     uuid=runtimeContext.submit_request_uuid,
362                     body=container_request,
363                     **extra_submit_params
364                 ).execute(num_retries=self.arvrunner.num_retries)
365             else:
366                 response = self.arvrunner.api.container_requests().create(
367                     body=container_request,
368                     **extra_submit_params
369                 ).execute(num_retries=self.arvrunner.num_retries)
370
371             self.uuid = response["uuid"]
372             self.arvrunner.process_submitted(self)
373
374             if response["state"] == "Final":
375                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
376             else:
377                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
378         except Exception as e:
379             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
380             logger.debug("Container request was %s", container_request)
381             self.output_callback({}, "permanentFail")
382
383     def done(self, record):
384         outputs = {}
385         try:
386             container = self.arvrunner.api.containers().get(
387                 uuid=record["container_uuid"]
388             ).execute(num_retries=self.arvrunner.num_retries)
389             if container["state"] == "Complete":
390                 rcode = container["exit_code"]
391                 if self.successCodes and rcode in self.successCodes:
392                     processStatus = "success"
393                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
394                     processStatus = "temporaryFail"
395                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
396                     processStatus = "permanentFail"
397                 elif rcode == 0:
398                     processStatus = "success"
399                 else:
400                     processStatus = "permanentFail"
401
402                 if rcode == 137:
403                     logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin'.",
404                                  self.arvrunner.label(self))
405             else:
406                 processStatus = "permanentFail"
407
408             if processStatus == "permanentFail" and record["log_uuid"]:
409                 logc = arvados.collection.CollectionReader(record["log_uuid"],
410                                                            api_client=self.arvrunner.api,
411                                                            keep_client=self.arvrunner.keep_client,
412                                                            num_retries=self.arvrunner.num_retries)
413                 label = self.arvrunner.label(self)
414                 done.logtail(
415                     logc, logger.error,
416                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
417
418             if record["output_uuid"]:
419                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
420                     # Compute the trash time to avoid requesting the collection record.
421                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
422                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
423                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
424                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
425                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
426                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
427                 self.arvrunner.add_intermediate_output(record["output_uuid"])
428
429             if container["output"]:
430                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
431         except WorkflowException as e:
432             # Only include a stack trace if in debug mode.
433             # A stack trace may obfuscate more useful output about the workflow.
434             logger.error("%s unable to collect output from %s:\n%s",
435                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
436             processStatus = "permanentFail"
437         except Exception:
438             logger.exception("%s while getting output object:", self.arvrunner.label(self))
439             processStatus = "permanentFail"
440         finally:
441             self.output_callback(outputs, processStatus)
442
443
444 class RunnerContainer(Runner):
445     """Submit and manage a container that runs arvados-cwl-runner."""
446
447     def arvados_job_spec(self, runtimeContext):
448         """Create an Arvados container request for this workflow.
449
450         The returned dict can be used to create a container passed as
451         the +body+ argument to container_requests().create().
452         """
453
454         adjustDirObjs(self.job_order, trim_listing)
455         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
456         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
457
458         secret_mounts = {}
459         for param in sorted(self.job_order.keys()):
460             if self.secret_store.has_secret(self.job_order[param]):
461                 mnt = "/secrets/s%d" % len(secret_mounts)
462                 secret_mounts[mnt] = {
463                     "kind": "text",
464                     "content": self.secret_store.retrieve(self.job_order[param])
465                 }
466                 self.job_order[param] = {"$include": mnt}
467
468         container_req = {
469             "name": self.name,
470             "output_path": "/var/spool/cwl",
471             "cwd": "/var/spool/cwl",
472             "priority": self.priority,
473             "state": "Committed",
474             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
475             "mounts": {
476                 "/var/lib/cwl/cwl.input.json": {
477                     "kind": "json",
478                     "content": self.job_order
479                 },
480                 "stdout": {
481                     "kind": "file",
482                     "path": "/var/spool/cwl/cwl.output.json"
483                 },
484                 "/var/spool/cwl": {
485                     "kind": "collection",
486                     "writable": True
487                 }
488             },
489             "secret_mounts": secret_mounts,
490             "runtime_constraints": {
491                 "vcpus": math.ceil(self.submit_runner_cores),
492                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
493                 "API": True
494             },
495             "use_existing": False, # Never reuse the runner container - see #15497.
496             "properties": {}
497         }
498
499         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
500             sp = self.embedded_tool.tool["id"].split('/')
501             workflowcollection = sp[0][5:]
502             workflowname = "/".join(sp[1:])
503             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
504             container_req["mounts"]["/var/lib/cwl/workflow"] = {
505                 "kind": "collection",
506                 "portable_data_hash": "%s" % workflowcollection
507             }
508         else:
509             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext)
510             workflowpath = "/var/lib/cwl/workflow.json#main"
511             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
512                 "kind": "json",
513                 "content": packed
514             }
515             if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
516                 container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
517
518         properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
519         if properties_req:
520             builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
521             for pr in properties_req["processProperties"]:
522                 container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
523
524         # --local means execute the workflow instead of submitting a container request
525         # --api=containers means use the containers API
526         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
527         # --disable-validate because we already validated so don't need to do it again
528         # --eval-timeout is the timeout for javascript invocation
529         # --parallel-task-count is the number of threads to use for job submission
530         # --enable/disable-reuse sets desired job reuse
531         # --collection-cache-size sets aside memory to store collections
532         command = ["arvados-cwl-runner",
533                    "--local",
534                    "--api=containers",
535                    "--no-log-timestamps",
536                    "--disable-validate",
537                    "--disable-color",
538                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
539                    "--thread-count=%s" % self.arvrunner.thread_count,
540                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
541                    "--collection-cache-size=%s" % self.collection_cache_size]
542
543         if self.output_name:
544             command.append("--output-name=" + self.output_name)
545             container_req["output_name"] = self.output_name
546
547         if self.output_tags:
548             command.append("--output-tags=" + self.output_tags)
549
550         if runtimeContext.debug:
551             command.append("--debug")
552
553         if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
554             command.append("--storage-classes=" + runtimeContext.storage_classes)
555
556         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
557             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
558
559         if runtimeContext.on_error:
560             command.append("--on-error=" + self.on_error)
561
562         if runtimeContext.intermediate_output_ttl:
563             command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
564
565         if runtimeContext.trash_intermediate:
566             command.append("--trash-intermediate")
567
568         if runtimeContext.project_uuid:
569             command.append("--project-uuid="+runtimeContext.project_uuid)
570
571         if self.enable_dev:
572             command.append("--enable-dev")
573
574         if runtimeContext.enable_preemptible is True:
575             command.append("--enable-preemptible")
576
577         if runtimeContext.enable_preemptible is False:
578             command.append("--disable-preemptible")
579
580         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
581
582         container_req["command"] = command
583
584         return container_req
585
586
587     def run(self, runtimeContext):
588         runtimeContext.keepprefix = "keep:"
589         job_spec = self.arvados_job_spec(runtimeContext)
590         if runtimeContext.project_uuid:
591             job_spec["owner_uuid"] = runtimeContext.project_uuid
592
593         extra_submit_params = {}
594         if runtimeContext.submit_runner_cluster:
595             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
596
597         if runtimeContext.submit_request_uuid:
598             if "cluster_id" in extra_submit_params:
599                 # Doesn't make sense for "update" and actually fails
600                 del extra_submit_params["cluster_id"]
601             response = self.arvrunner.api.container_requests().update(
602                 uuid=runtimeContext.submit_request_uuid,
603                 body=job_spec,
604                 **extra_submit_params
605             ).execute(num_retries=self.arvrunner.num_retries)
606         else:
607             response = self.arvrunner.api.container_requests().create(
608                 body=job_spec,
609                 **extra_submit_params
610             ).execute(num_retries=self.arvrunner.num_retries)
611
612         self.uuid = response["uuid"]
613         self.arvrunner.process_submitted(self)
614
615         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
616
617         workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
618         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
619         url = ""
620         if workbench2:
621             url = "{}processes/{}".format(workbench2, response["uuid"])
622         elif workbench1:
623             url = "{}container_requests/{}".format(workbench1, response["uuid"])
624         if url:
625             logger.info("Monitor workflow progress at %s", url)
626
627
628     def done(self, record):
629         try:
630             container = self.arvrunner.api.containers().get(
631                 uuid=record["container_uuid"]
632             ).execute(num_retries=self.arvrunner.num_retries)
633             container["log"] = record["log_uuid"]
634         except Exception:
635             logger.exception("%s while getting runner container", self.arvrunner.label(self))
636             self.arvrunner.output_callback({}, "permanentFail")
637         else:
638             super(RunnerContainer, self).done(container)