1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
13 from functools import partial
16 from collections import namedtuple
17 from io import StringIO
19 if os.name == "posix" and sys.version_info[0] < 3:
20 import subprocess32 as subprocess
24 from schema_salad.sourceline import SourceLine, cmap
26 from cwltool.command_line_tool import CommandLineTool
27 import cwltool.workflow
28 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
29 from cwltool.load_tool import fetch_document
30 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
31 from cwltool.utils import aslist
32 from cwltool.builder import substitute
33 from cwltool.pack import pack
34 from cwltool.update import INTERNAL_VERSION
35 import schema_salad.validate as validate
37 import arvados.collection
38 from .util import collectionUUID
39 import ruamel.yaml as yaml
41 import arvados_cwl.arvdocker
42 from .pathmapper import ArvPathMapper, trim_listing
43 from ._version import __version__
46 logger = logging.getLogger('arvados.cwl-runner')
48 def trim_anonymous_location(obj):
49 """Remove 'location' field from File and Directory literals.
51 To make internal handling easier, literals are assigned a random id for
52 'location'. However, when writing the record back out, this can break
53 reproducibility. Since it is valid for literals not have a 'location'
58 if obj.get("location", "").startswith("_:"):
62 def remove_redundant_fields(obj):
63 for field in ("path", "nameext", "nameroot", "dirname"):
68 def find_defaults(d, op):
69 if isinstance(d, list):
72 elif isinstance(d, dict):
76 for i in viewvalues(d):
79 def setSecondary(t, fileobj, discovered):
80 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
81 if "secondaryFiles" not in fileobj:
82 fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
83 if discovered is not None:
84 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
85 elif isinstance(fileobj, list):
87 setSecondary(t, e, discovered)
89 def discover_secondary_files(inputs, job_order, discovered=None):
91 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
92 setSecondary(t, job_order[shortname(t["id"])], discovered)
94 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
95 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
97 def upload_dependencies(arvrunner, name, document_loader,
98 workflowobj, uri, loadref_run,
99 include_primary=True, discovered_secondaryfiles=None):
100 """Upload the dependencies of the workflowobj document to Keep.
102 Returns a pathmapper object mapping local paths to keep references. Also
103 does an in-place update of references in "workflowobj".
105 Use scandeps to find $import, $include, $schemas, run, File and Directory
106 fields that represent external references.
108 If workflowobj has an "id" field, this will reload the document to ensure
109 it is scanning the raw document prior to preprocessing.
114 joined = document_loader.fetcher.urljoin(b, u)
115 defrg, _ = urllib.parse.urldefrag(joined)
116 if defrg not in loaded:
118 # Use fetch_text to get raw file (before preprocessing).
119 text = document_loader.fetch_text(defrg)
120 if isinstance(text, bytes):
121 textIO = StringIO(text.decode('utf-8'))
123 textIO = StringIO(text)
124 return yaml.safe_load(textIO)
129 loadref_fields = set(("$import", "run"))
131 loadref_fields = set(("$import",))
133 scanobj = workflowobj
134 if "id" in workflowobj:
135 # Need raw file content (before preprocessing) to ensure
136 # that external references in $include and $mixin are captured.
137 scanobj = loadref("", workflowobj["id"])
139 sc_result = scandeps(uri, scanobj,
141 set(("$include", "$schemas", "location")),
142 loadref, urljoin=document_loader.fetcher.urljoin)
147 def collect_uuids(obj):
148 loc = obj.get("location", "")
151 # Collect collection uuids that need to be resolved to
152 # portable data hashes
153 gp = collection_uuid_pattern.match(loc)
155 uuids[gp.groups()[0]] = obj
156 if collectionUUID in obj:
157 uuids[obj[collectionUUID]] = obj
159 def collect_uploads(obj):
160 loc = obj.get("location", "")
164 if sp[0] in ("file", "http", "https"):
165 # Record local files than need to be uploaded,
166 # don't include file literals, keep references, etc.
170 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
171 visit_class(sc_result, ("File", "Directory"), collect_uploads)
173 # Resolve any collection uuids we found to portable data hashes
174 # and assign them to uuid_map
176 fetch_uuids = list(uuids.keys())
178 # For a large number of fetch_uuids, API server may limit
179 # response size, so keep fetching from API server has nothing
181 lookups = arvrunner.api.collections().list(
182 filters=[["uuid", "in", fetch_uuids]],
184 select=["uuid", "portable_data_hash"]).execute(
185 num_retries=arvrunner.num_retries)
187 if not lookups["items"]:
190 for l in lookups["items"]:
191 uuid_map[l["uuid"]] = l["portable_data_hash"]
193 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
195 normalizeFilesDirs(sc)
197 if include_primary and "id" in workflowobj:
198 sc.append({"class": "File", "location": workflowobj["id"]})
200 if "$schemas" in workflowobj:
201 for s in workflowobj["$schemas"]:
202 sc.append({"class": "File", "location": s})
204 def visit_default(obj):
206 def ensure_default_location(f):
207 if "location" not in f and "path" in f:
208 f["location"] = f["path"]
210 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
211 # Doesn't exist, remove from list of dependencies to upload
212 sc[:] = [x for x in sc if x["location"] != f["location"]]
213 # Delete "default" from workflowobj
215 visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
219 find_defaults(workflowobj, visit_default)
222 def discover_default_secondary_files(obj):
223 discover_secondary_files(obj["inputs"],
224 {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
227 visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
229 for d in list(discovered):
230 # Only interested in discovered secondaryFiles which are local
231 # files that need to be uploaded.
232 if d.startswith("file:"):
233 sc.extend(discovered[d])
237 mapper = ArvPathMapper(arvrunner, sc, "",
241 single_collection=True)
244 loc = p.get("location")
245 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
246 p["location"] = mapper.mapper(p["location"]).resolved
252 if collectionUUID in p:
253 uuid = p[collectionUUID]
254 if uuid not in uuid_map:
255 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
256 "Collection uuid %s not found" % uuid)
257 gp = collection_pdh_pattern.match(loc)
258 if gp and uuid_map[uuid] != gp.groups()[0]:
259 # This file entry has both collectionUUID and a PDH
260 # location. If the PDH doesn't match the one returned
261 # the API server, raise an error.
262 raise SourceLine(p, "location", validate.ValidationException).makeError(
263 "Expected collection uuid %s to be %s but API server reported %s" % (
264 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
266 gp = collection_uuid_pattern.match(loc)
269 uuid = gp.groups()[0]
270 if uuid not in uuid_map:
271 raise SourceLine(p, "location", validate.ValidationException).makeError(
272 "Collection uuid %s not found" % uuid)
273 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
274 p[collectionUUID] = uuid
276 visit_class(workflowobj, ("File", "Directory"), setloc)
277 visit_class(discovered, ("File", "Directory"), setloc)
279 if discovered_secondaryfiles is not None:
281 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
283 if "$schemas" in workflowobj:
285 for s in workflowobj["$schemas"]:
286 sch.append(mapper.mapper(s).resolved)
287 workflowobj["$schemas"] = sch
292 def upload_docker(arvrunner, tool):
293 """Uploads Docker images used in CommandLineTool objects."""
295 if isinstance(tool, CommandLineTool):
296 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
298 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
299 # TODO: can be supported by containers API, but not jobs API.
300 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
301 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
302 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
304 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
305 elif isinstance(tool, cwltool.workflow.Workflow):
307 upload_docker(arvrunner, s.embedded_tool)
310 def packed_workflow(arvrunner, tool, merged_map):
311 """Create a packed workflow.
313 A "packed" workflow is one where all the components have been combined into a single document."""
316 packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
317 tool.tool["id"], tool.metadata, rewrite_out=rewrites)
319 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
321 def visit(v, cur_id):
322 if isinstance(v, dict):
323 if v.get("class") in ("CommandLineTool", "Workflow"):
325 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
326 cur_id = rewrite_to_orig.get(v["id"], v["id"])
327 if "location" in v and not v["location"].startswith("keep:"):
328 v["location"] = merged_map[cur_id].resolved[v["location"]]
329 if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
330 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
331 if v.get("class") == "DockerRequirement":
332 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
335 if isinstance(v, list):
342 def tag_git_version(packed):
343 if tool.tool["id"].startswith("file://"):
344 path = os.path.dirname(tool.tool["id"][7:])
346 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
347 except (OSError, subprocess.CalledProcessError):
350 packed["http://schema.org/version"] = githash
353 def upload_job_order(arvrunner, name, tool, job_order):
354 """Upload local files referenced in the input object and return updated input
355 object with 'location' updated to the proper keep references.
358 discover_secondary_files(tool.tool["inputs"], job_order)
360 jobmapper = upload_dependencies(arvrunner,
364 job_order.get("id", "#"),
367 if "id" in job_order:
370 # Need to filter this out, gets added by cwltool when providing
371 # parameters on the command line.
372 if "job_order" in job_order:
373 del job_order["job_order"]
377 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
379 def upload_workflow_deps(arvrunner, tool):
380 # Ensure that Docker images needed by this workflow are available
382 upload_docker(arvrunner, tool)
384 document_loader = tool.doc_loader
388 def upload_tool_deps(deptool):
390 discovered_secondaryfiles = {}
391 pm = upload_dependencies(arvrunner,
392 "%s dependencies" % (shortname(deptool["id"])),
397 include_primary=False,
398 discovered_secondaryfiles=discovered_secondaryfiles)
399 document_loader.idx[deptool["id"]] = deptool
401 for k,v in pm.items():
402 toolmap[k] = v.resolved
403 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
405 tool.visit(upload_tool_deps)
409 def arvados_jobs_image(arvrunner, img):
410 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
413 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
414 except Exception as e:
415 raise Exception("Docker image %s is not available\n%s" % (img, e) )
418 def upload_workflow_collection(arvrunner, name, packed):
419 collection = arvados.collection.Collection(api_client=arvrunner.api,
420 keep_client=arvrunner.keep_client,
421 num_retries=arvrunner.num_retries)
422 with collection.open("workflow.cwl", "w") as f:
423 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
425 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
426 ["name", "like", name+"%"]]
427 if arvrunner.project_uuid:
428 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
429 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
432 logger.info("Using collection %s", exists["items"][0]["uuid"])
434 collection.save_new(name=name,
435 owner_uuid=arvrunner.project_uuid,
436 ensure_unique_name=True,
437 num_retries=arvrunner.num_retries)
438 logger.info("Uploaded to %s", collection.manifest_locator())
440 return collection.portable_data_hash()
443 class Runner(Process):
444 """Base class for runner processes, which submit an instance of
445 arvados-cwl-runner and wait for the final result."""
447 def __init__(self, runner, tool, loadingContext, enable_reuse,
448 output_name, output_tags, submit_runner_ram=0,
449 name=None, on_error=None, submit_runner_image=None,
450 intermediate_output_ttl=0, merged_map=None,
451 priority=None, secret_store=None,
452 collection_cache_size=256,
453 collection_cache_is_default=True):
455 loadingContext = loadingContext.copy()
456 loadingContext.metadata = loadingContext.metadata.copy()
457 loadingContext.metadata["cwlVersion"] = INTERNAL_VERSION
459 super(Runner, self).__init__(tool.tool, loadingContext)
461 self.arvrunner = runner
462 self.embedded_tool = tool
463 self.job_order = None
466 # If reuse is permitted by command line arguments but
467 # disabled by the workflow itself, disable it.
468 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
470 enable_reuse = reuse_req["enableReuse"]
471 self.enable_reuse = enable_reuse
473 self.final_output = None
474 self.output_name = output_name
475 self.output_tags = output_tags
477 self.on_error = on_error
478 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
479 self.intermediate_output_ttl = intermediate_output_ttl
480 self.priority = priority
481 self.secret_store = secret_store
483 self.submit_runner_cores = 1
484 self.submit_runner_ram = 1024 # defaut 1 GiB
485 self.collection_cache_size = collection_cache_size
487 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
488 if runner_resource_req:
489 if runner_resource_req.get("coresMin"):
490 self.submit_runner_cores = runner_resource_req["coresMin"]
491 if runner_resource_req.get("ramMin"):
492 self.submit_runner_ram = runner_resource_req["ramMin"]
493 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
494 self.collection_cache_size = runner_resource_req["keep_cache"]
496 if submit_runner_ram:
497 # Command line / initializer overrides default and/or spec from workflow
498 self.submit_runner_ram = submit_runner_ram
500 if self.submit_runner_ram <= 0:
501 raise Exception("Value of submit-runner-ram must be greater than zero")
503 if self.submit_runner_cores <= 0:
504 raise Exception("Value of submit-runner-cores must be greater than zero")
506 self.merged_map = merged_map or {}
509 job_order, # type: Mapping[Text, Text]
510 output_callbacks, # type: Callable[[Any, Any], Any]
511 runtimeContext # type: RuntimeContext
512 ): # type: (...) -> Generator[Any, None, None]
513 self.job_order = job_order
514 self._init_job(job_order, runtimeContext)
517 def update_pipeline_component(self, record):
520 def done(self, record):
521 """Base method for handling a completed runner."""
524 if record["state"] == "Complete":
525 if record.get("exit_code") is not None:
526 if record["exit_code"] == 33:
527 processStatus = "UnsupportedRequirement"
528 elif record["exit_code"] == 0:
529 processStatus = "success"
531 processStatus = "permanentFail"
533 processStatus = "success"
535 processStatus = "permanentFail"
539 if processStatus == "permanentFail":
540 logc = arvados.collection.CollectionReader(record["log"],
541 api_client=self.arvrunner.api,
542 keep_client=self.arvrunner.keep_client,
543 num_retries=self.arvrunner.num_retries)
544 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
546 self.final_output = record["output"]
547 outc = arvados.collection.CollectionReader(self.final_output,
548 api_client=self.arvrunner.api,
549 keep_client=self.arvrunner.keep_client,
550 num_retries=self.arvrunner.num_retries)
551 if "cwl.output.json" in outc:
552 with outc.open("cwl.output.json", "rb") as f:
554 outputs = json.loads(f.read().decode())
555 def keepify(fileobj):
556 path = fileobj["location"]
557 if not path.startswith("keep:"):
558 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
559 adjustFileObjs(outputs, keepify)
560 adjustDirObjs(outputs, keepify)
562 logger.exception("[%s] While getting final output object", self.name)
563 self.arvrunner.output_callback({}, "permanentFail")
565 self.arvrunner.output_callback(outputs, processStatus)