15028: Update cwltool/schema-salad deps, fix tests
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index b65e2c58e180a00f6bb8bca6688498880c3d5e00..3235f4763c36506184d90a74a60136474d30b1e4 100644 (file)
@@ -7,15 +7,20 @@ standard_library.install_aliases()
 from future.utils import  viewvalues, viewitems
 
 import os
+import sys
+import re
 import urllib.parse
 from functools import partial
 import logging
 import json
-import subprocess32 as subprocess
 from collections import namedtuple
-
 from io import StringIO
 
+if os.name == "posix" and sys.version_info[0] < 3:
+    import subprocess32 as subprocess
+else:
+    import subprocess
+
 from schema_salad.sourceline import SourceLine, cmap
 
 from cwltool.command_line_tool import CommandLineTool
@@ -26,8 +31,11 @@ from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.utils import aslist
 from cwltool.builder import substitute
 from cwltool.pack import pack
+from cwltool.update import INTERNAL_VERSION
+import schema_salad.validate as validate
 
 import arvados.collection
+from .util import collectionUUID
 import ruamel.yaml as yaml
 
 import arvados_cwl.arvdocker
@@ -83,6 +91,8 @@ def discover_secondary_files(inputs, job_order, discovered=None):
         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
             setSecondary(t, job_order[shortname(t["id"])], discovered)
 
+collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
+collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
 
 def upload_dependencies(arvrunner, name, document_loader,
                         workflowobj, uri, loadref_run,
@@ -132,14 +142,55 @@ def upload_dependencies(arvrunner, name, document_loader,
                   loadref, urljoin=document_loader.fetcher.urljoin)
 
     sc = []
-    def only_real(obj):
-        # Only interested in local files than need to be uploaded,
-        # don't include file literals, keep references, etc.
-        sp = obj.get("location", "").split(":")
-        if len(sp) > 1 and sp[0] in ("file", "http", "https"):
+    uuids = {}
+
+    def collect_uuids(obj):
+        loc = obj.get("location", "")
+        sp = loc.split(":")
+        if sp[0] == "keep":
+            # Collect collection uuids that need to be resolved to
+            # portable data hashes
+            gp = collection_uuid_pattern.match(loc)
+            if gp:
+                uuids[gp.groups()[0]] = obj
+            if collectionUUID in obj:
+                uuids[obj[collectionUUID]] = obj
+
+    def collect_uploads(obj):
+        loc = obj.get("location", "")
+        sp = loc.split(":")
+        if len(sp) < 1:
+            return
+        if sp[0] in ("file", "http", "https"):
+            # Record local files than need to be uploaded,
+            # don't include file literals, keep references, etc.
             sc.append(obj)
+        collect_uuids(obj)
+
+    visit_class(workflowobj, ("File", "Directory"), collect_uuids)
+    visit_class(sc_result, ("File", "Directory"), collect_uploads)
+
+    # Resolve any collection uuids we found to portable data hashes
+    # and assign them to uuid_map
+    uuid_map = {}
+    fetch_uuids = list(uuids.keys())
+    while fetch_uuids:
+        # For a large number of fetch_uuids, API server may limit
+        # response size, so keep fetching from API server has nothing
+        # more to give us.
+        lookups = arvrunner.api.collections().list(
+            filters=[["uuid", "in", fetch_uuids]],
+            count="none",
+            select=["uuid", "portable_data_hash"]).execute(
+                num_retries=arvrunner.num_retries)
 
-    visit_class(sc_result, ("File", "Directory"), only_real)
+        if not lookups["items"]:
+            break
+
+        for l in lookups["items"]:
+            uuid_map[l["uuid"]] = l["portable_data_hash"]
+
+        fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
 
     normalizeFilesDirs(sc)
 
@@ -190,8 +241,37 @@ def upload_dependencies(arvrunner, name, document_loader,
                            single_collection=True)
 
     def setloc(p):
-        if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
+        loc = p.get("location")
+        if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
             p["location"] = mapper.mapper(p["location"]).resolved
+            return
+
+        if not loc:
+            return
+
+        if collectionUUID in p:
+            uuid = p[collectionUUID]
+            if uuid not in uuid_map:
+                raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
+                    "Collection uuid %s not found" % uuid)
+            gp = collection_pdh_pattern.match(loc)
+            if gp and uuid_map[uuid] != gp.groups()[0]:
+                # This file entry has both collectionUUID and a PDH
+                # location. If the PDH doesn't match the one returned
+                # the API server, raise an error.
+                raise SourceLine(p, "location", validate.ValidationException).makeError(
+                    "Expected collection uuid %s to be %s but API server reported %s" % (
+                        uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
+
+        gp = collection_uuid_pattern.match(loc)
+        if not gp:
+            return
+        uuid = gp.groups()[0]
+        if uuid not in uuid_map:
+            raise SourceLine(p, "location", validate.ValidationException).makeError(
+                "Collection uuid %s not found" % uuid)
+        p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
+        p[collectionUUID] = uuid
 
     visit_class(workflowobj, ("File", "Directory"), setloc)
     visit_class(discovered, ("File", "Directory"), setloc)
@@ -372,6 +452,10 @@ class Runner(Process):
                  collection_cache_size=256,
                  collection_cache_is_default=True):
 
+        loadingContext = loadingContext.copy()
+        loadingContext.metadata = loadingContext.metadata.copy()
+        loadingContext.metadata["cwlVersion"] = INTERNAL_VERSION
+
         super(Runner, self).__init__(tool.tool, loadingContext)
 
         self.arvrunner = runner
@@ -467,15 +551,15 @@ class Runner(Process):
             if "cwl.output.json" in outc:
                 with outc.open("cwl.output.json", "rb") as f:
                     if f.size() > 0:
-                        outputs = json.load(f)
+                        outputs = json.loads(f.read().decode())
             def keepify(fileobj):
                 path = fileobj["location"]
                 if not path.startswith("keep:"):
                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
             adjustFileObjs(outputs, keepify)
             adjustDirObjs(outputs, keepify)
-        except Exception as e:
-            logger.exception("[%s] While getting final output object: %s", self.name, e)
+        except Exception:
+            logger.exception("[%s] While getting final output object", self.name)
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             self.arvrunner.output_callback(outputs, processStatus)