10088: upload_dependencies reads raw files to find actual dependencies
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7 from cStringIO import StringIO
8
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
15
16 import arvados.collection
17 import ruamel.yaml as yaml
18
19 from .arvdocker import arv_docker_get_image
20 from .pathmapper import ArvPathMapper
21
22 logger = logging.getLogger('arvados.cwl-runner')
23
24 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
25
26 def upload_dependencies(arvrunner, name, document_loader,
27                         workflowobj, uri, loadref_run):
28     loaded = set()
29     def loadref(b, u):
30         joined = urlparse.urljoin(b, u)
31         defrg, _ = urlparse.urldefrag(joined)
32         if defrg not in loaded:
33             loaded.add(defrg)
34             # Use fetch_text to get raw file (before preprocessing).
35             text = document_loader.fetch_text(defrg)
36             if isinstance(text, bytes):
37                 textIO = StringIO(text.decode('utf-8'))
38             else:
39                 textIO = StringIO(text)
40             return yaml.safe_load(textIO)
41         else:
42             return {}
43
44     if loadref_run:
45         loadref_fields = set(("$import", "run"))
46     else:
47         loadref_fields = set(("$import",))
48
49     scanobj = workflowobj
50     if "id" in workflowobj:
51         # Need raw file content (before preprocessing) to ensure
52         # that external references in $include and $mixin are captured.
53         scanobj = loadref("", workflowobj["id"])
54
55     sc = scandeps(uri, scanobj,
56                   loadref_fields,
57                   set(("$include", "$schemas")),
58                   loadref)
59
60     files = []
61     def visitFiles(path):
62         files.append(path)
63
64     adjustFileObjs(sc, visitFiles)
65     adjustDirObjs(sc, visitFiles)
66
67     normalizeFilesDirs(files)
68
69     if "id" in workflowobj:
70         files.append({"class": "File", "location": workflowobj["id"]})
71
72     mapper = ArvPathMapper(arvrunner, files, "",
73                            "keep:%s",
74                            "keep:%s/%s",
75                            name=name)
76
77     def setloc(p):
78         p["location"] = mapper.mapper(p["location"]).target
79     adjustFileObjs(workflowobj, setloc)
80     adjustDirObjs(workflowobj, setloc)
81
82     return mapper
83
84
85 def upload_docker(arvrunner, tool):
86     if isinstance(tool, CommandLineTool):
87         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
88         if docker_req:
89             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
90     elif isinstance(tool, cwltool.workflow.Workflow):
91         for s in tool.steps:
92             upload_docker(arvrunner, s.embedded_tool)
93
94
95 class Runner(object):
96     def __init__(self, runner, tool, job_order, enable_reuse):
97         self.arvrunner = runner
98         self.tool = tool
99         self.job_order = job_order
100         self.running = False
101         self.enable_reuse = enable_reuse
102         self.uuid = None
103
104     def update_pipeline_component(self, record):
105         pass
106
107     def arvados_job_spec(self, *args, **kwargs):
108         upload_docker(self.arvrunner, self.tool)
109
110         self.name = os.path.basename(self.tool.tool["id"])
111
112         workflowmapper = upload_dependencies(self.arvrunner,
113                                              self.name,
114                                              self.tool.doc_loader,
115                                              self.tool.tool,
116                                              self.tool.tool["id"],
117                                              True)
118
119         jobmapper = upload_dependencies(self.arvrunner,
120                                         os.path.basename(self.job_order.get("id", "#")),
121                                         self.tool.doc_loader,
122                                         self.job_order,
123                                         self.job_order.get("id", "#"),
124                                         False)
125
126         if "id" in self.job_order:
127             del self.job_order["id"]
128
129         return workflowmapper
130
131
132     def done(self, record):
133         if record["state"] == "Complete":
134             if record.get("exit_code") is not None:
135                 if record["exit_code"] == 33:
136                     processStatus = "UnsupportedRequirement"
137                 elif record["exit_code"] == 0:
138                     processStatus = "success"
139                 else:
140                     processStatus = "permanentFail"
141             else:
142                 processStatus = "success"
143         else:
144             processStatus = "permanentFail"
145
146         outputs = None
147         try:
148             try:
149                 outc = arvados.collection.Collection(record["output"])
150                 with outc.open("cwl.output.json") as f:
151                     outputs = json.load(f)
152                 def keepify(fileobj):
153                     path = fileobj["location"]
154                     if not path.startswith("keep:"):
155                         fileobj["location"] = "keep:%s/%s" % (record["output"], path)
156                 adjustFileObjs(outputs, keepify)
157                 adjustDirObjs(outputs, keepify)
158             except Exception as e:
159                 logger.error("While getting final output object: %s", e)
160             self.arvrunner.output_callback(outputs, processStatus)
161         finally:
162             del self.arvrunner.processes[record["uuid"]]