Merge branch '18323-cwl-gpu' refs #18323
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.job import JobBase
26
27 import arvados.collection
28
29 from .arvdocker import arv_docker_get_image
30 from . import done
31 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
32 from .fsaccess import CollectionFetcher
33 from .pathmapper import NoFollowPathMapper, trim_listing
34 from .perf import Perf
35 from ._version import __version__
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 class ArvadosContainer(JobBase):
41     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
42
43     def __init__(self, runner, job_runtime,
44                  builder,   # type: Builder
45                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
46                  make_path_mapper,  # type: Callable[..., PathMapper]
47                  requirements,      # type: List[Dict[Text, Text]]
48                  hints,     # type: List[Dict[Text, Text]]
49                  name       # type: Text
50     ):
51         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
52         self.arvrunner = runner
53         self.job_runtime = job_runtime
54         self.running = False
55         self.uuid = None
56
57     def update_pipeline_component(self, r):
58         pass
59
60     def _required_env(self):
61         env = {}
62         env["HOME"] = self.outdir
63         env["TMPDIR"] = self.tmpdir
64         return env
65
66     def run(self, runtimeContext):
67         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
68         # which calls makeJobRunner() to get a new ArvadosContainer
69         # object.  The fields that define execution such as
70         # command_line, environment, etc are set on the
71         # ArvadosContainer object by CommandLineTool.job() before
72         # run() is called.
73
74         runtimeContext = self.job_runtime
75
76         if runtimeContext.submit_request_uuid:
77             container_request = self.arvrunner.api.container_requests().get(
78                 uuid=runtimeContext.submit_request_uuid
79             ).execute(num_retries=self.arvrunner.num_retries)
80         else:
81             container_request = {}
82
83         container_request["command"] = self.command_line
84         container_request["name"] = self.name
85         container_request["output_path"] = self.outdir
86         container_request["cwd"] = self.outdir
87         container_request["priority"] = runtimeContext.priority
88         container_request["state"] = "Committed"
89         container_request.setdefault("properties", {})
90
91         runtime_constraints = {}
92
93         if runtimeContext.project_uuid:
94             container_request["owner_uuid"] = runtimeContext.project_uuid
95
96         if self.arvrunner.secret_store.has_secret(self.command_line):
97             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
98
99         if self.arvrunner.secret_store.has_secret(self.environment):
100             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
101
102         resources = self.builder.resources
103         if resources is not None:
104             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
105             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
106
107         mounts = {
108             self.outdir: {
109                 "kind": "tmp",
110                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
111             },
112             self.tmpdir: {
113                 "kind": "tmp",
114                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
115             }
116         }
117         secret_mounts = {}
118         scheduling_parameters = {}
119
120         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
121         rf.sort(key=lambda k: k.resolved)
122         prevdir = None
123         for resolved, target, tp, stg in rf:
124             if not stg:
125                 continue
126             if prevdir and target.startswith(prevdir):
127                 continue
128             if tp == "Directory":
129                 targetdir = target
130             else:
131                 targetdir = os.path.dirname(target)
132             sp = resolved.split("/", 1)
133             pdh = sp[0][5:]   # remove "keep:"
134             mounts[targetdir] = {
135                 "kind": "collection",
136                 "portable_data_hash": pdh
137             }
138             if pdh in self.pathmapper.pdh_to_uuid:
139                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
140             if len(sp) == 2:
141                 if tp == "Directory":
142                     path = sp[1]
143                 else:
144                     path = os.path.dirname(sp[1])
145                 if path and path != "/":
146                     mounts[targetdir]["path"] = path
147             prevdir = targetdir + "/"
148
149         with Perf(metrics, "generatefiles %s" % self.name):
150             if self.generatefiles["listing"]:
151                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
152                                                     keep_client=self.arvrunner.keep_client,
153                                                     num_retries=self.arvrunner.num_retries)
154                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
155                                                     separateDirs=False)
156
157                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
158
159                 logger.debug("generatemapper is %s", sorteditems)
160
161                 with Perf(metrics, "createfiles %s" % self.name):
162                     for f, p in sorteditems:
163                         if not p.target:
164                             continue
165
166                         if p.target.startswith("/"):
167                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
168                         else:
169                             dst = p.target
170
171                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
172                             if p.resolved.startswith("_:"):
173                                 vwd.mkdirs(dst)
174                             else:
175                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
176                                 vwd.copy(path or ".", dst, source_collection=source)
177                         elif p.type == "CreateFile":
178                             if self.arvrunner.secret_store.has_secret(p.resolved):
179                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
180                                 secret_mounts[mountpoint] = {
181                                     "kind": "text",
182                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
183                                 }
184                             else:
185                                 with vwd.open(dst, "w") as n:
186                                     n.write(p.resolved)
187
188                 def keepemptydirs(p):
189                     if isinstance(p, arvados.collection.RichCollectionBase):
190                         if len(p) == 0:
191                             p.open(".keep", "w").close()
192                         else:
193                             for c in p:
194                                 keepemptydirs(p[c])
195
196                 keepemptydirs(vwd)
197
198                 if not runtimeContext.current_container:
199                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
200                 info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
201                 vwd.save_new(name=info["name"],
202                              owner_uuid=runtimeContext.project_uuid,
203                              ensure_unique_name=True,
204                              trash_at=info["trash_at"],
205                              properties=info["properties"])
206
207                 prev = None
208                 for f, p in sorteditems:
209                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
210                         (prev is not None and p.target.startswith(prev))):
211                         continue
212                     if p.target.startswith("/"):
213                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
214                     else:
215                         dst = p.target
216                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
217                     mounts[mountpoint] = {"kind": "collection",
218                                           "portable_data_hash": vwd.portable_data_hash(),
219                                           "path": dst}
220                     if p.type.startswith("Writable"):
221                         mounts[mountpoint]["writable"] = True
222                     prev = p.target + "/"
223
224         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
225         if self.environment:
226             container_request["environment"].update(self.environment)
227
228         if self.stdin:
229             sp = self.stdin[6:].split("/", 1)
230             mounts["stdin"] = {"kind": "collection",
231                                 "portable_data_hash": sp[0],
232                                 "path": sp[1]}
233
234         if self.stderr:
235             mounts["stderr"] = {"kind": "file",
236                                 "path": "%s/%s" % (self.outdir, self.stderr)}
237
238         if self.stdout:
239             mounts["stdout"] = {"kind": "file",
240                                 "path": "%s/%s" % (self.outdir, self.stdout)}
241
242         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
243
244         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
245                                                                     docker_req,
246                                                                     runtimeContext.pull_image,
247                                                                     runtimeContext.project_uuid,
248                                                                     runtimeContext.force_docker_pull,
249                                                                     runtimeContext.tmp_outdir_prefix)
250
251         network_req, _ = self.get_requirement("NetworkAccess")
252         if network_req:
253             runtime_constraints["API"] = network_req["networkAccess"]
254
255         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
256         if api_req:
257             runtime_constraints["API"] = True
258
259         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
260         if runtime_req:
261             if "keep_cache" in runtime_req:
262                 runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
263             if "outputDirType" in runtime_req:
264                 if runtime_req["outputDirType"] == "local_output_dir":
265                     # Currently the default behavior.
266                     pass
267                 elif runtime_req["outputDirType"] == "keep_output_dir":
268                     mounts[self.outdir]= {
269                         "kind": "collection",
270                         "writable": True
271                     }
272
273         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
274         if partition_req:
275             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
276
277         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
278         if intermediate_output_req:
279             self.output_ttl = intermediate_output_req["outputTTL"]
280         else:
281             self.output_ttl = self.arvrunner.intermediate_output_ttl
282
283         if self.output_ttl < 0:
284             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
285
286
287         if self.arvrunner.api._rootDesc["revision"] >= "20210628":
288             storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
289             if storage_class_req and storage_class_req.get("intermediateStorageClass"):
290                 container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
291             else:
292                 container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
293
294         cuda_req, _ = self.get_requirement("http://arvados.org/cwl#CUDARequirement")
295         if cuda_req:
296             runtime_constraints["cuda"] = {
297                 "device_count": cuda_req.get("minDeviceCount", 1),
298                 "driver_version": cuda_req["minCUDADriverVersion"],
299                 "hardware_capability": cuda_req["minCUDAHardwareCapability"]
300             }
301
302         if self.timelimit is not None and self.timelimit > 0:
303             scheduling_parameters["max_run_time"] = self.timelimit
304
305         extra_submit_params = {}
306         if runtimeContext.submit_runner_cluster:
307             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
308
309         container_request["output_name"] = "Output for step %s" % (self.name)
310         container_request["output_ttl"] = self.output_ttl
311         container_request["mounts"] = mounts
312         container_request["secret_mounts"] = secret_mounts
313         container_request["runtime_constraints"] = runtime_constraints
314         container_request["scheduling_parameters"] = scheduling_parameters
315
316         enable_reuse = runtimeContext.enable_reuse
317         if enable_reuse:
318             reuse_req, _ = self.get_requirement("WorkReuse")
319             if reuse_req:
320                 enable_reuse = reuse_req["enableReuse"]
321             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
322             if reuse_req:
323                 enable_reuse = reuse_req["enableReuse"]
324         container_request["use_existing"] = enable_reuse
325
326         properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
327         if properties_req:
328             for pr in properties_req["processProperties"]:
329                 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
330
331         if runtimeContext.runnerjob.startswith("arvwf:"):
332             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
333             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
334             if container_request["name"] == "main":
335                 container_request["name"] = wfrecord["name"]
336             container_request["properties"]["template_uuid"] = wfuuid
337
338         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
339
340         try:
341             if runtimeContext.submit_request_uuid:
342                 response = self.arvrunner.api.container_requests().update(
343                     uuid=runtimeContext.submit_request_uuid,
344                     body=container_request,
345                     **extra_submit_params
346                 ).execute(num_retries=self.arvrunner.num_retries)
347             else:
348                 response = self.arvrunner.api.container_requests().create(
349                     body=container_request,
350                     **extra_submit_params
351                 ).execute(num_retries=self.arvrunner.num_retries)
352
353             self.uuid = response["uuid"]
354             self.arvrunner.process_submitted(self)
355
356             if response["state"] == "Final":
357                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
358             else:
359                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
360         except Exception as e:
361             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
362             logger.debug("Container request was %s", container_request)
363             self.output_callback({}, "permanentFail")
364
365     def done(self, record):
366         outputs = {}
367         try:
368             container = self.arvrunner.api.containers().get(
369                 uuid=record["container_uuid"]
370             ).execute(num_retries=self.arvrunner.num_retries)
371             if container["state"] == "Complete":
372                 rcode = container["exit_code"]
373                 if self.successCodes and rcode in self.successCodes:
374                     processStatus = "success"
375                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
376                     processStatus = "temporaryFail"
377                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
378                     processStatus = "permanentFail"
379                 elif rcode == 0:
380                     processStatus = "success"
381                 else:
382                     processStatus = "permanentFail"
383             else:
384                 processStatus = "permanentFail"
385
386             if processStatus == "permanentFail" and record["log_uuid"]:
387                 logc = arvados.collection.CollectionReader(record["log_uuid"],
388                                                            api_client=self.arvrunner.api,
389                                                            keep_client=self.arvrunner.keep_client,
390                                                            num_retries=self.arvrunner.num_retries)
391                 label = self.arvrunner.label(self)
392                 done.logtail(
393                     logc, logger.error,
394                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
395
396             if record["output_uuid"]:
397                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
398                     # Compute the trash time to avoid requesting the collection record.
399                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
400                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
401                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
402                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
403                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
404                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
405                 self.arvrunner.add_intermediate_output(record["output_uuid"])
406
407             if container["output"]:
408                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
409         except WorkflowException as e:
410             # Only include a stack trace if in debug mode.
411             # A stack trace may obfuscate more useful output about the workflow.
412             logger.error("%s unable to collect output from %s:\n%s",
413                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
414             processStatus = "permanentFail"
415         except Exception:
416             logger.exception("%s while getting output object:", self.arvrunner.label(self))
417             processStatus = "permanentFail"
418         finally:
419             self.output_callback(outputs, processStatus)
420
421
422 class RunnerContainer(Runner):
423     """Submit and manage a container that runs arvados-cwl-runner."""
424
425     def arvados_job_spec(self, runtimeContext):
426         """Create an Arvados container request for this workflow.
427
428         The returned dict can be used to create a container passed as
429         the +body+ argument to container_requests().create().
430         """
431
432         adjustDirObjs(self.job_order, trim_listing)
433         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
434         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
435
436         secret_mounts = {}
437         for param in sorted(self.job_order.keys()):
438             if self.secret_store.has_secret(self.job_order[param]):
439                 mnt = "/secrets/s%d" % len(secret_mounts)
440                 secret_mounts[mnt] = {
441                     "kind": "text",
442                     "content": self.secret_store.retrieve(self.job_order[param])
443                 }
444                 self.job_order[param] = {"$include": mnt}
445
446         container_req = {
447             "name": self.name,
448             "output_path": "/var/spool/cwl",
449             "cwd": "/var/spool/cwl",
450             "priority": self.priority,
451             "state": "Committed",
452             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
453             "mounts": {
454                 "/var/lib/cwl/cwl.input.json": {
455                     "kind": "json",
456                     "content": self.job_order
457                 },
458                 "stdout": {
459                     "kind": "file",
460                     "path": "/var/spool/cwl/cwl.output.json"
461                 },
462                 "/var/spool/cwl": {
463                     "kind": "collection",
464                     "writable": True
465                 }
466             },
467             "secret_mounts": secret_mounts,
468             "runtime_constraints": {
469                 "vcpus": math.ceil(self.submit_runner_cores),
470                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
471                 "API": True
472             },
473             "use_existing": False, # Never reuse the runner container - see #15497.
474             "properties": {}
475         }
476
477         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
478             sp = self.embedded_tool.tool["id"].split('/')
479             workflowcollection = sp[0][5:]
480             workflowname = "/".join(sp[1:])
481             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
482             container_req["mounts"]["/var/lib/cwl/workflow"] = {
483                 "kind": "collection",
484                 "portable_data_hash": "%s" % workflowcollection
485             }
486         else:
487             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
488             workflowpath = "/var/lib/cwl/workflow.json#main"
489             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
490                 "kind": "json",
491                 "content": packed
492             }
493             if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
494                 container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
495
496         properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
497         if properties_req:
498             builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
499             for pr in properties_req["processProperties"]:
500                 container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
501
502         # --local means execute the workflow instead of submitting a container request
503         # --api=containers means use the containers API
504         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
505         # --disable-validate because we already validated so don't need to do it again
506         # --eval-timeout is the timeout for javascript invocation
507         # --parallel-task-count is the number of threads to use for job submission
508         # --enable/disable-reuse sets desired job reuse
509         # --collection-cache-size sets aside memory to store collections
510         command = ["arvados-cwl-runner",
511                    "--local",
512                    "--api=containers",
513                    "--no-log-timestamps",
514                    "--disable-validate",
515                    "--disable-color",
516                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
517                    "--thread-count=%s" % self.arvrunner.thread_count,
518                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
519                    "--collection-cache-size=%s" % self.collection_cache_size]
520
521         if self.output_name:
522             command.append("--output-name=" + self.output_name)
523             container_req["output_name"] = self.output_name
524
525         if self.output_tags:
526             command.append("--output-tags=" + self.output_tags)
527
528         if runtimeContext.debug:
529             command.append("--debug")
530
531         if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
532             command.append("--storage-classes=" + runtimeContext.storage_classes)
533
534         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
535             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
536
537         if self.on_error:
538             command.append("--on-error=" + self.on_error)
539
540         if self.intermediate_output_ttl:
541             command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
542
543         if self.arvrunner.trash_intermediate:
544             command.append("--trash-intermediate")
545
546         if self.arvrunner.project_uuid:
547             command.append("--project-uuid="+self.arvrunner.project_uuid)
548
549         if self.enable_dev:
550             command.append("--enable-dev")
551
552         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
553
554         container_req["command"] = command
555
556         return container_req
557
558
559     def run(self, runtimeContext):
560         runtimeContext.keepprefix = "keep:"
561         job_spec = self.arvados_job_spec(runtimeContext)
562         if self.arvrunner.project_uuid:
563             job_spec["owner_uuid"] = self.arvrunner.project_uuid
564
565         extra_submit_params = {}
566         if runtimeContext.submit_runner_cluster:
567             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
568
569         if runtimeContext.submit_request_uuid:
570             if "cluster_id" in extra_submit_params:
571                 # Doesn't make sense for "update" and actually fails
572                 del extra_submit_params["cluster_id"]
573             response = self.arvrunner.api.container_requests().update(
574                 uuid=runtimeContext.submit_request_uuid,
575                 body=job_spec,
576                 **extra_submit_params
577             ).execute(num_retries=self.arvrunner.num_retries)
578         else:
579             response = self.arvrunner.api.container_requests().create(
580                 body=job_spec,
581                 **extra_submit_params
582             ).execute(num_retries=self.arvrunner.num_retries)
583
584         self.uuid = response["uuid"]
585         self.arvrunner.process_submitted(self)
586
587         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
588
589         workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
590         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
591         url = ""
592         if workbench2:
593             url = "{}processes/{}".format(workbench2, response["uuid"])
594         elif workbench1:
595             url = "{}container_requests/{}".format(workbench1, response["uuid"])
596         if url:
597             logger.info("Monitor workflow progress at %s", url)
598
599
600     def done(self, record):
601         try:
602             container = self.arvrunner.api.containers().get(
603                 uuid=record["container_uuid"]
604             ).execute(num_retries=self.arvrunner.num_retries)
605             container["log"] = record["log_uuid"]
606         except Exception:
607             logger.exception("%s while getting runner container", self.arvrunner.label(self))
608             self.arvrunner.output_callback({}, "permanentFail")
609         else:
610             super(RunnerContainer, self).done(container)