3 from functools import partial
7 from cStringIO import StringIO
9 from schema_salad.sourceline import SourceLine
11 import cwltool.draft2tool
12 from cwltool.draft2tool import CommandLineTool
13 import cwltool.workflow
14 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
15 from cwltool.load_tool import fetch_document
16 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
17 from cwltool.utils import aslist
18 from cwltool.builder import substitute
20 import arvados.collection
21 import ruamel.yaml as yaml
23 from .arvdocker import arv_docker_get_image
24 from .pathmapper import ArvPathMapper
25 from ._version import __version__
27 logger = logging.getLogger('arvados.cwl-runner')
29 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
31 def trim_listing(obj):
32 """Remove 'listing' field from Directory objects that are keep references.
34 When Directory objects represent Keep references, it redundant and
35 potentially very expensive to pass fully enumerated Directory objects
36 between instances of cwl-runner (e.g. a submitting a job, or using the
37 RunInSingleContainer feature), so delete the 'listing' field when it is
41 if obj.get("location", "").startswith("keep:") and "listing" in obj:
43 if obj.get("location", "").startswith("_:"):
46 def upload_dependencies(arvrunner, name, document_loader,
47 workflowobj, uri, loadref_run):
48 """Upload the dependencies of the workflowobj document to Keep.
50 Returns a pathmapper object mapping local paths to keep references. Also
51 does an in-place update of references in "workflowobj".
53 Use scandeps to find $import, $include, $schemas, run, File and Directory
54 fields that represent external references.
56 If workflowobj has an "id" field, this will reload the document to ensure
57 it is scanning the raw document prior to preprocessing.
62 joined = document_loader.fetcher.urljoin(b, u)
63 defrg, _ = urlparse.urldefrag(joined)
64 if defrg not in loaded:
66 # Use fetch_text to get raw file (before preprocessing).
67 text = document_loader.fetch_text(defrg)
68 if isinstance(text, bytes):
69 textIO = StringIO(text.decode('utf-8'))
71 textIO = StringIO(text)
72 return yaml.safe_load(textIO)
77 loadref_fields = set(("$import", "run"))
79 loadref_fields = set(("$import",))
82 if "id" in workflowobj:
83 # Need raw file content (before preprocessing) to ensure
84 # that external references in $include and $mixin are captured.
85 scanobj = loadref("", workflowobj["id"])
87 sc = scandeps(uri, scanobj,
89 set(("$include", "$schemas", "location")),
90 loadref, urljoin=document_loader.fetcher.urljoin)
92 normalizeFilesDirs(sc)
94 if "id" in workflowobj:
95 sc.append({"class": "File", "location": workflowobj["id"]})
97 mapper = ArvPathMapper(arvrunner, sc, "",
103 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
104 p["location"] = mapper.mapper(p["location"]).resolved
105 adjustFileObjs(workflowobj, setloc)
106 adjustDirObjs(workflowobj, setloc)
111 def upload_docker(arvrunner, tool):
112 if isinstance(tool, CommandLineTool):
113 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
115 if docker_req.get("dockerOutputDirectory"):
116 # TODO: can be supported by containers API, but not jobs API.
117 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
118 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
119 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
120 elif isinstance(tool, cwltool.workflow.Workflow):
122 upload_docker(arvrunner, s.embedded_tool)
124 def upload_instance(arvrunner, name, tool, job_order):
125 upload_docker(arvrunner, tool)
127 for t in tool.tool["inputs"]:
128 def setSecondary(fileobj):
129 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
130 if "secondaryFiles" not in fileobj:
131 fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
133 if isinstance(fileobj, list):
137 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
138 setSecondary(job_order[shortname(t["id"])])
140 workflowmapper = upload_dependencies(arvrunner,
146 jobmapper = upload_dependencies(arvrunner,
147 os.path.basename(job_order.get("id", "#")),
150 job_order.get("id", "#"),
153 if "id" in job_order:
156 return workflowmapper
158 def arvados_jobs_image(arvrunner):
159 img = "arvados/jobs:"+__version__
161 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
162 except Exception as e:
163 raise Exception("Docker image %s is not available\n%s" % (img, e) )
166 class Runner(object):
167 def __init__(self, runner, tool, job_order, enable_reuse,
168 output_name, output_tags, submit_runner_ram=0,
170 self.arvrunner = runner
172 self.job_order = job_order
174 self.enable_reuse = enable_reuse
176 self.final_output = None
177 self.output_name = output_name
178 self.output_tags = output_tags
181 if submit_runner_ram:
182 self.submit_runner_ram = submit_runner_ram
184 self.submit_runner_ram = 1024
186 if self.submit_runner_ram <= 0:
187 raise Exception("Value of --submit-runner-ram must be greater than zero")
189 def update_pipeline_component(self, record):
192 def arvados_job_spec(self, *args, **kwargs):
193 if self.name is None:
194 self.name = self.tool.tool.get("label") or os.path.basename(self.tool.tool["id"])
196 # Need to filter this out, gets added by cwltool when providing
197 # parameters on the command line.
198 if "job_order" in self.job_order:
199 del self.job_order["job_order"]
201 workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
202 adjustDirObjs(self.job_order, trim_listing)
203 return workflowmapper
205 def done(self, record):
206 if record["state"] == "Complete":
207 if record.get("exit_code") is not None:
208 if record["exit_code"] == 33:
209 processStatus = "UnsupportedRequirement"
210 elif record["exit_code"] == 0:
211 processStatus = "success"
213 processStatus = "permanentFail"
215 processStatus = "success"
217 processStatus = "permanentFail"
222 self.final_output = record["output"]
223 outc = arvados.collection.CollectionReader(self.final_output,
224 api_client=self.arvrunner.api,
225 keep_client=self.arvrunner.keep_client,
226 num_retries=self.arvrunner.num_retries)
227 if "cwl.output.json" in outc:
228 with outc.open("cwl.output.json") as f:
230 outputs = json.load(f)
231 def keepify(fileobj):
232 path = fileobj["location"]
233 if not path.startswith("keep:"):
234 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
235 adjustFileObjs(outputs, keepify)
236 adjustDirObjs(outputs, keepify)
237 except Exception as e:
238 logger.exception("While getting final output object: %s", e)
239 self.arvrunner.output_callback(outputs, processStatus)
241 if record["uuid"] in self.arvrunner.processes:
242 del self.arvrunner.processes[record["uuid"]]