Add changes to arvworkflow.py for taking the max requirement
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import json
7 import copy
8 import logging
9
10 from schema_salad.sourceline import SourceLine, cmap
11
12 from cwltool.pack import pack
13 from cwltool.load_tool import fetch_document
14 from cwltool.process import shortname
15 from cwltool.workflow import Workflow, WorkflowException
16 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
17 from cwltool.builder import Builder
18
19 import ruamel.yaml as yaml
20
21 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
22                      trim_anonymous_location, remove_redundant_fields, discover_secondary_files)
23 from .pathmapper import ArvPathMapper, trim_listing
24 from .arvtool import ArvadosCommandTool
25 from .perf import Perf
26
27 logger = logging.getLogger('arvados.cwl-runner')
28 metrics = logging.getLogger('arvados.cwl-runner.metrics')
29
30 def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
31                     submit_runner_ram=0, name=None, merged_map=None):
32
33     packed = packed_workflow(arvRunner, tool, merged_map)
34
35     adjustDirObjs(job_order, trim_listing)
36     adjustFileObjs(job_order, trim_anonymous_location)
37     adjustDirObjs(job_order, trim_anonymous_location)
38
39     main = [p for p in packed["$graph"] if p["id"] == "#main"][0]
40     for inp in main["inputs"]:
41         sn = shortname(inp["id"])
42         if sn in job_order:
43             inp["default"] = job_order[sn]
44
45     if not name:
46         name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
47
48     upload_dependencies(arvRunner, name, tool.doc_loader,
49                         packed, tool.tool["id"], False)
50
51     # TODO nowhere for submit_runner_ram to go.
52
53     body = {
54         "workflow": {
55             "name": name,
56             "description": tool.tool.get("doc", ""),
57             "definition":yaml.round_trip_dump(packed)
58         }}
59     if project_uuid:
60         body["workflow"]["owner_uuid"] = project_uuid
61
62     if uuid:
63         call = arvRunner.api.workflows().update(uuid=uuid, body=body)
64     else:
65         call = arvRunner.api.workflows().create(body=body)
66     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
67
68 def dedup_reqs(reqs):
69     dedup = {}
70     for r in reversed(reqs):
71         if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
72             dedup[r["class"]] = r
73     return [dedup[r] for r in sorted(dedup.keys())]
74
75 def get_max_res_req(res_reqs):
76     """Take the max of a list of ResourceRequirement."""
77
78     total_res_req = {}
79     exception_msgs = []
80     for a in ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax", "outdirMin", "outdirMax"):
81         total_res_req[a] = []
82         for res_req in res_reqs:
83             if a in res_req:
84                 if isinstance(res_req[a], int): # integer check
85                     total_res_req[a].append(res_req[a])
86                 else:
87                     msg = SourceLine(res_req).makeError(
88                     "Non-top-level ResourceRequirement in single container cannot have expressions")
89                     exception_msgs.append(msg)
90     if exception_msgs:
91         raise WorkflowException("\n".join(exception_msgs))
92     else:
93         max_res_req = {}
94         for a in total_res_req:
95             if total_res_req[a]:
96                 max_res_req[a] = max(total_res_req[a])
97         if max_res_req:
98             max_res_req["class"] = "ResourceRequirement"
99         return cmap(max_res_req)
100
101 class ArvadosWorkflow(Workflow):
102     """Wrap cwltool Workflow to override selected methods."""
103
104     def __init__(self, arvrunner, toolpath_object, **kwargs):
105         super(ArvadosWorkflow, self).__init__(toolpath_object, **kwargs)
106         self.arvrunner = arvrunner
107         self.work_api = kwargs["work_api"]
108         self.wf_pdh = None
109
110     def job(self, joborder, output_callback, **kwargs):
111         kwargs["work_api"] = self.work_api
112         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
113         if req:
114             with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
115                 if "id" not in self.tool:
116                     raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
117             document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
118
119             discover_secondary_files(self.tool["inputs"], joborder)
120
121             with Perf(metrics, "subworkflow upload_deps"):
122                 upload_dependencies(self.arvrunner,
123                                     os.path.basename(joborder.get("id", "#")),
124                                     document_loader,
125                                     joborder,
126                                     joborder.get("id", "#"),
127                                     False)
128
129                 if self.wf_pdh is None:
130                     workflowobj["requirements"] = dedup_reqs(self.requirements)
131                     workflowobj["hints"] = dedup_reqs(self.hints)
132
133                     packed = pack(document_loader, workflowobj, uri, self.metadata)
134
135                     builder = Builder()
136                     builder.job = joborder
137                     builder.requirements = self.requirements
138                     builder.hints = self.hints
139                     builder.resources = {}
140
141                     res_reqs = {"requirements": [], "hints": []}
142                     for t in ("requirements", "hints"):
143                         for item in packed["$graph"]:
144                             if t in item:
145                                 if item["id"] == "#main": # evaluate potential expressions in the top-level requirements/hints
146                                     for req in item[t]:
147                                         if req["class"] == "ResourceRequirement":
148                                             eval_req = {"class": "ResourceRequirement"}
149                                             for a in ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax", "outdirMin", "outdirMax"):
150                                                 if a in req:
151                                                     eval_req[a] = builder.do_eval(req[a])
152                                             res_reqs[t].append(eval_req)
153                                 else:
154                                     for req in item[t]:
155                                         if req["class"] == "ResourceRequirement":
156                                             res_reqs[t].append(req)
157                     max_res_req = {"requirements": get_max_res_req(res_reqs["requirements"]),
158                                    "hints": get_max_res_req(res_reqs["hints"])}
159
160                     new_spec = {"requirements": self.requirements, "hints": self.hints}
161                     for t in ("requirements", "hints"):
162                         for req in new_spec[t]:
163                             if req["class"] == "ResourceRequirement":
164                                 new_spec[t].remove(req)
165                         if max_res_req[t]:
166                             new_spec[t].append(max_res_req[t])
167
168                     upload_dependencies(self.arvrunner,
169                                         kwargs.get("name", ""),
170                                         document_loader,
171                                         packed,
172                                         uri,
173                                         False)
174
175             with Perf(metrics, "subworkflow adjust"):
176                 joborder_resolved = copy.deepcopy(joborder)
177                 joborder_keepmount = copy.deepcopy(joborder)
178
179                 reffiles = []
180                 visit_class(joborder_keepmount, ("File", "Directory"), lambda x: reffiles.append(x))
181
182                 mapper = ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
183                                  "/keep/%s",
184                                  "/keep/%s/%s",
185                                  **kwargs)
186
187                 def keepmount(obj):
188                     remove_redundant_fields(obj)
189                     with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
190                         if "location" not in obj:
191                             raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
192                     with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
193                         if obj["location"].startswith("keep:"):
194                             obj["location"] = mapper.mapper(obj["location"]).target
195                             if "listing" in obj:
196                                 del obj["listing"]
197                         elif obj["location"].startswith("_:"):
198                             del obj["location"]
199                         else:
200                             raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
201
202                 visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
203
204                 def resolved(obj):
205                     if obj["location"].startswith("keep:"):
206                         obj["location"] = mapper.mapper(obj["location"]).resolved
207
208                 visit_class(joborder_resolved, ("File", "Directory"), resolved)
209
210                 if self.wf_pdh is None:
211                     adjustFileObjs(packed, keepmount)
212                     adjustDirObjs(packed, keepmount)
213                     self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
214
215             wf_runner = cmap({
216                 "class": "CommandLineTool",
217                 "baseCommand": "cwltool",
218                 "inputs": self.tool["inputs"],
219                 "outputs": self.tool["outputs"],
220                 "stdout": "cwl.output.json",
221                 "requirements": new_spec["requirements"]+[
222                     {
223                     "class": "InitialWorkDirRequirement",
224                     "listing": [{
225                             "entryname": "workflow.cwl",
226                             "entry": {
227                                 "class": "File",
228                                 "location": "keep:%s/workflow.cwl" % self.wf_pdh
229                             }
230                         }, {
231                             "entryname": "cwl.input.yml",
232                             "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
233                         }]
234                 }],
235                 "hints": new_spec["hints"],
236                 "arguments": ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl#main", "cwl.input.yml"],
237                 "id": "#"
238             })
239             kwargs["loader"] = self.doc_loader
240             kwargs["avsc_names"] = self.doc_schema
241             return ArvadosCommandTool(self.arvrunner, wf_runner, **kwargs).job(joborder_resolved, output_callback, **kwargs)
242         else:
243             return super(ArvadosWorkflow, self).job(joborder, output_callback, **kwargs)