Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.job import JobBase
26
27 import arvados.collection
28
29 from .arvdocker import arv_docker_get_image
30 from . import done
31 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
32 from .fsaccess import CollectionFetcher
33 from .pathmapper import NoFollowPathMapper, trim_listing
34 from .perf import Perf
35 from ._version import __version__
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 def cleanup_name_for_collection(name):
41     return name.replace("/", " ")
42
43 class ArvadosContainer(JobBase):
44     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
45
46     def __init__(self, runner, job_runtime,
47                  builder,   # type: Builder
48                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
49                  make_path_mapper,  # type: Callable[..., PathMapper]
50                  requirements,      # type: List[Dict[Text, Text]]
51                  hints,     # type: List[Dict[Text, Text]]
52                  name       # type: Text
53     ):
54         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
55         self.arvrunner = runner
56         self.job_runtime = job_runtime
57         self.running = False
58         self.uuid = None
59
60     def update_pipeline_component(self, r):
61         pass
62
63     def _required_env(self):
64         env = {}
65         env["HOME"] = self.outdir
66         env["TMPDIR"] = self.tmpdir
67         return env
68
69     def run(self, toplevelRuntimeContext):
70         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
71         # which calls makeJobRunner() to get a new ArvadosContainer
72         # object.  The fields that define execution such as
73         # command_line, environment, etc are set on the
74         # ArvadosContainer object by CommandLineTool.job() before
75         # run() is called.
76
77         runtimeContext = self.job_runtime
78
79         if runtimeContext.submit_request_uuid:
80             container_request = self.arvrunner.api.container_requests().get(
81                 uuid=runtimeContext.submit_request_uuid
82             ).execute(num_retries=self.arvrunner.num_retries)
83         else:
84             container_request = {}
85
86         container_request["command"] = self.command_line
87         container_request["name"] = self.name
88         container_request["output_path"] = self.outdir
89         container_request["cwd"] = self.outdir
90         container_request["priority"] = runtimeContext.priority
91         container_request["state"] = "Committed"
92         container_request.setdefault("properties", {})
93
94         container_request["properties"]["cwl_input"] = self.joborder
95
96         runtime_constraints = {}
97
98         if runtimeContext.project_uuid:
99             container_request["owner_uuid"] = runtimeContext.project_uuid
100
101         if self.arvrunner.secret_store.has_secret(self.command_line):
102             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
103
104         if self.arvrunner.secret_store.has_secret(self.environment):
105             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
106
107         resources = self.builder.resources
108         if resources is not None:
109             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
110             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
111
112         mounts = {
113             self.outdir: {
114                 "kind": "tmp",
115                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
116             },
117             self.tmpdir: {
118                 "kind": "tmp",
119                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
120             }
121         }
122         secret_mounts = {}
123         scheduling_parameters = {}
124
125         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
126         rf.sort(key=lambda k: k.resolved)
127         prevdir = None
128         for resolved, target, tp, stg in rf:
129             if not stg:
130                 continue
131             if prevdir and target.startswith(prevdir):
132                 continue
133             if tp == "Directory":
134                 targetdir = target
135             else:
136                 targetdir = os.path.dirname(target)
137             sp = resolved.split("/", 1)
138             pdh = sp[0][5:]   # remove "keep:"
139             mounts[targetdir] = {
140                 "kind": "collection",
141                 "portable_data_hash": pdh
142             }
143             if pdh in self.pathmapper.pdh_to_uuid:
144                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
145             if len(sp) == 2:
146                 if tp == "Directory":
147                     path = sp[1]
148                 else:
149                     path = os.path.dirname(sp[1])
150                 if path and path != "/":
151                     mounts[targetdir]["path"] = path
152             prevdir = targetdir + "/"
153
154         intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
155
156         with Perf(metrics, "generatefiles %s" % self.name):
157             if self.generatefiles["listing"]:
158                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
159                                                     keep_client=self.arvrunner.keep_client,
160                                                     num_retries=self.arvrunner.num_retries)
161                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
162                                                     separateDirs=False)
163
164                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
165
166                 logger.debug("generatemapper is %s", sorteditems)
167
168                 with Perf(metrics, "createfiles %s" % self.name):
169                     for f, p in sorteditems:
170                         if not p.target:
171                             continue
172
173                         if p.target.startswith("/"):
174                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
175                         else:
176                             dst = p.target
177
178                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
179                             if p.resolved.startswith("_:"):
180                                 vwd.mkdirs(dst)
181                             else:
182                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
183                                 vwd.copy(path or ".", dst, source_collection=source)
184                         elif p.type == "CreateFile":
185                             if self.arvrunner.secret_store.has_secret(p.resolved):
186                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
187                                 secret_mounts[mountpoint] = {
188                                     "kind": "text",
189                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
190                                 }
191                             else:
192                                 with vwd.open(dst, "w") as n:
193                                     n.write(p.resolved)
194
195                 def keepemptydirs(p):
196                     if isinstance(p, arvados.collection.RichCollectionBase):
197                         if len(p) == 0:
198                             p.open(".keep", "w").close()
199                         else:
200                             for c in p:
201                                 keepemptydirs(p[c])
202
203                 keepemptydirs(vwd)
204
205                 if not runtimeContext.current_container:
206                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
207                 vwd.save_new(name=intermediate_collection_info["name"],
208                              owner_uuid=runtimeContext.project_uuid,
209                              ensure_unique_name=True,
210                              trash_at=intermediate_collection_info["trash_at"],
211                              properties=intermediate_collection_info["properties"])
212
213                 prev = None
214                 for f, p in sorteditems:
215                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
216                         (prev is not None and p.target.startswith(prev))):
217                         continue
218                     if p.target.startswith("/"):
219                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
220                     else:
221                         dst = p.target
222                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
223                     mounts[mountpoint] = {"kind": "collection",
224                                           "portable_data_hash": vwd.portable_data_hash(),
225                                           "path": dst}
226                     if p.type.startswith("Writable"):
227                         mounts[mountpoint]["writable"] = True
228                     prev = p.target + "/"
229
230         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
231         if self.environment:
232             container_request["environment"].update(self.environment)
233
234         if self.stdin:
235             sp = self.stdin[6:].split("/", 1)
236             mounts["stdin"] = {"kind": "collection",
237                                 "portable_data_hash": sp[0],
238                                 "path": sp[1]}
239
240         if self.stderr:
241             mounts["stderr"] = {"kind": "file",
242                                 "path": "%s/%s" % (self.outdir, self.stderr)}
243
244         if self.stdout:
245             mounts["stdout"] = {"kind": "file",
246                                 "path": "%s/%s" % (self.outdir, self.stdout)}
247
248         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
249
250         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
251                                                                     docker_req,
252                                                                     runtimeContext.pull_image,
253                                                                     runtimeContext.project_uuid,
254                                                                     runtimeContext.force_docker_pull,
255                                                                     runtimeContext.tmp_outdir_prefix,
256                                                                     runtimeContext.match_local_docker,
257                                                                     runtimeContext.copy_deps)
258
259         network_req, _ = self.get_requirement("NetworkAccess")
260         if network_req:
261             runtime_constraints["API"] = network_req["networkAccess"]
262
263         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
264         if api_req:
265             runtime_constraints["API"] = True
266
267         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
268         if runtime_req:
269             if "keep_cache" in runtime_req:
270                 if self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheDisk", 0) > 0:
271                     # If DefaultKeepCacheDisk is non-zero it means we should use disk cache.
272                     runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20)
273                 else:
274                     runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
275             if "outputDirType" in runtime_req:
276                 if runtime_req["outputDirType"] == "local_output_dir":
277                     # Currently the default behavior.
278                     pass
279                 elif runtime_req["outputDirType"] == "keep_output_dir":
280                     mounts[self.outdir]= {
281                         "kind": "collection",
282                         "writable": True
283                     }
284
285         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
286         if partition_req:
287             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
288
289         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
290         if intermediate_output_req:
291             self.output_ttl = intermediate_output_req["outputTTL"]
292         else:
293             self.output_ttl = self.arvrunner.intermediate_output_ttl
294
295         if self.output_ttl < 0:
296             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
297
298
299         if self.arvrunner.api._rootDesc["revision"] >= "20210628":
300             storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
301             if storage_class_req and storage_class_req.get("intermediateStorageClass"):
302                 container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
303             else:
304                 container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
305
306         cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
307         if cuda_req:
308             runtime_constraints["cuda"] = {
309                 "device_count": resources.get("cudaDeviceCount", 1),
310                 "driver_version": cuda_req["cudaVersionMin"],
311                 "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
312             }
313
314         if runtimeContext.enable_preemptible is False:
315             scheduling_parameters["preemptible"] = False
316         else:
317             preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
318             if preemptible_req:
319                 scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
320             elif runtimeContext.enable_preemptible is True:
321                 scheduling_parameters["preemptible"] = True
322             elif runtimeContext.enable_preemptible is None:
323                 pass
324
325         if self.timelimit is not None and self.timelimit > 0:
326             scheduling_parameters["max_run_time"] = self.timelimit
327
328         extra_submit_params = {}
329         if runtimeContext.submit_runner_cluster:
330             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
331
332         container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name))
333         container_request["output_ttl"] = self.output_ttl
334         container_request["mounts"] = mounts
335         container_request["secret_mounts"] = secret_mounts
336         container_request["runtime_constraints"] = runtime_constraints
337         container_request["scheduling_parameters"] = scheduling_parameters
338
339         enable_reuse = runtimeContext.enable_reuse
340         if enable_reuse:
341             reuse_req, _ = self.get_requirement("WorkReuse")
342             if reuse_req:
343                 enable_reuse = reuse_req["enableReuse"]
344             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
345             if reuse_req:
346                 enable_reuse = reuse_req["enableReuse"]
347         container_request["use_existing"] = enable_reuse
348
349         properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
350         if properties_req:
351             for pr in properties_req["processProperties"]:
352                 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
353
354         output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
355         if output_properties_req:
356             if self.arvrunner.api._rootDesc["revision"] >= "20220510":
357                 container_request["output_properties"] = {}
358                 for pr in output_properties_req["outputProperties"]:
359                     container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
360             else:
361                 logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
362                                self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
363
364         if runtimeContext.runnerjob.startswith("arvwf:"):
365             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
366             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
367             if container_request["name"] == "main":
368                 container_request["name"] = wfrecord["name"]
369             container_request["properties"]["template_uuid"] = wfuuid
370
371         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
372
373         try:
374             if runtimeContext.submit_request_uuid:
375                 response = self.arvrunner.api.container_requests().update(
376                     uuid=runtimeContext.submit_request_uuid,
377                     body=container_request,
378                     **extra_submit_params
379                 ).execute(num_retries=self.arvrunner.num_retries)
380             else:
381                 response = self.arvrunner.api.container_requests().create(
382                     body=container_request,
383                     **extra_submit_params
384                 ).execute(num_retries=self.arvrunner.num_retries)
385
386             self.uuid = response["uuid"]
387             self.arvrunner.process_submitted(self)
388
389             if response["state"] == "Final":
390                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
391             else:
392                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
393         except Exception as e:
394             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
395             logger.debug("Container request was %s", container_request)
396             self.output_callback({}, "permanentFail")
397
398     def done(self, record):
399         outputs = {}
400         try:
401             container = self.arvrunner.api.containers().get(
402                 uuid=record["container_uuid"]
403             ).execute(num_retries=self.arvrunner.num_retries)
404             if container["state"] == "Complete":
405                 rcode = container["exit_code"]
406                 if self.successCodes and rcode in self.successCodes:
407                     processStatus = "success"
408                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
409                     processStatus = "temporaryFail"
410                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
411                     processStatus = "permanentFail"
412                 elif rcode == 0:
413                     processStatus = "success"
414                 else:
415                     processStatus = "permanentFail"
416
417                 if rcode == 137:
418                     logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin'.",
419                                  self.arvrunner.label(self))
420             else:
421                 processStatus = "permanentFail"
422
423             if processStatus == "permanentFail" and record["log_uuid"]:
424                 logc = arvados.collection.CollectionReader(record["log_uuid"],
425                                                            api_client=self.arvrunner.api,
426                                                            keep_client=self.arvrunner.keep_client,
427                                                            num_retries=self.arvrunner.num_retries)
428                 label = self.arvrunner.label(self)
429                 done.logtail(
430                     logc, logger.error,
431                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
432
433             if record["output_uuid"]:
434                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
435                     # Compute the trash time to avoid requesting the collection record.
436                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
437                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
438                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
439                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
440                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
441                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
442                 self.arvrunner.add_intermediate_output(record["output_uuid"])
443
444             if container["output"]:
445                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
446
447             properties = record["properties"].copy()
448             properties["cwl_output"] = outputs
449             self.arvrunner.api.container_requests().update(
450                 uuid=self.uuid,
451                 body={"container_request": {"properties": properties}}
452             ).execute(num_retries=self.arvrunner.num_retries)
453         except WorkflowException as e:
454             # Only include a stack trace if in debug mode.
455             # A stack trace may obfuscate more useful output about the workflow.
456             logger.error("%s unable to collect output from %s:\n%s",
457                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
458             processStatus = "permanentFail"
459         except Exception:
460             logger.exception("%s while getting output object:", self.arvrunner.label(self))
461             processStatus = "permanentFail"
462         finally:
463             self.output_callback(outputs, processStatus)
464
465
466 class RunnerContainer(Runner):
467     """Submit and manage a container that runs arvados-cwl-runner."""
468
469     def arvados_job_spec(self, runtimeContext, git_info):
470         """Create an Arvados container request for this workflow.
471
472         The returned dict can be used to create a container passed as
473         the +body+ argument to container_requests().create().
474         """
475
476         adjustDirObjs(self.job_order, trim_listing)
477         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
478         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
479
480         secret_mounts = {}
481         for param in sorted(self.job_order.keys()):
482             if self.secret_store.has_secret(self.job_order[param]):
483                 mnt = "/secrets/s%d" % len(secret_mounts)
484                 secret_mounts[mnt] = {
485                     "kind": "text",
486                     "content": self.secret_store.retrieve(self.job_order[param])
487                 }
488                 self.job_order[param] = {"$include": mnt}
489
490         container_req = {
491             "name": self.name,
492             "output_path": "/var/spool/cwl",
493             "cwd": "/var/spool/cwl",
494             "priority": self.priority,
495             "state": "Committed",
496             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
497             "mounts": {
498                 "/var/lib/cwl/cwl.input.json": {
499                     "kind": "json",
500                     "content": self.job_order
501                 },
502                 "stdout": {
503                     "kind": "file",
504                     "path": "/var/spool/cwl/cwl.output.json"
505                 },
506                 "/var/spool/cwl": {
507                     "kind": "collection",
508                     "writable": True
509                 }
510             },
511             "secret_mounts": secret_mounts,
512             "runtime_constraints": {
513                 "vcpus": math.ceil(self.submit_runner_cores),
514                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
515                 "API": True
516             },
517             "use_existing": False, # Never reuse the runner container - see #15497.
518             "properties": {}
519         }
520
521         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
522             sp = self.embedded_tool.tool["id"].split('/')
523             workflowcollection = sp[0][5:]
524             workflowname = "/".join(sp[1:])
525             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
526             container_req["mounts"]["/var/lib/cwl/workflow"] = {
527                 "kind": "collection",
528                 "portable_data_hash": "%s" % workflowcollection
529             }
530         elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
531             workflowpath = "/var/lib/cwl/workflow.json#main"
532             record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
533             packed = yaml.safe_load(record["definition"])
534             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
535                 "kind": "json",
536                 "content": packed
537             }
538             container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
539         else:
540             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
541             workflowpath = "/var/lib/cwl/workflow.json#main"
542             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
543                 "kind": "json",
544                 "content": packed
545             }
546
547         container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
548
549         properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
550         if properties_req:
551             builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
552             for pr in properties_req["processProperties"]:
553                 container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
554
555         # --local means execute the workflow instead of submitting a container request
556         # --api=containers means use the containers API
557         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
558         # --disable-validate because we already validated so don't need to do it again
559         # --eval-timeout is the timeout for javascript invocation
560         # --parallel-task-count is the number of threads to use for job submission
561         # --enable/disable-reuse sets desired job reuse
562         # --collection-cache-size sets aside memory to store collections
563         command = ["arvados-cwl-runner",
564                    "--local",
565                    "--api=containers",
566                    "--no-log-timestamps",
567                    "--disable-validate",
568                    "--disable-color",
569                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
570                    "--thread-count=%s" % self.arvrunner.thread_count,
571                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
572                    "--collection-cache-size=%s" % self.collection_cache_size]
573
574         if self.output_name:
575             command.append("--output-name=" + self.output_name)
576             container_req["output_name"] = self.output_name
577
578         if self.output_tags:
579             command.append("--output-tags=" + self.output_tags)
580
581         if runtimeContext.debug:
582             command.append("--debug")
583
584         if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
585             command.append("--storage-classes=" + runtimeContext.storage_classes)
586
587         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
588             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
589
590         if runtimeContext.on_error:
591             command.append("--on-error=" + self.on_error)
592
593         if runtimeContext.intermediate_output_ttl:
594             command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
595
596         if runtimeContext.trash_intermediate:
597             command.append("--trash-intermediate")
598
599         if runtimeContext.project_uuid:
600             command.append("--project-uuid="+runtimeContext.project_uuid)
601
602         if self.enable_dev:
603             command.append("--enable-dev")
604
605         if runtimeContext.enable_preemptible is True:
606             command.append("--enable-preemptible")
607
608         if runtimeContext.enable_preemptible is False:
609             command.append("--disable-preemptible")
610
611         if runtimeContext.varying_url_params:
612             command.append("--varying-url-params="+runtimeContext.varying_url_params)
613
614         if runtimeContext.prefer_cached_downloads:
615             command.append("--prefer-cached-downloads")
616
617         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
618
619         container_req["command"] = command
620
621         return container_req
622
623
624     def run(self, runtimeContext):
625         runtimeContext.keepprefix = "keep:"
626         job_spec = self.arvados_job_spec(runtimeContext, self.git_info)
627         if runtimeContext.project_uuid:
628             job_spec["owner_uuid"] = runtimeContext.project_uuid
629
630         extra_submit_params = {}
631         if runtimeContext.submit_runner_cluster:
632             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
633
634         if runtimeContext.submit_request_uuid:
635             if "cluster_id" in extra_submit_params:
636                 # Doesn't make sense for "update" and actually fails
637                 del extra_submit_params["cluster_id"]
638             response = self.arvrunner.api.container_requests().update(
639                 uuid=runtimeContext.submit_request_uuid,
640                 body=job_spec,
641                 **extra_submit_params
642             ).execute(num_retries=self.arvrunner.num_retries)
643         else:
644             response = self.arvrunner.api.container_requests().create(
645                 body=job_spec,
646                 **extra_submit_params
647             ).execute(num_retries=self.arvrunner.num_retries)
648
649         self.uuid = response["uuid"]
650         self.arvrunner.process_submitted(self)
651
652         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
653
654         workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
655         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
656         url = ""
657         if workbench2:
658             url = "{}processes/{}".format(workbench2, response["uuid"])
659         elif workbench1:
660             url = "{}container_requests/{}".format(workbench1, response["uuid"])
661         if url:
662             logger.info("Monitor workflow progress at %s", url)
663
664
665     def done(self, record):
666         try:
667             container = self.arvrunner.api.containers().get(
668                 uuid=record["container_uuid"]
669             ).execute(num_retries=self.arvrunner.num_retries)
670             container["log"] = record["log_uuid"]
671         except Exception:
672             logger.exception("%s while getting runner container", self.arvrunner.label(self))
673             self.arvrunner.output_callback({}, "permanentFail")
674         else:
675             super(RunnerContainer, self).done(container)