3 from functools import partial
7 from cStringIO import StringIO
9 from schema_salad.sourceline import SourceLine
11 import cwltool.draft2tool
12 from cwltool.draft2tool import CommandLineTool
13 import cwltool.workflow
14 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
15 from cwltool.load_tool import fetch_document
16 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
17 from cwltool.utils import aslist
18 from cwltool.builder import substitute
20 import arvados.collection
21 import ruamel.yaml as yaml
23 from .arvdocker import arv_docker_get_image
24 from .pathmapper import ArvPathMapper
25 from ._version import __version__
28 logger = logging.getLogger('arvados.cwl-runner')
30 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
32 def trim_listing(obj):
33 """Remove 'listing' field from Directory objects that are keep references.
35 When Directory objects represent Keep references, it redundant and
36 potentially very expensive to pass fully enumerated Directory objects
37 between instances of cwl-runner (e.g. a submitting a job, or using the
38 RunInSingleContainer feature), so delete the 'listing' field when it is
42 if obj.get("location", "").startswith("keep:") and "listing" in obj:
44 if obj.get("location", "").startswith("_:"):
47 def upload_dependencies(arvrunner, name, document_loader,
48 workflowobj, uri, loadref_run):
49 """Upload the dependencies of the workflowobj document to Keep.
51 Returns a pathmapper object mapping local paths to keep references. Also
52 does an in-place update of references in "workflowobj".
54 Use scandeps to find $import, $include, $schemas, run, File and Directory
55 fields that represent external references.
57 If workflowobj has an "id" field, this will reload the document to ensure
58 it is scanning the raw document prior to preprocessing.
63 joined = document_loader.fetcher.urljoin(b, u)
64 defrg, _ = urlparse.urldefrag(joined)
65 if defrg not in loaded:
67 # Use fetch_text to get raw file (before preprocessing).
68 text = document_loader.fetch_text(defrg)
69 if isinstance(text, bytes):
70 textIO = StringIO(text.decode('utf-8'))
72 textIO = StringIO(text)
73 return yaml.safe_load(textIO)
78 loadref_fields = set(("$import", "run"))
80 loadref_fields = set(("$import",))
83 if "id" in workflowobj:
84 # Need raw file content (before preprocessing) to ensure
85 # that external references in $include and $mixin are captured.
86 scanobj = loadref("", workflowobj["id"])
88 sc = scandeps(uri, scanobj,
90 set(("$include", "$schemas", "location")),
91 loadref, urljoin=document_loader.fetcher.urljoin)
93 normalizeFilesDirs(sc)
95 if "id" in workflowobj:
96 sc.append({"class": "File", "location": workflowobj["id"]})
98 mapper = ArvPathMapper(arvrunner, sc, "",
104 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
105 p["location"] = mapper.mapper(p["location"]).resolved
106 adjustFileObjs(workflowobj, setloc)
107 adjustDirObjs(workflowobj, setloc)
112 def upload_docker(arvrunner, tool):
113 if isinstance(tool, CommandLineTool):
114 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
116 if docker_req.get("dockerOutputDirectory"):
117 # TODO: can be supported by containers API, but not jobs API.
118 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
119 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
120 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
121 elif isinstance(tool, cwltool.workflow.Workflow):
123 upload_docker(arvrunner, s.embedded_tool)
125 def upload_instance(arvrunner, name, tool, job_order):
126 upload_docker(arvrunner, tool)
128 for t in tool.tool["inputs"]:
129 def setSecondary(fileobj):
130 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
131 if "secondaryFiles" not in fileobj:
132 fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
134 if isinstance(fileobj, list):
138 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
139 setSecondary(job_order[shortname(t["id"])])
141 workflowmapper = upload_dependencies(arvrunner,
147 jobmapper = upload_dependencies(arvrunner,
148 os.path.basename(job_order.get("id", "#")),
151 job_order.get("id", "#"),
154 if "id" in job_order:
157 return workflowmapper
159 def arvados_jobs_image(arvrunner):
160 img = "arvados/jobs:"+__version__
162 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
163 except Exception as e:
164 raise Exception("Docker image %s is not available\n%s" % (img, e) )
167 class Runner(object):
168 def __init__(self, runner, tool, job_order, enable_reuse,
169 output_name, output_tags, submit_runner_ram=0,
171 self.arvrunner = runner
173 self.job_order = job_order
175 self.enable_reuse = enable_reuse
177 self.final_output = None
178 self.output_name = output_name
179 self.output_tags = output_tags
182 if submit_runner_ram:
183 self.submit_runner_ram = submit_runner_ram
185 self.submit_runner_ram = 1024
187 if self.submit_runner_ram <= 0:
188 raise Exception("Value of --submit-runner-ram must be greater than zero")
190 def update_pipeline_component(self, record):
193 def arvados_job_spec(self, *args, **kwargs):
194 if self.name is None:
195 self.name = self.tool.tool.get("label") or os.path.basename(self.tool.tool["id"])
197 # Need to filter this out, gets added by cwltool when providing
198 # parameters on the command line.
199 if "job_order" in self.job_order:
200 del self.job_order["job_order"]
202 workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
203 adjustDirObjs(self.job_order, trim_listing)
204 return workflowmapper
206 def done(self, record):
208 if record["state"] == "Complete":
209 if record.get("exit_code") is not None:
210 if record["exit_code"] == 33:
211 processStatus = "UnsupportedRequirement"
212 elif record["exit_code"] == 0:
213 processStatus = "success"
215 processStatus = "permanentFail"
217 processStatus = "success"
219 processStatus = "permanentFail"
223 if processStatus == "permanentFail":
224 logc = arvados.collection.CollectionReader(record["log"],
225 api_client=self.arvrunner.api,
226 keep_client=self.arvrunner.keep_client,
227 num_retries=self.arvrunner.num_retries)
228 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
230 self.final_output = record["output"]
231 outc = arvados.collection.CollectionReader(self.final_output,
232 api_client=self.arvrunner.api,
233 keep_client=self.arvrunner.keep_client,
234 num_retries=self.arvrunner.num_retries)
235 if "cwl.output.json" in outc:
236 with outc.open("cwl.output.json") as f:
238 outputs = json.load(f)
239 def keepify(fileobj):
240 path = fileobj["location"]
241 if not path.startswith("keep:"):
242 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
243 adjustFileObjs(outputs, keepify)
244 adjustDirObjs(outputs, keepify)
245 except Exception as e:
246 logger.exception("[%s] While getting final output object: %s", self.name, e)
247 self.arvrunner.output_callback({}, "permanentFail")
249 self.arvrunner.output_callback(outputs, processStatus)
251 if record["uuid"] in self.arvrunner.processes:
252 del self.arvrunner.processes[record["uuid"]]