17fe8cb5c46fccad7ab161d479219c90b5958586
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
1 import logging
2 import arvados.collection
3 from cwltool.process import get_feature, adjustFiles
4 from .arvdocker import arv_docker_get_image
5 from . import done
6 from cwltool.errors import WorkflowException
7 from cwltool.process import UnsupportedRequirement
8
9 logger = logging.getLogger('arvados.cwl-runner')
10
11 class ArvadosContainer(object):
12     """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
13
14     def __init__(self, runner):
15         self.arvrunner = runner
16         self.running = False
17
18     def update_pipeline_component(self, r):
19         pass
20
21     def run(self, dry_run=False, pull_image=True, **kwargs):
22         container_request = {
23             "command": self.command_line,
24             "owner_uuid": self.arvrunner.project_uuid,
25             "name": self.name,
26             "output_path": "/var/spool/cwl",
27             "cwd": "/var/spool/cwl",
28             "priority": 1,
29             "state": "Committed"
30         }
31         runtime_constraints = {}
32         mounts = {
33             "/var/spool/cwl": {
34                 "kind": "tmp"
35             }
36         }
37
38         for f in self.pathmapper.files():
39             _, p = self.pathmapper.mapper(f)
40             mounts[p] = {
41                 "kind": "collection",
42                 "portable_data_hash": p[6:]
43             }
44
45         if self.generatefiles:
46             raise UnsupportedRequirement("Stdin redirection currently not suppported")
47
48             vwd = arvados.collection.Collection()
49             container_request["task.vwd"] = {}
50             for t in self.generatefiles:
51                 if isinstance(self.generatefiles[t], dict):
52                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
53                     vwd.copy(rest, t, source_collection=src)
54                 else:
55                     with vwd.open(t, "w") as f:
56                         f.write(self.generatefiles[t])
57             vwd.save_new()
58             # TODO
59             # for t in self.generatefiles:
60             #     container_request["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
61
62         container_request["environment"] = {"TMPDIR": "/tmp"}
63         if self.environment:
64             container_request["environment"].update(self.environment)
65
66         if self.stdin:
67             raise UnsupportedRequirement("Stdin redirection currently not suppported")
68
69         if self.stdout:
70             mounts["stdout"] = {"kind": "file",
71                                 "path": "/var/spool/cwl/%s" % (self.stdout)}
72
73         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
74         if not docker_req:
75             docker_req = {"dockerImageId": "arvados/jobs"}
76
77         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
78                                                                      docker_req,
79                                                                      pull_image,
80                                                                      self.arvrunner.project_uuid)
81
82         resources = self.builder.resources
83         if resources is not None:
84             runtime_constraints["vcpus"] = resources.get("cores", 1)
85             runtime_constraints["ram"] = resources.get("ram") * 2**20
86             #runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
87
88         container_request["mounts"] = mounts
89         container_request["runtime_constraints"] = runtime_constraints
90
91         try:
92             response = self.arvrunner.api.container_requests().create(
93                 body=container_request
94             ).execute(num_retries=self.arvrunner.num_retries)
95
96             self.arvrunner.jobs[response["container_uuid"]] = self
97
98             logger.info("Container %s (%s) request state is %s", self.name, response["container_uuid"], response["state"])
99
100             if response["state"] == "Final":
101                 self.done(response)
102         except Exception as e:
103             logger.error("Got error %s" % str(e))
104             self.output_callback({}, "permanentFail")
105
106     def done(self, record):
107         try:
108             if record["state"] == "Complete":
109                 processStatus = "success"
110             else:
111                 processStatus = "permanentFail"
112
113             try:
114                 outputs = {}
115                 if record["output"]:
116                     outputs = done.done(self, record, "/tmp", "/var/spool/cwl", "/keep")
117             except WorkflowException as e:
118                 logger.error("Error while collecting container outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
119                 processStatus = "permanentFail"
120             except Exception as e:
121                 logger.exception("Got unknown exception while collecting job outputs:")
122                 processStatus = "permanentFail"
123
124             self.output_callback(outputs, processStatus)
125         finally:
126             del self.arvrunner.jobs[record["uuid"]]