17301: Shorted OOM message. Limit added detail to 40 lines.
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.job import JobBase
26
27 import arvados.collection
28
29 from .arvdocker import arv_docker_get_image
30 from . import done
31 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
32 from .fsaccess import CollectionFetcher
33 from .pathmapper import NoFollowPathMapper, trim_listing
34 from .perf import Perf
35 from ._version import __version__
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 class ArvadosContainer(JobBase):
41     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
42
43     def __init__(self, runner, job_runtime,
44                  builder,   # type: Builder
45                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
46                  make_path_mapper,  # type: Callable[..., PathMapper]
47                  requirements,      # type: List[Dict[Text, Text]]
48                  hints,     # type: List[Dict[Text, Text]]
49                  name       # type: Text
50     ):
51         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
52         self.arvrunner = runner
53         self.job_runtime = job_runtime
54         self.running = False
55         self.uuid = None
56
57     def update_pipeline_component(self, r):
58         pass
59
60     def _required_env(self):
61         env = {}
62         env["HOME"] = self.outdir
63         env["TMPDIR"] = self.tmpdir
64         return env
65
66     def run(self, toplevelRuntimeContext):
67         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
68         # which calls makeJobRunner() to get a new ArvadosContainer
69         # object.  The fields that define execution such as
70         # command_line, environment, etc are set on the
71         # ArvadosContainer object by CommandLineTool.job() before
72         # run() is called.
73
74         runtimeContext = self.job_runtime
75
76         if runtimeContext.submit_request_uuid:
77             container_request = self.arvrunner.api.container_requests().get(
78                 uuid=runtimeContext.submit_request_uuid
79             ).execute(num_retries=self.arvrunner.num_retries)
80         else:
81             container_request = {}
82
83         container_request["command"] = self.command_line
84         container_request["name"] = self.name
85         container_request["output_path"] = self.outdir
86         container_request["cwd"] = self.outdir
87         container_request["priority"] = runtimeContext.priority
88         container_request["state"] = "Committed"
89         container_request.setdefault("properties", {})
90
91         runtime_constraints = {}
92
93         if runtimeContext.project_uuid:
94             container_request["owner_uuid"] = runtimeContext.project_uuid
95
96         if self.arvrunner.secret_store.has_secret(self.command_line):
97             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
98
99         if self.arvrunner.secret_store.has_secret(self.environment):
100             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
101
102         resources = self.builder.resources
103         if resources is not None:
104             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
105             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
106
107         mounts = {
108             self.outdir: {
109                 "kind": "tmp",
110                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
111             },
112             self.tmpdir: {
113                 "kind": "tmp",
114                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
115             }
116         }
117         secret_mounts = {}
118         scheduling_parameters = {}
119
120         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
121         rf.sort(key=lambda k: k.resolved)
122         prevdir = None
123         for resolved, target, tp, stg in rf:
124             if not stg:
125                 continue
126             if prevdir and target.startswith(prevdir):
127                 continue
128             if tp == "Directory":
129                 targetdir = target
130             else:
131                 targetdir = os.path.dirname(target)
132             sp = resolved.split("/", 1)
133             pdh = sp[0][5:]   # remove "keep:"
134             mounts[targetdir] = {
135                 "kind": "collection",
136                 "portable_data_hash": pdh
137             }
138             if pdh in self.pathmapper.pdh_to_uuid:
139                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
140             if len(sp) == 2:
141                 if tp == "Directory":
142                     path = sp[1]
143                 else:
144                     path = os.path.dirname(sp[1])
145                 if path and path != "/":
146                     mounts[targetdir]["path"] = path
147             prevdir = targetdir + "/"
148
149         with Perf(metrics, "generatefiles %s" % self.name):
150             if self.generatefiles["listing"]:
151                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
152                                                     keep_client=self.arvrunner.keep_client,
153                                                     num_retries=self.arvrunner.num_retries)
154                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
155                                                     separateDirs=False)
156
157                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
158
159                 logger.debug("generatemapper is %s", sorteditems)
160
161                 with Perf(metrics, "createfiles %s" % self.name):
162                     for f, p in sorteditems:
163                         if not p.target:
164                             continue
165
166                         if p.target.startswith("/"):
167                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
168                         else:
169                             dst = p.target
170
171                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
172                             if p.resolved.startswith("_:"):
173                                 vwd.mkdirs(dst)
174                             else:
175                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
176                                 vwd.copy(path or ".", dst, source_collection=source)
177                         elif p.type == "CreateFile":
178                             if self.arvrunner.secret_store.has_secret(p.resolved):
179                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
180                                 secret_mounts[mountpoint] = {
181                                     "kind": "text",
182                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
183                                 }
184                             else:
185                                 with vwd.open(dst, "w") as n:
186                                     n.write(p.resolved)
187
188                 def keepemptydirs(p):
189                     if isinstance(p, arvados.collection.RichCollectionBase):
190                         if len(p) == 0:
191                             p.open(".keep", "w").close()
192                         else:
193                             for c in p:
194                                 keepemptydirs(p[c])
195
196                 keepemptydirs(vwd)
197
198                 if not runtimeContext.current_container:
199                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
200                 info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
201                 vwd.save_new(name=info["name"],
202                              owner_uuid=runtimeContext.project_uuid,
203                              ensure_unique_name=True,
204                              trash_at=info["trash_at"],
205                              properties=info["properties"])
206
207                 prev = None
208                 for f, p in sorteditems:
209                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
210                         (prev is not None and p.target.startswith(prev))):
211                         continue
212                     if p.target.startswith("/"):
213                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
214                     else:
215                         dst = p.target
216                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
217                     mounts[mountpoint] = {"kind": "collection",
218                                           "portable_data_hash": vwd.portable_data_hash(),
219                                           "path": dst}
220                     if p.type.startswith("Writable"):
221                         mounts[mountpoint]["writable"] = True
222                     prev = p.target + "/"
223
224         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
225         if self.environment:
226             container_request["environment"].update(self.environment)
227
228         if self.stdin:
229             sp = self.stdin[6:].split("/", 1)
230             mounts["stdin"] = {"kind": "collection",
231                                 "portable_data_hash": sp[0],
232                                 "path": sp[1]}
233
234         if self.stderr:
235             mounts["stderr"] = {"kind": "file",
236                                 "path": "%s/%s" % (self.outdir, self.stderr)}
237
238         if self.stdout:
239             mounts["stdout"] = {"kind": "file",
240                                 "path": "%s/%s" % (self.outdir, self.stdout)}
241
242         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
243
244         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
245                                                                     docker_req,
246                                                                     runtimeContext.pull_image,
247                                                                     runtimeContext.project_uuid,
248                                                                     runtimeContext.force_docker_pull,
249                                                                     runtimeContext.tmp_outdir_prefix,
250                                                                     runtimeContext.match_local_docker)
251
252         network_req, _ = self.get_requirement("NetworkAccess")
253         if network_req:
254             runtime_constraints["API"] = network_req["networkAccess"]
255
256         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
257         if api_req:
258             runtime_constraints["API"] = True
259
260         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
261         if runtime_req:
262             if "keep_cache" in runtime_req:
263                 runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
264             if "outputDirType" in runtime_req:
265                 if runtime_req["outputDirType"] == "local_output_dir":
266                     # Currently the default behavior.
267                     pass
268                 elif runtime_req["outputDirType"] == "keep_output_dir":
269                     mounts[self.outdir]= {
270                         "kind": "collection",
271                         "writable": True
272                     }
273
274         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
275         if partition_req:
276             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
277
278         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
279         if intermediate_output_req:
280             self.output_ttl = intermediate_output_req["outputTTL"]
281         else:
282             self.output_ttl = self.arvrunner.intermediate_output_ttl
283
284         if self.output_ttl < 0:
285             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
286
287
288         if self.arvrunner.api._rootDesc["revision"] >= "20210628":
289             storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
290             if storage_class_req and storage_class_req.get("intermediateStorageClass"):
291                 container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
292             else:
293                 container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
294
295         cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
296         if cuda_req:
297             runtime_constraints["cuda"] = {
298                 "device_count": resources.get("cudaDeviceCount", 1),
299                 "driver_version": cuda_req["cudaVersionMin"],
300                 "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
301             }
302
303         if runtimeContext.enable_preemptible is False:
304             scheduling_parameters["preemptible"] = False
305         else:
306             preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
307             if preemptible_req:
308                 scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
309             elif runtimeContext.enable_preemptible is True:
310                 scheduling_parameters["preemptible"] = True
311             elif runtimeContext.enable_preemptible is None:
312                 pass
313
314         if self.timelimit is not None and self.timelimit > 0:
315             scheduling_parameters["max_run_time"] = self.timelimit
316
317         extra_submit_params = {}
318         if runtimeContext.submit_runner_cluster:
319             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
320
321         container_request["output_name"] = "Output for step %s" % (self.name)
322         container_request["output_ttl"] = self.output_ttl
323         container_request["mounts"] = mounts
324         container_request["secret_mounts"] = secret_mounts
325         container_request["runtime_constraints"] = runtime_constraints
326         container_request["scheduling_parameters"] = scheduling_parameters
327
328         enable_reuse = runtimeContext.enable_reuse
329         if enable_reuse:
330             reuse_req, _ = self.get_requirement("WorkReuse")
331             if reuse_req:
332                 enable_reuse = reuse_req["enableReuse"]
333             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
334             if reuse_req:
335                 enable_reuse = reuse_req["enableReuse"]
336         container_request["use_existing"] = enable_reuse
337
338         properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
339         if properties_req:
340             for pr in properties_req["processProperties"]:
341                 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
342
343         if runtimeContext.runnerjob.startswith("arvwf:"):
344             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
345             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
346             if container_request["name"] == "main":
347                 container_request["name"] = wfrecord["name"]
348             container_request["properties"]["template_uuid"] = wfuuid
349
350         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
351
352         try:
353             if runtimeContext.submit_request_uuid:
354                 response = self.arvrunner.api.container_requests().update(
355                     uuid=runtimeContext.submit_request_uuid,
356                     body=container_request,
357                     **extra_submit_params
358                 ).execute(num_retries=self.arvrunner.num_retries)
359             else:
360                 response = self.arvrunner.api.container_requests().create(
361                     body=container_request,
362                     **extra_submit_params
363                 ).execute(num_retries=self.arvrunner.num_retries)
364
365             self.uuid = response["uuid"]
366             self.arvrunner.process_submitted(self)
367
368             if response["state"] == "Final":
369                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
370             else:
371                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
372         except Exception as e:
373             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
374             logger.debug("Container request was %s", container_request)
375             self.output_callback({}, "permanentFail")
376
377     def done(self, record):
378         outputs = {}
379         try:
380             container = self.arvrunner.api.containers().get(
381                 uuid=record["container_uuid"]
382             ).execute(num_retries=self.arvrunner.num_retries)
383             if container["state"] == "Complete":
384                 rcode = container["exit_code"]
385                 if self.successCodes and rcode in self.successCodes:
386                     processStatus = "success"
387                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
388                     processStatus = "temporaryFail"
389                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
390                     processStatus = "permanentFail"
391                 elif rcode == 0:
392                     processStatus = "success"
393                 else:
394                     processStatus = "permanentFail"
395
396                 if rcode == 137:
397                     logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin'.",
398                                  self.arvrunner.label(self))
399             else:
400                 processStatus = "permanentFail"
401
402             if processStatus == "permanentFail" and record["log_uuid"]:
403                 logc = arvados.collection.CollectionReader(record["log_uuid"],
404                                                            api_client=self.arvrunner.api,
405                                                            keep_client=self.arvrunner.keep_client,
406                                                            num_retries=self.arvrunner.num_retries)
407                 label = self.arvrunner.label(self)
408                 done.logtail(
409                     logc, logger.error,
410                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
411
412             if record["output_uuid"]:
413                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
414                     # Compute the trash time to avoid requesting the collection record.
415                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
416                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
417                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
418                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
419                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
420                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
421                 self.arvrunner.add_intermediate_output(record["output_uuid"])
422
423             if container["output"]:
424                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
425         except WorkflowException as e:
426             # Only include a stack trace if in debug mode.
427             # A stack trace may obfuscate more useful output about the workflow.
428             logger.error("%s unable to collect output from %s:\n%s",
429                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
430             processStatus = "permanentFail"
431         except Exception:
432             logger.exception("%s while getting output object:", self.arvrunner.label(self))
433             processStatus = "permanentFail"
434         finally:
435             self.output_callback(outputs, processStatus)
436
437
438 class RunnerContainer(Runner):
439     """Submit and manage a container that runs arvados-cwl-runner."""
440
441     def arvados_job_spec(self, runtimeContext):
442         """Create an Arvados container request for this workflow.
443
444         The returned dict can be used to create a container passed as
445         the +body+ argument to container_requests().create().
446         """
447
448         adjustDirObjs(self.job_order, trim_listing)
449         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
450         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
451
452         secret_mounts = {}
453         for param in sorted(self.job_order.keys()):
454             if self.secret_store.has_secret(self.job_order[param]):
455                 mnt = "/secrets/s%d" % len(secret_mounts)
456                 secret_mounts[mnt] = {
457                     "kind": "text",
458                     "content": self.secret_store.retrieve(self.job_order[param])
459                 }
460                 self.job_order[param] = {"$include": mnt}
461
462         container_req = {
463             "name": self.name,
464             "output_path": "/var/spool/cwl",
465             "cwd": "/var/spool/cwl",
466             "priority": self.priority,
467             "state": "Committed",
468             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
469             "mounts": {
470                 "/var/lib/cwl/cwl.input.json": {
471                     "kind": "json",
472                     "content": self.job_order
473                 },
474                 "stdout": {
475                     "kind": "file",
476                     "path": "/var/spool/cwl/cwl.output.json"
477                 },
478                 "/var/spool/cwl": {
479                     "kind": "collection",
480                     "writable": True
481                 }
482             },
483             "secret_mounts": secret_mounts,
484             "runtime_constraints": {
485                 "vcpus": math.ceil(self.submit_runner_cores),
486                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
487                 "API": True
488             },
489             "use_existing": False, # Never reuse the runner container - see #15497.
490             "properties": {}
491         }
492
493         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
494             sp = self.embedded_tool.tool["id"].split('/')
495             workflowcollection = sp[0][5:]
496             workflowname = "/".join(sp[1:])
497             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
498             container_req["mounts"]["/var/lib/cwl/workflow"] = {
499                 "kind": "collection",
500                 "portable_data_hash": "%s" % workflowcollection
501             }
502         else:
503             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
504             workflowpath = "/var/lib/cwl/workflow.json#main"
505             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
506                 "kind": "json",
507                 "content": packed
508             }
509             if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
510                 container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
511
512         properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
513         if properties_req:
514             builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
515             for pr in properties_req["processProperties"]:
516                 container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
517
518         # --local means execute the workflow instead of submitting a container request
519         # --api=containers means use the containers API
520         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
521         # --disable-validate because we already validated so don't need to do it again
522         # --eval-timeout is the timeout for javascript invocation
523         # --parallel-task-count is the number of threads to use for job submission
524         # --enable/disable-reuse sets desired job reuse
525         # --collection-cache-size sets aside memory to store collections
526         command = ["arvados-cwl-runner",
527                    "--local",
528                    "--api=containers",
529                    "--no-log-timestamps",
530                    "--disable-validate",
531                    "--disable-color",
532                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
533                    "--thread-count=%s" % self.arvrunner.thread_count,
534                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
535                    "--collection-cache-size=%s" % self.collection_cache_size]
536
537         if self.output_name:
538             command.append("--output-name=" + self.output_name)
539             container_req["output_name"] = self.output_name
540
541         if self.output_tags:
542             command.append("--output-tags=" + self.output_tags)
543
544         if runtimeContext.debug:
545             command.append("--debug")
546
547         if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
548             command.append("--storage-classes=" + runtimeContext.storage_classes)
549
550         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
551             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
552
553         if self.on_error:
554             command.append("--on-error=" + self.on_error)
555
556         if self.intermediate_output_ttl:
557             command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
558
559         if self.arvrunner.trash_intermediate:
560             command.append("--trash-intermediate")
561
562         if self.arvrunner.project_uuid:
563             command.append("--project-uuid="+self.arvrunner.project_uuid)
564
565         if self.enable_dev:
566             command.append("--enable-dev")
567
568         if runtimeContext.enable_preemptible is True:
569             command.append("--enable-preemptible")
570
571         if runtimeContext.enable_preemptible is False:
572             command.append("--disable-preemptible")
573
574         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
575
576         container_req["command"] = command
577
578         return container_req
579
580
581     def run(self, runtimeContext):
582         runtimeContext.keepprefix = "keep:"
583         job_spec = self.arvados_job_spec(runtimeContext)
584         if self.arvrunner.project_uuid:
585             job_spec["owner_uuid"] = self.arvrunner.project_uuid
586
587         extra_submit_params = {}
588         if runtimeContext.submit_runner_cluster:
589             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
590
591         if runtimeContext.submit_request_uuid:
592             if "cluster_id" in extra_submit_params:
593                 # Doesn't make sense for "update" and actually fails
594                 del extra_submit_params["cluster_id"]
595             response = self.arvrunner.api.container_requests().update(
596                 uuid=runtimeContext.submit_request_uuid,
597                 body=job_spec,
598                 **extra_submit_params
599             ).execute(num_retries=self.arvrunner.num_retries)
600         else:
601             response = self.arvrunner.api.container_requests().create(
602                 body=job_spec,
603                 **extra_submit_params
604             ).execute(num_retries=self.arvrunner.num_retries)
605
606         self.uuid = response["uuid"]
607         self.arvrunner.process_submitted(self)
608
609         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
610
611         workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
612         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
613         url = ""
614         if workbench2:
615             url = "{}processes/{}".format(workbench2, response["uuid"])
616         elif workbench1:
617             url = "{}container_requests/{}".format(workbench1, response["uuid"])
618         if url:
619             logger.info("Monitor workflow progress at %s", url)
620
621
622     def done(self, record):
623         try:
624             container = self.arvrunner.api.containers().get(
625                 uuid=record["container_uuid"]
626             ).execute(num_retries=self.arvrunner.num_retries)
627             container["log"] = record["log_uuid"]
628         except Exception:
629             logger.exception("%s while getting runner container", self.arvrunner.label(self))
630             self.arvrunner.output_callback({}, "permanentFail")
631         else:
632             super(RunnerContainer, self).done(container)