Merge branch '15370-loopback-dispatchcloud'
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.job import JobBase
26
27 import arvados.collection
28
29 from .arvdocker import arv_docker_get_image
30 from . import done
31 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
32 from .fsaccess import CollectionFetcher
33 from .pathmapper import NoFollowPathMapper, trim_listing
34 from .perf import Perf
35 from ._version import __version__
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 def cleanup_name_for_collection(name):
41     return name.replace("/", " ")
42
43 class ArvadosContainer(JobBase):
44     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
45
46     def __init__(self, runner, job_runtime,
47                  builder,   # type: Builder
48                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
49                  make_path_mapper,  # type: Callable[..., PathMapper]
50                  requirements,      # type: List[Dict[Text, Text]]
51                  hints,     # type: List[Dict[Text, Text]]
52                  name       # type: Text
53     ):
54         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
55         self.arvrunner = runner
56         self.job_runtime = job_runtime
57         self.running = False
58         self.uuid = None
59
60     def update_pipeline_component(self, r):
61         pass
62
63     def _required_env(self):
64         env = {}
65         env["HOME"] = self.outdir
66         env["TMPDIR"] = self.tmpdir
67         return env
68
69     def run(self, toplevelRuntimeContext):
70         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
71         # which calls makeJobRunner() to get a new ArvadosContainer
72         # object.  The fields that define execution such as
73         # command_line, environment, etc are set on the
74         # ArvadosContainer object by CommandLineTool.job() before
75         # run() is called.
76
77         runtimeContext = self.job_runtime
78
79         if runtimeContext.submit_request_uuid:
80             container_request = self.arvrunner.api.container_requests().get(
81                 uuid=runtimeContext.submit_request_uuid
82             ).execute(num_retries=self.arvrunner.num_retries)
83         else:
84             container_request = {}
85
86         container_request["command"] = self.command_line
87         container_request["name"] = self.name
88         container_request["output_path"] = self.outdir
89         container_request["cwd"] = self.outdir
90         container_request["priority"] = runtimeContext.priority
91         container_request["state"] = "Committed"
92         container_request.setdefault("properties", {})
93
94         runtime_constraints = {}
95
96         if runtimeContext.project_uuid:
97             container_request["owner_uuid"] = runtimeContext.project_uuid
98
99         if self.arvrunner.secret_store.has_secret(self.command_line):
100             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
101
102         if self.arvrunner.secret_store.has_secret(self.environment):
103             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
104
105         resources = self.builder.resources
106         if resources is not None:
107             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
108             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
109
110         mounts = {
111             self.outdir: {
112                 "kind": "tmp",
113                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
114             },
115             self.tmpdir: {
116                 "kind": "tmp",
117                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
118             }
119         }
120         secret_mounts = {}
121         scheduling_parameters = {}
122
123         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
124         rf.sort(key=lambda k: k.resolved)
125         prevdir = None
126         for resolved, target, tp, stg in rf:
127             if not stg:
128                 continue
129             if prevdir and target.startswith(prevdir):
130                 continue
131             if tp == "Directory":
132                 targetdir = target
133             else:
134                 targetdir = os.path.dirname(target)
135             sp = resolved.split("/", 1)
136             pdh = sp[0][5:]   # remove "keep:"
137             mounts[targetdir] = {
138                 "kind": "collection",
139                 "portable_data_hash": pdh
140             }
141             if pdh in self.pathmapper.pdh_to_uuid:
142                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
143             if len(sp) == 2:
144                 if tp == "Directory":
145                     path = sp[1]
146                 else:
147                     path = os.path.dirname(sp[1])
148                 if path and path != "/":
149                     mounts[targetdir]["path"] = path
150             prevdir = targetdir + "/"
151
152         intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
153
154         with Perf(metrics, "generatefiles %s" % self.name):
155             if self.generatefiles["listing"]:
156                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
157                                                     keep_client=self.arvrunner.keep_client,
158                                                     num_retries=self.arvrunner.num_retries)
159                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
160                                                     separateDirs=False)
161
162                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
163
164                 logger.debug("generatemapper is %s", sorteditems)
165
166                 with Perf(metrics, "createfiles %s" % self.name):
167                     for f, p in sorteditems:
168                         if not p.target:
169                             continue
170
171                         if p.target.startswith("/"):
172                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
173                         else:
174                             dst = p.target
175
176                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
177                             if p.resolved.startswith("_:"):
178                                 vwd.mkdirs(dst)
179                             else:
180                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
181                                 vwd.copy(path or ".", dst, source_collection=source)
182                         elif p.type == "CreateFile":
183                             if self.arvrunner.secret_store.has_secret(p.resolved):
184                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
185                                 secret_mounts[mountpoint] = {
186                                     "kind": "text",
187                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
188                                 }
189                             else:
190                                 with vwd.open(dst, "w") as n:
191                                     n.write(p.resolved)
192
193                 def keepemptydirs(p):
194                     if isinstance(p, arvados.collection.RichCollectionBase):
195                         if len(p) == 0:
196                             p.open(".keep", "w").close()
197                         else:
198                             for c in p:
199                                 keepemptydirs(p[c])
200
201                 keepemptydirs(vwd)
202
203                 if not runtimeContext.current_container:
204                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
205                 vwd.save_new(name=intermediate_collection_info["name"],
206                              owner_uuid=runtimeContext.project_uuid,
207                              ensure_unique_name=True,
208                              trash_at=intermediate_collection_info["trash_at"],
209                              properties=intermediate_collection_info["properties"])
210
211                 prev = None
212                 for f, p in sorteditems:
213                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
214                         (prev is not None and p.target.startswith(prev))):
215                         continue
216                     if p.target.startswith("/"):
217                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
218                     else:
219                         dst = p.target
220                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
221                     mounts[mountpoint] = {"kind": "collection",
222                                           "portable_data_hash": vwd.portable_data_hash(),
223                                           "path": dst}
224                     if p.type.startswith("Writable"):
225                         mounts[mountpoint]["writable"] = True
226                     prev = p.target + "/"
227
228         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
229         if self.environment:
230             container_request["environment"].update(self.environment)
231
232         if self.stdin:
233             sp = self.stdin[6:].split("/", 1)
234             mounts["stdin"] = {"kind": "collection",
235                                 "portable_data_hash": sp[0],
236                                 "path": sp[1]}
237
238         if self.stderr:
239             mounts["stderr"] = {"kind": "file",
240                                 "path": "%s/%s" % (self.outdir, self.stderr)}
241
242         if self.stdout:
243             mounts["stdout"] = {"kind": "file",
244                                 "path": "%s/%s" % (self.outdir, self.stdout)}
245
246         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
247
248         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
249                                                                     docker_req,
250                                                                     runtimeContext.pull_image,
251                                                                     runtimeContext.project_uuid,
252                                                                     runtimeContext.force_docker_pull,
253                                                                     runtimeContext.tmp_outdir_prefix,
254                                                                     runtimeContext.match_local_docker,
255                                                                     runtimeContext.copy_deps)
256
257         network_req, _ = self.get_requirement("NetworkAccess")
258         if network_req:
259             runtime_constraints["API"] = network_req["networkAccess"]
260
261         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
262         if api_req:
263             runtime_constraints["API"] = True
264
265         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
266         if runtime_req:
267             if "keep_cache" in runtime_req:
268                 runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
269             if "outputDirType" in runtime_req:
270                 if runtime_req["outputDirType"] == "local_output_dir":
271                     # Currently the default behavior.
272                     pass
273                 elif runtime_req["outputDirType"] == "keep_output_dir":
274                     mounts[self.outdir]= {
275                         "kind": "collection",
276                         "writable": True
277                     }
278
279         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
280         if partition_req:
281             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
282
283         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
284         if intermediate_output_req:
285             self.output_ttl = intermediate_output_req["outputTTL"]
286         else:
287             self.output_ttl = self.arvrunner.intermediate_output_ttl
288
289         if self.output_ttl < 0:
290             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
291
292
293         if self.arvrunner.api._rootDesc["revision"] >= "20210628":
294             storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
295             if storage_class_req and storage_class_req.get("intermediateStorageClass"):
296                 container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
297             else:
298                 container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
299
300         cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
301         if cuda_req:
302             runtime_constraints["cuda"] = {
303                 "device_count": resources.get("cudaDeviceCount", 1),
304                 "driver_version": cuda_req["cudaVersionMin"],
305                 "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
306             }
307
308         if runtimeContext.enable_preemptible is False:
309             scheduling_parameters["preemptible"] = False
310         else:
311             preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
312             if preemptible_req:
313                 scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
314             elif runtimeContext.enable_preemptible is True:
315                 scheduling_parameters["preemptible"] = True
316             elif runtimeContext.enable_preemptible is None:
317                 pass
318
319         if self.timelimit is not None and self.timelimit > 0:
320             scheduling_parameters["max_run_time"] = self.timelimit
321
322         extra_submit_params = {}
323         if runtimeContext.submit_runner_cluster:
324             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
325
326         container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name))
327         container_request["output_ttl"] = self.output_ttl
328         container_request["mounts"] = mounts
329         container_request["secret_mounts"] = secret_mounts
330         container_request["runtime_constraints"] = runtime_constraints
331         container_request["scheduling_parameters"] = scheduling_parameters
332
333         enable_reuse = runtimeContext.enable_reuse
334         if enable_reuse:
335             reuse_req, _ = self.get_requirement("WorkReuse")
336             if reuse_req:
337                 enable_reuse = reuse_req["enableReuse"]
338             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
339             if reuse_req:
340                 enable_reuse = reuse_req["enableReuse"]
341         container_request["use_existing"] = enable_reuse
342
343         properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
344         if properties_req:
345             for pr in properties_req["processProperties"]:
346                 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
347
348         output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
349         if output_properties_req:
350             if self.arvrunner.api._rootDesc["revision"] >= "20220510":
351                 container_request["output_properties"] = {}
352                 for pr in output_properties_req["outputProperties"]:
353                     container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
354             else:
355                 logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
356                                self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
357
358         if runtimeContext.runnerjob.startswith("arvwf:"):
359             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
360             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
361             if container_request["name"] == "main":
362                 container_request["name"] = wfrecord["name"]
363             container_request["properties"]["template_uuid"] = wfuuid
364
365         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
366
367         try:
368             if runtimeContext.submit_request_uuid:
369                 response = self.arvrunner.api.container_requests().update(
370                     uuid=runtimeContext.submit_request_uuid,
371                     body=container_request,
372                     **extra_submit_params
373                 ).execute(num_retries=self.arvrunner.num_retries)
374             else:
375                 response = self.arvrunner.api.container_requests().create(
376                     body=container_request,
377                     **extra_submit_params
378                 ).execute(num_retries=self.arvrunner.num_retries)
379
380             self.uuid = response["uuid"]
381             self.arvrunner.process_submitted(self)
382
383             if response["state"] == "Final":
384                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
385             else:
386                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
387         except Exception as e:
388             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
389             logger.debug("Container request was %s", container_request)
390             self.output_callback({}, "permanentFail")
391
392     def done(self, record):
393         outputs = {}
394         try:
395             container = self.arvrunner.api.containers().get(
396                 uuid=record["container_uuid"]
397             ).execute(num_retries=self.arvrunner.num_retries)
398             if container["state"] == "Complete":
399                 rcode = container["exit_code"]
400                 if self.successCodes and rcode in self.successCodes:
401                     processStatus = "success"
402                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
403                     processStatus = "temporaryFail"
404                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
405                     processStatus = "permanentFail"
406                 elif rcode == 0:
407                     processStatus = "success"
408                 else:
409                     processStatus = "permanentFail"
410
411                 if rcode == 137:
412                     logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin'.",
413                                  self.arvrunner.label(self))
414             else:
415                 processStatus = "permanentFail"
416
417             if processStatus == "permanentFail" and record["log_uuid"]:
418                 logc = arvados.collection.CollectionReader(record["log_uuid"],
419                                                            api_client=self.arvrunner.api,
420                                                            keep_client=self.arvrunner.keep_client,
421                                                            num_retries=self.arvrunner.num_retries)
422                 label = self.arvrunner.label(self)
423                 done.logtail(
424                     logc, logger.error,
425                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
426
427             if record["output_uuid"]:
428                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
429                     # Compute the trash time to avoid requesting the collection record.
430                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
431                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
432                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
433                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
434                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
435                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
436                 self.arvrunner.add_intermediate_output(record["output_uuid"])
437
438             if container["output"]:
439                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
440         except WorkflowException as e:
441             # Only include a stack trace if in debug mode.
442             # A stack trace may obfuscate more useful output about the workflow.
443             logger.error("%s unable to collect output from %s:\n%s",
444                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
445             processStatus = "permanentFail"
446         except Exception:
447             logger.exception("%s while getting output object:", self.arvrunner.label(self))
448             processStatus = "permanentFail"
449         finally:
450             self.output_callback(outputs, processStatus)
451
452
453 class RunnerContainer(Runner):
454     """Submit and manage a container that runs arvados-cwl-runner."""
455
456     def arvados_job_spec(self, runtimeContext):
457         """Create an Arvados container request for this workflow.
458
459         The returned dict can be used to create a container passed as
460         the +body+ argument to container_requests().create().
461         """
462
463         adjustDirObjs(self.job_order, trim_listing)
464         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
465         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
466
467         secret_mounts = {}
468         for param in sorted(self.job_order.keys()):
469             if self.secret_store.has_secret(self.job_order[param]):
470                 mnt = "/secrets/s%d" % len(secret_mounts)
471                 secret_mounts[mnt] = {
472                     "kind": "text",
473                     "content": self.secret_store.retrieve(self.job_order[param])
474                 }
475                 self.job_order[param] = {"$include": mnt}
476
477         container_req = {
478             "name": self.name,
479             "output_path": "/var/spool/cwl",
480             "cwd": "/var/spool/cwl",
481             "priority": self.priority,
482             "state": "Committed",
483             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
484             "mounts": {
485                 "/var/lib/cwl/cwl.input.json": {
486                     "kind": "json",
487                     "content": self.job_order
488                 },
489                 "stdout": {
490                     "kind": "file",
491                     "path": "/var/spool/cwl/cwl.output.json"
492                 },
493                 "/var/spool/cwl": {
494                     "kind": "collection",
495                     "writable": True
496                 }
497             },
498             "secret_mounts": secret_mounts,
499             "runtime_constraints": {
500                 "vcpus": math.ceil(self.submit_runner_cores),
501                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
502                 "API": True
503             },
504             "use_existing": False, # Never reuse the runner container - see #15497.
505             "properties": {}
506         }
507
508         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
509             sp = self.embedded_tool.tool["id"].split('/')
510             workflowcollection = sp[0][5:]
511             workflowname = "/".join(sp[1:])
512             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
513             container_req["mounts"]["/var/lib/cwl/workflow"] = {
514                 "kind": "collection",
515                 "portable_data_hash": "%s" % workflowcollection
516             }
517         else:
518             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext)
519             workflowpath = "/var/lib/cwl/workflow.json#main"
520             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
521                 "kind": "json",
522                 "content": packed
523             }
524             if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
525                 container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
526
527         properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
528         if properties_req:
529             builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
530             for pr in properties_req["processProperties"]:
531                 container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
532
533         # --local means execute the workflow instead of submitting a container request
534         # --api=containers means use the containers API
535         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
536         # --disable-validate because we already validated so don't need to do it again
537         # --eval-timeout is the timeout for javascript invocation
538         # --parallel-task-count is the number of threads to use for job submission
539         # --enable/disable-reuse sets desired job reuse
540         # --collection-cache-size sets aside memory to store collections
541         command = ["arvados-cwl-runner",
542                    "--local",
543                    "--api=containers",
544                    "--no-log-timestamps",
545                    "--disable-validate",
546                    "--disable-color",
547                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
548                    "--thread-count=%s" % self.arvrunner.thread_count,
549                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
550                    "--collection-cache-size=%s" % self.collection_cache_size]
551
552         if self.output_name:
553             command.append("--output-name=" + self.output_name)
554             container_req["output_name"] = self.output_name
555
556         if self.output_tags:
557             command.append("--output-tags=" + self.output_tags)
558
559         if runtimeContext.debug:
560             command.append("--debug")
561
562         if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
563             command.append("--storage-classes=" + runtimeContext.storage_classes)
564
565         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
566             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
567
568         if runtimeContext.on_error:
569             command.append("--on-error=" + self.on_error)
570
571         if runtimeContext.intermediate_output_ttl:
572             command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
573
574         if runtimeContext.trash_intermediate:
575             command.append("--trash-intermediate")
576
577         if runtimeContext.project_uuid:
578             command.append("--project-uuid="+runtimeContext.project_uuid)
579
580         if self.enable_dev:
581             command.append("--enable-dev")
582
583         if runtimeContext.enable_preemptible is True:
584             command.append("--enable-preemptible")
585
586         if runtimeContext.enable_preemptible is False:
587             command.append("--disable-preemptible")
588
589         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
590
591         container_req["command"] = command
592
593         return container_req
594
595
596     def run(self, runtimeContext):
597         runtimeContext.keepprefix = "keep:"
598         job_spec = self.arvados_job_spec(runtimeContext)
599         if runtimeContext.project_uuid:
600             job_spec["owner_uuid"] = runtimeContext.project_uuid
601
602         extra_submit_params = {}
603         if runtimeContext.submit_runner_cluster:
604             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
605
606         if runtimeContext.submit_request_uuid:
607             if "cluster_id" in extra_submit_params:
608                 # Doesn't make sense for "update" and actually fails
609                 del extra_submit_params["cluster_id"]
610             response = self.arvrunner.api.container_requests().update(
611                 uuid=runtimeContext.submit_request_uuid,
612                 body=job_spec,
613                 **extra_submit_params
614             ).execute(num_retries=self.arvrunner.num_retries)
615         else:
616             response = self.arvrunner.api.container_requests().create(
617                 body=job_spec,
618                 **extra_submit_params
619             ).execute(num_retries=self.arvrunner.num_retries)
620
621         self.uuid = response["uuid"]
622         self.arvrunner.process_submitted(self)
623
624         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
625
626         workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
627         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
628         url = ""
629         if workbench2:
630             url = "{}processes/{}".format(workbench2, response["uuid"])
631         elif workbench1:
632             url = "{}container_requests/{}".format(workbench1, response["uuid"])
633         if url:
634             logger.info("Monitor workflow progress at %s", url)
635
636
637     def done(self, record):
638         try:
639             container = self.arvrunner.api.containers().get(
640                 uuid=record["container_uuid"]
641             ).execute(num_retries=self.arvrunner.num_retries)
642             container["log"] = record["log_uuid"]
643         except Exception:
644             logger.exception("%s while getting runner container", self.arvrunner.label(self))
645             self.arvrunner.output_callback({}, "permanentFail")
646         else:
647             super(RunnerContainer, self).done(container)