19385: WIP for complex schemadef + $import case
authorPeter Amstutz <peter.amstutz@curii.com>
Tue, 24 Jan 2023 21:03:56 +0000 (16:03 -0500)
committerPeter Amstutz <peter.amstutz@curii.com>
Tue, 24 Jan 2023 21:03:56 +0000 (16:03 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/executor.py

index 35cbf94353bdd6f0235d0bc3d24df65a746d25ca..03e6bd302abeda5ed0b3701e912140ac5d5ad5a6 100644 (file)
@@ -25,7 +25,7 @@ import arvados.collection
 
 from cwltool.pack import pack
 from cwltool.load_tool import fetch_document, resolve_and_validate_document
-from cwltool.process import shortname
+from cwltool.process import shortname, uniquename
 from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
 from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
 from cwltool.context import LoadingContext
@@ -36,7 +36,7 @@ import ruamel.yaml as yaml
 
 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
                      trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
-                     make_builder, arvados_jobs_image)
+                     make_builder, arvados_jobs_image, FileUpdates)
 from .pathmapper import ArvPathMapper, trim_listing
 from .arvtool import ArvadosCommandTool, set_cluster_target
 from ._version import __version__
@@ -127,6 +127,7 @@ def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info,
 
     return json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
 
+
 def rel_ref(s, baseuri, urlexpander, merged_map):
     if s.startswith("keep:"):
         return s
@@ -135,11 +136,18 @@ def rel_ref(s, baseuri, urlexpander, merged_map):
     uri = urlexpander(s, baseuri)
     #print("CCC", uri)
 
+    if uri.startswith("keep:"):
+        return uri
+
     fileuri = urllib.parse.urldefrag(baseuri)[0]
 
+    #print("BBB", s, baseuri)
+
     for u in (baseuri, fileuri):
         if u in merged_map:
             replacements = merged_map[u].resolved
+            #print("RRR", uri, replacements)
+            #print()
             #print(uri, replacements)
             if uri in replacements:
                 return replacements[uri]
@@ -173,37 +181,62 @@ def update_refs(d, baseuri, urlexpander, merged_map, set_block_style, runtimeCon
     elif isinstance(d, MutableMapping):
         if "id" in d:
             baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
+        elif "name" in d:
+            baseuri = urlexpander(d["name"], baseuri, scoped_id=True)
 
         if d.get("class") == "DockerRequirement":
             dockerImageId = d.get("dockerImageId") or d.get("dockerPull")
             d["http://arvados.org/cwl#dockerCollectionPDH"] = runtimeContext.cached_docker_lookups.get(dockerImageId)
 
-        for s in d:
-            for field in ("location", "run", "name"):
-                if field in d and isinstance(d[field], str):
-                    d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map)
+        for field in d:
+            if field in ("location", "run", "name") and isinstance(d[field], str):
+                d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map)
+                continue
 
-            for field in ("$include", "$import"):
-                if field in d and isinstance(d[field], str):
-                    d[field] = rel_ref(d[field], baseuri, urlexpander, {})
+            if field in ("$include", "$import") and isinstance(d[field], str):
+                d[field] = rel_ref(d[field], baseuri, urlexpander, {})
+                continue
 
-            basetypes = ("null", "boolean", "int", "long", "float", "double", "string", "File", "Directory")
+            basetypes = ("null", "boolean", "int", "long", "float", "double", "string", "File", "Directory", "record", "array", "enum")
 
-            if ("type" in d and
+            if (field == "type" and
                 isinstance(d["type"], str) and
                 d["type"] not in basetypes):
+                #print("DDD ding", d["type"])
                 d["type"] = rel_ref(d["type"], baseuri, urlexpander, merged_map)
+                #print("DDD dong", d["type"])
+                continue
 
-            if "inputs" in d and isinstance(d["inputs"], MutableMapping):
+            if field == "inputs" and isinstance(d["inputs"], MutableMapping):
                 for inp in d["inputs"]:
                     if isinstance(d["inputs"][inp], str) and d["inputs"][inp] not in basetypes:
+                        #print("III", inp)
                         d["inputs"][inp] = rel_ref(d["inputs"][inp], baseuri, urlexpander, merged_map)
+                continue
 
-            if "$schemas" in d:
+            if field == "$schemas":
                 for n, s in enumerate(d["$schemas"]):
                     d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map)
