18842: Initial work adding disk cache option to configuration
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.job import JobBase
26
27 import arvados.collection
28
29 from .arvdocker import arv_docker_get_image
30 from . import done
31 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
32 from .fsaccess import CollectionFetcher
33 from .pathmapper import NoFollowPathMapper, trim_listing
34 from .perf import Perf
35 from ._version import __version__
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 def cleanup_name_for_collection(name):
41     return name.replace("/", " ")
42
43 class ArvadosContainer(JobBase):
44     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
45
46     def __init__(self, runner, job_runtime,
47                  builder,   # type: Builder
48                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
49                  make_path_mapper,  # type: Callable[..., PathMapper]
50                  requirements,      # type: List[Dict[Text, Text]]
51                  hints,     # type: List[Dict[Text, Text]]
52                  name       # type: Text
53     ):
54         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
55         self.arvrunner = runner
56         self.job_runtime = job_runtime
57         self.running = False
58         self.uuid = None
59
60     def update_pipeline_component(self, r):
61         pass
62
63     def _required_env(self):
64         env = {}
65         env["HOME"] = self.outdir
66         env["TMPDIR"] = self.tmpdir
67         return env
68
69     def run(self, toplevelRuntimeContext):
70         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
71         # which calls makeJobRunner() to get a new ArvadosContainer
72         # object.  The fields that define execution such as
73         # command_line, environment, etc are set on the
74         # ArvadosContainer object by CommandLineTool.job() before
75         # run() is called.
76
77         runtimeContext = self.job_runtime
78
79         if runtimeContext.submit_request_uuid:
80             container_request = self.arvrunner.api.container_requests().get(
81                 uuid=runtimeContext.submit_request_uuid
82             ).execute(num_retries=self.arvrunner.num_retries)
83         else:
84             container_request = {}
85
86         container_request["command"] = self.command_line
87         container_request["name"] = self.name
88         container_request["output_path"] = self.outdir
89         container_request["cwd"] = self.outdir
90         container_request["priority"] = runtimeContext.priority
91         container_request["state"] = "Committed"
92         container_request.setdefault("properties", {})
93
94         runtime_constraints = {}
95
96         if runtimeContext.project_uuid:
97             container_request["owner_uuid"] = runtimeContext.project_uuid
98
99         if self.arvrunner.secret_store.has_secret(self.command_line):
100             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
101
102         if self.arvrunner.secret_store.has_secret(self.environment):
103             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
104
105         resources = self.builder.resources
106         if resources is not None:
107             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
108             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
109
110         mounts = {
111             self.outdir: {
112                 "kind": "tmp",
113                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
114             },
115             self.tmpdir: {
116                 "kind": "tmp",
117                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
118             }
119         }
120         secret_mounts = {}
121         scheduling_parameters = {}
122
123         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
124         rf.sort(key=lambda k: k.resolved)
125         prevdir = None
126         for resolved, target, tp, stg in rf:
127             if not stg:
128                 continue
129             if prevdir and target.startswith(prevdir):
130                 continue
131             if tp == "Directory":
132                 targetdir = target
133             else:
134                 targetdir = os.path.dirname(target)
135             sp = resolved.split("/", 1)
136             pdh = sp[0][5:]   # remove "keep:"
137             mounts[targetdir] = {
138                 "kind": "collection",
139                 "portable_data_hash": pdh
140             }
141             if pdh in self.pathmapper.pdh_to_uuid:
142                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
143             if len(sp) == 2:
144                 if tp == "Directory":
145                     path = sp[1]
146                 else:
147                     path = os.path.dirname(sp[1])
148                 if path and path != "/":
149                     mounts[targetdir]["path"] = path
150             prevdir = targetdir + "/"
151
152         intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
153
154         with Perf(metrics, "generatefiles %s" % self.name):
155             if self.generatefiles["listing"]:
156                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
157                                                     keep_client=self.arvrunner.keep_client,
158                                                     num_retries=self.arvrunner.num_retries)
159                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
160                                                     separateDirs=False)
161
162                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
163
164                 logger.debug("generatemapper is %s", sorteditems)
165
166                 with Perf(metrics, "createfiles %s" % self.name):
167                     for f, p in sorteditems:
168                         if not p.target:
169                             continue
170
171                         if p.target.startswith("/"):
172                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
173                         else:
174                             dst = p.target
175
176                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
177                             if p.resolved.startswith("_:"):
178                                 vwd.mkdirs(dst)
179                             else:
180                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
181                                 vwd.copy(path or ".", dst, source_collection=source)
182                         elif p.type == "CreateFile":
183                             if self.arvrunner.secret_store.has_secret(p.resolved):
184                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
185                                 secret_mounts[mountpoint] = {
186                                     "kind": "text",
187                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
188                                 }
189                             else:
190                                 with vwd.open(dst, "w") as n:
191                                     n.write(p.resolved)
192
193                 def keepemptydirs(p):
194                     if isinstance(p, arvados.collection.RichCollectionBase):
195                         if len(p) == 0:
196                             p.open(".keep", "w").close()
197                         else:
198                             for c in p:
199                                 keepemptydirs(p[c])
200
201                 keepemptydirs(vwd)
202
203                 if not runtimeContext.current_container:
204                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
205                 vwd.save_new(name=intermediate_collection_info["name"],
206                              owner_uuid=runtimeContext.project_uuid,
207                              ensure_unique_name=True,
208                              trash_at=intermediate_collection_info["trash_at"],
209                              properties=intermediate_collection_info["properties"])
210
211                 prev = None
212                 for f, p in sorteditems:
213                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
214                         (prev is not None and p.target.startswith(prev))):
215                         continue
216                     if p.target.startswith("/"):
217                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
218                     else:
219                         dst = p.target
220                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
221                     mounts[mountpoint] = {"kind": "collection",
222                                           "portable_data_hash": vwd.portable_data_hash(),
223                                           "path": dst}
224                     if p.type.startswith("Writable"):
225                         mounts[mountpoint]["writable"] = True
226                     prev = p.target + "/"
227
228         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
229         if self.environment:
230             container_request["environment"].update(self.environment)
231
232         if self.stdin:
233             sp = self.stdin[6:].split("/", 1)
234             mounts["stdin"] = {"kind": "collection",
235                                 "portable_data_hash": sp[0],
236                                 "path": sp[1]}
237
238         if self.stderr:
239             mounts["stderr"] = {"kind": "file",
240                                 "path": "%s/%s" % (self.outdir, self.stderr)}
241
242         if self.stdout:
243             mounts["stdout"] = {"kind": "file",
244                                 "path": "%s/%s" % (self.outdir, self.stdout)}
245
246         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
247
248         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
249                                                                     docker_req,
250                                                                     runtimeContext.pull_image,
251                                                                     runtimeContext.project_uuid,
252                                                                     runtimeContext.force_docker_pull,
253                                                                     runtimeContext.tmp_outdir_prefix,
254                                                                     runtimeContext.match_local_docker,
255                                                                     runtimeContext.copy_deps)
256
257         network_req, _ = self.get_requirement("NetworkAccess")
258         if network_req:
259             runtime_constraints["API"] = network_req["networkAccess"]
260
261         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
262         if api_req:
263             runtime_constraints["API"] = True
264
265         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
266         if runtime_req:
267             if "keep_cache" in runtime_req:
268                 if self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheDisk", 0) > 0:
269                     # If DefaultKeepCacheDisk is non-zero it means we should use disk cache.
270                     runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20)
271                 else:
272                     runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
273             if "outputDirType" in runtime_req:
274                 if runtime_req["outputDirType"] == "local_output_dir":
275                     # Currently the default behavior.
276                     pass
277                 elif runtime_req["outputDirType"] == "keep_output_dir":
278                     mounts[self.outdir]= {
279                         "kind": "collection",
280                         "writable": True
281                     }
282
283         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
284         if partition_req:
285             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
286
287         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
288         if intermediate_output_req:
289             self.output_ttl = intermediate_output_req["outputTTL"]
290         else:
291             self.output_ttl = self.arvrunner.intermediate_output_ttl
292
293         if self.output_ttl < 0:
294             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
295
296
297         if self.arvrunner.api._rootDesc["revision"] >= "20210628":
298             storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
299             if storage_class_req and storage_class_req.get("intermediateStorageClass"):
300                 container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
301             else:
302                 container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
303
304         cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
305         if cuda_req:
306             runtime_constraints["cuda"] = {
307                 "device_count": resources.get("cudaDeviceCount", 1),
308                 "driver_version": cuda_req["cudaVersionMin"],
309                 "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
310             }
311
312         if runtimeContext.enable_preemptible is False:
313             scheduling_parameters["preemptible"] = False
314         else:
315             preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
316             if preemptible_req:
317                 scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
318             elif runtimeContext.enable_preemptible is True:
319                 scheduling_parameters["preemptible"] = True
320             elif runtimeContext.enable_preemptible is None:
321                 pass
322
323         if self.timelimit is not None and self.timelimit > 0:
324             scheduling_parameters["max_run_time"] = self.timelimit
325
326         extra_submit_params = {}
327         if runtimeContext.submit_runner_cluster:
328             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
329
330         container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name))
331         container_request["output_ttl"] = self.output_ttl
332         container_request["mounts"] = mounts
333         container_request["secret_mounts"] = secret_mounts
334         container_request["runtime_constraints"] = runtime_constraints
335         container_request["scheduling_parameters"] = scheduling_parameters
336
337         enable_reuse = runtimeContext.enable_reuse
338         if enable_reuse:
339             reuse_req, _ = self.get_requirement("WorkReuse")
340             if reuse_req:
341                 enable_reuse = reuse_req["enableReuse"]
342             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
343             if reuse_req:
344                 enable_reuse = reuse_req["enableReuse"]
345         container_request["use_existing"] = enable_reuse
346
347         properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
348         if properties_req:
349             for pr in properties_req["processProperties"]:
350                 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
351
352         output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
353         if output_properties_req:
354             if self.arvrunner.api._rootDesc["revision"] >= "20220510":
355                 container_request["output_properties"] = {}
356                 for pr in output_properties_req["outputProperties"]:
357                     container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
358             else:
359                 logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
360                                self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
361
362         if runtimeContext.runnerjob.startswith("arvwf:"):
363             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
364             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
365             if container_request["name"] == "main":
366                 container_request["name"] = wfrecord["name"]
367             container_request["properties"]["template_uuid"] = wfuuid
368
369         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
370
371         try:
372             if runtimeContext.submit_request_uuid:
373                 response = self.arvrunner.api.container_requests().update(
374                     uuid=runtimeContext.submit_request_uuid,
375                     body=container_request,
376                     **extra_submit_params
377                 ).execute(num_retries=self.arvrunner.num_retries)
378             else:
379                 response = self.arvrunner.api.container_requests().create(
380                     body=container_request,
381                     **extra_submit_params
382                 ).execute(num_retries=self.arvrunner.num_retries)
383
384             self.uuid = response["uuid"]
385             self.arvrunner.process_submitted(self)
386
387             if response["state"] == "Final":
388                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
389             else:
390                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
391         except Exception as e:
392             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
393             logger.debug("Container request was %s", container_request)
394             self.output_callback({}, "permanentFail")
395
396     def done(self, record):
397         outputs = {}
398         try:
399             container = self.arvrunner.api.containers().get(
400                 uuid=record["container_uuid"]
401             ).execute(num_retries=self.arvrunner.num_retries)
402             if container["state"] == "Complete":
403                 rcode = container["exit_code"]
404                 if self.successCodes and rcode in self.successCodes:
405                     processStatus = "success"
406                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
407                     processStatus = "temporaryFail"
408                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
409                     processStatus = "permanentFail"
410                 elif rcode == 0:
411                     processStatus = "success"
412                 else:
413                     processStatus = "permanentFail"
414
415                 if rcode == 137:
416                     logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin'.",
417                                  self.arvrunner.label(self))
418             else:
419                 processStatus = "permanentFail"
420
421             if processStatus == "permanentFail" and record["log_uuid"]:
422                 logc = arvados.collection.CollectionReader(record["log_uuid"],
423                                                            api_client=self.arvrunner.api,
424                                                            keep_client=self.arvrunner.keep_client,
425                                                            num_retries=self.arvrunner.num_retries)
426                 label = self.arvrunner.label(self)
427                 done.logtail(
428                     logc, logger.error,
429                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
430
431             if record["output_uuid"]:
432                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
433                     # Compute the trash time to avoid requesting the collection record.
434                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
435                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
436                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
437                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
438                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
439                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
440                 self.arvrunner.add_intermediate_output(record["output_uuid"])
441
442             if container["output"]:
443                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
444         except WorkflowException as e:
445             # Only include a stack trace if in debug mode.
446             # A stack trace may obfuscate more useful output about the workflow.
447             logger.error("%s unable to collect output from %s:\n%s",
448                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
449             processStatus = "permanentFail"
450         except Exception:
451             logger.exception("%s while getting output object:", self.arvrunner.label(self))
452             processStatus = "permanentFail"
453         finally:
454             self.output_callback(outputs, processStatus)
455
456
457 class RunnerContainer(Runner):
458     """Submit and manage a container that runs arvados-cwl-runner."""
459
460     def arvados_job_spec(self, runtimeContext, git_info):
461         """Create an Arvados container request for this workflow.
462
463         The returned dict can be used to create a container passed as
464         the +body+ argument to container_requests().create().
465         """
466
467         adjustDirObjs(self.job_order, trim_listing)
468         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
469         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
470
471         secret_mounts = {}
472         for param in sorted(self.job_order.keys()):
473             if self.secret_store.has_secret(self.job_order[param]):
474                 mnt = "/secrets/s%d" % len(secret_mounts)
475                 secret_mounts[mnt] = {
476                     "kind": "text",
477                     "content": self.secret_store.retrieve(self.job_order[param])
478                 }
479                 self.job_order[param] = {"$include": mnt}
480
481         container_req = {
482             "name": self.name,
483             "output_path": "/var/spool/cwl",
484             "cwd": "/var/spool/cwl",
485             "priority": self.priority,
486             "state": "Committed",
487             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
488             "mounts": {
489                 "/var/lib/cwl/cwl.input.json": {
490                     "kind": "json",
491                     "content": self.job_order
492                 },
493                 "stdout": {
494                     "kind": "file",
495                     "path": "/var/spool/cwl/cwl.output.json"
496                 },
497                 "/var/spool/cwl": {
498                     "kind": "collection",
499                     "writable": True
500                 }
501             },
502             "secret_mounts": secret_mounts,
503             "runtime_constraints": {
504                 "vcpus": math.ceil(self.submit_runner_cores),
505                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
506                 "API": True
507             },
508             "use_existing": False, # Never reuse the runner container - see #15497.
509             "properties": {}
510         }
511
512         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
513             sp = self.embedded_tool.tool["id"].split('/')
514             workflowcollection = sp[0][5:]
515             workflowname = "/".join(sp[1:])
516             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
517             container_req["mounts"]["/var/lib/cwl/workflow"] = {
518                 "kind": "collection",
519                 "portable_data_hash": "%s" % workflowcollection
520             }
521         else:
522             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
523             workflowpath = "/var/lib/cwl/workflow.json#main"
524             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
525                 "kind": "json",
526                 "content": packed
527             }
528             if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
529                 container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
530
531         container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
532
533         properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
534         if properties_req:
535             builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
536             for pr in properties_req["processProperties"]:
537                 container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
538
539         # --local means execute the workflow instead of submitting a container request
540         # --api=containers means use the containers API
541         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
542         # --disable-validate because we already validated so don't need to do it again
543         # --eval-timeout is the timeout for javascript invocation
544         # --parallel-task-count is the number of threads to use for job submission
545         # --enable/disable-reuse sets desired job reuse
546         # --collection-cache-size sets aside memory to store collections
547         command = ["arvados-cwl-runner",
548                    "--local",
549                    "--api=containers",
550                    "--no-log-timestamps",
551                    "--disable-validate",
552                    "--disable-color",
553                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
554                    "--thread-count=%s" % self.arvrunner.thread_count,
555                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
556                    "--collection-cache-size=%s" % self.collection_cache_size]
557
558         if self.output_name:
559             command.append("--output-name=" + self.output_name)
560             container_req["output_name"] = self.output_name
561
562         if self.output_tags:
563             command.append("--output-tags=" + self.output_tags)
564
565         if runtimeContext.debug:
566             command.append("--debug")
567
568         if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
569             command.append("--storage-classes=" + runtimeContext.storage_classes)
570
571         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
572             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
573
574         if runtimeContext.on_error:
575             command.append("--on-error=" + self.on_error)
576
577         if runtimeContext.intermediate_output_ttl:
578             command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
579
580         if runtimeContext.trash_intermediate:
581             command.append("--trash-intermediate")
582
583         if runtimeContext.project_uuid:
584             command.append("--project-uuid="+runtimeContext.project_uuid)
585
586         if self.enable_dev:
587             command.append("--enable-dev")
588
589         if runtimeContext.enable_preemptible is True:
590             command.append("--enable-preemptible")
591
592         if runtimeContext.enable_preemptible is False:
593             command.append("--disable-preemptible")
594
595         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
596
597         container_req["command"] = command
598
599         return container_req
600
601
602     def run(self, runtimeContext):
603         runtimeContext.keepprefix = "keep:"
604         job_spec = self.arvados_job_spec(runtimeContext, self.git_info)
605         if runtimeContext.project_uuid:
606             job_spec["owner_uuid"] = runtimeContext.project_uuid
607
608         extra_submit_params = {}
609         if runtimeContext.submit_runner_cluster:
610             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
611
612         if runtimeContext.submit_request_uuid:
613             if "cluster_id" in extra_submit_params:
614                 # Doesn't make sense for "update" and actually fails
615                 del extra_submit_params["cluster_id"]
616             response = self.arvrunner.api.container_requests().update(
617                 uuid=runtimeContext.submit_request_uuid,
618                 body=job_spec,
619                 **extra_submit_params
620             ).execute(num_retries=self.arvrunner.num_retries)
621         else:
622             response = self.arvrunner.api.container_requests().create(
623                 body=job_spec,
624                 **extra_submit_params
625             ).execute(num_retries=self.arvrunner.num_retries)
626
627         self.uuid = response["uuid"]
628         self.arvrunner.process_submitted(self)
629
630         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
631
632         workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
633         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
634         url = ""
635         if workbench2:
636             url = "{}processes/{}".format(workbench2, response["uuid"])
637         elif workbench1:
638             url = "{}container_requests/{}".format(workbench1, response["uuid"])
639         if url:
640             logger.info("Monitor workflow progress at %s", url)
641
642
643     def done(self, record):
644         try:
645             container = self.arvrunner.api.containers().get(
646                 uuid=record["container_uuid"]
647             ).execute(num_retries=self.arvrunner.num_retries)
648             container["log"] = record["log_uuid"]
649         except Exception:
650             logger.exception("%s while getting runner container", self.arvrunner.label(self))
651             self.arvrunner.output_callback({}, "permanentFail")
652         else:
653             super(RunnerContainer, self).done(container)