Merge branch '14018-acr-set-container-properties' into main
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.job import JobBase
26
27 import arvados.collection
28
29 from .arvdocker import arv_docker_get_image
30 from . import done
31 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
32 from .fsaccess import CollectionFetcher
33 from .pathmapper import NoFollowPathMapper, trim_listing
34 from .perf import Perf
35 from ._version import __version__
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 class ArvadosContainer(JobBase):
41     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
42
43     def __init__(self, runner, job_runtime,
44                  builder,   # type: Builder
45                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
46                  make_path_mapper,  # type: Callable[..., PathMapper]
47                  requirements,      # type: List[Dict[Text, Text]]
48                  hints,     # type: List[Dict[Text, Text]]
49                  name       # type: Text
50     ):
51         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
52         self.arvrunner = runner
53         self.job_runtime = job_runtime
54         self.running = False
55         self.uuid = None
56
57     def update_pipeline_component(self, r):
58         pass
59
60     def run(self, runtimeContext):
61         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
62         # which calls makeJobRunner() to get a new ArvadosContainer
63         # object.  The fields that define execution such as
64         # command_line, environment, etc are set on the
65         # ArvadosContainer object by CommandLineTool.job() before
66         # run() is called.
67
68         runtimeContext = self.job_runtime
69
70         if runtimeContext.submit_request_uuid:
71             container_request = self.arvrunner.api.container_requests().get(
72                 uuid=runtimeContext.submit_request_uuid
73             ).execute(num_retries=self.arvrunner.num_retries)
74         else:
75             container_request = {}
76
77         container_request["command"] = self.command_line
78         container_request["name"] = self.name
79         container_request["output_path"] = self.outdir
80         container_request["cwd"] = self.outdir
81         container_request["priority"] = runtimeContext.priority
82         container_request["state"] = "Committed"
83         container_request.setdefault("properties", {})
84
85         runtime_constraints = {}
86
87         if runtimeContext.project_uuid:
88             container_request["owner_uuid"] = runtimeContext.project_uuid
89
90         if self.arvrunner.secret_store.has_secret(self.command_line):
91             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
92
93         if self.arvrunner.secret_store.has_secret(self.environment):
94             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
95
96         resources = self.builder.resources
97         if resources is not None:
98             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
99             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
100
101         mounts = {
102             self.outdir: {
103                 "kind": "tmp",
104                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
105             },
106             self.tmpdir: {
107                 "kind": "tmp",
108                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
109             }
110         }
111         secret_mounts = {}
112         scheduling_parameters = {}
113
114         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
115         rf.sort(key=lambda k: k.resolved)
116         prevdir = None
117         for resolved, target, tp, stg in rf:
118             if not stg:
119                 continue
120             if prevdir and target.startswith(prevdir):
121                 continue
122             if tp == "Directory":
123                 targetdir = target
124             else:
125                 targetdir = os.path.dirname(target)
126             sp = resolved.split("/", 1)
127             pdh = sp[0][5:]   # remove "keep:"
128             mounts[targetdir] = {
129                 "kind": "collection",
130                 "portable_data_hash": pdh
131             }
132             if pdh in self.pathmapper.pdh_to_uuid:
133                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
134             if len(sp) == 2:
135                 if tp == "Directory":
136                     path = sp[1]
137                 else:
138                     path = os.path.dirname(sp[1])
139                 if path and path != "/":
140                     mounts[targetdir]["path"] = path
141             prevdir = targetdir + "/"
142
143         with Perf(metrics, "generatefiles %s" % self.name):
144             if self.generatefiles["listing"]:
145                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
146                                                     keep_client=self.arvrunner.keep_client,
147                                                     num_retries=self.arvrunner.num_retries)
148                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
149                                                     separateDirs=False)
150
151                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
152
153                 logger.debug("generatemapper is %s", sorteditems)
154
155                 with Perf(metrics, "createfiles %s" % self.name):
156                     for f, p in sorteditems:
157                         if not p.target:
158                             continue
159
160                         if p.target.startswith("/"):
161                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
162                         else:
163                             dst = p.target
164
165                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
166                             if p.resolved.startswith("_:"):
167                                 vwd.mkdirs(dst)
168                             else:
169                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
170                                 vwd.copy(path or ".", dst, source_collection=source)
171                         elif p.type == "CreateFile":
172                             if self.arvrunner.secret_store.has_secret(p.resolved):
173                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
174                                 secret_mounts[mountpoint] = {
175                                     "kind": "text",
176                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
177                                 }
178                             else:
179                                 with vwd.open(dst, "w") as n:
180                                     n.write(p.resolved)
181
182                 def keepemptydirs(p):
183                     if isinstance(p, arvados.collection.RichCollectionBase):
184                         if len(p) == 0:
185                             p.open(".keep", "w").close()
186                         else:
187                             for c in p:
188                                 keepemptydirs(p[c])
189
190                 keepemptydirs(vwd)
191
192                 if not runtimeContext.current_container:
193                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
194                 info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
195                 vwd.save_new(name=info["name"],
196                              owner_uuid=runtimeContext.project_uuid,
197                              ensure_unique_name=True,
198                              trash_at=info["trash_at"],
199                              properties=info["properties"])
200
201                 prev = None
202                 for f, p in sorteditems:
203                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
204                         (prev is not None and p.target.startswith(prev))):
205                         continue
206                     if p.target.startswith("/"):
207                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
208                     else:
209                         dst = p.target
210                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
211                     mounts[mountpoint] = {"kind": "collection",
212                                           "portable_data_hash": vwd.portable_data_hash(),
213                                           "path": dst}
214                     if p.type.startswith("Writable"):
215                         mounts[mountpoint]["writable"] = True
216                     prev = p.target + "/"
217
218         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
219         if self.environment:
220             container_request["environment"].update(self.environment)
221
222         if self.stdin:
223             sp = self.stdin[6:].split("/", 1)
224             mounts["stdin"] = {"kind": "collection",
225                                 "portable_data_hash": sp[0],
226                                 "path": sp[1]}
227
228         if self.stderr:
229             mounts["stderr"] = {"kind": "file",
230                                 "path": "%s/%s" % (self.outdir, self.stderr)}
231
232         if self.stdout:
233             mounts["stdout"] = {"kind": "file",
234                                 "path": "%s/%s" % (self.outdir, self.stdout)}
235
236         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
237         if not docker_req:
238             docker_req = {"dockerImageId": "arvados/jobs:"+__version__}
239
240         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
241                                                                     docker_req,
242                                                                     runtimeContext.pull_image,
243                                                                     runtimeContext.project_uuid,
244                                                                     runtimeContext.force_docker_pull,
245                                                                     runtimeContext.tmp_outdir_prefix)
246
247         network_req, _ = self.get_requirement("NetworkAccess")
248         if network_req:
249             runtime_constraints["API"] = network_req["networkAccess"]
250
251         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
252         if api_req:
253             runtime_constraints["API"] = True
254
255         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
256         if runtime_req:
257             if "keep_cache" in runtime_req:
258                 runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
259             if "outputDirType" in runtime_req:
260                 if runtime_req["outputDirType"] == "local_output_dir":
261                     # Currently the default behavior.
262                     pass
263                 elif runtime_req["outputDirType"] == "keep_output_dir":
264                     mounts[self.outdir]= {
265                         "kind": "collection",
266                         "writable": True
267                     }
268
269         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
270         if partition_req:
271             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
272
273         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
274         if intermediate_output_req:
275             self.output_ttl = intermediate_output_req["outputTTL"]
276         else:
277             self.output_ttl = self.arvrunner.intermediate_output_ttl
278
279         if self.output_ttl < 0:
280             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
281
282         storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
283         if storage_class_req and storage_class_req.get("intermediateStorageClass"):
284             container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
285         else:
286             container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
287
288         if self.timelimit is not None and self.timelimit > 0:
289             scheduling_parameters["max_run_time"] = self.timelimit
290
291         extra_submit_params = {}
292         if runtimeContext.submit_runner_cluster:
293             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
294
295         container_request["output_name"] = "Output for step %s" % (self.name)
296         container_request["output_ttl"] = self.output_ttl
297         container_request["mounts"] = mounts
298         container_request["secret_mounts"] = secret_mounts
299         container_request["runtime_constraints"] = runtime_constraints
300         container_request["scheduling_parameters"] = scheduling_parameters
301
302         enable_reuse = runtimeContext.enable_reuse
303         if enable_reuse:
304             reuse_req, _ = self.get_requirement("WorkReuse")
305             if reuse_req:
306                 enable_reuse = reuse_req["enableReuse"]
307             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
308             if reuse_req:
309                 enable_reuse = reuse_req["enableReuse"]
310         container_request["use_existing"] = enable_reuse
311
312         properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
313         if properties_req:
314             for pr in properties_req["processProperties"]:
315                 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
316
317         if runtimeContext.runnerjob.startswith("arvwf:"):
318             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
319             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
320             if container_request["name"] == "main":
321                 container_request["name"] = wfrecord["name"]
322             container_request["properties"]["template_uuid"] = wfuuid
323
324         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
325
326         try:
327             if runtimeContext.submit_request_uuid:
328                 response = self.arvrunner.api.container_requests().update(
329                     uuid=runtimeContext.submit_request_uuid,
330                     body=container_request,
331                     **extra_submit_params
332                 ).execute(num_retries=self.arvrunner.num_retries)
333             else:
334                 response = self.arvrunner.api.container_requests().create(
335                     body=container_request,
336                     **extra_submit_params
337                 ).execute(num_retries=self.arvrunner.num_retries)
338
339             self.uuid = response["uuid"]
340             self.arvrunner.process_submitted(self)
341
342             if response["state"] == "Final":
343                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
344             else:
345                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
346         except Exception as e:
347             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
348             logger.debug("Container request was %s", container_request)
349             self.output_callback({}, "permanentFail")
350
351     def done(self, record):
352         outputs = {}
353         try:
354             container = self.arvrunner.api.containers().get(
355                 uuid=record["container_uuid"]
356             ).execute(num_retries=self.arvrunner.num_retries)
357             if container["state"] == "Complete":
358                 rcode = container["exit_code"]
359                 if self.successCodes and rcode in self.successCodes:
360                     processStatus = "success"
361                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
362                     processStatus = "temporaryFail"
363                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
364                     processStatus = "permanentFail"
365                 elif rcode == 0:
366                     processStatus = "success"
367                 else:
368                     processStatus = "permanentFail"
369             else:
370                 processStatus = "permanentFail"
371
372             if processStatus == "permanentFail" and record["log_uuid"]:
373                 logc = arvados.collection.CollectionReader(record["log_uuid"],
374                                                            api_client=self.arvrunner.api,
375                                                            keep_client=self.arvrunner.keep_client,
376                                                            num_retries=self.arvrunner.num_retries)
377                 label = self.arvrunner.label(self)
378                 done.logtail(
379                     logc, logger.error,
380                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
381
382             if record["output_uuid"]:
383                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
384                     # Compute the trash time to avoid requesting the collection record.
385                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
386                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
387                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
388                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
389                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
390                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
391                 self.arvrunner.add_intermediate_output(record["output_uuid"])
392
393             if container["output"]:
394                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
395         except WorkflowException as e:
396             # Only include a stack trace if in debug mode.
397             # A stack trace may obfuscate more useful output about the workflow.
398             logger.error("%s unable to collect output from %s:\n%s",
399                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
400             processStatus = "permanentFail"
401         except Exception:
402             logger.exception("%s while getting output object:", self.arvrunner.label(self))
403             processStatus = "permanentFail"
404         finally:
405             self.output_callback(outputs, processStatus)
406
407
408 class RunnerContainer(Runner):
409     """Submit and manage a container that runs arvados-cwl-runner."""
410
411     def arvados_job_spec(self, runtimeContext):
412         """Create an Arvados container request for this workflow.
413
414         The returned dict can be used to create a container passed as
415         the +body+ argument to container_requests().create().
416         """
417
418         adjustDirObjs(self.job_order, trim_listing)
419         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
420         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
421
422         secret_mounts = {}
423         for param in sorted(self.job_order.keys()):
424             if self.secret_store.has_secret(self.job_order[param]):
425                 mnt = "/secrets/s%d" % len(secret_mounts)
426                 secret_mounts[mnt] = {
427                     "kind": "text",
428                     "content": self.secret_store.retrieve(self.job_order[param])
429                 }
430                 self.job_order[param] = {"$include": mnt}
431
432         container_req = {
433             "name": self.name,
434             "output_path": "/var/spool/cwl",
435             "cwd": "/var/spool/cwl",
436             "priority": self.priority,
437             "state": "Committed",
438             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
439             "mounts": {
440                 "/var/lib/cwl/cwl.input.json": {
441                     "kind": "json",
442                     "content": self.job_order
443                 },
444                 "stdout": {
445                     "kind": "file",
446                     "path": "/var/spool/cwl/cwl.output.json"
447                 },
448                 "/var/spool/cwl": {
449                     "kind": "collection",
450                     "writable": True
451                 }
452             },
453             "secret_mounts": secret_mounts,
454             "runtime_constraints": {
455                 "vcpus": math.ceil(self.submit_runner_cores),
456                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
457                 "API": True
458             },
459             "use_existing": False, # Never reuse the runner container - see #15497.
460             "properties": {}
461         }
462
463         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
464             sp = self.embedded_tool.tool["id"].split('/')
465             workflowcollection = sp[0][5:]
466             workflowname = "/".join(sp[1:])
467             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
468             container_req["mounts"]["/var/lib/cwl/workflow"] = {
469                 "kind": "collection",
470                 "portable_data_hash": "%s" % workflowcollection
471             }
472         else:
473             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
474             workflowpath = "/var/lib/cwl/workflow.json#main"
475             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
476                 "kind": "json",
477                 "content": packed
478             }
479             if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
480                 container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
481
482         properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
483         if properties_req:
484             builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
485             for pr in properties_req["processProperties"]:
486                 container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
487
488         # --local means execute the workflow instead of submitting a container request
489         # --api=containers means use the containers API
490         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
491         # --disable-validate because we already validated so don't need to do it again
492         # --eval-timeout is the timeout for javascript invocation
493         # --parallel-task-count is the number of threads to use for job submission
494         # --enable/disable-reuse sets desired job reuse
495         # --collection-cache-size sets aside memory to store collections
496         command = ["arvados-cwl-runner",
497                    "--local",
498                    "--api=containers",
499                    "--no-log-timestamps",
500                    "--disable-validate",
501                    "--disable-color",
502                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
503                    "--thread-count=%s" % self.arvrunner.thread_count,
504                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
505                    "--collection-cache-size=%s" % self.collection_cache_size]
506
507         if self.output_name:
508             command.append("--output-name=" + self.output_name)
509             container_req["output_name"] = self.output_name
510
511         if self.output_tags:
512             command.append("--output-tags=" + self.output_tags)
513
514         if runtimeContext.debug:
515             command.append("--debug")
516
517         if runtimeContext.storage_classes != "default":
518             command.append("--storage-classes=" + runtimeContext.storage_classes)
519
520         if runtimeContext.intermediate_storage_classes != "default":
521             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
522
523         if self.on_error:
524             command.append("--on-error=" + self.on_error)
525
526         if self.intermediate_output_ttl:
527             command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
528
529         if self.arvrunner.trash_intermediate:
530             command.append("--trash-intermediate")
531
532         if self.arvrunner.project_uuid:
533             command.append("--project-uuid="+self.arvrunner.project_uuid)
534
535         if self.enable_dev:
536             command.append("--enable-dev")
537
538         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
539
540         container_req["command"] = command
541
542         return container_req
543
544
545     def run(self, runtimeContext):
546         runtimeContext.keepprefix = "keep:"
547         job_spec = self.arvados_job_spec(runtimeContext)
548         if self.arvrunner.project_uuid:
549             job_spec["owner_uuid"] = self.arvrunner.project_uuid
550
551         extra_submit_params = {}
552         if runtimeContext.submit_runner_cluster:
553             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
554
555         if runtimeContext.submit_request_uuid:
556             if "cluster_id" in extra_submit_params:
557                 # Doesn't make sense for "update" and actually fails
558                 del extra_submit_params["cluster_id"]
559             response = self.arvrunner.api.container_requests().update(
560                 uuid=runtimeContext.submit_request_uuid,
561                 body=job_spec,
562                 **extra_submit_params
563             ).execute(num_retries=self.arvrunner.num_retries)
564         else:
565             response = self.arvrunner.api.container_requests().create(
566                 body=job_spec,
567                 **extra_submit_params
568             ).execute(num_retries=self.arvrunner.num_retries)
569
570         self.uuid = response["uuid"]
571         self.arvrunner.process_submitted(self)
572
573         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
574
575         workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
576         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
577         url = ""
578         if workbench2:
579             url = "{}processes/{}".format(workbench2, response["uuid"])
580         elif workbench1:
581             url = "{}container_requests/{}".format(workbench1, response["uuid"])
582         if url:
583             logger.info("Monitor workflow progress at %s", url)
584
585
586     def done(self, record):
587         try:
588             container = self.arvrunner.api.containers().get(
589                 uuid=record["container_uuid"]
590             ).execute(num_retries=self.arvrunner.num_retries)
591             container["log"] = record["log_uuid"]
592         except Exception:
593             logger.exception("%s while getting runner container", self.arvrunner.label(self))
594             self.arvrunner.output_callback({}, "permanentFail")
595         else:
596             super(RunnerContainer, self).done(container)