f3e122e603f8cd9cf9b768158132f28d7190b685
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.job import JobBase
26
27 import arvados.collection
28
29 from .arvdocker import arv_docker_get_image
30 from . import done
31 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
32 from .fsaccess import CollectionFetcher
33 from .pathmapper import NoFollowPathMapper, trim_listing
34 from .perf import Perf
35 from ._version import __version__
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 class ArvadosContainer(JobBase):
41     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
42
43     def __init__(self, runner, job_runtime,
44                  builder,   # type: Builder
45                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
46                  make_path_mapper,  # type: Callable[..., PathMapper]
47                  requirements,      # type: List[Dict[Text, Text]]
48                  hints,     # type: List[Dict[Text, Text]]
49                  name       # type: Text
50     ):
51         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
52         self.arvrunner = runner
53         self.job_runtime = job_runtime
54         self.running = False
55         self.uuid = None
56
57     def update_pipeline_component(self, r):
58         pass
59
60     def _required_env(self):
61         env = {}
62         env["HOME"] = self.outdir
63         env["TMPDIR"] = self.tmpdir
64         return env
65
66     def run(self, toplevelRuntimeContext):
67         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
68         # which calls makeJobRunner() to get a new ArvadosContainer
69         # object.  The fields that define execution such as
70         # command_line, environment, etc are set on the
71         # ArvadosContainer object by CommandLineTool.job() before
72         # run() is called.
73
74         runtimeContext = self.job_runtime
75
76         if runtimeContext.submit_request_uuid:
77             container_request = self.arvrunner.api.container_requests().get(
78                 uuid=runtimeContext.submit_request_uuid
79             ).execute(num_retries=self.arvrunner.num_retries)
80         else:
81             container_request = {}
82
83         container_request["command"] = self.command_line
84         container_request["name"] = self.name
85         container_request["output_path"] = self.outdir
86         container_request["cwd"] = self.outdir
87         container_request["priority"] = runtimeContext.priority
88         container_request["state"] = "Committed"
89         container_request.setdefault("properties", {})
90
91         runtime_constraints = {}
92
93         if runtimeContext.project_uuid:
94             container_request["owner_uuid"] = runtimeContext.project_uuid
95
96         if self.arvrunner.secret_store.has_secret(self.command_line):
97             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
98
99         if self.arvrunner.secret_store.has_secret(self.environment):
100             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
101
102         resources = self.builder.resources
103         if resources is not None:
104             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
105             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
106
107         mounts = {
108             self.outdir: {
109                 "kind": "tmp",
110                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
111             },
112             self.tmpdir: {
113                 "kind": "tmp",
114                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
115             }
116         }
117         secret_mounts = {}
118         scheduling_parameters = {}
119
120         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
121         rf.sort(key=lambda k: k.resolved)
122         prevdir = None
123         for resolved, target, tp, stg in rf:
124             if not stg:
125                 continue
126             if prevdir and target.startswith(prevdir):
127                 continue
128             if tp == "Directory":
129                 targetdir = target
130             else:
131                 targetdir = os.path.dirname(target)
132             sp = resolved.split("/", 1)
133             pdh = sp[0][5:]   # remove "keep:"
134             mounts[targetdir] = {
135                 "kind": "collection",
136                 "portable_data_hash": pdh
137             }
138             if pdh in self.pathmapper.pdh_to_uuid:
139                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
140             if len(sp) == 2:
141                 if tp == "Directory":
142                     path = sp[1]
143                 else:
144                     path = os.path.dirname(sp[1])
145                 if path and path != "/":
146                     mounts[targetdir]["path"] = path
147             prevdir = targetdir + "/"
148
149         intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
150
151         with Perf(metrics, "generatefiles %s" % self.name):
152             if self.generatefiles["listing"]:
153                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
154                                                     keep_client=self.arvrunner.keep_client,
155                                                     num_retries=self.arvrunner.num_retries)
156                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
157                                                     separateDirs=False)
158
159                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
160
161                 logger.debug("generatemapper is %s", sorteditems)
162
163                 with Perf(metrics, "createfiles %s" % self.name):
164                     for f, p in sorteditems:
165                         if not p.target:
166                             continue
167
168                         if p.target.startswith("/"):
169                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
170                         else:
171                             dst = p.target
172
173                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
174                             if p.resolved.startswith("_:"):
175                                 vwd.mkdirs(dst)
176                             else:
177                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
178                                 vwd.copy(path or ".", dst, source_collection=source)
179                         elif p.type == "CreateFile":
180                             if self.arvrunner.secret_store.has_secret(p.resolved):
181                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
182                                 secret_mounts[mountpoint] = {
183                                     "kind": "text",
184                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
185                                 }
186                             else:
187                                 with vwd.open(dst, "w") as n:
188                                     n.write(p.resolved)
189
190                 def keepemptydirs(p):
191                     if isinstance(p, arvados.collection.RichCollectionBase):
192                         if len(p) == 0:
193                             p.open(".keep", "w").close()
194                         else:
195                             for c in p:
196                                 keepemptydirs(p[c])
197
198                 keepemptydirs(vwd)
199
200                 if not runtimeContext.current_container:
201                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
202                 vwd.save_new(name=intermediate_collection_info["name"],
203                              owner_uuid=runtimeContext.project_uuid,
204                              ensure_unique_name=True,
205                              trash_at=intermediate_collection_info["trash_at"],
206                              properties=intermediate_collection_info["properties"])
207
208                 prev = None
209                 for f, p in sorteditems:
210                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
211                         (prev is not None and p.target.startswith(prev))):
212                         continue
213                     if p.target.startswith("/"):
214                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
215                     else:
216                         dst = p.target
217                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
218                     mounts[mountpoint] = {"kind": "collection",
219                                           "portable_data_hash": vwd.portable_data_hash(),
220                                           "path": dst}
221                     if p.type.startswith("Writable"):
222                         mounts[mountpoint]["writable"] = True
223                     prev = p.target + "/"
224
225         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
226         if self.environment:
227             container_request["environment"].update(self.environment)
228
229         if self.stdin:
230             sp = self.stdin[6:].split("/", 1)
231             mounts["stdin"] = {"kind": "collection",
232                                 "portable_data_hash": sp[0],
233                                 "path": sp[1]}
234
235         if self.stderr:
236             mounts["stderr"] = {"kind": "file",
237                                 "path": "%s/%s" % (self.outdir, self.stderr)}
238
239         if self.stdout:
240             mounts["stdout"] = {"kind": "file",
241                                 "path": "%s/%s" % (self.outdir, self.stdout)}
242
243         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
244
245         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
246                                                                     docker_req,
247                                                                     runtimeContext.pull_image,
248                                                                     runtimeContext.project_uuid,
249                                                                     runtimeContext.force_docker_pull,
250                                                                     runtimeContext.tmp_outdir_prefix,
251                                                                     runtimeContext.match_local_docker,
252                                                                     runtimeContext.copy_deps)
253
254         network_req, _ = self.get_requirement("NetworkAccess")
255         if network_req:
256             runtime_constraints["API"] = network_req["networkAccess"]
257
258         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
259         if api_req:
260             runtime_constraints["API"] = True
261
262         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
263         if runtime_req:
264             if "keep_cache" in runtime_req:
265                 runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
266             if "outputDirType" in runtime_req:
267                 if runtime_req["outputDirType"] == "local_output_dir":
268                     # Currently the default behavior.
269                     pass
270                 elif runtime_req["outputDirType"] == "keep_output_dir":
271                     mounts[self.outdir]= {
272                         "kind": "collection",
273                         "writable": True
274                     }
275
276         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
277         if partition_req:
278             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
279
280         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
281         if intermediate_output_req:
282             self.output_ttl = intermediate_output_req["outputTTL"]
283         else:
284             self.output_ttl = self.arvrunner.intermediate_output_ttl
285
286         if self.output_ttl < 0:
287             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
288
289
290         if self.arvrunner.api._rootDesc["revision"] >= "20210628":
291             storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
292             if storage_class_req and storage_class_req.get("intermediateStorageClass"):
293                 container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
294             else:
295                 container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
296
297         cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
298         if cuda_req:
299             runtime_constraints["cuda"] = {
300                 "device_count": resources.get("cudaDeviceCount", 1),
301                 "driver_version": cuda_req["cudaVersionMin"],
302                 "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
303             }
304
305         if runtimeContext.enable_preemptible is False:
306             scheduling_parameters["preemptible"] = False
307         else:
308             preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
309             if preemptible_req:
310                 scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
311             elif runtimeContext.enable_preemptible is True:
312                 scheduling_parameters["preemptible"] = True
313             elif runtimeContext.enable_preemptible is None:
314                 pass
315
316         if self.timelimit is not None and self.timelimit > 0:
317             scheduling_parameters["max_run_time"] = self.timelimit
318
319         extra_submit_params = {}
320         if runtimeContext.submit_runner_cluster:
321             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
322
323         container_request["output_name"] = "Output from step %s" % (self.name)
324         container_request["output_ttl"] = self.output_ttl
325         container_request["mounts"] = mounts
326         container_request["secret_mounts"] = secret_mounts
327         container_request["runtime_constraints"] = runtime_constraints
328         container_request["scheduling_parameters"] = scheduling_parameters
329
330         enable_reuse = runtimeContext.enable_reuse
331         if enable_reuse:
332             reuse_req, _ = self.get_requirement("WorkReuse")
333             if reuse_req:
334                 enable_reuse = reuse_req["enableReuse"]
335             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
336             if reuse_req:
337                 enable_reuse = reuse_req["enableReuse"]
338         container_request["use_existing"] = enable_reuse
339
340         properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
341         if properties_req:
342             for pr in properties_req["processProperties"]:
343                 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
344
345         output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
346         if output_properties_req:
347             if self.arvrunner.api._rootDesc["revision"] >= "20220510":
348                 container_request["output_properties"] = {}
349                 for pr in output_properties_req["outputProperties"]:
350                     container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
351             else:
352                 logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
353                                self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
354
355         if runtimeContext.runnerjob.startswith("arvwf:"):
356             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
357             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
358             if container_request["name"] == "main":
359                 container_request["name"] = wfrecord["name"]
360             container_request["properties"]["template_uuid"] = wfuuid
361
362         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
363
364         try:
365             if runtimeContext.submit_request_uuid:
366                 response = self.arvrunner.api.container_requests().update(
367                     uuid=runtimeContext.submit_request_uuid,
368                     body=container_request,
369                     **extra_submit_params
370                 ).execute(num_retries=self.arvrunner.num_retries)
371             else:
372                 response = self.arvrunner.api.container_requests().create(
373                     body=container_request,
374                     **extra_submit_params
375                 ).execute(num_retries=self.arvrunner.num_retries)
376
377             self.uuid = response["uuid"]
378             self.arvrunner.process_submitted(self)
379
380             if response["state"] == "Final":
381                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
382             else:
383                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
384         except Exception as e:
385             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
386             logger.debug("Container request was %s", container_request)
387             self.output_callback({}, "permanentFail")
388
389     def done(self, record):
390         outputs = {}
391         try:
392             container = self.arvrunner.api.containers().get(
393                 uuid=record["container_uuid"]
394             ).execute(num_retries=self.arvrunner.num_retries)
395             if container["state"] == "Complete":
396                 rcode = container["exit_code"]
397                 if self.successCodes and rcode in self.successCodes:
398                     processStatus = "success"
399                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
400                     processStatus = "temporaryFail"
401                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
402                     processStatus = "permanentFail"
403                 elif rcode == 0:
404                     processStatus = "success"
405                 else:
406                     processStatus = "permanentFail"
407
408                 if rcode == 137:
409                     logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin'.",
410                                  self.arvrunner.label(self))
411             else:
412                 processStatus = "permanentFail"
413
414             if processStatus == "permanentFail" and record["log_uuid"]:
415                 logc = arvados.collection.CollectionReader(record["log_uuid"],
416                                                            api_client=self.arvrunner.api,
417                                                            keep_client=self.arvrunner.keep_client,
418                                                            num_retries=self.arvrunner.num_retries)
419                 label = self.arvrunner.label(self)
420                 done.logtail(
421                     logc, logger.error,
422                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
423
424             if record["output_uuid"]:
425                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
426                     # Compute the trash time to avoid requesting the collection record.
427                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
428                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
429                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
430                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
431                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
432                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
433                 self.arvrunner.add_intermediate_output(record["output_uuid"])
434
435             if container["output"]:
436                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
437         except WorkflowException as e:
438             # Only include a stack trace if in debug mode.
439             # A stack trace may obfuscate more useful output about the workflow.
440             logger.error("%s unable to collect output from %s:\n%s",
441                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
442             processStatus = "permanentFail"
443         except Exception:
444             logger.exception("%s while getting output object:", self.arvrunner.label(self))
445             processStatus = "permanentFail"
446         finally:
447             self.output_callback(outputs, processStatus)
448
449
450 class RunnerContainer(Runner):
451     """Submit and manage a container that runs arvados-cwl-runner."""
452
453     def arvados_job_spec(self, runtimeContext):
454         """Create an Arvados container request for this workflow.
455
456         The returned dict can be used to create a container passed as
457         the +body+ argument to container_requests().create().
458         """
459
460         adjustDirObjs(self.job_order, trim_listing)
461         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
462         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
463
464         secret_mounts = {}
465         for param in sorted(self.job_order.keys()):
466             if self.secret_store.has_secret(self.job_order[param]):
467                 mnt = "/secrets/s%d" % len(secret_mounts)
468                 secret_mounts[mnt] = {
469                     "kind": "text",
470                     "content": self.secret_store.retrieve(self.job_order[param])
471                 }
472                 self.job_order[param] = {"$include": mnt}
473
474         container_req = {
475             "name": self.name,
476             "output_path": "/var/spool/cwl",
477             "cwd": "/var/spool/cwl",
478             "priority": self.priority,
479             "state": "Committed",
480             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
481             "mounts": {
482                 "/var/lib/cwl/cwl.input.json": {
483                     "kind": "json",
484                     "content": self.job_order
485                 },
486                 "stdout": {
487                     "kind": "file",
488                     "path": "/var/spool/cwl/cwl.output.json"
489                 },
490                 "/var/spool/cwl": {
491                     "kind": "collection",
492                     "writable": True
493                 }
494             },
495             "secret_mounts": secret_mounts,
496             "runtime_constraints": {
497                 "vcpus": math.ceil(self.submit_runner_cores),
498                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
499                 "API": True
500             },
501             "use_existing": False, # Never reuse the runner container - see #15497.
502             "properties": {}
503         }
504
505         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
506             sp = self.embedded_tool.tool["id"].split('/')
507             workflowcollection = sp[0][5:]
508             workflowname = "/".join(sp[1:])
509             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
510             container_req["mounts"]["/var/lib/cwl/workflow"] = {
511                 "kind": "collection",
512                 "portable_data_hash": "%s" % workflowcollection
513             }
514         else:
515             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext)
516             workflowpath = "/var/lib/cwl/workflow.json#main"
517             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
518                 "kind": "json",
519                 "content": packed
520             }
521             if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
522                 container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
523
524         properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
525         if properties_req:
526             builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
527             for pr in properties_req["processProperties"]:
528                 container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
529
530         # --local means execute the workflow instead of submitting a container request
531         # --api=containers means use the containers API
532         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
533         # --disable-validate because we already validated so don't need to do it again
534         # --eval-timeout is the timeout for javascript invocation
535         # --parallel-task-count is the number of threads to use for job submission
536         # --enable/disable-reuse sets desired job reuse
537         # --collection-cache-size sets aside memory to store collections
538         command = ["arvados-cwl-runner",
539                    "--local",
540                    "--api=containers",
541                    "--no-log-timestamps",
542                    "--disable-validate",
543                    "--disable-color",
544                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
545                    "--thread-count=%s" % self.arvrunner.thread_count,
546                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
547                    "--collection-cache-size=%s" % self.collection_cache_size]
548
549         if self.output_name:
550             command.append("--output-name=" + self.output_name)
551             container_req["output_name"] = self.output_name
552
553         if self.output_tags:
554             command.append("--output-tags=" + self.output_tags)
555
556         if runtimeContext.debug:
557             command.append("--debug")
558
559         if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
560             command.append("--storage-classes=" + runtimeContext.storage_classes)
561
562         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
563             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
564
565         if runtimeContext.on_error:
566             command.append("--on-error=" + self.on_error)
567
568         if runtimeContext.intermediate_output_ttl:
569             command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
570
571         if runtimeContext.trash_intermediate:
572             command.append("--trash-intermediate")
573
574         if runtimeContext.project_uuid:
575             command.append("--project-uuid="+runtimeContext.project_uuid)
576
577         if self.enable_dev:
578             command.append("--enable-dev")
579
580         if runtimeContext.enable_preemptible is True:
581             command.append("--enable-preemptible")
582
583         if runtimeContext.enable_preemptible is False:
584             command.append("--disable-preemptible")
585
586         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
587
588         container_req["command"] = command
589
590         return container_req
591
592
593     def run(self, runtimeContext):
594         runtimeContext.keepprefix = "keep:"
595         job_spec = self.arvados_job_spec(runtimeContext)
596         if runtimeContext.project_uuid:
597             job_spec["owner_uuid"] = runtimeContext.project_uuid
598
599         extra_submit_params = {}
600         if runtimeContext.submit_runner_cluster:
601             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
602
603         if runtimeContext.submit_request_uuid:
604             if "cluster_id" in extra_submit_params:
605                 # Doesn't make sense for "update" and actually fails
606                 del extra_submit_params["cluster_id"]
607             response = self.arvrunner.api.container_requests().update(
608                 uuid=runtimeContext.submit_request_uuid,
609                 body=job_spec,
610                 **extra_submit_params
611             ).execute(num_retries=self.arvrunner.num_retries)
612         else:
613             response = self.arvrunner.api.container_requests().create(
614                 body=job_spec,
615                 **extra_submit_params
616             ).execute(num_retries=self.arvrunner.num_retries)
617
618         self.uuid = response["uuid"]
619         self.arvrunner.process_submitted(self)
620
621         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
622
623         workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
624         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
625         url = ""
626         if workbench2:
627             url = "{}processes/{}".format(workbench2, response["uuid"])
628         elif workbench1:
629             url = "{}container_requests/{}".format(workbench1, response["uuid"])
630         if url:
631             logger.info("Monitor workflow progress at %s", url)
632
633
634     def done(self, record):
635         try:
636             container = self.arvrunner.api.containers().get(
637                 uuid=record["container_uuid"]
638             ).execute(num_retries=self.arvrunner.num_retries)
639             container["log"] = record["log_uuid"]
640         except Exception:
641             logger.exception("%s while getting runner container", self.arvrunner.label(self))
642             self.arvrunner.output_callback({}, "permanentFail")
643         else:
644             super(RunnerContainer, self).done(container)