fix "submitted container [uuid]" log from a-c-r
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import logging
6 import json
7 import os
8 import urllib
9 import time
10 import datetime
11 import ciso8601
12 import uuid
13
14 from arvados_cwl.util import get_current_container, get_intermediate_collection_info
15 import ruamel.yaml as yaml
16
17 from cwltool.errors import WorkflowException
18 from cwltool.process import UnsupportedRequirement, shortname
19 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
20 from cwltool.utils import aslist
21 from cwltool.job import JobBase
22
23 import arvados.collection
24
25 from .arvdocker import arv_docker_get_image
26 from . import done
27 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
28 from .fsaccess import CollectionFetcher
29 from .pathmapper import NoFollowPathMapper, trim_listing
30 from .perf import Perf
31
32 logger = logging.getLogger('arvados.cwl-runner')
33 metrics = logging.getLogger('arvados.cwl-runner.metrics')
34
35 class ArvadosContainer(JobBase):
36     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
37
38     def __init__(self, runner,
39                  builder,   # type: Builder
40                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
41                  make_path_mapper,  # type: Callable[..., PathMapper]
42                  requirements,      # type: List[Dict[Text, Text]]
43                  hints,     # type: List[Dict[Text, Text]]
44                  name       # type: Text
45     ):
46         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
47         self.arvrunner = runner
48         self.running = False
49         self.uuid = None
50
51     def update_pipeline_component(self, r):
52         pass
53
54     def run(self, runtimeContext):
55         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
56         # which calls makeJobRunner() to get a new ArvadosContainer
57         # object.  The fields that define execution such as
58         # command_line, environment, etc are set on the
59         # ArvadosContainer object by CommandLineTool.job() before
60         # run() is called.
61
62         container_request = {
63             "command": self.command_line,
64             "name": self.name,
65             "output_path": self.outdir,
66             "cwd": self.outdir,
67             "priority": runtimeContext.priority,
68             "state": "Committed",
69             "properties": {},
70         }
71         runtime_constraints = {}
72
73         if self.arvrunner.project_uuid:
74             container_request["owner_uuid"] = self.arvrunner.project_uuid
75
76         if self.arvrunner.secret_store.has_secret(self.command_line):
77             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
78
79         if self.arvrunner.secret_store.has_secret(self.environment):
80             raise WorkflowException("Secret material leaked in environment, only file literals may contain secrets")
81
82         resources = self.builder.resources
83         if resources is not None:
84             runtime_constraints["vcpus"] = resources.get("cores", 1)
85             runtime_constraints["ram"] = resources.get("ram") * 2**20
86
87         mounts = {
88             self.outdir: {
89                 "kind": "tmp",
90                 "capacity": resources.get("outdirSize", 0) * 2**20
91             },
92             self.tmpdir: {
93                 "kind": "tmp",
94                 "capacity": resources.get("tmpdirSize", 0) * 2**20
95             }
96         }
97         secret_mounts = {}
98         scheduling_parameters = {}
99
100         rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
101         rf.sort(key=lambda k: k.resolved)
102         prevdir = None
103         for resolved, target, tp, stg in rf:
104             if not stg:
105                 continue
106             if prevdir and target.startswith(prevdir):
107                 continue
108             if tp == "Directory":
109                 targetdir = target
110             else:
111                 targetdir = os.path.dirname(target)
112             sp = resolved.split("/", 1)
113             pdh = sp[0][5:]   # remove "keep:"
114             mounts[targetdir] = {
115                 "kind": "collection",
116                 "portable_data_hash": pdh
117             }
118             if len(sp) == 2:
119                 if tp == "Directory":
120                     path = sp[1]
121                 else:
122                     path = os.path.dirname(sp[1])
123                 if path and path != "/":
124                     mounts[targetdir]["path"] = path
125             prevdir = targetdir + "/"
126
127         with Perf(metrics, "generatefiles %s" % self.name):
128             if self.generatefiles["listing"]:
129                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
130                                                     keep_client=self.arvrunner.keep_client,
131                                                     num_retries=self.arvrunner.num_retries)
132                 generatemapper = NoFollowPathMapper([self.generatefiles], "", "",
133                                                     separateDirs=False)
134
135                 sorteditems = sorted(generatemapper.items(), None, key=lambda n: n[1].target)
136
137                 logger.debug("generatemapper is %s", sorteditems)
138
139                 with Perf(metrics, "createfiles %s" % self.name):
140                     for f, p in sorteditems:
141                         if not p.target:
142                             pass
143                         elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
144                             if p.resolved.startswith("_:"):
145                                 vwd.mkdirs(p.target)
146                             else:
147                                 source, path = self.arvrunner.fs_access.get_collection(p.resolved)
148                                 vwd.copy(path, p.target, source_collection=source)
149                         elif p.type == "CreateFile":
150                             if self.arvrunner.secret_store.has_secret(p.resolved):
151                                 secret_mounts["%s/%s" % (self.outdir, p.target)] = {
152                                     "kind": "text",
153                                     "content": self.arvrunner.secret_store.retrieve(p.resolved)
154                                 }
155                             else:
156                                 with vwd.open(p.target, "w") as n:
157                                     n.write(p.resolved.encode("utf-8"))
158
159                 def keepemptydirs(p):
160                     if isinstance(p, arvados.collection.RichCollectionBase):
161                         if len(p) == 0:
162                             p.open(".keep", "w").close()
163                         else:
164                             for c in p:
165                                 keepemptydirs(p[c])
166
167                 keepemptydirs(vwd)
168
169                 if not runtimeContext.current_container:
170                     runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
171                 info = get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
172                 vwd.save_new(name=info["name"],
173                              owner_uuid=self.arvrunner.project_uuid,
174                              ensure_unique_name=True,
175                              trash_at=info["trash_at"],
176                              properties=info["properties"])
177
178                 prev = None
179                 for f, p in sorteditems:
180                     if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
181                         (prev is not None and p.target.startswith(prev))):
182                         continue
183                     mountpoint = "%s/%s" % (self.outdir, p.target)
184                     mounts[mountpoint] = {"kind": "collection",
185                                           "portable_data_hash": vwd.portable_data_hash(),
186                                           "path": p.target}
187                     if p.type.startswith("Writable"):
188                         mounts[mountpoint]["writable"] = True
189                     prev = p.target + "/"
190
191         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
192         if self.environment:
193             container_request["environment"].update(self.environment)
194
195         if self.stdin:
196             sp = self.stdin[6:].split("/", 1)
197             mounts["stdin"] = {"kind": "collection",
198                                 "portable_data_hash": sp[0],
199                                 "path": sp[1]}
200
201         if self.stderr:
202             mounts["stderr"] = {"kind": "file",
203                                 "path": "%s/%s" % (self.outdir, self.stderr)}
204
205         if self.stdout:
206             mounts["stdout"] = {"kind": "file",
207                                 "path": "%s/%s" % (self.outdir, self.stdout)}
208
209         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
210         if not docker_req:
211             docker_req = {"dockerImageId": "arvados/jobs"}
212
213         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
214                                                                      docker_req,
215                                                                      runtimeContext.pull_image,
216                                                                      self.arvrunner.project_uuid)
217
218         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
219         if api_req:
220             runtime_constraints["API"] = True
221
222         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
223         if runtime_req:
224             if "keep_cache" in runtime_req:
225                 runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"] * 2**20
226             if "outputDirType" in runtime_req:
227                 if runtime_req["outputDirType"] == "local_output_dir":
228                     # Currently the default behavior.
229                     pass
230                 elif runtime_req["outputDirType"] == "keep_output_dir":
231                     mounts[self.outdir]= {
232                         "kind": "collection",
233                         "writable": True
234                     }
235
236         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
237         if partition_req:
238             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
239
240         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
241         if intermediate_output_req:
242             self.output_ttl = intermediate_output_req["outputTTL"]
243         else:
244             self.output_ttl = self.arvrunner.intermediate_output_ttl
245
246         if self.output_ttl < 0:
247             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
248
249         if self.timelimit is not None:
250             scheduling_parameters["max_run_time"] = self.timelimit
251
252         container_request["output_name"] = "Output for step %s" % (self.name)
253         container_request["output_ttl"] = self.output_ttl
254         container_request["mounts"] = mounts
255         container_request["secret_mounts"] = secret_mounts
256         container_request["runtime_constraints"] = runtime_constraints
257         container_request["scheduling_parameters"] = scheduling_parameters
258
259         enable_reuse = runtimeContext.enable_reuse
260         if enable_reuse:
261             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
262             if reuse_req:
263                 enable_reuse = reuse_req["enableReuse"]
264         container_request["use_existing"] = enable_reuse
265
266         if runtimeContext.runnerjob.startswith("arvwf:"):
267             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
268             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
269             if container_request["name"] == "main":
270                 container_request["name"] = wfrecord["name"]
271             container_request["properties"]["template_uuid"] = wfuuid
272
273         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
274
275         try:
276             if runtimeContext.submit_request_uuid:
277                 response = self.arvrunner.api.container_requests().update(
278                     uuid=runtimeContext.submit_request_uuid,
279                     body=container_request
280                 ).execute(num_retries=self.arvrunner.num_retries)
281             else:
282                 response = self.arvrunner.api.container_requests().create(
283                     body=container_request
284                 ).execute(num_retries=self.arvrunner.num_retries)
285
286             self.uuid = response["uuid"]
287             self.arvrunner.process_submitted(self)
288
289             if response["state"] == "Final":
290                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
291             else:
292                 logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
293         except Exception as e:
294             logger.error("%s got error %s" % (self.arvrunner.label(self), str(e)))
295             self.output_callback({}, "permanentFail")
296
297     def done(self, record):
298         outputs = {}
299         try:
300             container = self.arvrunner.api.containers().get(
301                 uuid=record["container_uuid"]
302             ).execute(num_retries=self.arvrunner.num_retries)
303             if container["state"] == "Complete":
304                 rcode = container["exit_code"]
305                 if self.successCodes and rcode in self.successCodes:
306                     processStatus = "success"
307                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
308                     processStatus = "temporaryFail"
309                 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
310                     processStatus = "permanentFail"
311                 elif rcode == 0:
312                     processStatus = "success"
313                 else:
314                     processStatus = "permanentFail"
315             else:
316                 processStatus = "permanentFail"
317
318             if processStatus == "permanentFail":
319                 logc = arvados.collection.CollectionReader(container["log"],
320                                                            api_client=self.arvrunner.api,
321                                                            keep_client=self.arvrunner.keep_client,
322                                                            num_retries=self.arvrunner.num_retries)
323                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
324
325             if record["output_uuid"]:
326                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
327                     # Compute the trash time to avoid requesting the collection record.
328                     trash_at = ciso8601.parse_datetime_unaware(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
329                     aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
330                     orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
331                     oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
332                     logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
333                         self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
334                 self.arvrunner.add_intermediate_output(record["output_uuid"])
335
336             if container["output"]:
337                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
338         except WorkflowException as e:
339             logger.error("%s unable to collect output from %s:\n%s",
340                          self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
341             processStatus = "permanentFail"
342         except Exception as e:
343             logger.exception("%s while getting output object: %s", self.arvrunner.label(self), e)
344             processStatus = "permanentFail"
345         finally:
346             self.output_callback(outputs, processStatus)
347
348
349 class RunnerContainer(Runner):
350     """Submit and manage a container that runs arvados-cwl-runner."""
351
352     def arvados_job_spec(self, runtimeContext):
353         """Create an Arvados container request for this workflow.
354
355         The returned dict can be used to create a container passed as
356         the +body+ argument to container_requests().create().
357         """
358
359         adjustDirObjs(self.job_order, trim_listing)
360         visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
361         visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
362
363         secret_mounts = {}
364         for param in sorted(self.job_order.keys()):
365             if self.secret_store.has_secret(self.job_order[param]):
366                 mnt = "/secrets/s%d" % len(secret_mounts)
367                 secret_mounts[mnt] = {
368                     "kind": "text",
369                     "content": self.secret_store.retrieve(self.job_order[param])
370                 }
371                 self.job_order[param] = {"$include": mnt}
372
373         container_req = {
374             "name": self.name,
375             "output_path": "/var/spool/cwl",
376             "cwd": "/var/spool/cwl",
377             "priority": self.priority,
378             "state": "Committed",
379             "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
380             "mounts": {
381                 "/var/lib/cwl/cwl.input.json": {
382                     "kind": "json",
383                     "content": self.job_order
384                 },
385                 "stdout": {
386                     "kind": "file",
387                     "path": "/var/spool/cwl/cwl.output.json"
388                 },
389                 "/var/spool/cwl": {
390                     "kind": "collection",
391                     "writable": True
392                 }
393             },
394             "secret_mounts": secret_mounts,
395             "runtime_constraints": {
396                 "vcpus": self.submit_runner_cores,
397                 "ram": 1024*1024 * self.submit_runner_ram,
398                 "API": True
399             },
400             "use_existing": self.enable_reuse,
401             "properties": {}
402         }
403
404         if self.tool.tool.get("id", "").startswith("keep:"):
405             sp = self.tool.tool["id"].split('/')
406             workflowcollection = sp[0][5:]
407             workflowname = "/".join(sp[1:])
408             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
409             container_req["mounts"]["/var/lib/cwl/workflow"] = {
410                 "kind": "collection",
411                 "portable_data_hash": "%s" % workflowcollection
412             }
413         else:
414             packed = packed_workflow(self.arvrunner, self.tool, self.merged_map)
415             workflowpath = "/var/lib/cwl/workflow.json#main"
416             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
417                 "kind": "json",
418                 "content": packed
419             }
420             if self.tool.tool.get("id", "").startswith("arvwf:"):
421                 container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
422
423
424         # --local means execute the workflow instead of submitting a container request
425         # --api=containers means use the containers API
426         # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
427         # --disable-validate because we already validated so don't need to do it again
428         # --eval-timeout is the timeout for javascript invocation
429         # --parallel-task-count is the number of threads to use for job submission
430         # --enable/disable-reuse sets desired job reuse
431         command = ["arvados-cwl-runner",
432                    "--local",
433                    "--api=containers",
434                    "--no-log-timestamps",
435                    "--disable-validate",
436                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
437                    "--thread-count=%s" % self.arvrunner.thread_count,
438                    "--enable-reuse" if self.enable_reuse else "--disable-reuse"]
439
440         if self.output_name:
441             command.append("--output-name=" + self.output_name)
442             container_req["output_name"] = self.output_name
443
444         if self.output_tags:
445             command.append("--output-tags=" + self.output_tags)
446
447         if runtimeContext.debug:
448             command.append("--debug")
449
450         if runtimeContext.storage_classes != "default":
451             command.append("--storage-classes=" + runtimeContext.storage_classes)
452
453         if self.on_error:
454             command.append("--on-error=" + self.on_error)
455
456         if self.intermediate_output_ttl:
457             command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
458
459         if self.arvrunner.trash_intermediate:
460             command.append("--trash-intermediate")
461
462         if self.arvrunner.project_uuid:
463             command.append("--project-uuid="+self.arvrunner.project_uuid)
464
465         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
466
467         container_req["command"] = command
468
469         return container_req
470
471
472     def run(self, runtimeContext):
473         runtimeContext.keepprefix = "keep:"
474         job_spec = self.arvados_job_spec(runtimeContext)
475         if self.arvrunner.project_uuid:
476             job_spec["owner_uuid"] = self.arvrunner.project_uuid
477
478         if runtimeContext.submit_request_uuid:
479             response = self.arvrunner.api.container_requests().update(
480                 uuid=runtimeContext.submit_request_uuid,
481                 body=job_spec
482             ).execute(num_retries=self.arvrunner.num_retries)
483         else:
484             response = self.arvrunner.api.container_requests().create(
485                 body=job_spec
486             ).execute(num_retries=self.arvrunner.num_retries)
487
488         self.uuid = response["uuid"]
489         self.arvrunner.process_submitted(self)
490
491         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
492
493     def done(self, record):
494         try:
495             container = self.arvrunner.api.containers().get(
496                 uuid=record["container_uuid"]
497             ).execute(num_retries=self.arvrunner.num_retries)
498         except Exception as e:
499             logger.exception("%s while getting runner container: %s", self.arvrunner.label(self), e)
500             self.arvrunner.output_callback({}, "permanentFail")
501         else:
502             super(RunnerContainer, self).done(container)