Fix typo.
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7 from cStringIO import StringIO
8
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
15
16 import arvados.collection
17 import ruamel.yaml as yaml
18
19 from .arvdocker import arv_docker_get_image
20 from .pathmapper import ArvPathMapper
21
22 logger = logging.getLogger('arvados.cwl-runner')
23
24 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
25
26 def upload_dependencies(arvrunner, name, document_loader,
27                         workflowobj, uri, loadref_run):
28     """Upload the dependencies of the workflowobj document to Keep.
29
30     Returns a pathmapper object mapping local paths to keep references.  Also
31     does an in-place update of references in "workflowobj".
32
33     Use scandeps to find $import, $include, $schemas, run, File and Directory
34     fields that represent external references.
35
36     If workflowobj has an "id" field, this will reload the document to ensure
37     it is scanning the raw document prior to preprocessing.
38     """
39
40     loaded = set()
41     def loadref(b, u):
42         joined = urlparse.urljoin(b, u)
43         defrg, _ = urlparse.urldefrag(joined)
44         if defrg not in loaded:
45             loaded.add(defrg)
46             # Use fetch_text to get raw file (before preprocessing).
47             text = document_loader.fetch_text(defrg)
48             if isinstance(text, bytes):
49                 textIO = StringIO(text.decode('utf-8'))
50             else:
51                 textIO = StringIO(text)
52             return yaml.safe_load(textIO)
53         else:
54             return {}
55
56     if loadref_run:
57         loadref_fields = set(("$import", "run"))
58     else:
59         loadref_fields = set(("$import",))
60
61     scanobj = workflowobj
62     if "id" in workflowobj:
63         # Need raw file content (before preprocessing) to ensure
64         # that external references in $include and $mixin are captured.
65         scanobj = loadref("", workflowobj["id"])
66
67     sc = scandeps(uri, scanobj,
68                   loadref_fields,
69                   set(("$include", "$schemas", "location")),
70                   loadref)
71
72     normalizeFilesDirs(sc)
73
74     if "id" in workflowobj:
75         sc.append({"class": "File", "location": workflowobj["id"]})
76
77     mapper = ArvPathMapper(arvrunner, sc, "",
78                            "keep:%s",
79                            "keep:%s/%s",
80                            name=name)
81
82     def setloc(p):
83         if not p["location"].startswith("_:") and not p["location"].startswith("keep:"):
84             p["location"] = mapper.mapper(p["location"]).target
85     adjustFileObjs(workflowobj, setloc)
86     adjustDirObjs(workflowobj, setloc)
87
88     return mapper
89
90
91 def upload_docker(arvrunner, tool):
92     if isinstance(tool, CommandLineTool):
93         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
94         if docker_req:
95             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
96     elif isinstance(tool, cwltool.workflow.Workflow):
97         for s in tool.steps:
98             upload_docker(arvrunner, s.embedded_tool)
99
100
101 class Runner(object):
102     def __init__(self, runner, tool, job_order, enable_reuse):
103         self.arvrunner = runner
104         self.tool = tool
105         self.job_order = job_order
106         self.running = False
107         self.enable_reuse = enable_reuse
108         self.uuid = None
109
110     def update_pipeline_component(self, record):
111         pass
112
113     def arvados_job_spec(self, *args, **kwargs):
114         upload_docker(self.arvrunner, self.tool)
115
116         self.name = os.path.basename(self.tool.tool["id"])
117
118         workflowmapper = upload_dependencies(self.arvrunner,
119                                              self.name,
120                                              self.tool.doc_loader,
121                                              self.tool.tool,
122                                              self.tool.tool["id"],
123                                              True)
124
125         jobmapper = upload_dependencies(self.arvrunner,
126                                         os.path.basename(self.job_order.get("id", "#")),
127                                         self.tool.doc_loader,
128                                         self.job_order,
129                                         self.job_order.get("id", "#"),
130                                         False)
131
132         if "id" in self.job_order:
133             del self.job_order["id"]
134
135         return workflowmapper
136
137
138     def done(self, record):
139         if record["state"] == "Complete":
140             if record.get("exit_code") is not None:
141                 if record["exit_code"] == 33:
142                     processStatus = "UnsupportedRequirement"
143                 elif record["exit_code"] == 0:
144                     processStatus = "success"
145                 else:
146                     processStatus = "permanentFail"
147             else:
148                 processStatus = "success"
149         else:
150             processStatus = "permanentFail"
151
152         outputs = None
153         try:
154             try:
155                 outc = arvados.collection.Collection(record["output"])
156                 with outc.open("cwl.output.json") as f:
157                     outputs = json.load(f)
158                 def keepify(fileobj):
159                     path = fileobj["location"]
160                     if not path.startswith("keep:"):
161                         fileobj["location"] = "keep:%s/%s" % (record["output"], path)
162                 adjustFileObjs(outputs, keepify)
163                 adjustDirObjs(outputs, keepify)
164             except Exception as e:
165                 logger.error("While getting final output object: %s", e)
166             self.arvrunner.output_callback(outputs, processStatus)
167         finally:
168             del self.arvrunner.processes[record["uuid"]]