+                continue
+
+            update_refs(d[field], baseuri, urlexpander, merged_map, set_block_style, runtimeContext, prefix, replacePrefix)
+
+
+def fix_schemadef(req, baseuri, urlexpander, merged_map, pdh):
+    req = copy.deepcopy(req)
+    #if "id" in req:
+    #    del req["id"]
 
-            update_refs(d[s], baseuri, urlexpander, merged_map, set_block_style, runtimeContext, prefix, replacePrefix)
+    for f in req["types"]:
+        r = f["name"]
+        path, frag = urllib.parse.urldefrag(r)
+        rel = rel_ref(r, baseuri, urlexpander, merged_map)
+        merged_map.setdefault(path, FileUpdates({}, {}))
+        #print("PPP", path, r, frag)
+        rename = "keep:%s/%s" %(pdh, rel)
+        for mm in merged_map:
+            merged_map[mm].resolved[r] = rename
+    return req
 
 def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
                         runtimeContext,
@@ -307,11 +340,13 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
 
     # now construct the wrapper
 
+    runfile = "keep:%s/%s" % (col.portable_data_hash(), toolfile)
+
     step = {
         "id": "#main/" + toolname,
         "in": [],
         "out": [],
-        "run": "keep:%s/%s" % (col.portable_data_hash(), toolfile),
+        "run": runfile,
         "label": name
     }
 
@@ -387,6 +422,10 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
     if hints:
         wrapper["hints"] = hints
 
+    # 1. check for SchemaDef
+    # 2. do what pack does
+    # 3. fix inputs
+
     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
 
     if git_info:
@@ -396,8 +435,21 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
     #print("MMM", main["id"])
     #print(yamlloader.dump(wrapper, stream=sys.stdout))
 
+    for i, r in enumerate(wrapper["requirements"]):
+        if r["class"] == "SchemaDefRequirement":
+            wrapper["requirements"][i] = fix_schemadef(r, main["id"], tool.doc_loader.expand_url, merged_map, col.portable_data_hash())
+
+    # print()
+    # print("merrrrged maaap", merged_map)
+    # print()
+    print("update_refs", main["id"], runfile)
+
+    #print(yamlloader.dump(wrapper, stream=sys.stdout))
+
     update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False, runtimeContext, main["id"]+"#", "#main/")
 
+    #print("HHH")
+
     #print(yamlloader.dump(wrapper, stream=sys.stdout))
 
     return doc
index ee8917b881f2f24d7171bd523d818b1024e59fb1..a447f46b933a6d8689ea8b95e529653541a23378 100644 (file)
@@ -726,12 +726,18 @@ The 'jobs' API is no longer supported.
                 self.stdout.write(uuid + "\n")
                 return (None, "success")
 
+            loadingContext.loader.idx.clear()
             loadingContext.loader.idx["_:main"] = workflow_wrapper
+            workflow_wrapper["id"] = "_:main"
 
             # Reload just the wrapper workflow.
             self.fast_submit = True
-            workflow_wrapper, _ = loadingContext.loader.resolve_all(cmap(workflow_wrapper), tool.tool["id"])
-            tool = load_tool(workflow_wrapper[0], loadingContext)
+            #print("bah bah", loadingContext.requirements)
+            #workflow_wrapper, _ = loadingContext.loader.resolve_all(cmap(workflow_wrapper), "_:main", checklinks=True)
+
+            #tool = load_tool(workflow_wrapper[0], loadingContext)
+            #print(json.dumps(workflow_wrapper, indent=2))
+            tool = load_tool(workflow_wrapper, loadingContext)
 
 
         self.apply_reqs(job_order, tool)