10088: Bump cwltool dependency for another bugfix to scandeps.
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7 from cStringIO import StringIO
8
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
15
16 import arvados.collection
17 import ruamel.yaml as yaml
18
19 from .arvdocker import arv_docker_get_image
20 from .pathmapper import ArvPathMapper
21
22 logger = logging.getLogger('arvados.cwl-runner')
23
24 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
25
26 def upload_dependencies(arvrunner, name, document_loader,
27                         workflowobj, uri, loadref_run):
28     """Upload the dependencies of the workflowobj document to Keep.
29
30     Returns a pathmapper object mapping local paths to keep references.  Also
31     does an in-place update of references in "workflowobj".
32
33     Use scandeps to find $import, $include, $schemas, run, File and Directory
34     fields that represent external references.
35
36     If workflowobj has an "id" field, this will reload the document to ensure
37     it is scanning the raw document prior to preprocessing.
38     """
39
40     loaded = set()
41     def loadref(b, u):
42         joined = urlparse.urljoin(b, u)
43         defrg, _ = urlparse.urldefrag(joined)
44         if defrg not in loaded:
45             loaded.add(defrg)
46             # Use fetch_text to get raw file (before preprocessing).
47             text = document_loader.fetch_text(defrg)
48             if isinstance(text, bytes):
49                 textIO = StringIO(text.decode('utf-8'))
50             else:
51                 textIO = StringIO(text)
52             return yaml.safe_load(textIO)
53         else:
54             return {}
55
56     if loadref_run:
57         loadref_fields = set(("$import", "run"))
58     else:
59         loadref_fields = set(("$import",))
60
61     scanobj = workflowobj
62     if "id" in workflowobj:
63         # Need raw file content (before preprocessing) to ensure
64         # that external references in $include and $mixin are captured.
65         scanobj = loadref("", workflowobj["id"])
66
67     sc = scandeps(uri, scanobj,
68                   loadref_fields,
69                   set(("$include", "$schemas", "location")),
70                   loadref)
71
72     files = []
73     def visitFiles(path):
74         files.append(path)
75
76     adjustFileObjs(sc, visitFiles)
77     adjustDirObjs(sc, visitFiles)
78
79     normalizeFilesDirs(files)
80
81     if "id" in workflowobj:
82         files.append({"class": "File", "location": workflowobj["id"]})
83
84     mapper = ArvPathMapper(arvrunner, files, "",
85                            "keep:%s",
86                            "keep:%s/%s",
87                            name=name)
88
89     def setloc(p):
90         p["location"] = mapper.mapper(p["location"]).target
91     adjustFileObjs(workflowobj, setloc)
92     adjustDirObjs(workflowobj, setloc)
93
94     return mapper
95
96
97 def upload_docker(arvrunner, tool):
98     if isinstance(tool, CommandLineTool):
99         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
100         if docker_req:
101             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
102     elif isinstance(tool, cwltool.workflow.Workflow):
103         for s in tool.steps:
104             upload_docker(arvrunner, s.embedded_tool)
105
106
107 class Runner(object):
108     def __init__(self, runner, tool, job_order, enable_reuse):
109         self.arvrunner = runner
110         self.tool = tool
111         self.job_order = job_order
112         self.running = False
113         self.enable_reuse = enable_reuse
114         self.uuid = None
115
116     def update_pipeline_component(self, record):
117         pass
118
119     def arvados_job_spec(self, *args, **kwargs):
120         upload_docker(self.arvrunner, self.tool)
121
122         self.name = os.path.basename(self.tool.tool["id"])
123
124         workflowmapper = upload_dependencies(self.arvrunner,
125                                              self.name,
126                                              self.tool.doc_loader,
127                                              self.tool.tool,
128                                              self.tool.tool["id"],
129                                              True)
130
131         jobmapper = upload_dependencies(self.arvrunner,
132                                         os.path.basename(self.job_order.get("id", "#")),
133                                         self.tool.doc_loader,
134                                         self.job_order,
135                                         self.job_order.get("id", "#"),
136                                         False)
137
138         if "id" in self.job_order:
139             del self.job_order["id"]
140
141         return workflowmapper
142
143
144     def done(self, record):
145         if record["state"] == "Complete":
146             if record.get("exit_code") is not None:
147                 if record["exit_code"] == 33:
148                     processStatus = "UnsupportedRequirement"
149                 elif record["exit_code"] == 0:
150                     processStatus = "success"
151                 else:
152                     processStatus = "permanentFail"
153             else:
154                 processStatus = "success"
155         else:
156             processStatus = "permanentFail"
157
158         outputs = None
159         try:
160             try:
161                 outc = arvados.collection.Collection(record["output"])
162                 with outc.open("cwl.output.json") as f:
163                     outputs = json.load(f)
164                 def keepify(fileobj):
165                     path = fileobj["location"]
166                     if not path.startswith("keep:"):
167                         fileobj["location"] = "keep:%s/%s" % (record["output"], path)
168                 adjustFileObjs(outputs, keepify)
169                 adjustDirObjs(outputs, keepify)
170             except Exception as e:
171                 logger.error("While getting final output object: %s", e)
172             self.arvrunner.output_callback(outputs, processStatus)
173         finally:
174             del self.arvrunner.processes[record["uuid"]]