1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
13 from functools import partial
16 from collections import namedtuple
17 from io import StringIO
19 if os.name == "posix" and sys.version_info[0] < 3:
20 import subprocess32 as subprocess
24 from schema_salad.sourceline import SourceLine, cmap
26 from cwltool.command_line_tool import CommandLineTool
27 import cwltool.workflow
28 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
29 from cwltool.load_tool import fetch_document
30 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
31 from cwltool.utils import aslist
32 from cwltool.builder import substitute
33 from cwltool.pack import pack
34 import schema_salad.validate as validate
36 import arvados.collection
37 from .util import collectionUUID
38 import ruamel.yaml as yaml
40 import arvados_cwl.arvdocker
41 from .pathmapper import ArvPathMapper, trim_listing
42 from ._version import __version__
45 logger = logging.getLogger('arvados.cwl-runner')
47 def trim_anonymous_location(obj):
48 """Remove 'location' field from File and Directory literals.
50 To make internal handling easier, literals are assigned a random id for
51 'location'. However, when writing the record back out, this can break
52 reproducibility. Since it is valid for literals not have a 'location'
57 if obj.get("location", "").startswith("_:"):
61 def remove_redundant_fields(obj):
62 for field in ("path", "nameext", "nameroot", "dirname"):
67 def find_defaults(d, op):
68 if isinstance(d, list):
71 elif isinstance(d, dict):
75 for i in viewvalues(d):
78 def setSecondary(t, fileobj, discovered):
79 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
80 if "secondaryFiles" not in fileobj:
81 fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
82 if discovered is not None:
83 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
84 elif isinstance(fileobj, list):
86 setSecondary(t, e, discovered)
88 def discover_secondary_files(inputs, job_order, discovered=None):
90 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
91 setSecondary(t, job_order[shortname(t["id"])], discovered)
93 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
94 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
96 def upload_dependencies(arvrunner, name, document_loader,
97 workflowobj, uri, loadref_run,
98 include_primary=True, discovered_secondaryfiles=None):
99 """Upload the dependencies of the workflowobj document to Keep.
101 Returns a pathmapper object mapping local paths to keep references. Also
102 does an in-place update of references in "workflowobj".
104 Use scandeps to find $import, $include, $schemas, run, File and Directory
105 fields that represent external references.
107 If workflowobj has an "id" field, this will reload the document to ensure
108 it is scanning the raw document prior to preprocessing.
113 joined = document_loader.fetcher.urljoin(b, u)
114 defrg, _ = urllib.parse.urldefrag(joined)
115 if defrg not in loaded:
117 # Use fetch_text to get raw file (before preprocessing).
118 text = document_loader.fetch_text(defrg)
119 if isinstance(text, bytes):
120 textIO = StringIO(text.decode('utf-8'))
122 textIO = StringIO(text)
123 return yaml.safe_load(textIO)
128 loadref_fields = set(("$import", "run"))
130 loadref_fields = set(("$import",))
132 scanobj = workflowobj
133 if "id" in workflowobj:
134 # Need raw file content (before preprocessing) to ensure
135 # that external references in $include and $mixin are captured.
136 scanobj = loadref("", workflowobj["id"])
138 sc_result = scandeps(uri, scanobj,
140 set(("$include", "$schemas", "location")),
141 loadref, urljoin=document_loader.fetcher.urljoin)
146 def collect_uuids(obj):
147 loc = obj.get("location", "")
150 # Collect collection uuids that need to be resolved to
151 # portable data hashes
152 gp = collection_uuid_pattern.match(loc)
154 uuids[gp.groups()[0]] = obj
155 if collectionUUID in obj:
156 uuids[obj[collectionUUID]] = obj
158 def collect_uploads(obj):
159 loc = obj.get("location", "")
163 if sp[0] in ("file", "http", "https"):
164 # Record local files than need to be uploaded,
165 # don't include file literals, keep references, etc.
169 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
170 visit_class(sc_result, ("File", "Directory"), collect_uploads)
172 # Resolve any collection uuids we found to portable data hashes
173 # and assign them to uuid_map
175 fetch_uuids = list(uuids.keys())
177 # For a large number of fetch_uuids, API server may limit
178 # response size, so keep fetching from API server has nothing
180 lookups = arvrunner.api.collections().list(
181 filters=[["uuid", "in", fetch_uuids]],
183 select=["uuid", "portable_data_hash"]).execute(
184 num_retries=arvrunner.num_retries)
186 if not lookups["items"]:
189 for l in lookups["items"]:
190 uuid_map[l["uuid"]] = l["portable_data_hash"]
192 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
194 normalizeFilesDirs(sc)
196 if include_primary and "id" in workflowobj:
197 sc.append({"class": "File", "location": workflowobj["id"]})
199 if "$schemas" in workflowobj:
200 for s in workflowobj["$schemas"]:
201 sc.append({"class": "File", "location": s})
203 def visit_default(obj):
205 def ensure_default_location(f):
206 if "location" not in f and "path" in f:
207 f["location"] = f["path"]
209 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
210 # Doesn't exist, remove from list of dependencies to upload
211 sc[:] = [x for x in sc if x["location"] != f["location"]]
212 # Delete "default" from workflowobj
214 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
218 find_defaults(workflowobj, visit_default)
221 def discover_default_secondary_files(obj):
222 discover_secondary_files(obj["inputs"],
223 {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
226 visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
228 for d in list(discovered):
229 # Only interested in discovered secondaryFiles which are local
230 # files that need to be uploaded.
231 if d.startswith("file:"):
232 sc.extend(discovered[d])
236 mapper = ArvPathMapper(arvrunner, sc, "",
240 single_collection=True)
243 loc = p.get("location")
244 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
245 p["location"] = mapper.mapper(p["location"]).resolved
251 if collectionUUID in p:
252 uuid = p[collectionUUID]
253 if uuid not in uuid_map:
254 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
255 "Collection uuid %s not found" % uuid)
256 gp = collection_pdh_pattern.match(loc)
257 if gp and uuid_map[uuid] != gp.groups()[0]:
258 # This file entry has both collectionUUID and a PDH
259 # location. If the PDH doesn't match the one returned
260 # the API server, raise an error.
261 raise SourceLine(p, "location", validate.ValidationException).makeError(
262 "Expected collection uuid %s to be %s but API server reported %s" % (
263 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
265 gp = collection_uuid_pattern.match(loc)
268 uuid = gp.groups()[0]
269 if uuid not in uuid_map:
270 raise SourceLine(p, "location", validate.ValidationException).makeError(
271 "Collection uuid %s not found" % uuid)
272 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
273 p[collectionUUID] = uuid
275 visit_class(workflowobj, ("File", "Directory"), setloc)
276 visit_class(discovered, ("File", "Directory"), setloc)
278 if discovered_secondaryfiles is not None:
280 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
282 if "$schemas" in workflowobj:
284 for s in workflowobj["$schemas"]:
285 sch.append(mapper.mapper(s).resolved)
286 workflowobj["$schemas"] = sch
291 def upload_docker(arvrunner, tool):
292 """Uploads Docker images used in CommandLineTool objects."""
294 if isinstance(tool, CommandLineTool):
295 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
297 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
298 # TODO: can be supported by containers API, but not jobs API.
299 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
300 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
301 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
303 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
304 elif isinstance(tool, cwltool.workflow.Workflow):
306 upload_docker(arvrunner, s.embedded_tool)
309 def packed_workflow(arvrunner, tool, merged_map):
310 """Create a packed workflow.
312 A "packed" workflow is one where all the components have been combined into a single document."""
315 packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
316 tool.tool["id"], tool.metadata, rewrite_out=rewrites)
318 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
320 def visit(v, cur_id):
321 if isinstance(v, dict):
322 if v.get("class") in ("CommandLineTool", "Workflow"):
324 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
325 cur_id = rewrite_to_orig.get(v["id"], v["id"])
326 if "location" in v and not v["location"].startswith("keep:"):
327 v["location"] = merged_map[cur_id].resolved[v["location"]]
328 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
329 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
330 if v.get("class") == "DockerRequirement":
331 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
334 if isinstance(v, list):
341 def tag_git_version(packed):
342 if tool.tool["id"].startswith("file://"):
343 path = os.path.dirname(tool.tool["id"][7:])
345 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
346 except (OSError, subprocess.CalledProcessError):
349 packed["http://schema.org/version"] = githash
352 def upload_job_order(arvrunner, name, tool, job_order):
353 """Upload local files referenced in the input object and return updated input
354 object with 'location' updated to the proper keep references.
357 discover_secondary_files(tool.tool["inputs"], job_order)
359 jobmapper = upload_dependencies(arvrunner,
363 job_order.get("id", "#"),
366 if "id" in job_order:
369 # Need to filter this out, gets added by cwltool when providing
370 # parameters on the command line.
371 if "job_order" in job_order:
372 del job_order["job_order"]
376 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
378 def upload_workflow_deps(arvrunner, tool):
379 # Ensure that Docker images needed by this workflow are available
381 upload_docker(arvrunner, tool)
383 document_loader = tool.doc_loader
387 def upload_tool_deps(deptool):
389 discovered_secondaryfiles = {}
390 pm = upload_dependencies(arvrunner,
391 "%s dependencies" % (shortname(deptool["id"])),
396 include_primary=False,
397 discovered_secondaryfiles=discovered_secondaryfiles)
398 document_loader.idx[deptool["id"]] = deptool
400 for k,v in pm.items():
401 toolmap[k] = v.resolved
402 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
404 tool.visit(upload_tool_deps)
408 def arvados_jobs_image(arvrunner, img):
409 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
412 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
413 except Exception as e:
414 raise Exception("Docker image %s is not available\n%s" % (img, e) )
417 def upload_workflow_collection(arvrunner, name, packed):
418 collection = arvados.collection.Collection(api_client=arvrunner.api,
419 keep_client=arvrunner.keep_client,
420 num_retries=arvrunner.num_retries)
421 with collection.open("workflow.cwl", "w") as f:
422 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
424 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
425 ["name", "like", name+"%"]]
426 if arvrunner.project_uuid:
427 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
428 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
431 logger.info("Using collection %s", exists["items"][0]["uuid"])
433 collection.save_new(name=name,
434 owner_uuid=arvrunner.project_uuid,
435 ensure_unique_name=True,
436 num_retries=arvrunner.num_retries)
437 logger.info("Uploaded to %s", collection.manifest_locator())
439 return collection.portable_data_hash()
442 class Runner(Process):
443 """Base class for runner processes, which submit an instance of
444 arvados-cwl-runner and wait for the final result."""
446 def __init__(self, runner, tool, loadingContext, enable_reuse,
447 output_name, output_tags, submit_runner_ram=0,
448 name=None, on_error=None, submit_runner_image=None,
449 intermediate_output_ttl=0, merged_map=None,
450 priority=None, secret_store=None,
451 collection_cache_size=256,
452 collection_cache_is_default=True):
454 super(Runner, self).__init__(tool.tool, loadingContext)
456 self.arvrunner = runner
457 self.embedded_tool = tool
458 self.job_order = None
461 # If reuse is permitted by command line arguments but
462 # disabled by the workflow itself, disable it.
463 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
465 enable_reuse = reuse_req["enableReuse"]
466 self.enable_reuse = enable_reuse
468 self.final_output = None
469 self.output_name = output_name
470 self.output_tags = output_tags
472 self.on_error = on_error
473 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
474 self.intermediate_output_ttl = intermediate_output_ttl
475 self.priority = priority
476 self.secret_store = secret_store
478 self.submit_runner_cores = 1
479 self.submit_runner_ram = 1024 # defaut 1 GiB
480 self.collection_cache_size = collection_cache_size
482 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
483 if runner_resource_req:
484 if runner_resource_req.get("coresMin"):
485 self.submit_runner_cores = runner_resource_req["coresMin"]
486 if runner_resource_req.get("ramMin"):
487 self.submit_runner_ram = runner_resource_req["ramMin"]
488 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
489 self.collection_cache_size = runner_resource_req["keep_cache"]
491 if submit_runner_ram:
492 # Command line / initializer overrides default and/or spec from workflow
493 self.submit_runner_ram = submit_runner_ram
495 if self.submit_runner_ram <= 0:
496 raise Exception("Value of submit-runner-ram must be greater than zero")
498 if self.submit_runner_cores <= 0:
499 raise Exception("Value of submit-runner-cores must be greater than zero")
501 self.merged_map = merged_map or {}
504 job_order, # type: Mapping[Text, Text]
505 output_callbacks, # type: Callable[[Any, Any], Any]
506 runtimeContext # type: RuntimeContext
507 ): # type: (...) -> Generator[Any, None, None]
508 self.job_order = job_order
509 self._init_job(job_order, runtimeContext)
512 def update_pipeline_component(self, record):
515 def done(self, record):
516 """Base method for handling a completed runner."""
519 if record["state"] == "Complete":
520 if record.get("exit_code") is not None:
521 if record["exit_code"] == 33:
522 processStatus = "UnsupportedRequirement"
523 elif record["exit_code"] == 0:
524 processStatus = "success"
526 processStatus = "permanentFail"
528 processStatus = "success"
530 processStatus = "permanentFail"
534 if processStatus == "permanentFail":
535 logc = arvados.collection.CollectionReader(record["log"],
536 api_client=self.arvrunner.api,
537 keep_client=self.arvrunner.keep_client,
538 num_retries=self.arvrunner.num_retries)
539 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
541 self.final_output = record["output"]
542 outc = arvados.collection.CollectionReader(self.final_output,
543 api_client=self.arvrunner.api,
544 keep_client=self.arvrunner.keep_client,
545 num_retries=self.arvrunner.num_retries)
546 if "cwl.output.json" in outc:
547 with outc.open("cwl.output.json", "rb") as f:
549 outputs = json.loads(f.read().decode())
550 def keepify(fileobj):
551 path = fileobj["location"]
552 if not path.startswith("keep:"):
553 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
554 adjustFileObjs(outputs, keepify)
555 adjustDirObjs(outputs, keepify)
557 logger.exception("[%s] While getting final output object", self.name)
558 self.arvrunner.output_callback({}, "permanentFail")
560 self.arvrunner.output_callback(outputs, processStatus)