1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
7 from functools import partial
10 import subprocess32 as subprocess
11 from collections import namedtuple
13 from StringIO import StringIO
15 from schema_salad.sourceline import SourceLine, cmap
17 from cwltool.command_line_tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
26 import arvados.collection
27 import ruamel.yaml as yaml
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
34 logger = logging.getLogger('arvados.cwl-runner')
36 def trim_anonymous_location(obj):
37 """Remove 'location' field from File and Directory literals.
39 To make internal handling easier, literals are assigned a random id for
40 'location'. However, when writing the record back out, this can break
41 reproducibility. Since it is valid for literals not have a 'location'
46 if obj.get("location", "").startswith("_:"):
50 def remove_redundant_fields(obj):
51 for field in ("path", "nameext", "nameroot", "dirname"):
56 def find_defaults(d, op):
57 if isinstance(d, list):
60 elif isinstance(d, dict):
64 for i in d.itervalues():
67 def setSecondary(t, fileobj, discovered):
68 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
69 if "secondaryFiles" not in fileobj:
70 fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
71 if discovered is not None:
72 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
73 elif isinstance(fileobj, list):
75 setSecondary(t, e, discovered)
77 def discover_secondary_files(inputs, job_order, discovered=None):
79 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
80 setSecondary(t, job_order[shortname(t["id"])], discovered)
83 def upload_dependencies(arvrunner, name, document_loader,
84 workflowobj, uri, loadref_run,
85 include_primary=True, discovered_secondaryfiles=None):
86 """Upload the dependencies of the workflowobj document to Keep.
88 Returns a pathmapper object mapping local paths to keep references. Also
89 does an in-place update of references in "workflowobj".
91 Use scandeps to find $import, $include, $schemas, run, File and Directory
92 fields that represent external references.
94 If workflowobj has an "id" field, this will reload the document to ensure
95 it is scanning the raw document prior to preprocessing.
100 joined = document_loader.fetcher.urljoin(b, u)
101 defrg, _ = urlparse.urldefrag(joined)
102 if defrg not in loaded:
104 # Use fetch_text to get raw file (before preprocessing).
105 text = document_loader.fetch_text(defrg)
106 if isinstance(text, bytes):
107 textIO = StringIO(text.decode('utf-8'))
109 textIO = StringIO(text)
110 return yaml.safe_load(textIO)
115 loadref_fields = set(("$import", "run"))
117 loadref_fields = set(("$import",))
119 scanobj = workflowobj
120 if "id" in workflowobj:
121 # Need raw file content (before preprocessing) to ensure
122 # that external references in $include and $mixin are captured.
123 scanobj = loadref("", workflowobj["id"])
125 sc_result = scandeps(uri, scanobj,
127 set(("$include", "$schemas", "location")),
128 loadref, urljoin=document_loader.fetcher.urljoin)
132 # Only interested in local files than need to be uploaded,
133 # don't include file literals, keep references, etc.
134 sp = obj.get("location", "").split(":")
135 if len(sp) > 1 and sp[0] in ("file", "http", "https"):
138 visit_class(sc_result, ("File", "Directory"), only_real)
140 normalizeFilesDirs(sc)
142 if include_primary and "id" in workflowobj:
143 sc.append({"class": "File", "location": workflowobj["id"]})
145 if "$schemas" in workflowobj:
146 for s in workflowobj["$schemas"]:
147 sc.append({"class": "File", "location": s})
149 def visit_default(obj):
151 def ensure_default_location(f):
152 if "location" not in f and "path" in f:
153 f["location"] = f["path"]
155 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
156 # Doesn't exist, remove from list of dependencies to upload
157 sc[:] = [x for x in sc if x["location"] != f["location"]]
158 # Delete "default" from workflowobj
160 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
164 find_defaults(workflowobj, visit_default)
167 def discover_default_secondary_files(obj):
168 discover_secondary_files(obj["inputs"],
169 {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
172 visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
174 for d in list(discovered.keys()):
175 # Only interested in discovered secondaryFiles which are local
176 # files that need to be uploaded.
177 if d.startswith("file:"):
178 sc.extend(discovered[d])
182 mapper = ArvPathMapper(arvrunner, sc, "",
186 single_collection=True)
189 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
190 p["location"] = mapper.mapper(p["location"]).resolved
192 visit_class(workflowobj, ("File", "Directory"), setloc)
193 visit_class(discovered, ("File", "Directory"), setloc)
195 if discovered_secondaryfiles is not None:
197 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
199 if "$schemas" in workflowobj:
201 for s in workflowobj["$schemas"]:
202 sch.append(mapper.mapper(s).resolved)
203 workflowobj["$schemas"] = sch
208 def upload_docker(arvrunner, tool):
209 """Uploads Docker images used in CommandLineTool objects."""
211 if isinstance(tool, CommandLineTool):
212 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
214 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
215 # TODO: can be supported by containers API, but not jobs API.
216 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
217 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
218 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
220 arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
221 elif isinstance(tool, cwltool.workflow.Workflow):
223 upload_docker(arvrunner, s.embedded_tool)
226 def packed_workflow(arvrunner, tool, merged_map):
227 """Create a packed workflow.
229 A "packed" workflow is one where all the components have been combined into a single document."""
232 packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
233 tool.tool["id"], tool.metadata, rewrite_out=rewrites)
235 rewrite_to_orig = {v: k for k,v in rewrites.items()}
237 def visit(v, cur_id):
238 if isinstance(v, dict):
239 if v.get("class") in ("CommandLineTool", "Workflow"):
241 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
242 cur_id = rewrite_to_orig.get(v["id"], v["id"])
243 if "location" in v and not v["location"].startswith("keep:"):
244 v["location"] = merged_map[cur_id].resolved[v["location"]]
245 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
246 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
249 if isinstance(v, list):
256 def tag_git_version(packed):
257 if tool.tool["id"].startswith("file://"):
258 path = os.path.dirname(tool.tool["id"][7:])
260 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
261 except (OSError, subprocess.CalledProcessError):
264 packed["http://schema.org/version"] = githash
267 def upload_job_order(arvrunner, name, tool, job_order):
268 """Upload local files referenced in the input object and return updated input
269 object with 'location' updated to the proper keep references.
272 discover_secondary_files(tool.tool["inputs"], job_order)
274 jobmapper = upload_dependencies(arvrunner,
278 job_order.get("id", "#"),
281 if "id" in job_order:
284 # Need to filter this out, gets added by cwltool when providing
285 # parameters on the command line.
286 if "job_order" in job_order:
287 del job_order["job_order"]
291 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
293 def upload_workflow_deps(arvrunner, tool):
294 # Ensure that Docker images needed by this workflow are available
296 upload_docker(arvrunner, tool)
298 document_loader = tool.doc_loader
302 def upload_tool_deps(deptool):
304 discovered_secondaryfiles = {}
305 pm = upload_dependencies(arvrunner,
306 "%s dependencies" % (shortname(deptool["id"])),
311 include_primary=False,
312 discovered_secondaryfiles=discovered_secondaryfiles)
313 document_loader.idx[deptool["id"]] = deptool
315 for k,v in pm.items():
316 toolmap[k] = v.resolved
317 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
319 tool.visit(upload_tool_deps)
323 def arvados_jobs_image(arvrunner, img):
324 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
327 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
328 except Exception as e:
329 raise Exception("Docker image %s is not available\n%s" % (img, e) )
332 def upload_workflow_collection(arvrunner, name, packed):
333 collection = arvados.collection.Collection(api_client=arvrunner.api,
334 keep_client=arvrunner.keep_client,
335 num_retries=arvrunner.num_retries)
336 with collection.open("workflow.cwl", "w") as f:
337 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
339 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
340 ["name", "like", name+"%"]]
341 if arvrunner.project_uuid:
342 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
343 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
346 logger.info("Using collection %s", exists["items"][0]["uuid"])
348 collection.save_new(name=name,
349 owner_uuid=arvrunner.project_uuid,
350 ensure_unique_name=True,
351 num_retries=arvrunner.num_retries)
352 logger.info("Uploaded to %s", collection.manifest_locator())
354 return collection.portable_data_hash()
357 class Runner(object):
358 """Base class for runner processes, which submit an instance of
359 arvados-cwl-runner and wait for the final result."""
361 def __init__(self, runner, tool, job_order, enable_reuse,
362 output_name, output_tags, submit_runner_ram=0,
363 name=None, on_error=None, submit_runner_image=None,
364 intermediate_output_ttl=0, merged_map=None,
365 priority=None, secret_store=None):
366 self.arvrunner = runner
368 self.job_order = job_order
371 # If reuse is permitted by command line arguments but
372 # disabled by the workflow itself, disable it.
373 reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
375 enable_reuse = reuse_req["enableReuse"]
376 self.enable_reuse = enable_reuse
378 self.final_output = None
379 self.output_name = output_name
380 self.output_tags = output_tags
382 self.on_error = on_error
383 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
384 self.intermediate_output_ttl = intermediate_output_ttl
385 self.priority = priority
386 self.secret_store = secret_store
388 self.submit_runner_cores = 1
389 self.submit_runner_ram = 1024 # defaut 1 GiB
391 runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
392 if runner_resource_req:
393 if runner_resource_req.get("coresMin"):
394 self.submit_runner_cores = runner_resource_req["coresMin"]
395 if runner_resource_req.get("ramMin"):
396 self.submit_runner_ram = runner_resource_req["ramMin"]
398 if submit_runner_ram:
399 # Command line / initializer overrides default and/or spec from workflow
400 self.submit_runner_ram = submit_runner_ram
402 if self.submit_runner_ram <= 0:
403 raise Exception("Value of submit-runner-ram must be greater than zero")
405 if self.submit_runner_cores <= 0:
406 raise Exception("Value of submit-runner-cores must be greater than zero")
408 self.merged_map = merged_map or {}
410 def update_pipeline_component(self, record):
413 def done(self, record):
414 """Base method for handling a completed runner."""
417 if record["state"] == "Complete":
418 if record.get("exit_code") is not None:
419 if record["exit_code"] == 33:
420 processStatus = "UnsupportedRequirement"
421 elif record["exit_code"] == 0:
422 processStatus = "success"
424 processStatus = "permanentFail"
426 processStatus = "success"
428 processStatus = "permanentFail"
432 if processStatus == "permanentFail":
433 logc = arvados.collection.CollectionReader(record["log"],
434 api_client=self.arvrunner.api,
435 keep_client=self.arvrunner.keep_client,
436 num_retries=self.arvrunner.num_retries)
437 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
439 self.final_output = record["output"]
440 outc = arvados.collection.CollectionReader(self.final_output,
441 api_client=self.arvrunner.api,
442 keep_client=self.arvrunner.keep_client,
443 num_retries=self.arvrunner.num_retries)
444 if "cwl.output.json" in outc:
445 with outc.open("cwl.output.json") as f:
447 outputs = json.load(f)
448 def keepify(fileobj):
449 path = fileobj["location"]
450 if not path.startswith("keep:"):
451 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
452 adjustFileObjs(outputs, keepify)
453 adjustDirObjs(outputs, keepify)
454 except Exception as e:
455 logger.exception("[%s] While getting final output object: %s", self.name, e)
456 self.arvrunner.output_callback({}, "permanentFail")
458 self.arvrunner.output_callback(outputs, processStatus)