1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
7 from functools import partial
11 from collections import namedtuple
13 from StringIO import StringIO
15 from schema_salad.sourceline import SourceLine, cmap
17 from cwltool.command_line_tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
26 import arvados.collection
27 import ruamel.yaml as yaml
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
34 logger = logging.getLogger('arvados.cwl-runner')
36 def trim_anonymous_location(obj):
37 """Remove 'location' field from File and Directory literals.
39 To make internal handling easier, literals are assigned a random id for
40 'location'. However, when writing the record back out, this can break
41 reproducibility. Since it is valid for literals not have a 'location'
46 if obj.get("location", "").startswith("_:"):
50 def remove_redundant_fields(obj):
51 for field in ("path", "nameext", "nameroot", "dirname"):
56 def find_defaults(d, op):
57 if isinstance(d, list):
60 elif isinstance(d, dict):
64 for i in d.itervalues():
68 def discover_secondary_files(inputs, job_order, discovered=None):
70 def setSecondary(fileobj):
71 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
72 if "secondaryFiles" not in fileobj:
73 fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
74 if discovered is not None:
75 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
77 if isinstance(fileobj, list):
81 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
82 setSecondary(job_order[shortname(t["id"])])
85 def upload_dependencies(arvrunner, name, document_loader,
86 workflowobj, uri, loadref_run,
87 include_primary=True, discovered_secondaryfiles=None):
88 """Upload the dependencies of the workflowobj document to Keep.
90 Returns a pathmapper object mapping local paths to keep references. Also
91 does an in-place update of references in "workflowobj".
93 Use scandeps to find $import, $include, $schemas, run, File and Directory
94 fields that represent external references.
96 If workflowobj has an "id" field, this will reload the document to ensure
97 it is scanning the raw document prior to preprocessing.
102 joined = document_loader.fetcher.urljoin(b, u)
103 defrg, _ = urlparse.urldefrag(joined)
104 if defrg not in loaded:
106 # Use fetch_text to get raw file (before preprocessing).
107 text = document_loader.fetch_text(defrg)
108 if isinstance(text, bytes):
109 textIO = StringIO(text.decode('utf-8'))
111 textIO = StringIO(text)
112 return yaml.safe_load(textIO)
117 loadref_fields = set(("$import", "run"))
119 loadref_fields = set(("$import",))
121 scanobj = workflowobj
122 if "id" in workflowobj:
123 # Need raw file content (before preprocessing) to ensure
124 # that external references in $include and $mixin are captured.
125 scanobj = loadref("", workflowobj["id"])
127 sc = scandeps(uri, scanobj,
129 set(("$include", "$schemas", "location")),
130 loadref, urljoin=document_loader.fetcher.urljoin)
132 normalizeFilesDirs(sc)
134 if include_primary and "id" in workflowobj:
135 sc.append({"class": "File", "location": workflowobj["id"]})
137 if "$schemas" in workflowobj:
138 for s in workflowobj["$schemas"]:
139 sc.append({"class": "File", "location": s})
141 def visit_default(obj):
143 def ensure_default_location(f):
144 if "location" not in f and "path" in f:
145 f["location"] = f["path"]
147 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
148 # Doesn't exist, remove from list of dependencies to upload
149 sc[:] = [x for x in sc if x["location"] != f["location"]]
150 # Delete "default" from workflowobj
152 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
156 find_defaults(workflowobj, visit_default)
159 def discover_default_secondary_files(obj):
160 discover_secondary_files(obj["inputs"],
161 {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
164 visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
167 sc.extend(discovered[d])
169 mapper = ArvPathMapper(arvrunner, sc, "",
173 single_collection=True)
176 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
177 p["location"] = mapper.mapper(p["location"]).resolved
179 visit_class(workflowobj, ("File", "Directory"), setloc)
180 visit_class(discovered, ("File", "Directory"), setloc)
182 if discovered_secondaryfiles is not None:
184 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
186 if "$schemas" in workflowobj:
188 for s in workflowobj["$schemas"]:
189 sch.append(mapper.mapper(s).resolved)
190 workflowobj["$schemas"] = sch
195 def upload_docker(arvrunner, tool):
196 """Uploads Docker images used in CommandLineTool objects."""
198 if isinstance(tool, CommandLineTool):
199 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
201 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
202 # TODO: can be supported by containers API, but not jobs API.
203 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
204 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
205 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
207 arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
208 elif isinstance(tool, cwltool.workflow.Workflow):
210 upload_docker(arvrunner, s.embedded_tool)
213 def packed_workflow(arvrunner, tool, merged_map):
214 """Create a packed workflow.
216 A "packed" workflow is one where all the components have been combined into a single document."""
219 packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
220 tool.tool["id"], tool.metadata, rewrite_out=rewrites)
223 for k,v in rewrites.items():
224 rewrite_to_orig[v] = k
226 def visit(v, cur_id):
227 if isinstance(v, dict):
228 if v.get("class") in ("CommandLineTool", "Workflow"):
229 cur_id = rewrite_to_orig.get(v["id"], v["id"])
230 if "location" in v and not v["location"].startswith("keep:"):
231 v["location"] = merged_map[cur_id].resolved[v["location"]]
232 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
233 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
236 if isinstance(v, list):
243 def tag_git_version(packed):
244 if tool.tool["id"].startswith("file://"):
245 path = os.path.dirname(tool.tool["id"][7:])
247 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
248 except (OSError, subprocess.CalledProcessError):
251 packed["http://schema.org/version"] = githash
254 def upload_job_order(arvrunner, name, tool, job_order):
255 """Upload local files referenced in the input object and return updated input
256 object with 'location' updated to the proper keep references.
259 discover_secondary_files(tool.tool["inputs"], job_order)
261 jobmapper = upload_dependencies(arvrunner,
265 job_order.get("id", "#"),
268 if "id" in job_order:
271 # Need to filter this out, gets added by cwltool when providing
272 # parameters on the command line.
273 if "job_order" in job_order:
274 del job_order["job_order"]
278 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
280 def upload_workflow_deps(arvrunner, tool):
281 # Ensure that Docker images needed by this workflow are available
283 upload_docker(arvrunner, tool)
285 document_loader = tool.doc_loader
289 def upload_tool_deps(deptool):
291 discovered_secondaryfiles = {}
292 pm = upload_dependencies(arvrunner,
293 "%s dependencies" % (shortname(deptool["id"])),
298 include_primary=False,
299 discovered_secondaryfiles=discovered_secondaryfiles)
300 document_loader.idx[deptool["id"]] = deptool
302 for k,v in pm.items():
303 toolmap[k] = v.resolved
304 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
306 tool.visit(upload_tool_deps)
310 def arvados_jobs_image(arvrunner, img):
311 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
314 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
315 except Exception as e:
316 raise Exception("Docker image %s is not available\n%s" % (img, e) )
319 def upload_workflow_collection(arvrunner, name, packed):
320 collection = arvados.collection.Collection(api_client=arvrunner.api,
321 keep_client=arvrunner.keep_client,
322 num_retries=arvrunner.num_retries)
323 with collection.open("workflow.cwl", "w") as f:
324 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
326 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
327 ["name", "like", name+"%"]]
328 if arvrunner.project_uuid:
329 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
330 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
333 logger.info("Using collection %s", exists["items"][0]["uuid"])
335 collection.save_new(name=name,
336 owner_uuid=arvrunner.project_uuid,
337 ensure_unique_name=True,
338 num_retries=arvrunner.num_retries)
339 logger.info("Uploaded to %s", collection.manifest_locator())
341 return collection.portable_data_hash()
344 class Runner(object):
345 """Base class for runner processes, which submit an instance of
346 arvados-cwl-runner and wait for the final result."""
348 def __init__(self, runner, tool, job_order, enable_reuse,
349 output_name, output_tags, submit_runner_ram=0,
350 name=None, on_error=None, submit_runner_image=None,
351 intermediate_output_ttl=0, merged_map=None, priority=None,
353 self.arvrunner = runner
355 self.job_order = job_order
358 # If reuse is permitted by command line arguments but
359 # disabled by the workflow itself, disable it.
360 reuse_req, _ = get_feature(self.tool, "http://arvados.org/cwl#ReuseRequirement")
362 enable_reuse = reuse_req["enableReuse"]
363 self.enable_reuse = enable_reuse
365 self.final_output = None
366 self.output_name = output_name
367 self.output_tags = output_tags
369 self.on_error = on_error
370 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
371 self.intermediate_output_ttl = intermediate_output_ttl
372 self.priority = priority
373 self.secret_store = secret_store
375 if submit_runner_ram:
376 self.submit_runner_ram = submit_runner_ram
378 self.submit_runner_ram = 3000
380 if self.submit_runner_ram <= 0:
381 raise Exception("Value of --submit-runner-ram must be greater than zero")
383 self.merged_map = merged_map or {}
385 def update_pipeline_component(self, record):
388 def done(self, record):
389 """Base method for handling a completed runner."""
392 if record["state"] == "Complete":
393 if record.get("exit_code") is not None:
394 if record["exit_code"] == 33:
395 processStatus = "UnsupportedRequirement"
396 elif record["exit_code"] == 0:
397 processStatus = "success"
399 processStatus = "permanentFail"
401 processStatus = "success"
403 processStatus = "permanentFail"
407 if processStatus == "permanentFail":
408 logc = arvados.collection.CollectionReader(record["log"],
409 api_client=self.arvrunner.api,
410 keep_client=self.arvrunner.keep_client,
411 num_retries=self.arvrunner.num_retries)
412 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
414 self.final_output = record["output"]
415 outc = arvados.collection.CollectionReader(self.final_output,
416 api_client=self.arvrunner.api,
417 keep_client=self.arvrunner.keep_client,
418 num_retries=self.arvrunner.num_retries)
419 if "cwl.output.json" in outc:
420 with outc.open("cwl.output.json") as f:
422 outputs = json.load(f)
423 def keepify(fileobj):
424 path = fileobj["location"]
425 if not path.startswith("keep:"):
426 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
427 adjustFileObjs(outputs, keepify)
428 adjustDirObjs(outputs, keepify)
429 except Exception as e:
430 logger.exception("[%s] While getting final output object: %s", self.name, e)
431 self.arvrunner.output_callback({}, "permanentFail")
433 self.arvrunner.output_callback(outputs, processStatus)