6f157db3c6836c0769e622764fe93b04fe14d398
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7 from cStringIO import StringIO
8
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
15
16 import arvados.collection
17 import ruamel.yaml as yaml
18
19 from .arvdocker import arv_docker_get_image
20 from .pathmapper import ArvPathMapper
21 from ._version import __version__
22
23 logger = logging.getLogger('arvados.cwl-runner')
24
25 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
26
27 def trim_listing(obj):
28     """Remove 'listing' field from Directory objects that are keep references.
29
30     When Directory objects represent Keep references, it redundant and
31     potentially very expensive to pass fully enumerated Directory objects
32     between instances of cwl-runner (e.g. a submitting a job, or using the
33     RunInSingleContainer feature), so delete the 'listing' field when it is
34     safe to do so.
35     """
36
37     if obj.get("location", "").startswith("keep:") and "listing" in obj:
38         del obj["listing"]
39     if obj.get("location", "").startswith("_:"):
40         del obj["location"]
41
42 def upload_dependencies(arvrunner, name, document_loader,
43                         workflowobj, uri, loadref_run):
44     """Upload the dependencies of the workflowobj document to Keep.
45
46     Returns a pathmapper object mapping local paths to keep references.  Also
47     does an in-place update of references in "workflowobj".
48
49     Use scandeps to find $import, $include, $schemas, run, File and Directory
50     fields that represent external references.
51
52     If workflowobj has an "id" field, this will reload the document to ensure
53     it is scanning the raw document prior to preprocessing.
54     """
55
56     loaded = set()
57     def loadref(b, u):
58         joined = urlparse.urljoin(b, u)
59         defrg, _ = urlparse.urldefrag(joined)
60         if defrg not in loaded:
61             loaded.add(defrg)
62             # Use fetch_text to get raw file (before preprocessing).
63             text = document_loader.fetch_text(defrg)
64             if isinstance(text, bytes):
65                 textIO = StringIO(text.decode('utf-8'))
66             else:
67                 textIO = StringIO(text)
68             return yaml.safe_load(textIO)
69         else:
70             return {}
71
72     if loadref_run:
73         loadref_fields = set(("$import", "run"))
74     else:
75         loadref_fields = set(("$import",))
76
77     scanobj = workflowobj
78     if "id" in workflowobj:
79         # Need raw file content (before preprocessing) to ensure
80         # that external references in $include and $mixin are captured.
81         scanobj = loadref("", workflowobj["id"])
82
83     sc = scandeps(uri, scanobj,
84                   loadref_fields,
85                   set(("$include", "$schemas", "location")),
86                   loadref)
87
88     normalizeFilesDirs(sc)
89
90     if "id" in workflowobj:
91         sc.append({"class": "File", "location": workflowobj["id"]})
92
93     mapper = ArvPathMapper(arvrunner, sc, "",
94                            "keep:%s",
95                            "keep:%s/%s",
96                            name=name)
97
98     def setloc(p):
99         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
100             p["location"] = mapper.mapper(p["location"]).resolved
101     adjustFileObjs(workflowobj, setloc)
102     adjustDirObjs(workflowobj, setloc)
103
104     return mapper
105
106
107 def upload_docker(arvrunner, tool):
108     if isinstance(tool, CommandLineTool):
109         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
110         if docker_req:
111             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
112     elif isinstance(tool, cwltool.workflow.Workflow):
113         for s in tool.steps:
114             upload_docker(arvrunner, s.embedded_tool)
115
116 def upload_instance(arvrunner, name, tool, job_order):
117         upload_docker(arvrunner, tool)
118
119         workflowmapper = upload_dependencies(arvrunner,
120                                              name,
121                                              tool.doc_loader,
122                                              tool.tool,
123                                              tool.tool["id"],
124                                              True)
125         jobmapper = upload_dependencies(arvrunner,
126                                         os.path.basename(job_order.get("id", "#")),
127                                         tool.doc_loader,
128                                         job_order,
129                                         job_order.get("id", "#"),
130                                         False)
131
132         if "id" in job_order:
133             del job_order["id"]
134
135         return workflowmapper
136
137 def arvados_jobs_image(arvrunner):
138     img = "arvados/jobs:"+__version__
139     try:
140         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
141     except Exception as e:
142         raise Exception("Docker image %s is not available\n%s" % (img, e) )
143     return img
144
145 class Runner(object):
146     def __init__(self, runner, tool, job_order, enable_reuse, output_name):
147         self.arvrunner = runner
148         self.tool = tool
149         self.job_order = job_order
150         self.running = False
151         self.enable_reuse = enable_reuse
152         self.uuid = None
153         self.final_output = None
154         self.output_name = output_name
155
156     def update_pipeline_component(self, record):
157         pass
158
159     def arvados_job_spec(self, *args, **kwargs):
160         self.name = os.path.basename(self.tool.tool["id"])
161         workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
162         adjustDirObjs(self.job_order, trim_listing)
163         return workflowmapper
164
165     def done(self, record):
166         if record["state"] == "Complete":
167             if record.get("exit_code") is not None:
168                 if record["exit_code"] == 33:
169                     processStatus = "UnsupportedRequirement"
170                 elif record["exit_code"] == 0:
171                     processStatus = "success"
172                 else:
173                     processStatus = "permanentFail"
174             else:
175                 processStatus = "success"
176         else:
177             processStatus = "permanentFail"
178
179         outputs = None
180         try:
181             try:
182                 self.final_output = record["output"]
183                 outc = arvados.collection.CollectionReader(self.final_output,
184                                                            api_client=self.arvrunner.api,
185                                                            keep_client=self.arvrunner.keep_client,
186                                                            num_retries=self.arvrunner.num_retries)
187                 with outc.open("cwl.output.json") as f:
188                     outputs = json.load(f)
189                 def keepify(fileobj):
190                     path = fileobj["location"]
191                     if not path.startswith("keep:"):
192                         fileobj["location"] = "keep:%s/%s" % (record["output"], path)
193                 adjustFileObjs(outputs, keepify)
194                 adjustDirObjs(outputs, keepify)
195             except Exception as e:
196                 logger.error("While getting final output object: %s", e)
197             self.arvrunner.output_callback(outputs, processStatus)
198         finally:
199             del self.arvrunner.processes[record["uuid"]]