1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
13 from functools import partial
16 from collections import namedtuple
17 from io import StringIO
19 if os.name == "posix" and sys.version_info[0] < 3:
20 import subprocess32 as subprocess
24 from schema_salad.sourceline import SourceLine, cmap
26 from cwltool.command_line_tool import CommandLineTool
27 import cwltool.workflow
28 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
29 from cwltool.load_tool import fetch_document
30 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
31 from cwltool.utils import aslist
32 from cwltool.builder import substitute
33 from cwltool.pack import pack
34 import schema_salad.validate as validate
36 import arvados.collection
37 from .util import collectionUUID
38 import ruamel.yaml as yaml
40 import arvados_cwl.arvdocker
41 from .pathmapper import ArvPathMapper, trim_listing
42 from ._version import __version__
45 logger = logging.getLogger('arvados.cwl-runner')
47 def trim_anonymous_location(obj):
48 """Remove 'location' field from File and Directory literals.
50 To make internal handling easier, literals are assigned a random id for
51 'location'. However, when writing the record back out, this can break
52 reproducibility. Since it is valid for literals not have a 'location'
57 if obj.get("location", "").startswith("_:"):
61 def remove_redundant_fields(obj):
62 for field in ("path", "nameext", "nameroot", "dirname"):
67 def find_defaults(d, op):
68 if isinstance(d, list):
71 elif isinstance(d, dict):
75 for i in viewvalues(d):
78 def setSecondary(t, fileobj, discovered):
79 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
80 if "secondaryFiles" not in fileobj:
81 fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
82 if discovered is not None:
83 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
84 elif isinstance(fileobj, list):
86 setSecondary(t, e, discovered)
88 def discover_secondary_files(inputs, job_order, discovered=None):
90 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
91 setSecondary(t, job_order[shortname(t["id"])], discovered)
93 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
94 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
96 def upload_dependencies(arvrunner, name, document_loader,
97 workflowobj, uri, loadref_run,
98 include_primary=True, discovered_secondaryfiles=None):
99 """Upload the dependencies of the workflowobj document to Keep.
101 Returns a pathmapper object mapping local paths to keep references. Also
102 does an in-place update of references in "workflowobj".
104 Use scandeps to find $import, $include, $schemas, run, File and Directory
105 fields that represent external references.
107 If workflowobj has an "id" field, this will reload the document to ensure
108 it is scanning the raw document prior to preprocessing.
113 joined = document_loader.fetcher.urljoin(b, u)
114 defrg, _ = urllib.parse.urldefrag(joined)
115 if defrg not in loaded:
117 # Use fetch_text to get raw file (before preprocessing).
118 text = document_loader.fetch_text(defrg)
119 if isinstance(text, bytes):
120 textIO = StringIO(text.decode('utf-8'))
122 textIO = StringIO(text)
123 return yaml.safe_load(textIO)
128 loadref_fields = set(("$import", "run"))
130 loadref_fields = set(("$import",))
132 scanobj = workflowobj
133 if "id" in workflowobj:
134 # Need raw file content (before preprocessing) to ensure
135 # that external references in $include and $mixin are captured.
136 scanobj = loadref("", workflowobj["id"])
138 sc_result = scandeps(uri, scanobj,
140 set(("$include", "$schemas", "location")),
141 loadref, urljoin=document_loader.fetcher.urljoin)
146 def collect_uuids(obj):
147 loc = obj.get("location", "")
150 # Collect collection uuids that need to be resolved to
151 # portable data hashes
152 gp = collection_uuid_pattern.match(loc)
154 uuids[gp.groups()[0]] = obj
155 if collectionUUID in obj:
156 uuids[obj[collectionUUID]] = obj
158 def collect_uploads(obj):
159 loc = obj.get("location", "")
163 if sp[0] in ("file", "http", "https"):
164 # Record local files than need to be uploaded,
165 # don't include file literals, keep references, etc.
169 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
170 visit_class(sc_result, ("File", "Directory"), collect_uploads)
172 # Resolve any collection uuids we found to portable data hashes
173 # and assign them to uuid_map
175 fetch_uuids = list(uuids.keys())
177 lookups = arvrunner.api.collections().list(
178 filters=[["uuid", "in", fetch_uuids]],
180 select=["uuid", "portable_data_hash"]).execute(
181 num_retries=arvrunner.num_retries)
183 if not lookups["items"]:
186 for l in lookups["items"]:
187 uuid_map[l["uuid"]] = l["portable_data_hash"]
189 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
191 normalizeFilesDirs(sc)
193 if include_primary and "id" in workflowobj:
194 sc.append({"class": "File", "location": workflowobj["id"]})
196 if "$schemas" in workflowobj:
197 for s in workflowobj["$schemas"]:
198 sc.append({"class": "File", "location": s})
200 def visit_default(obj):
202 def ensure_default_location(f):
203 if "location" not in f and "path" in f:
204 f["location"] = f["path"]
206 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
207 # Doesn't exist, remove from list of dependencies to upload
208 sc[:] = [x for x in sc if x["location"] != f["location"]]
209 # Delete "default" from workflowobj
211 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
215 find_defaults(workflowobj, visit_default)
218 def discover_default_secondary_files(obj):
219 discover_secondary_files(obj["inputs"],
220 {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
223 visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
225 for d in list(discovered):
226 # Only interested in discovered secondaryFiles which are local
227 # files that need to be uploaded.
228 if d.startswith("file:"):
229 sc.extend(discovered[d])
233 mapper = ArvPathMapper(arvrunner, sc, "",
237 single_collection=True)
240 loc = p.get("location")
241 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
242 p["location"] = mapper.mapper(p["location"]).resolved
248 if collectionUUID in p:
249 uuid = p[collectionUUID]
250 if uuid not in uuid_map:
251 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
252 "Collection uuid %s not found" % uuid)
253 gp = collection_pdh_pattern.match(loc)
254 if gp and uuid_map[uuid] != gp.groups()[0]:
255 # This file entry has both collectionUUID and a PDH
256 # location. If the PDH doesn't match the one returned
257 # the API server, raise an error.
258 raise SourceLine(p, "location", validate.ValidationException).makeError(
259 "Expected collection uuid %s to be %s but API server reported %s" % (
260 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
262 gp = collection_uuid_pattern.match(loc)
265 uuid = gp.groups()[0]
266 if uuid not in uuid_map:
267 raise SourceLine(p, "location", validate.ValidationException).makeError(
268 "Collection uuid %s not found" % uuid)
269 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
270 p[collectionUUID] = uuid
272 visit_class(workflowobj, ("File", "Directory"), setloc)
273 visit_class(discovered, ("File", "Directory"), setloc)
275 if discovered_secondaryfiles is not None:
277 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
279 if "$schemas" in workflowobj:
281 for s in workflowobj["$schemas"]:
282 sch.append(mapper.mapper(s).resolved)
283 workflowobj["$schemas"] = sch
288 def upload_docker(arvrunner, tool):
289 """Uploads Docker images used in CommandLineTool objects."""
291 if isinstance(tool, CommandLineTool):
292 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
294 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
295 # TODO: can be supported by containers API, but not jobs API.
296 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
297 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
298 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
300 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
301 elif isinstance(tool, cwltool.workflow.Workflow):
303 upload_docker(arvrunner, s.embedded_tool)
306 def packed_workflow(arvrunner, tool, merged_map):
307 """Create a packed workflow.
309 A "packed" workflow is one where all the components have been combined into a single document."""
312 packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
313 tool.tool["id"], tool.metadata, rewrite_out=rewrites)
315 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
317 def visit(v, cur_id):
318 if isinstance(v, dict):
319 if v.get("class") in ("CommandLineTool", "Workflow"):
321 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
322 cur_id = rewrite_to_orig.get(v["id"], v["id"])
323 if "location" in v and not v["location"].startswith("keep:"):
324 v["location"] = merged_map[cur_id].resolved[v["location"]]
325 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
326 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
327 if v.get("class") == "DockerRequirement":
328 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
331 if isinstance(v, list):
338 def tag_git_version(packed):
339 if tool.tool["id"].startswith("file://"):
340 path = os.path.dirname(tool.tool["id"][7:])
342 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
343 except (OSError, subprocess.CalledProcessError):
346 packed["http://schema.org/version"] = githash
349 def upload_job_order(arvrunner, name, tool, job_order):
350 """Upload local files referenced in the input object and return updated input
351 object with 'location' updated to the proper keep references.
354 discover_secondary_files(tool.tool["inputs"], job_order)
356 jobmapper = upload_dependencies(arvrunner,
360 job_order.get("id", "#"),
363 if "id" in job_order:
366 # Need to filter this out, gets added by cwltool when providing
367 # parameters on the command line.
368 if "job_order" in job_order:
369 del job_order["job_order"]
373 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
375 def upload_workflow_deps(arvrunner, tool):
376 # Ensure that Docker images needed by this workflow are available
378 upload_docker(arvrunner, tool)
380 document_loader = tool.doc_loader
384 def upload_tool_deps(deptool):
386 discovered_secondaryfiles = {}
387 pm = upload_dependencies(arvrunner,
388 "%s dependencies" % (shortname(deptool["id"])),
393 include_primary=False,
394 discovered_secondaryfiles=discovered_secondaryfiles)
395 document_loader.idx[deptool["id"]] = deptool
397 for k,v in pm.items():
398 toolmap[k] = v.resolved
399 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
401 tool.visit(upload_tool_deps)
405 def arvados_jobs_image(arvrunner, img):
406 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
409 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
410 except Exception as e:
411 raise Exception("Docker image %s is not available\n%s" % (img, e) )
414 def upload_workflow_collection(arvrunner, name, packed):
415 collection = arvados.collection.Collection(api_client=arvrunner.api,
416 keep_client=arvrunner.keep_client,
417 num_retries=arvrunner.num_retries)
418 with collection.open("workflow.cwl", "w") as f:
419 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
421 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
422 ["name", "like", name+"%"]]
423 if arvrunner.project_uuid:
424 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
425 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
428 logger.info("Using collection %s", exists["items"][0]["uuid"])
430 collection.save_new(name=name,
431 owner_uuid=arvrunner.project_uuid,
432 ensure_unique_name=True,
433 num_retries=arvrunner.num_retries)
434 logger.info("Uploaded to %s", collection.manifest_locator())
436 return collection.portable_data_hash()
439 class Runner(Process):
440 """Base class for runner processes, which submit an instance of
441 arvados-cwl-runner and wait for the final result."""
443 def __init__(self, runner, tool, loadingContext, enable_reuse,
444 output_name, output_tags, submit_runner_ram=0,
445 name=None, on_error=None, submit_runner_image=None,
446 intermediate_output_ttl=0, merged_map=None,
447 priority=None, secret_store=None,
448 collection_cache_size=256,
449 collection_cache_is_default=True):
451 super(Runner, self).__init__(tool.tool, loadingContext)
453 self.arvrunner = runner
454 self.embedded_tool = tool
455 self.job_order = None
458 # If reuse is permitted by command line arguments but
459 # disabled by the workflow itself, disable it.
460 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
462 enable_reuse = reuse_req["enableReuse"]
463 self.enable_reuse = enable_reuse
465 self.final_output = None
466 self.output_name = output_name
467 self.output_tags = output_tags
469 self.on_error = on_error
470 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
471 self.intermediate_output_ttl = intermediate_output_ttl
472 self.priority = priority
473 self.secret_store = secret_store
475 self.submit_runner_cores = 1
476 self.submit_runner_ram = 1024 # defaut 1 GiB
477 self.collection_cache_size = collection_cache_size
479 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
480 if runner_resource_req:
481 if runner_resource_req.get("coresMin"):
482 self.submit_runner_cores = runner_resource_req["coresMin"]
483 if runner_resource_req.get("ramMin"):
484 self.submit_runner_ram = runner_resource_req["ramMin"]
485 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
486 self.collection_cache_size = runner_resource_req["keep_cache"]
488 if submit_runner_ram:
489 # Command line / initializer overrides default and/or spec from workflow
490 self.submit_runner_ram = submit_runner_ram
492 if self.submit_runner_ram <= 0:
493 raise Exception("Value of submit-runner-ram must be greater than zero")
495 if self.submit_runner_cores <= 0:
496 raise Exception("Value of submit-runner-cores must be greater than zero")
498 self.merged_map = merged_map or {}
501 job_order, # type: Mapping[Text, Text]
502 output_callbacks, # type: Callable[[Any, Any], Any]
503 runtimeContext # type: RuntimeContext
504 ): # type: (...) -> Generator[Any, None, None]
505 self.job_order = job_order
506 self._init_job(job_order, runtimeContext)
509 def update_pipeline_component(self, record):
512 def done(self, record):
513 """Base method for handling a completed runner."""
516 if record["state"] == "Complete":
517 if record.get("exit_code") is not None:
518 if record["exit_code"] == 33:
519 processStatus = "UnsupportedRequirement"
520 elif record["exit_code"] == 0:
521 processStatus = "success"
523 processStatus = "permanentFail"
525 processStatus = "success"
527 processStatus = "permanentFail"
531 if processStatus == "permanentFail":
532 logc = arvados.collection.CollectionReader(record["log"],
533 api_client=self.arvrunner.api,
534 keep_client=self.arvrunner.keep_client,
535 num_retries=self.arvrunner.num_retries)
536 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
538 self.final_output = record["output"]
539 outc = arvados.collection.CollectionReader(self.final_output,
540 api_client=self.arvrunner.api,
541 keep_client=self.arvrunner.keep_client,
542 num_retries=self.arvrunner.num_retries)
543 if "cwl.output.json" in outc:
544 with outc.open("cwl.output.json", "rb") as f:
546 outputs = json.loads(f.read().decode())
547 def keepify(fileobj):
548 path = fileobj["location"]
549 if not path.startswith("keep:"):
550 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
551 adjustFileObjs(outputs, keepify)
552 adjustDirObjs(outputs, keepify)
554 logger.exception("[%s] While getting final output object", self.name)
555 self.arvrunner.output_callback({}, "permanentFail")
557 self.arvrunner.output_callback(outputs, processStatus)