1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
7 from functools import partial
10 import subprocess32 as subprocess
11 from collections import namedtuple
13 from StringIO import StringIO
15 from schema_salad.sourceline import SourceLine, cmap
17 from cwltool.command_line_tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
26 import arvados.collection
27 import ruamel.yaml as yaml
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
34 logger = logging.getLogger('arvados.cwl-runner')
36 def trim_anonymous_location(obj):
37 """Remove 'location' field from File and Directory literals.
39 To make internal handling easier, literals are assigned a random id for
40 'location'. However, when writing the record back out, this can break
41 reproducibility. Since it is valid for literals not have a 'location'
46 if obj.get("location", "").startswith("_:"):
50 def remove_redundant_fields(obj):
51 for field in ("path", "nameext", "nameroot", "dirname"):
56 def find_defaults(d, op):
57 if isinstance(d, list):
60 elif isinstance(d, dict):
64 for i in d.itervalues():
67 def setSecondary(t, fileobj, discovered):
68 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
69 if "secondaryFiles" not in fileobj:
70 fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
71 if discovered is not None:
72 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
73 elif isinstance(fileobj, list):
75 setSecondary(t, e, discovered)
77 def discover_secondary_files(inputs, job_order, discovered=None):
79 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
80 setSecondary(t, job_order[shortname(t["id"])], discovered)
83 def upload_dependencies(arvrunner, name, document_loader,
84 workflowobj, uri, loadref_run,
85 include_primary=True, discovered_secondaryfiles=None):
86 """Upload the dependencies of the workflowobj document to Keep.
88 Returns a pathmapper object mapping local paths to keep references. Also
89 does an in-place update of references in "workflowobj".
91 Use scandeps to find $import, $include, $schemas, run, File and Directory
92 fields that represent external references.
94 If workflowobj has an "id" field, this will reload the document to ensure
95 it is scanning the raw document prior to preprocessing.
100 joined = document_loader.fetcher.urljoin(b, u)
101 defrg, _ = urlparse.urldefrag(joined)
102 if defrg not in loaded:
104 # Use fetch_text to get raw file (before preprocessing).
105 text = document_loader.fetch_text(defrg)
106 if isinstance(text, bytes):
107 textIO = StringIO(text.decode('utf-8'))
109 textIO = StringIO(text)
110 return yaml.safe_load(textIO)
115 loadref_fields = set(("$import", "run"))
117 loadref_fields = set(("$import",))
119 scanobj = workflowobj
120 if "id" in workflowobj:
121 # Need raw file content (before preprocessing) to ensure
122 # that external references in $include and $mixin are captured.
123 scanobj = loadref("", workflowobj["id"])
125 sc_result = scandeps(uri, scanobj,
127 set(("$include", "$schemas", "location")),
128 loadref, urljoin=document_loader.fetcher.urljoin)
132 if obj.get("location", "").startswith("file:"):
135 visit_class(sc_result, ("File", "Directory"), only_real)
137 normalizeFilesDirs(sc)
139 if include_primary and "id" in workflowobj:
140 sc.append({"class": "File", "location": workflowobj["id"]})
142 if "$schemas" in workflowobj:
143 for s in workflowobj["$schemas"]:
144 sc.append({"class": "File", "location": s})
146 def visit_default(obj):
148 def ensure_default_location(f):
149 if "location" not in f and "path" in f:
150 f["location"] = f["path"]
152 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
153 # Doesn't exist, remove from list of dependencies to upload
154 sc[:] = [x for x in sc if x["location"] != f["location"]]
155 # Delete "default" from workflowobj
157 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
161 find_defaults(workflowobj, visit_default)
164 def discover_default_secondary_files(obj):
165 discover_secondary_files(obj["inputs"],
166 {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
169 visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
172 sc.extend(discovered[d])
174 mapper = ArvPathMapper(arvrunner, sc, "",
178 single_collection=True)
181 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
182 p["location"] = mapper.mapper(p["location"]).resolved
184 visit_class(workflowobj, ("File", "Directory"), setloc)
185 visit_class(discovered, ("File", "Directory"), setloc)
187 if discovered_secondaryfiles is not None:
189 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
191 if "$schemas" in workflowobj:
193 for s in workflowobj["$schemas"]:
194 sch.append(mapper.mapper(s).resolved)
195 workflowobj["$schemas"] = sch
200 def upload_docker(arvrunner, tool):
201 """Uploads Docker images used in CommandLineTool objects."""
203 if isinstance(tool, CommandLineTool):
204 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
206 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
207 # TODO: can be supported by containers API, but not jobs API.
208 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
209 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
210 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
212 arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
213 elif isinstance(tool, cwltool.workflow.Workflow):
215 upload_docker(arvrunner, s.embedded_tool)
218 def packed_workflow(arvrunner, tool, merged_map):
219 """Create a packed workflow.
221 A "packed" workflow is one where all the components have been combined into a single document."""
224 packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
225 tool.tool["id"], tool.metadata, rewrite_out=rewrites)
227 rewrite_to_orig = {v: k for k,v in rewrites.items()}
229 def visit(v, cur_id):
230 if isinstance(v, dict):
231 if v.get("class") in ("CommandLineTool", "Workflow"):
233 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
234 cur_id = rewrite_to_orig.get(v["id"], v["id"])
235 if "location" in v and not v["location"].startswith("keep:"):
236 v["location"] = merged_map[cur_id].resolved[v["location"]]
237 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
238 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
241 if isinstance(v, list):
248 def tag_git_version(packed):
249 if tool.tool["id"].startswith("file://"):
250 path = os.path.dirname(tool.tool["id"][7:])
252 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
253 except (OSError, subprocess.CalledProcessError):
256 packed["http://schema.org/version"] = githash
259 def upload_job_order(arvrunner, name, tool, job_order):
260 """Upload local files referenced in the input object and return updated input
261 object with 'location' updated to the proper keep references.
264 discover_secondary_files(tool.tool["inputs"], job_order)
266 jobmapper = upload_dependencies(arvrunner,
270 job_order.get("id", "#"),
273 if "id" in job_order:
276 # Need to filter this out, gets added by cwltool when providing
277 # parameters on the command line.
278 if "job_order" in job_order:
279 del job_order["job_order"]
283 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
285 def upload_workflow_deps(arvrunner, tool):
286 # Ensure that Docker images needed by this workflow are available
288 upload_docker(arvrunner, tool)
290 document_loader = tool.doc_loader
294 def upload_tool_deps(deptool):
296 discovered_secondaryfiles = {}
297 pm = upload_dependencies(arvrunner,
298 "%s dependencies" % (shortname(deptool["id"])),
303 include_primary=False,
304 discovered_secondaryfiles=discovered_secondaryfiles)
305 document_loader.idx[deptool["id"]] = deptool
307 for k,v in pm.items():
308 toolmap[k] = v.resolved
309 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
311 tool.visit(upload_tool_deps)
315 def arvados_jobs_image(arvrunner, img):
316 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
319 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
320 except Exception as e:
321 raise Exception("Docker image %s is not available\n%s" % (img, e) )
324 def upload_workflow_collection(arvrunner, name, packed):
325 collection = arvados.collection.Collection(api_client=arvrunner.api,
326 keep_client=arvrunner.keep_client,
327 num_retries=arvrunner.num_retries)
328 with collection.open("workflow.cwl", "w") as f:
329 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
331 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
332 ["name", "like", name+"%"]]
333 if arvrunner.project_uuid:
334 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
335 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
338 logger.info("Using collection %s", exists["items"][0]["uuid"])
340 collection.save_new(name=name,
341 owner_uuid=arvrunner.project_uuid,
342 ensure_unique_name=True,
343 num_retries=arvrunner.num_retries)
344 logger.info("Uploaded to %s", collection.manifest_locator())
346 return collection.portable_data_hash()
349 class Runner(object):
350 """Base class for runner processes, which submit an instance of
351 arvados-cwl-runner and wait for the final result."""
353 def __init__(self, runner, tool, job_order, enable_reuse,
354 output_name, output_tags, submit_runner_ram=0,
355 name=None, on_error=None, submit_runner_image=None,
356 intermediate_output_ttl=0, merged_map=None,
357 priority=None, secret_store=None):
358 self.arvrunner = runner
360 self.job_order = job_order
363 # If reuse is permitted by command line arguments but
364 # disabled by the workflow itself, disable it.
365 reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
367 enable_reuse = reuse_req["enableReuse"]
368 self.enable_reuse = enable_reuse
370 self.final_output = None
371 self.output_name = output_name
372 self.output_tags = output_tags
374 self.on_error = on_error
375 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
376 self.intermediate_output_ttl = intermediate_output_ttl
377 self.priority = priority
378 self.secret_store = secret_store
380 self.submit_runner_cores = 1
381 self.submit_runner_ram = 1024 # defaut 1 GiB
383 runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
384 if runner_resource_req:
385 if runner_resource_req.get("coresMin"):
386 self.submit_runner_cores = runner_resource_req["coresMin"]
387 if runner_resource_req.get("ramMin"):
388 self.submit_runner_ram = runner_resource_req["ramMin"]
390 if submit_runner_ram:
391 # Command line / initializer overrides default and/or spec from workflow
392 self.submit_runner_ram = submit_runner_ram
394 if self.submit_runner_ram <= 0:
395 raise Exception("Value of submit-runner-ram must be greater than zero")
397 if self.submit_runner_cores <= 0:
398 raise Exception("Value of submit-runner-cores must be greater than zero")
400 self.merged_map = merged_map or {}
402 def update_pipeline_component(self, record):
405 def done(self, record):
406 """Base method for handling a completed runner."""
409 if record["state"] == "Complete":
410 if record.get("exit_code") is not None:
411 if record["exit_code"] == 33:
412 processStatus = "UnsupportedRequirement"
413 elif record["exit_code"] == 0:
414 processStatus = "success"
416 processStatus = "permanentFail"
418 processStatus = "success"
420 processStatus = "permanentFail"
424 if processStatus == "permanentFail":
425 logc = arvados.collection.CollectionReader(record["log"],
426 api_client=self.arvrunner.api,
427 keep_client=self.arvrunner.keep_client,
428 num_retries=self.arvrunner.num_retries)
429 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
431 self.final_output = record["output"]
432 outc = arvados.collection.CollectionReader(self.final_output,
433 api_client=self.arvrunner.api,
434 keep_client=self.arvrunner.keep_client,
435 num_retries=self.arvrunner.num_retries)
436 if "cwl.output.json" in outc:
437 with outc.open("cwl.output.json") as f:
439 outputs = json.load(f)
440 def keepify(fileobj):
441 path = fileobj["location"]
442 if not path.startswith("keep:"):
443 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
444 adjustFileObjs(outputs, keepify)
445 adjustDirObjs(outputs, keepify)
446 except Exception as e:
447 logger.exception("[%s] While getting final output object: %s", self.name, e)
448 self.arvrunner.output_callback({}, "permanentFail")
450 self.arvrunner.output_callback(outputs, processStatus)