1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
7 from functools import partial
10 import subprocess32 as subprocess
11 from collections import namedtuple
13 from StringIO import StringIO
15 from schema_salad.sourceline import SourceLine, cmap
17 from cwltool.command_line_tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
26 import arvados.collection
27 import ruamel.yaml as yaml
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
34 logger = logging.getLogger('arvados.cwl-runner')
36 def trim_anonymous_location(obj):
37 """Remove 'location' field from File and Directory literals.
39 To make internal handling easier, literals are assigned a random id for
40 'location'. However, when writing the record back out, this can break
41 reproducibility. Since it is valid for literals not have a 'location'
46 if obj.get("location", "").startswith("_:"):
50 def remove_redundant_fields(obj):
51 for field in ("path", "nameext", "nameroot", "dirname"):
56 def find_defaults(d, op):
57 if isinstance(d, list):
60 elif isinstance(d, dict):
64 for i in d.itervalues():
67 def setSecondary(t, fileobj, discovered):
68 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
69 if "secondaryFiles" not in fileobj:
70 fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
71 if discovered is not None:
72 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
73 elif isinstance(fileobj, list):
75 setSecondary(t, e, discovered)
77 def discover_secondary_files(inputs, job_order, discovered=None):
79 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
80 setSecondary(t, job_order[shortname(t["id"])], discovered)
83 def upload_dependencies(arvrunner, name, document_loader,
84 workflowobj, uri, loadref_run,
85 include_primary=True, discovered_secondaryfiles=None):
86 """Upload the dependencies of the workflowobj document to Keep.
88 Returns a pathmapper object mapping local paths to keep references. Also
89 does an in-place update of references in "workflowobj".
91 Use scandeps to find $import, $include, $schemas, run, File and Directory
92 fields that represent external references.
94 If workflowobj has an "id" field, this will reload the document to ensure
95 it is scanning the raw document prior to preprocessing.
100 joined = document_loader.fetcher.urljoin(b, u)
101 defrg, _ = urlparse.urldefrag(joined)
102 if defrg not in loaded:
104 # Use fetch_text to get raw file (before preprocessing).
105 text = document_loader.fetch_text(defrg)
106 if isinstance(text, bytes):
107 textIO = StringIO(text.decode('utf-8'))
109 textIO = StringIO(text)
110 return yaml.safe_load(textIO)
115 loadref_fields = set(("$import", "run"))
117 loadref_fields = set(("$import",))
119 scanobj = workflowobj
120 if "id" in workflowobj:
121 # Need raw file content (before preprocessing) to ensure
122 # that external references in $include and $mixin are captured.
123 scanobj = loadref("", workflowobj["id"])
125 sc_result = scandeps(uri, scanobj,
127 set(("$include", "$schemas", "location")),
128 loadref, urljoin=document_loader.fetcher.urljoin)
132 # Only interested in local files than need to be uploaded,
133 # don't include file literals, keep references, etc.
134 if obj.get("location", "").startswith("file:"):
137 visit_class(sc_result, ("File", "Directory"), only_real)
139 normalizeFilesDirs(sc)
141 if include_primary and "id" in workflowobj:
142 sc.append({"class": "File", "location": workflowobj["id"]})
144 if "$schemas" in workflowobj:
145 for s in workflowobj["$schemas"]:
146 sc.append({"class": "File", "location": s})
148 def visit_default(obj):
150 def ensure_default_location(f):
151 if "location" not in f and "path" in f:
152 f["location"] = f["path"]
154 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
155 # Doesn't exist, remove from list of dependencies to upload
156 sc[:] = [x for x in sc if x["location"] != f["location"]]
157 # Delete "default" from workflowobj
159 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
163 find_defaults(workflowobj, visit_default)
166 def discover_default_secondary_files(obj):
167 discover_secondary_files(obj["inputs"],
168 {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
171 visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
173 for d in list(discovered.keys()):
174 # Only interested in discovered secondaryFiles which are local
175 # files that need to be uploaded.
176 if d.startswith("file:"):
177 sc.extend(discovered[d])
181 mapper = ArvPathMapper(arvrunner, sc, "",
185 single_collection=True)
188 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
189 p["location"] = mapper.mapper(p["location"]).resolved
191 visit_class(workflowobj, ("File", "Directory"), setloc)
192 visit_class(discovered, ("File", "Directory"), setloc)
194 if discovered_secondaryfiles is not None:
196 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
198 if "$schemas" in workflowobj:
200 for s in workflowobj["$schemas"]:
201 sch.append(mapper.mapper(s).resolved)
202 workflowobj["$schemas"] = sch
207 def upload_docker(arvrunner, tool):
208 """Uploads Docker images used in CommandLineTool objects."""
210 if isinstance(tool, CommandLineTool):
211 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
213 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
214 # TODO: can be supported by containers API, but not jobs API.
215 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
216 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
217 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
219 arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
220 elif isinstance(tool, cwltool.workflow.Workflow):
222 upload_docker(arvrunner, s.embedded_tool)
225 def packed_workflow(arvrunner, tool, merged_map):
226 """Create a packed workflow.
228 A "packed" workflow is one where all the components have been combined into a single document."""
231 packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
232 tool.tool["id"], tool.metadata, rewrite_out=rewrites)
234 rewrite_to_orig = {v: k for k,v in rewrites.items()}
236 def visit(v, cur_id):
237 if isinstance(v, dict):
238 if v.get("class") in ("CommandLineTool", "Workflow"):
240 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
241 cur_id = rewrite_to_orig.get(v["id"], v["id"])
242 if "location" in v and not v["location"].startswith("keep:"):
243 v["location"] = merged_map[cur_id].resolved[v["location"]]
244 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
245 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
248 if isinstance(v, list):
255 def tag_git_version(packed):
256 if tool.tool["id"].startswith("file://"):
257 path = os.path.dirname(tool.tool["id"][7:])
259 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
260 except (OSError, subprocess.CalledProcessError):
263 packed["http://schema.org/version"] = githash
266 def upload_job_order(arvrunner, name, tool, job_order):
267 """Upload local files referenced in the input object and return updated input
268 object with 'location' updated to the proper keep references.
271 discover_secondary_files(tool.tool["inputs"], job_order)
273 jobmapper = upload_dependencies(arvrunner,
277 job_order.get("id", "#"),
280 if "id" in job_order:
283 # Need to filter this out, gets added by cwltool when providing
284 # parameters on the command line.
285 if "job_order" in job_order:
286 del job_order["job_order"]
290 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
292 def upload_workflow_deps(arvrunner, tool):
293 # Ensure that Docker images needed by this workflow are available
295 upload_docker(arvrunner, tool)
297 document_loader = tool.doc_loader
301 def upload_tool_deps(deptool):
303 discovered_secondaryfiles = {}
304 pm = upload_dependencies(arvrunner,
305 "%s dependencies" % (shortname(deptool["id"])),
310 include_primary=False,
311 discovered_secondaryfiles=discovered_secondaryfiles)
312 document_loader.idx[deptool["id"]] = deptool
314 for k,v in pm.items():
315 toolmap[k] = v.resolved
316 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
318 tool.visit(upload_tool_deps)
322 def arvados_jobs_image(arvrunner, img):
323 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
326 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
327 except Exception as e:
328 raise Exception("Docker image %s is not available\n%s" % (img, e) )
331 def upload_workflow_collection(arvrunner, name, packed):
332 collection = arvados.collection.Collection(api_client=arvrunner.api,
333 keep_client=arvrunner.keep_client,
334 num_retries=arvrunner.num_retries)
335 with collection.open("workflow.cwl", "w") as f:
336 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
338 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
339 ["name", "like", name+"%"]]
340 if arvrunner.project_uuid:
341 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
342 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
345 logger.info("Using collection %s", exists["items"][0]["uuid"])
347 collection.save_new(name=name,
348 owner_uuid=arvrunner.project_uuid,
349 ensure_unique_name=True,
350 num_retries=arvrunner.num_retries)
351 logger.info("Uploaded to %s", collection.manifest_locator())
353 return collection.portable_data_hash()
356 class Runner(object):
357 """Base class for runner processes, which submit an instance of
358 arvados-cwl-runner and wait for the final result."""
360 def __init__(self, runner, tool, job_order, enable_reuse,
361 output_name, output_tags, submit_runner_ram=0,
362 name=None, on_error=None, submit_runner_image=None,
363 intermediate_output_ttl=0, merged_map=None,
364 priority=None, secret_store=None):
365 self.arvrunner = runner
367 self.job_order = job_order
370 # If reuse is permitted by command line arguments but
371 # disabled by the workflow itself, disable it.
372 reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
374 enable_reuse = reuse_req["enableReuse"]
375 self.enable_reuse = enable_reuse
377 self.final_output = None
378 self.output_name = output_name
379 self.output_tags = output_tags
381 self.on_error = on_error
382 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
383 self.intermediate_output_ttl = intermediate_output_ttl
384 self.priority = priority
385 self.secret_store = secret_store
387 self.submit_runner_cores = 1
388 self.submit_runner_ram = 1024 # defaut 1 GiB
390 runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
391 if runner_resource_req:
392 if runner_resource_req.get("coresMin"):
393 self.submit_runner_cores = runner_resource_req["coresMin"]
394 if runner_resource_req.get("ramMin"):
395 self.submit_runner_ram = runner_resource_req["ramMin"]
397 if submit_runner_ram:
398 # Command line / initializer overrides default and/or spec from workflow
399 self.submit_runner_ram = submit_runner_ram
401 if self.submit_runner_ram <= 0:
402 raise Exception("Value of submit-runner-ram must be greater than zero")
404 if self.submit_runner_cores <= 0:
405 raise Exception("Value of submit-runner-cores must be greater than zero")
407 self.merged_map = merged_map or {}
409 def update_pipeline_component(self, record):
412 def done(self, record):
413 """Base method for handling a completed runner."""
416 if record["state"] == "Complete":
417 if record.get("exit_code") is not None:
418 if record["exit_code"] == 33:
419 processStatus = "UnsupportedRequirement"
420 elif record["exit_code"] == 0:
421 processStatus = "success"
423 processStatus = "permanentFail"
425 processStatus = "success"
427 processStatus = "permanentFail"
431 if processStatus == "permanentFail":
432 logc = arvados.collection.CollectionReader(record["log"],
433 api_client=self.arvrunner.api,
434 keep_client=self.arvrunner.keep_client,
435 num_retries=self.arvrunner.num_retries)
436 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
438 self.final_output = record["output"]
439 outc = arvados.collection.CollectionReader(self.final_output,
440 api_client=self.arvrunner.api,
441 keep_client=self.arvrunner.keep_client,
442 num_retries=self.arvrunner.num_retries)
443 if "cwl.output.json" in outc:
444 with outc.open("cwl.output.json") as f:
446 outputs = json.load(f)
447 def keepify(fileobj):
448 path = fileobj["location"]
449 if not path.startswith("keep:"):
450 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
451 adjustFileObjs(outputs, keepify)
452 adjustDirObjs(outputs, keepify)
453 except Exception as e:
454 logger.exception("[%s] While getting final output object: %s", self.name, e)
455 self.arvrunner.output_callback({}, "permanentFail")
457 self.arvrunner.output_callback(outputs, processStatus)