3 from functools import partial
9 from StringIO import StringIO
11 from schema_salad.sourceline import SourceLine
13 import cwltool.draft2tool
14 from cwltool.draft2tool import CommandLineTool
15 import cwltool.workflow
16 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
17 from cwltool.load_tool import fetch_document
18 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
19 from cwltool.utils import aslist
20 from cwltool.builder import substitute
21 from cwltool.pack import pack
23 import arvados.collection
24 import ruamel.yaml as yaml
26 from .arvdocker import arv_docker_get_image
27 from .pathmapper import ArvPathMapper
28 from ._version import __version__
31 logger = logging.getLogger('arvados.cwl-runner')
33 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
35 def trim_listing(obj):
36 """Remove 'listing' field from Directory objects that are keep references.
38 When Directory objects represent Keep references, it redundant and
39 potentially very expensive to pass fully enumerated Directory objects
40 between instances of cwl-runner (e.g. a submitting a job, or using the
41 RunInSingleContainer feature), so delete the 'listing' field when it is
45 if obj.get("location", "").startswith("keep:") and "listing" in obj:
47 if obj.get("location", "").startswith("_:"):
50 def upload_dependencies(arvrunner, name, document_loader,
51 workflowobj, uri, loadref_run, include_primary=True):
52 """Upload the dependencies of the workflowobj document to Keep.
54 Returns a pathmapper object mapping local paths to keep references. Also
55 does an in-place update of references in "workflowobj".
57 Use scandeps to find $import, $include, $schemas, run, File and Directory
58 fields that represent external references.
60 If workflowobj has an "id" field, this will reload the document to ensure
61 it is scanning the raw document prior to preprocessing.
66 joined = document_loader.fetcher.urljoin(b, u)
67 defrg, _ = urlparse.urldefrag(joined)
68 if defrg not in loaded:
70 # Use fetch_text to get raw file (before preprocessing).
71 text = document_loader.fetch_text(defrg)
72 if isinstance(text, bytes):
73 textIO = StringIO(text.decode('utf-8'))
75 textIO = StringIO(text)
76 return yaml.safe_load(textIO)
81 loadref_fields = set(("$import", "run"))
83 loadref_fields = set(("$import",))
86 if "id" in workflowobj:
87 # Need raw file content (before preprocessing) to ensure
88 # that external references in $include and $mixin are captured.
89 scanobj = loadref("", workflowobj["id"])
91 sc = scandeps(uri, scanobj,
93 set(("$include", "$schemas", "location")),
94 loadref, urljoin=document_loader.fetcher.urljoin)
96 normalizeFilesDirs(sc)
98 if include_primary and "id" in workflowobj:
99 sc.append({"class": "File", "location": workflowobj["id"]})
101 if "$schemas" in workflowobj:
102 for s in workflowobj["$schemas"]:
103 sc.append({"class": "File", "location": s})
105 mapper = ArvPathMapper(arvrunner, sc, "",
111 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
112 p["location"] = mapper.mapper(p["location"]).resolved
113 adjustFileObjs(workflowobj, setloc)
114 adjustDirObjs(workflowobj, setloc)
116 if "$schemas" in workflowobj:
118 for s in workflowobj["$schemas"]:
119 sch.append(mapper.mapper(s).resolved)
120 workflowobj["$schemas"] = sch
125 def upload_docker(arvrunner, tool):
126 """Uploads Docker images used in CommandLineTool objects."""
128 if isinstance(tool, CommandLineTool):
129 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
131 if docker_req.get("dockerOutputDirectory"):
132 # TODO: can be supported by containers API, but not jobs API.
133 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
134 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
135 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
136 elif isinstance(tool, cwltool.workflow.Workflow):
138 upload_docker(arvrunner, s.embedded_tool)
140 def packed_workflow(arvrunner, tool):
141 """Create a packed workflow.
143 A "packed" workflow is one where all the components have been combined into a single document."""
145 return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
146 tool.tool["id"], tool.metadata)
148 def tag_git_version(packed):
149 if tool.tool["id"].startswith("file://"):
150 path = os.path.dirname(tool.tool["id"][7:])
152 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
153 except (OSError, subprocess.CalledProcessError):
156 packed["http://schema.org/version"] = githash
159 def upload_job_order(arvrunner, name, tool, job_order):
160 """Upload local files referenced in the input object and return updated input
161 object with 'location' updated to the proper keep references.
164 for t in tool.tool["inputs"]:
165 def setSecondary(fileobj):
166 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
167 if "secondaryFiles" not in fileobj:
168 fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
170 if isinstance(fileobj, list):
174 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
175 setSecondary(job_order[shortname(t["id"])])
177 jobmapper = upload_dependencies(arvrunner,
181 job_order.get("id", "#"),
184 if "id" in job_order:
187 # Need to filter this out, gets added by cwltool when providing
188 # parameters on the command line.
189 if "job_order" in job_order:
190 del job_order["job_order"]
194 def upload_workflow_deps(arvrunner, tool):
195 # Ensure that Docker images needed by this workflow are available
197 upload_docker(arvrunner, tool)
199 document_loader = tool.doc_loader
201 def upload_tool_deps(deptool):
203 upload_dependencies(arvrunner,
204 "%s dependencies" % (shortname(deptool["id"])),
209 include_primary=False)
210 document_loader.idx[deptool["id"]] = deptool
212 tool.visit(upload_tool_deps)
214 def arvados_jobs_image(arvrunner, img):
215 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
218 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
219 except Exception as e:
220 raise Exception("Docker image %s is not available\n%s" % (img, e) )
223 def upload_workflow_collection(arvrunner, name, packed):
224 collection = arvados.collection.Collection(api_client=arvrunner.api,
225 keep_client=arvrunner.keep_client,
226 num_retries=arvrunner.num_retries)
227 with collection.open("workflow.cwl", "w") as f:
228 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
230 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
231 ["name", "like", name+"%"]]
232 if arvrunner.project_uuid:
233 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
234 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
237 logger.info("Using collection %s", exists["items"][0]["uuid"])
239 collection.save_new(name=name,
240 owner_uuid=arvrunner.project_uuid,
241 ensure_unique_name=True,
242 num_retries=arvrunner.num_retries)
243 logger.info("Uploaded to %s", collection.manifest_locator())
245 return collection.portable_data_hash()
248 class Runner(object):
249 """Base class for runner processes, which submit an instance of
250 arvados-cwl-runner and wait for the final result."""
252 def __init__(self, runner, tool, job_order, enable_reuse,
253 output_name, output_tags, submit_runner_ram=0,
254 name=None, on_error=None, submit_runner_image=None):
255 self.arvrunner = runner
257 self.job_order = job_order
259 self.enable_reuse = enable_reuse
261 self.final_output = None
262 self.output_name = output_name
263 self.output_tags = output_tags
265 self.on_error = on_error
266 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
268 if submit_runner_ram:
269 self.submit_runner_ram = submit_runner_ram
271 self.submit_runner_ram = 1024
273 if self.submit_runner_ram <= 0:
274 raise Exception("Value of --submit-runner-ram must be greater than zero")
276 def update_pipeline_component(self, record):
279 def done(self, record):
280 """Base method for handling a completed runner."""
283 if record["state"] == "Complete":
284 if record.get("exit_code") is not None:
285 if record["exit_code"] == 33:
286 processStatus = "UnsupportedRequirement"
287 elif record["exit_code"] == 0:
288 processStatus = "success"
290 processStatus = "permanentFail"
292 processStatus = "success"
294 processStatus = "permanentFail"
298 if processStatus == "permanentFail":
299 logc = arvados.collection.CollectionReader(record["log"],
300 api_client=self.arvrunner.api,
301 keep_client=self.arvrunner.keep_client,
302 num_retries=self.arvrunner.num_retries)
303 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
305 self.final_output = record["output"]
306 outc = arvados.collection.CollectionReader(self.final_output,
307 api_client=self.arvrunner.api,
308 keep_client=self.arvrunner.keep_client,
309 num_retries=self.arvrunner.num_retries)
310 if "cwl.output.json" in outc:
311 with outc.open("cwl.output.json") as f:
313 outputs = json.load(f)
314 def keepify(fileobj):
315 path = fileobj["location"]
316 if not path.startswith("keep:"):
317 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
318 adjustFileObjs(outputs, keepify)
319 adjustDirObjs(outputs, keepify)
320 except Exception as e:
321 logger.exception("[%s] While getting final output object: %s", self.name, e)
322 self.arvrunner.output_callback({}, "permanentFail")
324 self.arvrunner.output_callback(outputs, processStatus)
326 if record["uuid"] in self.arvrunner.processes:
327 del self.arvrunner.processes[record["uuid"]]