1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
7 from functools import partial
10 import subprocess32 as subprocess
11 from collections import namedtuple
13 from StringIO import StringIO
15 from schema_salad.sourceline import SourceLine, cmap
17 from cwltool.command_line_tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
26 import arvados.collection
27 import ruamel.yaml as yaml
29 import arvados_cwl.arvdocker
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
34 logger = logging.getLogger('arvados.cwl-runner')
36 def trim_anonymous_location(obj):
37 """Remove 'location' field from File and Directory literals.
39 To make internal handling easier, literals are assigned a random id for
40 'location'. However, when writing the record back out, this can break
41 reproducibility. Since it is valid for literals not have a 'location'
46 if obj.get("location", "").startswith("_:"):
50 def remove_redundant_fields(obj):
51 for field in ("path", "nameext", "nameroot", "dirname"):
56 def find_defaults(d, op):
57 if isinstance(d, list):
60 elif isinstance(d, dict):
64 for i in d.itervalues():
67 def setSecondary(t, fileobj, discovered):
68 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
69 if "secondaryFiles" not in fileobj:
70 fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
71 if discovered is not None:
72 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
73 elif isinstance(fileobj, list):
75 setSecondary(t, e, discovered)
77 def discover_secondary_files(inputs, job_order, discovered=None):
79 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
80 setSecondary(t, job_order[shortname(t["id"])], discovered)
83 def upload_dependencies(arvrunner, name, document_loader,
84 workflowobj, uri, loadref_run,
85 include_primary=True, discovered_secondaryfiles=None):
86 """Upload the dependencies of the workflowobj document to Keep.
88 Returns a pathmapper object mapping local paths to keep references. Also
89 does an in-place update of references in "workflowobj".
91 Use scandeps to find $import, $include, $schemas, run, File and Directory
92 fields that represent external references.
94 If workflowobj has an "id" field, this will reload the document to ensure
95 it is scanning the raw document prior to preprocessing.
100 joined = document_loader.fetcher.urljoin(b, u)
101 defrg, _ = urlparse.urldefrag(joined)
102 if defrg not in loaded:
104 # Use fetch_text to get raw file (before preprocessing).
105 text = document_loader.fetch_text(defrg)
106 if isinstance(text, bytes):
107 textIO = StringIO(text.decode('utf-8'))
109 textIO = StringIO(text)
110 return yaml.safe_load(textIO)
115 loadref_fields = set(("$import", "run"))
117 loadref_fields = set(("$import",))
119 scanobj = workflowobj
120 if "id" in workflowobj:
121 # Need raw file content (before preprocessing) to ensure
122 # that external references in $include and $mixin are captured.
123 scanobj = loadref("", workflowobj["id"])
125 sc_result = scandeps(uri, scanobj,
127 set(("$include", "$schemas", "location")),
128 loadref, urljoin=document_loader.fetcher.urljoin)
132 # Only interested in local files than need to be uploaded,
133 # don't include file literals, keep references, etc.
134 sp = obj.get("location", "").split(":")
135 if len(sp) > 1 and sp[0] in ("file", "http", "https"):
138 visit_class(sc_result, ("File", "Directory"), only_real)
140 normalizeFilesDirs(sc)
142 if include_primary and "id" in workflowobj:
143 sc.append({"class": "File", "location": workflowobj["id"]})
145 if "$schemas" in workflowobj:
146 for s in workflowobj["$schemas"]:
147 sc.append({"class": "File", "location": s})
149 def visit_default(obj):
151 def ensure_default_location(f):
152 if "location" not in f and "path" in f:
153 f["location"] = f["path"]
155 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
156 # Doesn't exist, remove from list of dependencies to upload
157 sc[:] = [x for x in sc if x["location"] != f["location"]]
158 # Delete "default" from workflowobj
160 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
164 find_defaults(workflowobj, visit_default)
167 def discover_default_secondary_files(obj):
168 discover_secondary_files(obj["inputs"],
169 {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
172 visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
174 for d in list(discovered.keys()):
175 # Only interested in discovered secondaryFiles which are local
176 # files that need to be uploaded.
177 if d.startswith("file:"):
178 sc.extend(discovered[d])
182 mapper = ArvPathMapper(arvrunner, sc, "",
186 single_collection=True)
189 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
190 p["location"] = mapper.mapper(p["location"]).resolved
192 visit_class(workflowobj, ("File", "Directory"), setloc)
193 visit_class(discovered, ("File", "Directory"), setloc)
195 if discovered_secondaryfiles is not None:
197 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
199 if "$schemas" in workflowobj:
201 for s in workflowobj["$schemas"]:
202 sch.append(mapper.mapper(s).resolved)
203 workflowobj["$schemas"] = sch
208 def upload_docker(arvrunner, tool):
209 """Uploads Docker images used in CommandLineTool objects."""
211 if isinstance(tool, CommandLineTool):
212 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
214 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
215 # TODO: can be supported by containers API, but not jobs API.
216 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
217 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
218 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
220 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
221 elif isinstance(tool, cwltool.workflow.Workflow):
223 upload_docker(arvrunner, s.embedded_tool)
226 def packed_workflow(arvrunner, tool, merged_map):
227 """Create a packed workflow.
229 A "packed" workflow is one where all the components have been combined into a single document."""
232 packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
233 tool.tool["id"], tool.metadata, rewrite_out=rewrites)
235 rewrite_to_orig = {v: k for k,v in rewrites.items()}
237 def visit(v, cur_id):
238 if isinstance(v, dict):
239 if v.get("class") in ("CommandLineTool", "Workflow"):
241 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
242 cur_id = rewrite_to_orig.get(v["id"], v["id"])
243 if "location" in v and not v["location"].startswith("keep:"):
244 v["location"] = merged_map[cur_id].resolved[v["location"]]
245 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
246 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
247 if v.get("class") == "DockerRequirement":
248 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
251 if isinstance(v, list):
258 def tag_git_version(packed):
259 if tool.tool["id"].startswith("file://"):
260 path = os.path.dirname(tool.tool["id"][7:])
262 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
263 except (OSError, subprocess.CalledProcessError):
266 packed["http://schema.org/version"] = githash
269 def upload_job_order(arvrunner, name, tool, job_order):
270 """Upload local files referenced in the input object and return updated input
271 object with 'location' updated to the proper keep references.
274 discover_secondary_files(tool.tool["inputs"], job_order)
276 jobmapper = upload_dependencies(arvrunner,
280 job_order.get("id", "#"),
283 if "id" in job_order:
286 # Need to filter this out, gets added by cwltool when providing
287 # parameters on the command line.
288 if "job_order" in job_order:
289 del job_order["job_order"]
293 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
295 def upload_workflow_deps(arvrunner, tool):
296 # Ensure that Docker images needed by this workflow are available
298 upload_docker(arvrunner, tool)
300 document_loader = tool.doc_loader
304 def upload_tool_deps(deptool):
306 discovered_secondaryfiles = {}
307 pm = upload_dependencies(arvrunner,
308 "%s dependencies" % (shortname(deptool["id"])),
313 include_primary=False,
314 discovered_secondaryfiles=discovered_secondaryfiles)
315 document_loader.idx[deptool["id"]] = deptool
317 for k,v in pm.items():
318 toolmap[k] = v.resolved
319 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
321 tool.visit(upload_tool_deps)
325 def arvados_jobs_image(arvrunner, img):
326 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
329 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
330 except Exception as e:
331 raise Exception("Docker image %s is not available\n%s" % (img, e) )
334 def upload_workflow_collection(arvrunner, name, packed):
335 collection = arvados.collection.Collection(api_client=arvrunner.api,
336 keep_client=arvrunner.keep_client,
337 num_retries=arvrunner.num_retries)
338 with collection.open("workflow.cwl", "w") as f:
339 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
341 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
342 ["name", "like", name+"%"]]
343 if arvrunner.project_uuid:
344 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
345 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
348 logger.info("Using collection %s", exists["items"][0]["uuid"])
350 collection.save_new(name=name,
351 owner_uuid=arvrunner.project_uuid,
352 ensure_unique_name=True,
353 num_retries=arvrunner.num_retries)
354 logger.info("Uploaded to %s", collection.manifest_locator())
356 return collection.portable_data_hash()
359 class Runner(object):
360 """Base class for runner processes, which submit an instance of
361 arvados-cwl-runner and wait for the final result."""
363 def __init__(self, runner, tool, job_order, enable_reuse,
364 output_name, output_tags, submit_runner_ram=0,
365 name=None, on_error=None, submit_runner_image=None,
366 intermediate_output_ttl=0, merged_map=None,
367 priority=None, secret_store=None):
368 self.arvrunner = runner
370 self.job_order = job_order
373 # If reuse is permitted by command line arguments but
374 # disabled by the workflow itself, disable it.
375 reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
377 enable_reuse = reuse_req["enableReuse"]
378 self.enable_reuse = enable_reuse
380 self.final_output = None
381 self.output_name = output_name
382 self.output_tags = output_tags
384 self.on_error = on_error
385 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
386 self.intermediate_output_ttl = intermediate_output_ttl
387 self.priority = priority
388 self.secret_store = secret_store
390 self.submit_runner_cores = 1
391 self.submit_runner_ram = 1024 # defaut 1 GiB
393 runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
394 if runner_resource_req:
395 if runner_resource_req.get("coresMin"):
396 self.submit_runner_cores = runner_resource_req["coresMin"]
397 if runner_resource_req.get("ramMin"):
398 self.submit_runner_ram = runner_resource_req["ramMin"]
400 if submit_runner_ram:
401 # Command line / initializer overrides default and/or spec from workflow
402 self.submit_runner_ram = submit_runner_ram
404 if self.submit_runner_ram <= 0:
405 raise Exception("Value of submit-runner-ram must be greater than zero")
407 if self.submit_runner_cores <= 0:
408 raise Exception("Value of submit-runner-cores must be greater than zero")
410 self.merged_map = merged_map or {}
412 def update_pipeline_component(self, record):
415 def done(self, record):
416 """Base method for handling a completed runner."""
419 if record["state"] == "Complete":
420 if record.get("exit_code") is not None:
421 if record["exit_code"] == 33:
422 processStatus = "UnsupportedRequirement"
423 elif record["exit_code"] == 0:
424 processStatus = "success"
426 processStatus = "permanentFail"
428 processStatus = "success"
430 processStatus = "permanentFail"
434 if processStatus == "permanentFail":
435 logc = arvados.collection.CollectionReader(record["log"],
436 api_client=self.arvrunner.api,
437 keep_client=self.arvrunner.keep_client,
438 num_retries=self.arvrunner.num_retries)
439 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
441 self.final_output = record["output"]
442 outc = arvados.collection.CollectionReader(self.final_output,
443 api_client=self.arvrunner.api,
444 keep_client=self.arvrunner.keep_client,
445 num_retries=self.arvrunner.num_retries)
446 if "cwl.output.json" in outc:
447 with outc.open("cwl.output.json") as f:
449 outputs = json.load(f)
450 def keepify(fileobj):
451 path = fileobj["location"]
452 if not path.startswith("keep:"):
453 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
454 adjustFileObjs(outputs, keepify)
455 adjustDirObjs(outputs, keepify)
456 except Exception as e:
457 logger.exception("[%s] While getting final output object: %s", self.name, e)
458 self.arvrunner.output_callback({}, "permanentFail")
460 self.arvrunner.output_callback(outputs, processStatus)