10383: Now --update-collection is not mutually exclusive with --resume and --no-resume.
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7 from cStringIO import StringIO
8
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
15
16 import arvados.collection
17 import ruamel.yaml as yaml
18
19 from .arvdocker import arv_docker_get_image
20 from .pathmapper import ArvPathMapper
21 from ._version import __version__
22
23 logger = logging.getLogger('arvados.cwl-runner')
24
25 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
26
27 def trim_listing(obj):
28     """Remove 'listing' field from Directory objects that are keep references.
29
30     When Directory objects represent Keep references, it redundant and
31     potentially very expensive to pass fully enumerated Directory objects
32     between instances of cwl-runner (e.g. a submitting a job, or using the
33     RunInSingleContainer feature), so delete the 'listing' field when it is
34     safe to do so.
35     """
36
37     if obj.get("location", "").startswith("keep:") and "listing" in obj:
38         del obj["listing"]
39     if obj.get("location", "").startswith("_:"):
40         del obj["location"]
41
42 def upload_dependencies(arvrunner, name, document_loader,
43                         workflowobj, uri, loadref_run):
44     """Upload the dependencies of the workflowobj document to Keep.
45
46     Returns a pathmapper object mapping local paths to keep references.  Also
47     does an in-place update of references in "workflowobj".
48
49     Use scandeps to find $import, $include, $schemas, run, File and Directory
50     fields that represent external references.
51
52     If workflowobj has an "id" field, this will reload the document to ensure
53     it is scanning the raw document prior to preprocessing.
54     """
55
56     loaded = set()
57     def loadref(b, u):
58         joined = urlparse.urljoin(b, u)
59         defrg, _ = urlparse.urldefrag(joined)
60         if defrg not in loaded:
61             loaded.add(defrg)
62             # Use fetch_text to get raw file (before preprocessing).
63             text = document_loader.fetch_text(defrg)
64             if isinstance(text, bytes):
65                 textIO = StringIO(text.decode('utf-8'))
66             else:
67                 textIO = StringIO(text)
68             return yaml.safe_load(textIO)
69         else:
70             return {}
71
72     if loadref_run:
73         loadref_fields = set(("$import", "run"))
74     else:
75         loadref_fields = set(("$import",))
76
77     scanobj = workflowobj
78     if "id" in workflowobj:
79         # Need raw file content (before preprocessing) to ensure
80         # that external references in $include and $mixin are captured.
81         scanobj = loadref("", workflowobj["id"])
82
83     sc = scandeps(uri, scanobj,
84                   loadref_fields,
85                   set(("$include", "$schemas", "location")),
86                   loadref)
87
88     normalizeFilesDirs(sc)
89
90     if "id" in workflowobj:
91         sc.append({"class": "File", "location": workflowobj["id"]})
92
93     mapper = ArvPathMapper(arvrunner, sc, "",
94                            "keep:%s",
95                            "keep:%s/%s",
96                            name=name)
97
98     def setloc(p):
99         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
100             p["location"] = mapper.mapper(p["location"]).resolved
101     adjustFileObjs(workflowobj, setloc)
102     adjustDirObjs(workflowobj, setloc)
103
104     return mapper
105
106
107 def upload_docker(arvrunner, tool):
108     if isinstance(tool, CommandLineTool):
109         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
110         if docker_req:
111             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
112     elif isinstance(tool, cwltool.workflow.Workflow):
113         for s in tool.steps:
114             upload_docker(arvrunner, s.embedded_tool)
115
116 def upload_instance(arvrunner, name, tool, job_order):
117         upload_docker(arvrunner, tool)
118
119         workflowmapper = upload_dependencies(arvrunner,
120                                              name,
121                                              tool.doc_loader,
122                                              tool.tool,
123                                              tool.tool["id"],
124                                              True)
125
126         jobmapper = upload_dependencies(arvrunner,
127                                         os.path.basename(job_order.get("id", "#")),
128                                         tool.doc_loader,
129                                         job_order,
130                                         job_order.get("id", "#"),
131                                         False)
132
133         if "id" in job_order:
134             del job_order["id"]
135
136         return workflowmapper
137
138 def arvados_jobs_image(arvrunner):
139     img = "arvados/jobs:"+__version__
140     try:
141         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
142     except Exception as e:
143         raise Exception("Docker image %s is not available\n%s" % (img, e) )
144     return img
145
146 class Runner(object):
147     def __init__(self, runner, tool, job_order, enable_reuse, output_name):
148         self.arvrunner = runner
149         self.tool = tool
150         self.job_order = job_order
151         self.running = False
152         self.enable_reuse = enable_reuse
153         self.uuid = None
154         self.final_output = None
155         self.output_name = output_name
156
157     def update_pipeline_component(self, record):
158         pass
159
160     def arvados_job_spec(self, *args, **kwargs):
161         self.name = os.path.basename(self.tool.tool["id"])
162         workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
163         adjustDirObjs(self.job_order, trim_listing)
164         return workflowmapper
165
166     def done(self, record):
167         if record["state"] == "Complete":
168             if record.get("exit_code") is not None:
169                 if record["exit_code"] == 33:
170                     processStatus = "UnsupportedRequirement"
171                 elif record["exit_code"] == 0:
172                     processStatus = "success"
173                 else:
174                     processStatus = "permanentFail"
175             else:
176                 processStatus = "success"
177         else:
178             processStatus = "permanentFail"
179
180         outputs = None
181         try:
182             try:
183                 self.final_output = record["output"]
184                 outc = arvados.collection.CollectionReader(self.final_output,
185                                                            api_client=self.arvrunner.api,
186                                                            keep_client=self.arvrunner.keep_client,
187                                                            num_retries=self.arvrunner.num_retries)
188                 with outc.open("cwl.output.json") as f:
189                     outputs = json.load(f)
190                 def keepify(fileobj):
191                     path = fileobj["location"]
192                     if not path.startswith("keep:"):
193                         fileobj["location"] = "keep:%s/%s" % (record["output"], path)
194                 adjustFileObjs(outputs, keepify)
195                 adjustDirObjs(outputs, keepify)
196             except Exception as e:
197                 logger.error("While getting final output object: %s", e)
198             self.arvrunner.output_callback(outputs, processStatus)
199         finally:
200             del self.arvrunner.processes[record["uuid"]]