X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/9f57bcbab5b244365ca4d0ece8490d7540c72b86..ac0dc6c57a2b4f736b6faf62421de56a3355db04:/sdk/cwl/arvados_cwl/runner.py diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index c0d165aa9e..3235f4763c 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -8,6 +8,7 @@ from future.utils import viewvalues, viewitems import os import sys +import re import urllib.parse from functools import partial import logging @@ -30,8 +31,11 @@ from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class from cwltool.utils import aslist from cwltool.builder import substitute from cwltool.pack import pack +from cwltool.update import INTERNAL_VERSION +import schema_salad.validate as validate import arvados.collection +from .util import collectionUUID import ruamel.yaml as yaml import arvados_cwl.arvdocker @@ -87,6 +91,8 @@ def discover_secondary_files(inputs, job_order, discovered=None): if shortname(t["id"]) in job_order and t.get("secondaryFiles"): setSecondary(t, job_order[shortname(t["id"])], discovered) +collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$') +collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?') def upload_dependencies(arvrunner, name, document_loader, workflowobj, uri, loadref_run, @@ -136,14 +142,55 @@ def upload_dependencies(arvrunner, name, document_loader, loadref, urljoin=document_loader.fetcher.urljoin) sc = [] - def only_real(obj): - # Only interested in local files than need to be uploaded, - # don't include file literals, keep references, etc. - sp = obj.get("location", "").split(":") - if len(sp) > 1 and sp[0] in ("file", "http", "https"): + uuids = {} + + def collect_uuids(obj): + loc = obj.get("location", "") + sp = loc.split(":") + if sp[0] == "keep": + # Collect collection uuids that need to be resolved to + # portable data hashes + gp = collection_uuid_pattern.match(loc) + if gp: + uuids[gp.groups()[0]] = obj + if collectionUUID in obj: + uuids[obj[collectionUUID]] = obj + + def collect_uploads(obj): + loc = obj.get("location", "") + sp = loc.split(":") + if len(sp) < 1: + return + if sp[0] in ("file", "http", "https"): + # Record local files than need to be uploaded, + # don't include file literals, keep references, etc. sc.append(obj) + collect_uuids(obj) - visit_class(sc_result, ("File", "Directory"), only_real) + visit_class(workflowobj, ("File", "Directory"), collect_uuids) + visit_class(sc_result, ("File", "Directory"), collect_uploads) + + # Resolve any collection uuids we found to portable data hashes + # and assign them to uuid_map + uuid_map = {} + fetch_uuids = list(uuids.keys()) + while fetch_uuids: + # For a large number of fetch_uuids, API server may limit + # response size, so keep fetching from API server has nothing + # more to give us. + lookups = arvrunner.api.collections().list( + filters=[["uuid", "in", fetch_uuids]], + count="none", + select=["uuid", "portable_data_hash"]).execute( + num_retries=arvrunner.num_retries) + + if not lookups["items"]: + break + + for l in lookups["items"]: + uuid_map[l["uuid"]] = l["portable_data_hash"] + + fetch_uuids = [u for u in fetch_uuids if u not in uuid_map] normalizeFilesDirs(sc) @@ -194,8 +241,37 @@ def upload_dependencies(arvrunner, name, document_loader, single_collection=True) def setloc(p): - if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")): + loc = p.get("location") + if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")): p["location"] = mapper.mapper(p["location"]).resolved + return + + if not loc: + return + + if collectionUUID in p: + uuid = p[collectionUUID] + if uuid not in uuid_map: + raise SourceLine(p, collectionUUID, validate.ValidationException).makeError( + "Collection uuid %s not found" % uuid) + gp = collection_pdh_pattern.match(loc) + if gp and uuid_map[uuid] != gp.groups()[0]: + # This file entry has both collectionUUID and a PDH + # location. If the PDH doesn't match the one returned + # the API server, raise an error. + raise SourceLine(p, "location", validate.ValidationException).makeError( + "Expected collection uuid %s to be %s but API server reported %s" % ( + uuid, gp.groups()[0], uuid_map[p[collectionUUID]])) + + gp = collection_uuid_pattern.match(loc) + if not gp: + return + uuid = gp.groups()[0] + if uuid not in uuid_map: + raise SourceLine(p, "location", validate.ValidationException).makeError( + "Collection uuid %s not found" % uuid) + p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "") + p[collectionUUID] = uuid visit_class(workflowobj, ("File", "Directory"), setloc) visit_class(discovered, ("File", "Directory"), setloc) @@ -376,6 +452,10 @@ class Runner(Process): collection_cache_size=256, collection_cache_is_default=True): + loadingContext = loadingContext.copy() + loadingContext.metadata = loadingContext.metadata.copy() + loadingContext.metadata["cwlVersion"] = INTERNAL_VERSION + super(Runner, self).__init__(tool.tool, loadingContext) self.arvrunner = runner @@ -478,8 +558,8 @@ class Runner(Process): fileobj["location"] = "keep:%s/%s" % (record["output"], path) adjustFileObjs(outputs, keepify) adjustDirObjs(outputs, keepify) - except Exception as e: - logger.exception("[%s] While getting final output object: %s", self.name, e) + except Exception: + logger.exception("[%s] While getting final output object", self.name) self.arvrunner.output_callback({}, "permanentFail") else: self.arvrunner.output_callback(outputs, processStatus)