1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
7 from functools import partial
11 from collections import namedtuple
13 from StringIO import StringIO
15 from schema_salad.sourceline import SourceLine, cmap
17 from cwltool.command_line_tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
26 import arvados.collection
27 import ruamel.yaml as yaml
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
34 logger = logging.getLogger('arvados.cwl-runner')
36 def trim_anonymous_location(obj):
37 """Remove 'location' field from File and Directory literals.
39 To make internal handling easier, literals are assigned a random id for
40 'location'. However, when writing the record back out, this can break
41 reproducibility. Since it is valid for literals not have a 'location'
46 if obj.get("location", "").startswith("_:"):
50 def remove_redundant_fields(obj):
51 for field in ("path", "nameext", "nameroot", "dirname"):
56 def find_defaults(d, op):
57 if isinstance(d, list):
60 elif isinstance(d, dict):
64 for i in d.itervalues():
67 def setSecondary(t, fileobj, discovered):
68 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
69 if "secondaryFiles" not in fileobj:
70 fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
71 if discovered is not None:
72 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
73 elif isinstance(fileobj, list):
75 setSecondary(t, e, discovered)
77 def discover_secondary_files(inputs, job_order, discovered=None):
79 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
80 setSecondary(t, job_order[shortname(t["id"])], discovered)
83 def upload_dependencies(arvrunner, name, document_loader,
84 workflowobj, uri, loadref_run,
85 include_primary=True, discovered_secondaryfiles=None):
86 """Upload the dependencies of the workflowobj document to Keep.
88 Returns a pathmapper object mapping local paths to keep references. Also
89 does an in-place update of references in "workflowobj".
91 Use scandeps to find $import, $include, $schemas, run, File and Directory
92 fields that represent external references.
94 If workflowobj has an "id" field, this will reload the document to ensure
95 it is scanning the raw document prior to preprocessing.
100 joined = document_loader.fetcher.urljoin(b, u)
101 defrg, _ = urlparse.urldefrag(joined)
102 if defrg not in loaded:
104 # Use fetch_text to get raw file (before preprocessing).
105 text = document_loader.fetch_text(defrg)
106 if isinstance(text, bytes):
107 textIO = StringIO(text.decode('utf-8'))
109 textIO = StringIO(text)
110 return yaml.safe_load(textIO)
115 loadref_fields = set(("$import", "run"))
117 loadref_fields = set(("$import",))
119 scanobj = workflowobj
120 if "id" in workflowobj:
121 # Need raw file content (before preprocessing) to ensure
122 # that external references in $include and $mixin are captured.
123 scanobj = loadref("", workflowobj["id"])
125 sc = scandeps(uri, scanobj,
127 set(("$include", "$schemas", "location")),
128 loadref, urljoin=document_loader.fetcher.urljoin)
130 normalizeFilesDirs(sc)
132 if include_primary and "id" in workflowobj:
133 sc.append({"class": "File", "location": workflowobj["id"]})
135 if "$schemas" in workflowobj:
136 for s in workflowobj["$schemas"]:
137 sc.append({"class": "File", "location": s})
139 def visit_default(obj):
141 def ensure_default_location(f):
142 if "location" not in f and "path" in f:
143 f["location"] = f["path"]
145 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
146 # Doesn't exist, remove from list of dependencies to upload
147 sc[:] = [x for x in sc if x["location"] != f["location"]]
148 # Delete "default" from workflowobj
150 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
154 find_defaults(workflowobj, visit_default)
157 def discover_default_secondary_files(obj):
158 discover_secondary_files(obj["inputs"],
159 {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
162 visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
165 sc.extend(discovered[d])
167 mapper = ArvPathMapper(arvrunner, sc, "",
171 single_collection=True)
174 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
175 p["location"] = mapper.mapper(p["location"]).resolved
177 visit_class(workflowobj, ("File", "Directory"), setloc)
178 visit_class(discovered, ("File", "Directory"), setloc)
180 if discovered_secondaryfiles is not None:
182 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
184 if "$schemas" in workflowobj:
186 for s in workflowobj["$schemas"]:
187 sch.append(mapper.mapper(s).resolved)
188 workflowobj["$schemas"] = sch
193 def upload_docker(arvrunner, tool):
194 """Uploads Docker images used in CommandLineTool objects."""
196 if isinstance(tool, CommandLineTool):
197 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
199 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
200 # TODO: can be supported by containers API, but not jobs API.
201 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
202 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
203 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
205 arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
206 elif isinstance(tool, cwltool.workflow.Workflow):
208 upload_docker(arvrunner, s.embedded_tool)
211 def packed_workflow(arvrunner, tool, merged_map):
212 """Create a packed workflow.
214 A "packed" workflow is one where all the components have been combined into a single document."""
217 packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
218 tool.tool["id"], tool.metadata, rewrite_out=rewrites)
220 rewrite_to_orig = {v: k for k,v in rewrites.items()}
222 def visit(v, cur_id):
223 if isinstance(v, dict):
224 if v.get("class") in ("CommandLineTool", "Workflow"):
226 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
227 cur_id = rewrite_to_orig.get(v["id"], v["id"])
228 if "location" in v and not v["location"].startswith("keep:"):
229 v["location"] = merged_map[cur_id].resolved[v["location"]]
230 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
231 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
234 if isinstance(v, list):
241 def tag_git_version(packed):
242 if tool.tool["id"].startswith("file://"):
243 path = os.path.dirname(tool.tool["id"][7:])
245 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
246 except (OSError, subprocess.CalledProcessError):
249 packed["http://schema.org/version"] = githash
252 def upload_job_order(arvrunner, name, tool, job_order):
253 """Upload local files referenced in the input object and return updated input
254 object with 'location' updated to the proper keep references.
257 discover_secondary_files(tool.tool["inputs"], job_order)
259 jobmapper = upload_dependencies(arvrunner,
263 job_order.get("id", "#"),
266 if "id" in job_order:
269 # Need to filter this out, gets added by cwltool when providing
270 # parameters on the command line.
271 if "job_order" in job_order:
272 del job_order["job_order"]
276 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
278 def upload_workflow_deps(arvrunner, tool):
279 # Ensure that Docker images needed by this workflow are available
281 upload_docker(arvrunner, tool)
283 document_loader = tool.doc_loader
287 def upload_tool_deps(deptool):
289 discovered_secondaryfiles = {}
290 pm = upload_dependencies(arvrunner,
291 "%s dependencies" % (shortname(deptool["id"])),
296 include_primary=False,
297 discovered_secondaryfiles=discovered_secondaryfiles)
298 document_loader.idx[deptool["id"]] = deptool
300 for k,v in pm.items():
301 toolmap[k] = v.resolved
302 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
304 tool.visit(upload_tool_deps)
308 def arvados_jobs_image(arvrunner, img):
309 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
312 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
313 except Exception as e:
314 raise Exception("Docker image %s is not available\n%s" % (img, e) )
317 def upload_workflow_collection(arvrunner, name, packed):
318 collection = arvados.collection.Collection(api_client=arvrunner.api,
319 keep_client=arvrunner.keep_client,
320 num_retries=arvrunner.num_retries)
321 with collection.open("workflow.cwl", "w") as f:
322 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
324 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
325 ["name", "like", name+"%"]]
326 if arvrunner.project_uuid:
327 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
328 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
331 logger.info("Using collection %s", exists["items"][0]["uuid"])
333 collection.save_new(name=name,
334 owner_uuid=arvrunner.project_uuid,
335 ensure_unique_name=True,
336 num_retries=arvrunner.num_retries)
337 logger.info("Uploaded to %s", collection.manifest_locator())
339 return collection.portable_data_hash()
342 class Runner(object):
343 """Base class for runner processes, which submit an instance of
344 arvados-cwl-runner and wait for the final result."""
346 def __init__(self, runner, tool, job_order, enable_reuse,
347 output_name, output_tags, submit_runner_ram=0,
348 name=None, on_error=None, submit_runner_image=None,
349 intermediate_output_ttl=0, merged_map=None, priority=None,
351 self.arvrunner = runner
353 self.job_order = job_order
356 # If reuse is permitted by command line arguments but
357 # disabled by the workflow itself, disable it.
358 reuse_req, _ = get_feature(self.tool, "http://arvados.org/cwl#ReuseRequirement")
360 enable_reuse = reuse_req["enableReuse"]
361 self.enable_reuse = enable_reuse
363 self.final_output = None
364 self.output_name = output_name
365 self.output_tags = output_tags
367 self.on_error = on_error
368 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
369 self.intermediate_output_ttl = intermediate_output_ttl
370 self.priority = priority
371 self.secret_store = secret_store
373 if submit_runner_ram:
374 self.submit_runner_ram = submit_runner_ram
376 self.submit_runner_ram = 3000
378 if self.submit_runner_ram <= 0:
379 raise Exception("Value of --submit-runner-ram must be greater than zero")
381 self.merged_map = merged_map or {}
383 def update_pipeline_component(self, record):
386 def done(self, record):
387 """Base method for handling a completed runner."""
390 if record["state"] == "Complete":
391 if record.get("exit_code") is not None:
392 if record["exit_code"] == 33:
393 processStatus = "UnsupportedRequirement"
394 elif record["exit_code"] == 0:
395 processStatus = "success"
397 processStatus = "permanentFail"
399 processStatus = "success"
401 processStatus = "permanentFail"
405 if processStatus == "permanentFail":
406 logc = arvados.collection.CollectionReader(record["log"],
407 api_client=self.arvrunner.api,
408 keep_client=self.arvrunner.keep_client,
409 num_retries=self.arvrunner.num_retries)
410 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
412 self.final_output = record["output"]
413 outc = arvados.collection.CollectionReader(self.final_output,
414 api_client=self.arvrunner.api,
415 keep_client=self.arvrunner.keep_client,
416 num_retries=self.arvrunner.num_retries)
417 if "cwl.output.json" in outc:
418 with outc.open("cwl.output.json") as f:
420 outputs = json.load(f)
421 def keepify(fileobj):
422 path = fileobj["location"]
423 if not path.startswith("keep:"):
424 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
425 adjustFileObjs(outputs, keepify)
426 adjustDirObjs(outputs, keepify)
427 except Exception as e:
428 logger.exception("[%s] While getting final output object: %s", self.name, e)
429 self.arvrunner.output_callback({}, "permanentFail")
431 self.arvrunner.output_callback(outputs, processStatus)