Merge branch '19954-permission-dedup-doc'
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8
9 import logging
10 import json
11 import os
12 import urllib.request, urllib.parse, urllib.error
13 import time
14 import datetime
15 import ciso8601
16 import uuid
17 import math
18
19 import arvados_cwl.util
20 import ruamel.yaml as yaml
21
22 from cwltool.errors import WorkflowException
23 from cwltool.process import UnsupportedRequirement, shortname
24 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
25 from cwltool.job import JobBase
26
27 import arvados.collection
28
29 from .arvdocker import arv_docker_get_image
30 from . import done
31 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
32 from .fsaccess import CollectionFetcher
33 from .pathmapper import NoFollowPathMapper, trim_listing
34 from .perf import Perf
35 from ._version import __version__
36
37 logger = logging.getLogger('arvados.cwl-runner')
38 metrics = logging.getLogger('arvados.cwl-runner.metrics')
39
40 def cleanup_name_for_collection(name):
41     return name.replace("/", " ")
42
43 class ArvadosContainer(JobBase):
44     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
45
46     def __init__(self, runner, job_runtime,
47                  builder,   # type: Builder
48                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
49                  make_path_mapper,  # type: Callable[..., PathMapper]
50                  requirements,      # type: List[Dict[Text, Text]]
51                  hints,     # type: List[Dict[Text, Text]]
52                  name       # type: Text
53     ):
54         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
55         self.arvrunner = runner
56         self.job_runtime = job_runtime
57         self.running = False
58         self.uuid = None
59
60     def update_pipeline_component(self, r):
61         pass
62
63     def _required_env(self):
64         env = {}
65         env["HOME"] = self.outdir
66         env["TMPDIR"] = self.tmpdir
67         return env
68
69     def run(self, toplevelRuntimeContext):
70         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
71         # which calls makeJobRunner() to get a new ArvadosContainer
72         # object.  The fields that define execution such as
73         # command_line, environment, etc are set on the
74         # ArvadosContainer object by CommandLineTool.job() before
75         # run() is called.
76
77         runtimeContext = self.job_runtime
78
79         if runtimeContext.submit_request_uuid:
80             container_request = self.arvrunner.api.container_requests().get(
81                 uuid=runtimeContext.submit_request_uuid
82             ).execute(num_retries=self.arvrunner.num_retries)
83         else:
84             container_request = {}
85
86         container_request["command"] = self.command_line
87         container_request["name"] = self.name
88         container_request["output_path"] = self.outdir
89         container_request["cwd"] = self.outdir
90         container_request["priority"] = runtimeContext.priority
91         container_request["state"] = "Committed"
92         container_request.setdefault("properties", {})
93
94         container_request["properties"]["cwl_input"] = self.joborder
95
96         runtime_constraints = {}
97
98         if runtimeContext.project_uuid:
99             container_request["owner_uuid"] = runtimeContext.project_uuid
100
101         if self.arvrunner.secret_store.has_secret(self.command_line):
102             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
103
104         if self.arvrunner.secret_store.has_secret(self.environment):
105             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
106
107         resources = self.builder.resources
108         if resources is not None:
109             runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
110             runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
111
112         mounts = {
113             self.outdir: {
114                 "kind": "tmp",
115                 "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
116             },
117             self.tmpdir: {
118                 "kind": "tmp",
119                 "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
120             }
121         }
122         secret_mounts = {}
123         scheduling_parameters = {}
124
125         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
126         rf.sort(key=lambda k: k.resolved)
127         prevdir = None
128         for resolved, target, tp, stg in rf:
129             if not stg:
130                 continue
131             if prevdir and target.startswith(prevdir):
132                 continue
133             if tp == "Directory":
134                 targetdir = target
135             else:
136                 targetdir = os.path.dirname(target)
137             sp = resolved.split("/", 1)
138             pdh = sp[0][5:]   # remove "keep:"
139             mounts[targetdir] = {
140                 "kind": "collection",
141                 "portable_data_hash": pdh
142             }
143             if pdh in self.pathmapper.pdh_to_uuid:
144                 mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
145             if len(sp) == 2:
146                 if tp == "Directory":
147                     path = sp[1]
148                 else:
149                     path = os.path.dirname(sp[1])
150                 if path and path != "/":
151                     mounts[targetdir]["path"] = path
152             prevdir = targetdir + "/"
153
154         intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
155
156         with Perf(metrics, "generatefiles %s" % self.name):
157             if self.generatefiles["listing"]:
158                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
159                                                     keep_client=self.arvrunner.keep_client,
160                                                     num_retries=self.arvrunner.num_retries)
161                 generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
162                                                     separateDirs=False)
163
164                 sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target)
165
166                 logger.debug("generatemapper is %s", sorteditems)
167
168                 with Perf(metrics, "createfiles %s" % self.name):
169                     for f, p in sorteditems:
170                         if not p.target:
171                             continue
172
173                         if p.target.startswith("/"):
174                             dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
175                         else:
176                             dst = p.target
177
178                         if p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
179                             if p.resolved.startswith("_:"):
180                                 vwd.mkdirs(dst)
181                             else:
182                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
183                                 vwd.copy(path or ".", dst, source_collection=source)
184                         elif p.type == "CreateFile":
185                             if self.arvrunner.secret_store.has_secret(p.resolved):
186                                 mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
187                                 secret_mounts[mountpoint] = {
188                                     "kind": "text",
189                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
190                                 }
191                             else:
192                                 with vwd.open(dst, "w") as n:
193                                     n.write(p.resolved)
194
195                 def keepemptydirs(p):
196                     if isinstance(p, arvados.collection.RichCollectionBase):
197                         if len(p) == 0:
198                             p.open(".keep", "w").close()
199                         else:
200                             for c in p:
201                                 keepemptydirs(p[c])
202
203                 keepemptydirs(vwd)
204
205                 if not runtimeContext.current_container:
206                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
207                 vwd.save_new(name=intermediate_collection_info["name"],
208                              owner_uuid=runtimeContext.project_uuid,
209                              ensure_unique_name=True,
210                              trash_at=intermediate_collection_info["trash_at"],
211                              properties=intermediate_collection_info["properties"])
212
213                 prev = None
214                 for f, p in sorteditems:
215                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
216                         (prev is not None and p.target.startswith(prev))):
217                         continue
218                     if p.target.startswith("/"):
219                         dst = p.target[len(self.outdir)+1:] if p.target.startswith(self.outdir+"/") else p.target[1:]
220                     else:
221                         dst = p.target
222                     mountpoint = p.target if p.target.startswith("/") else os.path.join(self.outdir, p.target)
223                     mounts[mountpoint] = {"kind": "collection",
224                                           "portable_data_hash": vwd.portable_data_hash(),
225                                           "path": dst}
226                     if p.type.startswith("Writable"):
227                         mounts[mountpoint]["writable"] = True
228                     prev = p.target + "/"
229
230         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
231         if self.environment:
232             container_request["environment"].update(self.environment)
233
234         if self.stdin:
235             sp = self.stdin[6:].split("/", 1)
236             mounts["stdin"] = {"kind": "collection",
237                                 "portable_data_hash": sp[0],
238                                 "path": sp[1]}
239
240         if self.stderr:
241             mounts["stderr"] = {"kind": "file",
242                                 "path": "%s/%s" % (self.outdir, self.stderr)}
243
244         if self.stdout:
245             mounts["stdout"] = {"kind": "file",
246                                 "path": "%s/%s" % (self.outdir, self.stdout)}
247
248         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
249
250         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
251                                                                     docker_req,
252                                                                     runtimeContext.pull_image,
253                                                                     runtimeContext.project_uuid,
254                                                                     runtimeContext.force_docker_pull,
255                                                                     runtimeContext.tmp_outdir_prefix,
256                                                                     runtimeContext.match_local_docker,
257                                                                     runtimeContext.copy_deps)
258
259         network_req, _ = self.get_requirement("NetworkAccess")
260         if network_req:
261             runtime_constraints["API"] = network_req["networkAccess"]
262
263         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
264         if api_req:
265             runtime_constraints["API"] = True
266
267         use_disk_cache = (self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheRAM", 0) == 0)
268
269         keep_cache_type_req, _ = self.get_requirement("http://arvados.org/cwl#KeepCacheTypeRequirement")
270         if keep_cache_type_req:
271             if "keepCacheType" in keep_cache_type_req:
272                 if keep_cache_type_req["keepCacheType"] == "ram_cache":
273                     use_disk_cache = False
274
275         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
276         if runtime_req:
277             if "keep_cache" in runtime_req:
278                 if use_disk_cache:
279                     # If DefaultKeepCacheRAM is zero it means we should use disk cache.
280                     runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20)
281                 else:
282                     runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
283             if "outputDirType" in runtime_req:
284                 if runtime_req["outputDirType"] == "local_output_dir":
285                     # Currently the default behavior.
286                     pass
287                 elif runtime_req["outputDirType"] == "keep_output_dir":
288                     mounts[self.outdir]= {
289                         "kind": "collection",
290                         "writable": True
291                     }
292
293         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
294         if partition_req:
295             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
296
297         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
298         if intermediate_output_req:
299             self.output_ttl = intermediate_output_req["outputTTL"]
300         else:
301             self.output_ttl = self.arvrunner.intermediate_output_ttl
302
303         if self.output_ttl < 0:
304             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
305
306
307         if self.arvrunner.api._rootDesc["revision"] >= "20210628":
308             storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
309             if storage_class_req and storage_class_req.get("intermediateStorageClass"):
310                 container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
311             else:
312                 container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
313
314         cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
315         if cuda_req:
316             runtime_constraints["cuda"] = {
317                 "device_count": resources.get("cudaDeviceCount", 1),
318                 "driver_version": cuda_req["cudaVersionMin"],
319                 "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
320             }
321
322         if runtimeContext.enable_preemptible is False:
323             scheduling_parameters["preemptible"] = False
324         else:
325             preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
326             if preemptible_req:
327                 scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
328             elif runtimeContext.enable_preemptible is True:
329                 scheduling_parameters["preemptible"] = True
330             elif runtimeContext.enable_preemptible is None:
331                 pass
332
333         if self.timelimit is not None and self.timelimit > 0:
334             scheduling_parameters["max_run_time"] = self.timelimit
335
336         extra_submit_params = {}
337         if runtimeContext.submit_runner_cluster:
338             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
339
340         container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name))
341         container_request["output_ttl"] = self.output_ttl
342         container_request["mounts"] = mounts
343         container_request["secret_mounts"] = secret_mounts
344         container_request["runtime_constraints"] = runtime_constraints
345         container_request["scheduling_parameters"] = scheduling_parameters
346
347         enable_reuse = runtimeContext.enable_reuse
348         if enable_reuse:
349             reuse_req, _ = self.get_requirement("WorkReuse")
350             if reuse_req:
351                 enable_reuse = reuse_req["enableReuse"]
352             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
353             if reuse_req:
354                 enable_reuse = reuse_req["enableReuse"]
355         container_request["use_existing"] = enable_reuse
356
357         properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
358         if properties_req:
359             for pr in properties_req["processProperties"]:
360                 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
361
362         output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
363         if output_properties_req:
364             if self.arvrunner.api._rootDesc["revision"] >= "20220510":
365                 container_request["output_properties"] = {}
366                 for pr in output_properties_req["outputProperties"]:
367                     container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
368             else:
369                 logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
370                                self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
371
372         if runtimeContext.runnerjob.startswith("arvwf:"):
373             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
374             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
375             if container_request["name"] == "main":
376                 container_request["name"] = wfrecord["name"]
377             container_request["properties"]["template_uuid"] = wfuuid
378
379         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
380
381         try:
382             if runtimeContext.submit_request_uuid:
383                 response = self.arvrunner.api.container_requests().update(
384                     uuid=runtimeContext.submit_request_uuid,
385                     body=container_request,
386                     **extra_submit_params
387                 ).execute(num_retries=self.arvrunner.num_retries)
388             else:
389                 response = self.arvrunner.api.container_requests().create(
390                     body=container_request,
391                     **extra_submit_params
392                 ).execute(num_retries=self.arvrunner.num_retries)
393
394             self.uuid = response["uuid"]
395             self.arvrunner.process_submitted(self)
396
397             if response["state"] == "Final":
398                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
399             else:
400                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
401         except Exception as e:
402             logger.exception("%s error submitting container\n%s", self.arvrunner.label(self), e)
403             logger.debug("Container request was %s", container_request)
404             self.output_callback({}, "permanentFail")
405
406     def done(self, record):
407         outputs = {}
408         try:
409             container = self.arvrunner.api.containers().get(
410                 uuid=record["container_uuid"]
411             ).execute(num_retries=self.arvrunner.num_retries)
412             if container["state"] == "Complete":
413                 rcode = container["exit_code"]
414                 if self.successCodes and rcode in self.successCodes:
415                     processStatus = "success"
416                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
417                     processStatus = "temporaryFail"
418                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
419                     processStatus = "permanentFail"
420                 elif rcode == 0:
421                     processStatus = "success"
422                 else:
423                     processStatus = "permanentFail"
424
425                 if rcode == 137:
426                     logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin'.",
427                                  self.arvrunner.label(self))
428             else:
429                 processStatus = "permanentFail"
430
431             if processStatus == "permanentFail" and record["log_uuid"]:
432                 logc = arvados.collection.CollectionReader(record["log_uuid"],
433                                                            api_client=self.arvrunner.api,
434                                                            keep_client=self.arvrunner.keep_client,
435                                                            num_retries=self.arvrunner.num_retries)
436                 label = self.arvrunner.label(self)
437                 done.logtail(
438                     logc, logger.error,
439                     "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
440
441             if record["output_uuid"]:
442                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
443                     # Compute the trash time to avoid requesting the collection record.
444                     trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
445                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
446                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
447                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
448                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
449                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
450                 self.arvrunner.add_intermediate_output(record["output_uuid"])
451
452             if container["output"]:
453                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
454
455             properties = record["properties"].copy()
456             properties["cwl_output"] = outputs
457             self.arvrunner.api.container_requests().update(
458                 uuid=self.uuid,
459                 body={"container_request": {"properties": properties}}
460             ).execute(num_retries=self.arvrunner.num_retries)
461         except WorkflowException as e:
462             # Only include a stack trace if in debug mode.
463             # A stack trace may obfuscate more useful output about the workflow.
464             logger.error("%s unable to collect output from %s:\n%s",
465                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
466             processStatus = "permanentFail"
467         except Exception:
468             logger.exception("%s while getting output object:", self.arvrunner.label(self))
469             processStatus = "permanentFail"
470         finally:
471             self.output_callback(outputs, processStatus)
472
473
474 class RunnerContainer(Runner):
475     """Submit and manage a container that runs arvados-cwl-runner."""
476
477     def arvados_job_spec(self, runtimeContext, git_info):
478         """Create an Arvados container request for this workflow.
479
480         The returned dict can be used to create a container passed as
481         the +body+ argument to container_requests().create().
482         """
483
484         adjustDirObjs(self.job_order, trim_listing)
485         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
486         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
487
488         secret_mounts = {}
489         for param in sorted(self.job_order.keys()):
490             if self.secret_store.has_secret(self.job_order[param]):
491                 mnt = "/secrets/s%d" % len(secret_mounts)
492                 secret_mounts[mnt] = {
493                     "kind": "text",
494                     "content": self.secret_store.retrieve(self.job_order[param])
495                 }
496                 self.job_order[param] = {"$include": mnt}
497
498         container_req = {
499             "name": self.name,
500             "output_path": "/var/spool/cwl",
501             "cwd": "/var/spool/cwl",
502             "priority": self.priority,
503             "state": "Committed",
504             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
505             "mounts": {
506                 "/var/lib/cwl/cwl.input.json": {
507                     "kind": "json",
508                     "content": self.job_order
509                 },
510                 "stdout": {
511                     "kind": "file",
512                     "path": "/var/spool/cwl/cwl.output.json"
513                 },
514                 "/var/spool/cwl": {
515                     "kind": "collection",
516                     "writable": True
517                 }
518             },
519             "secret_mounts": secret_mounts,
520             "runtime_constraints": {
521                 "vcpus": math.ceil(self.submit_runner_cores),
522                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
523                 "API": True
524             },
525             "use_existing": False, # Never reuse the runner container - see #15497.
526             "properties": {}
527         }
528
529         if self.embedded_tool.tool.get("id", "").startswith("keep:"):
530             sp = self.embedded_tool.tool["id"].split('/')
531             workflowcollection = sp[0][5:]
532             workflowname = "/".join(sp[1:])
533             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
534             container_req["mounts"]["/var/lib/cwl/workflow"] = {
535                 "kind": "collection",
536                 "portable_data_hash": "%s" % workflowcollection
537             }
538         elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
539             workflowpath = "/var/lib/cwl/workflow.json#main"
540             record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
541             packed = yaml.safe_load(record["definition"])
542             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
543                 "kind": "json",
544                 "content": packed
545             }
546             container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
547         else:
548             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
549             workflowpath = "/var/lib/cwl/workflow.json#main"
550             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
551                 "kind": "json",
552                 "content": packed
553             }
554
555         container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
556
557         properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
558         if properties_req:
559             builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
560             for pr in properties_req["processProperties"]:
561                 container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
562
563         # --local means execute the workflow instead of submitting a container request
564         # --api=containers means use the containers API
565         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
566         # --disable-validate because we already validated so don't need to do it again
567         # --eval-timeout is the timeout for javascript invocation
568         # --parallel-task-count is the number of threads to use for job submission
569         # --enable/disable-reuse sets desired job reuse
570         # --collection-cache-size sets aside memory to store collections
571         command = ["arvados-cwl-runner",
572                    "--local",
573                    "--api=containers",
574                    "--no-log-timestamps",
575                    "--disable-validate",
576                    "--disable-color",
577                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
578                    "--thread-count=%s" % self.arvrunner.thread_count,
579                    "--enable-reuse" if self.enable_reuse else "--disable-reuse",
580                    "--collection-cache-size=%s" % self.collection_cache_size]
581
582         if self.output_name:
583             command.append("--output-name=" + self.output_name)
584             container_req["output_name"] = self.output_name
585
586         if self.output_tags:
587             command.append("--output-tags=" + self.output_tags)
588
589         if runtimeContext.debug:
590             command.append("--debug")
591
592         if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
593             command.append("--storage-classes=" + runtimeContext.storage_classes)
594
595         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
596             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
597
598         if runtimeContext.on_error:
599             command.append("--on-error=" + self.on_error)
600
601         if runtimeContext.intermediate_output_ttl:
602             command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
603
604         if runtimeContext.trash_intermediate:
605             command.append("--trash-intermediate")
606
607         if runtimeContext.project_uuid:
608             command.append("--project-uuid="+runtimeContext.project_uuid)
609
610         if self.enable_dev:
611             command.append("--enable-dev")
612
613         if runtimeContext.enable_preemptible is True:
614             command.append("--enable-preemptible")
615
616         if runtimeContext.enable_preemptible is False:
617             command.append("--disable-preemptible")
618
619         if runtimeContext.varying_url_params:
620             command.append("--varying-url-params="+runtimeContext.varying_url_params)
621
622         if runtimeContext.prefer_cached_downloads:
623             command.append("--prefer-cached-downloads")
624
625         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
626
627         container_req["command"] = command
628
629         return container_req
630
631
632     def run(self, runtimeContext):
633         runtimeContext.keepprefix = "keep:"
634         job_spec = self.arvados_job_spec(runtimeContext, self.git_info)
635         if runtimeContext.project_uuid:
636             job_spec["owner_uuid"] = runtimeContext.project_uuid
637
638         extra_submit_params = {}
639         if runtimeContext.submit_runner_cluster:
640             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
641
642         if runtimeContext.submit_request_uuid:
643             if "cluster_id" in extra_submit_params:
644                 # Doesn't make sense for "update" and actually fails
645                 del extra_submit_params["cluster_id"]
646             response = self.arvrunner.api.container_requests().update(
647                 uuid=runtimeContext.submit_request_uuid,
648                 body=job_spec,
649                 **extra_submit_params
650             ).execute(num_retries=self.arvrunner.num_retries)
651         else:
652             response = self.arvrunner.api.container_requests().create(
653                 body=job_spec,
654                 **extra_submit_params
655             ).execute(num_retries=self.arvrunner.num_retries)
656
657         self.uuid = response["uuid"]
658         self.arvrunner.process_submitted(self)
659
660         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
661
662         workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
663         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
664         url = ""
665         if workbench2:
666             url = "{}processes/{}".format(workbench2, response["uuid"])
667         elif workbench1:
668             url = "{}container_requests/{}".format(workbench1, response["uuid"])
669         if url:
670             logger.info("Monitor workflow progress at %s", url)
671
672
673     def done(self, record):
674         try:
675             container = self.arvrunner.api.containers().get(
676                 uuid=record["container_uuid"]
677             ).execute(num_retries=self.arvrunner.num_retries)
678             container["log"] = record["log_uuid"]
679         except Exception:
680             logger.exception("%s while getting runner container", self.arvrunner.label(self))
681             self.arvrunner.output_callback({}, "permanentFail")
682         else:
683             super(RunnerContainer, self).done(container)