3 from functools import partial
7 from cStringIO import StringIO
9 from schema_salad.sourceline import SourceLine
11 import cwltool.draft2tool
12 from cwltool.draft2tool import CommandLineTool
13 import cwltool.workflow
14 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
15 from cwltool.load_tool import fetch_document
16 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
17 from cwltool.utils import aslist
18 from cwltool.builder import substitute
20 import arvados.collection
21 import ruamel.yaml as yaml
23 from .arvdocker import arv_docker_get_image
24 from .pathmapper import ArvPathMapper
25 from ._version import __version__
28 logger = logging.getLogger('arvados.cwl-runner')
30 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
32 def trim_listing(obj):
33 """Remove 'listing' field from Directory objects that are keep references.
35 When Directory objects represent Keep references, it redundant and
36 potentially very expensive to pass fully enumerated Directory objects
37 between instances of cwl-runner (e.g. a submitting a job, or using the
38 RunInSingleContainer feature), so delete the 'listing' field when it is
42 if obj.get("location", "").startswith("keep:") and "listing" in obj:
44 if obj.get("location", "").startswith("_:"):
47 def upload_dependencies(arvrunner, name, document_loader,
48 workflowobj, uri, loadref_run):
49 """Upload the dependencies of the workflowobj document to Keep.
51 Returns a pathmapper object mapping local paths to keep references. Also
52 does an in-place update of references in "workflowobj".
54 Use scandeps to find $import, $include, $schemas, run, File and Directory
55 fields that represent external references.
57 If workflowobj has an "id" field, this will reload the document to ensure
58 it is scanning the raw document prior to preprocessing.
63 joined = document_loader.fetcher.urljoin(b, u)
64 defrg, _ = urlparse.urldefrag(joined)
65 if defrg not in loaded:
67 # Use fetch_text to get raw file (before preprocessing).
68 text = document_loader.fetch_text(defrg)
69 if isinstance(text, bytes):
70 textIO = StringIO(text.decode('utf-8'))
72 textIO = StringIO(text)
73 return yaml.safe_load(textIO)
78 loadref_fields = set(("$import", "run"))
80 loadref_fields = set(("$import",))
83 if "id" in workflowobj:
84 # Need raw file content (before preprocessing) to ensure
85 # that external references in $include and $mixin are captured.
86 scanobj = loadref("", workflowobj["id"])
88 sc = scandeps(uri, scanobj,
90 set(("$include", "$schemas", "location")),
91 loadref, urljoin=document_loader.fetcher.urljoin)
93 normalizeFilesDirs(sc)
95 if "id" in workflowobj:
96 sc.append({"class": "File", "location": workflowobj["id"]})
98 mapper = ArvPathMapper(arvrunner, sc, "",
104 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
105 p["location"] = mapper.mapper(p["location"]).resolved
106 adjustFileObjs(workflowobj, setloc)
107 adjustDirObjs(workflowobj, setloc)
112 def upload_docker(arvrunner, tool):
113 if isinstance(tool, CommandLineTool):
114 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
116 if docker_req.get("dockerOutputDirectory"):
117 # TODO: can be supported by containers API, but not jobs API.
118 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
119 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
120 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
121 elif isinstance(tool, cwltool.workflow.Workflow):
123 upload_docker(arvrunner, s.embedded_tool)
125 def upload_instance(arvrunner, name, tool, job_order):
126 upload_docker(arvrunner, tool)
128 for t in tool.tool["inputs"]:
129 def setSecondary(fileobj):
130 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
131 if "secondaryFiles" not in fileobj:
132 fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
134 if isinstance(fileobj, list):
138 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
139 setSecondary(job_order[shortname(t["id"])])
141 workflowmapper = upload_dependencies(arvrunner,
147 jobmapper = upload_dependencies(arvrunner,
148 os.path.basename(job_order.get("id", "#")),
151 job_order.get("id", "#"),
154 if "id" in job_order:
157 return workflowmapper
159 def arvados_jobs_image(arvrunner):
160 img = "arvados/jobs:"+__version__
162 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
163 except Exception as e:
164 raise Exception("Docker image %s is not available\n%s" % (img, e) )
167 class Runner(object):
168 def __init__(self, runner, tool, job_order, enable_reuse,
169 output_name, output_tags, submit_runner_ram=0,
170 name=None, on_error=None):
171 self.arvrunner = runner
173 self.job_order = job_order
175 self.enable_reuse = enable_reuse
177 self.final_output = None
178 self.output_name = output_name
179 self.output_tags = output_tags
181 self.on_error = on_error
183 if submit_runner_ram:
184 self.submit_runner_ram = submit_runner_ram
186 self.submit_runner_ram = 1024
188 if self.submit_runner_ram <= 0:
189 raise Exception("Value of --submit-runner-ram must be greater than zero")
191 def update_pipeline_component(self, record):
194 def arvados_job_spec(self, *args, **kwargs):
195 if self.name is None:
196 self.name = self.tool.tool.get("label") or os.path.basename(self.tool.tool["id"])
198 # Need to filter this out, gets added by cwltool when providing
199 # parameters on the command line.
200 if "job_order" in self.job_order:
201 del self.job_order["job_order"]
203 workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
204 adjustDirObjs(self.job_order, trim_listing)
205 return workflowmapper
207 def done(self, record):
209 if record["state"] == "Complete":
210 if record.get("exit_code") is not None:
211 if record["exit_code"] == 33:
212 processStatus = "UnsupportedRequirement"
213 elif record["exit_code"] == 0:
214 processStatus = "success"
216 processStatus = "permanentFail"
218 processStatus = "success"
220 processStatus = "permanentFail"
224 if processStatus == "permanentFail":
225 logc = arvados.collection.CollectionReader(record["log"],
226 api_client=self.arvrunner.api,
227 keep_client=self.arvrunner.keep_client,
228 num_retries=self.arvrunner.num_retries)
229 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
231 self.final_output = record["output"]
232 outc = arvados.collection.CollectionReader(self.final_output,
233 api_client=self.arvrunner.api,
234 keep_client=self.arvrunner.keep_client,
235 num_retries=self.arvrunner.num_retries)
236 if "cwl.output.json" in outc:
237 with outc.open("cwl.output.json") as f:
239 outputs = json.load(f)
240 def keepify(fileobj):
241 path = fileobj["location"]
242 if not path.startswith("keep:"):
243 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
244 adjustFileObjs(outputs, keepify)
245 adjustDirObjs(outputs, keepify)
246 except Exception as e:
247 logger.exception("[%s] While getting final output object: %s", self.name, e)
248 self.arvrunner.output_callback({}, "permanentFail")
250 self.arvrunner.output_callback(outputs, processStatus)
252 if record["uuid"] in self.arvrunner.processes:
253 del self.arvrunner.processes[record["uuid"]]