10460: Add implied secondaryFiles based on input parameter spec.
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7 from cStringIO import StringIO
8
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
15 from cwltool.utils import aslist
16 from cwltool.builder import substitute
17
18 import arvados.collection
19 import ruamel.yaml as yaml
20
21 from .arvdocker import arv_docker_get_image
22 from .pathmapper import ArvPathMapper
23 from ._version import __version__
24
25 logger = logging.getLogger('arvados.cwl-runner')
26
27 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
28
29 def trim_listing(obj):
30     """Remove 'listing' field from Directory objects that are keep references.
31
32     When Directory objects represent Keep references, it redundant and
33     potentially very expensive to pass fully enumerated Directory objects
34     between instances of cwl-runner (e.g. a submitting a job, or using the
35     RunInSingleContainer feature), so delete the 'listing' field when it is
36     safe to do so.
37     """
38
39     if obj.get("location", "").startswith("keep:") and "listing" in obj:
40         del obj["listing"]
41     if obj.get("location", "").startswith("_:"):
42         del obj["location"]
43
44 def upload_dependencies(arvrunner, name, document_loader,
45                         workflowobj, uri, loadref_run):
46     """Upload the dependencies of the workflowobj document to Keep.
47
48     Returns a pathmapper object mapping local paths to keep references.  Also
49     does an in-place update of references in "workflowobj".
50
51     Use scandeps to find $import, $include, $schemas, run, File and Directory
52     fields that represent external references.
53
54     If workflowobj has an "id" field, this will reload the document to ensure
55     it is scanning the raw document prior to preprocessing.
56     """
57
58     loaded = set()
59     def loadref(b, u):
60         joined = urlparse.urljoin(b, u)
61         defrg, _ = urlparse.urldefrag(joined)
62         if defrg not in loaded:
63             loaded.add(defrg)
64             # Use fetch_text to get raw file (before preprocessing).
65             text = document_loader.fetch_text(defrg)
66             if isinstance(text, bytes):
67                 textIO = StringIO(text.decode('utf-8'))
68             else:
69                 textIO = StringIO(text)
70             return yaml.safe_load(textIO)
71         else:
72             return {}
73
74     if loadref_run:
75         loadref_fields = set(("$import", "run"))
76     else:
77         loadref_fields = set(("$import",))
78
79     scanobj = workflowobj
80     if "id" in workflowobj:
81         # Need raw file content (before preprocessing) to ensure
82         # that external references in $include and $mixin are captured.
83         scanobj = loadref("", workflowobj["id"])
84
85     sc = scandeps(uri, scanobj,
86                   loadref_fields,
87                   set(("$include", "$schemas", "location")),
88                   loadref)
89
90     normalizeFilesDirs(sc)
91
92     if "id" in workflowobj:
93         sc.append({"class": "File", "location": workflowobj["id"]})
94
95     mapper = ArvPathMapper(arvrunner, sc, "",
96                            "keep:%s",
97                            "keep:%s/%s",
98                            name=name)
99
100     def setloc(p):
101         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
102             p["location"] = mapper.mapper(p["location"]).resolved
103     adjustFileObjs(workflowobj, setloc)
104     adjustDirObjs(workflowobj, setloc)
105
106     return mapper
107
108
109 def upload_docker(arvrunner, tool):
110     if isinstance(tool, CommandLineTool):
111         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
112         if docker_req:
113             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
114     elif isinstance(tool, cwltool.workflow.Workflow):
115         for s in tool.steps:
116             upload_docker(arvrunner, s.embedded_tool)
117
118 def upload_instance(arvrunner, name, tool, job_order):
119         upload_docker(arvrunner, tool)
120
121         for t in tool.tool["inputs"]:
122             def setSecondary(fileobj):
123                 if "__norecurse" in fileobj:
124                     del fileobj["__norecurse"]
125                     return
126                 if "secondaryFiles" not in fileobj:
127                     fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File", "__norecurse": True} for sf in t["secondaryFiles"]]
128
129             if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
130                 adjustFileObjs(job_order, setSecondary)
131
132         workflowmapper = upload_dependencies(arvrunner,
133                                              name,
134                                              tool.doc_loader,
135                                              tool.tool,
136                                              tool.tool["id"],
137                                              True)
138         jobmapper = upload_dependencies(arvrunner,
139                                         os.path.basename(job_order.get("id", "#")),
140                                         tool.doc_loader,
141                                         job_order,
142                                         job_order.get("id", "#"),
143                                         False)
144
145         if "id" in job_order:
146             del job_order["id"]
147
148         return workflowmapper
149
150 def arvados_jobs_image(arvrunner):
151     img = "arvados/jobs:"+__version__
152     try:
153         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
154     except Exception as e:
155         raise Exception("Docker image %s is not available\n%s" % (img, e) )
156     return img
157
158 class Runner(object):
159     def __init__(self, runner, tool, job_order, enable_reuse, output_name):
160         self.arvrunner = runner
161         self.tool = tool
162         self.job_order = job_order
163         self.running = False
164         self.enable_reuse = enable_reuse
165         self.uuid = None
166         self.final_output = None
167         self.output_name = output_name
168
169     def update_pipeline_component(self, record):
170         pass
171
172     def arvados_job_spec(self, *args, **kwargs):
173         self.name = os.path.basename(self.tool.tool["id"])
174         workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
175         adjustDirObjs(self.job_order, trim_listing)
176         return workflowmapper
177
178     def done(self, record):
179         if record["state"] == "Complete":
180             if record.get("exit_code") is not None:
181                 if record["exit_code"] == 33:
182                     processStatus = "UnsupportedRequirement"
183                 elif record["exit_code"] == 0:
184                     processStatus = "success"
185                 else:
186                     processStatus = "permanentFail"
187             else:
188                 processStatus = "success"
189         else:
190             processStatus = "permanentFail"
191
192         outputs = None
193         try:
194             try:
195                 self.final_output = record["output"]
196                 outc = arvados.collection.CollectionReader(self.final_output,
197                                                            api_client=self.arvrunner.api,
198                                                            keep_client=self.arvrunner.keep_client,
199                                                            num_retries=self.arvrunner.num_retries)
200                 with outc.open("cwl.output.json") as f:
201                     outputs = json.load(f)
202                 def keepify(fileobj):
203                     path = fileobj["location"]
204                     if not path.startswith("keep:"):
205                         fileobj["location"] = "keep:%s/%s" % (record["output"], path)
206                 adjustFileObjs(outputs, keepify)
207                 adjustDirObjs(outputs, keepify)
208             except Exception as e:
209                 logger.error("While getting final output object: %s", e)
210             self.arvrunner.output_callback(outputs, processStatus)
211         finally:
212             del self.arvrunner.processes[record["uuid"]]