19385: Messy work in progress for uploading workflows to collections
authorPeter Amstutz <peter.amstutz@curii.com>
Fri, 6 Jan 2023 22:21:07 +0000 (17:21 -0500)
committerPeter Amstutz <peter.amstutz@curii.com>
Fri, 6 Jan 2023 22:21:07 +0000 (17:21 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/cwl/arvados_cwl/arvdocker.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/executor.py
sdk/cwl/arvados_cwl/runner.py

index cf0b3b9daff3deb02a84fe6e41428e04cdefca4e..ef8398df341e88861ff80d6152762b52cb00ae0a 100644 (file)
@@ -121,7 +121,8 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
             if not out_of_project_images:
                 # Fetch Docker image if necessary.
                 try:
-                    result = cwltool.docker.DockerCommandLineJob.get_image(dockerRequirement, pull_image,
+                    dockerjob = cwltool.docker.DockerCommandLineJob(None, None, None, None, None, None)
+                    result = dockerjob.get_image(dockerRequirement, pull_image,
                                                                   force_pull, tmp_outdir_prefix)
                     if not result:
                         raise WorkflowException("Docker image '%s' not available" % dockerRequirement["dockerImageId"])
index 56226388d7ab36618d1ff354c70d6bf512c18cea..f66e50dca935870a1f1c88ccc4e4be023c60f9c5 100644 (file)
@@ -9,6 +9,14 @@ import os
 import json
 import copy
 import logging
+import urllib
+from io import StringIO
+import sys
+
+from typing import (MutableSequence, MutableMapping)
+
+from ruamel.yaml import YAML
+from ruamel.yaml.comments import CommentedMap, CommentedSeq
 
 from schema_salad.sourceline import SourceLine, cmap
 import schema_salad.ref_resolver
@@ -22,6 +30,8 @@ from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
 from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
 from cwltool.context import LoadingContext
 
+from schema_salad.ref_resolver import file_uri, uri_file_path
+
 import ruamel.yaml as yaml
 
 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
@@ -117,6 +127,133 @@ def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info,
 
     return json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
 
+def rel_ref(s, baseuri, urlexpander, merged_map):
+    uri = urlexpander(s, baseuri)
+    if baseuri in merged_map:
+        replacements = merged_map[baseuri].resolved
+        if uri in replacements:
+            return replacements[uri]
+
+    p1 = os.path.dirname(uri_file_path(baseuri))
+    p2 = os.path.dirname(uri_file_path(uri))
+    p3 = os.path.basename(uri_file_path(uri))
+    r = os.path.relpath(p2, p1)
+    if r == ".":
+        r = ""
+    print("AAA", uri, s)
+    print("BBBB", p1, p2, p3, r)
+    return os.path.join(r, p3)
+
+
+def update_refs(d, baseuri, urlexpander, merged_map, set_block_style):
+    if isinstance(d, CommentedSeq):
+        if set_block_style:
+            d.fa.set_block_style()
+        for s in d:
+            update_refs(s, baseuri, urlexpander, merged_map, set_block_style)
+    elif isinstance(d, CommentedMap):
+        if set_block_style:
+            d.fa.set_block_style()
+
+        if "id" in d:
+            baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
+
+        for s in d:
+            for field in ("$include", "$import", "location", "run"):
+                if field in d and isinstance(d[field], str):
+                    d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map)
+
+            if "$schemas" in d:
+                for n, s in enumerate(d["$schemas"]):
+                    d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map)
+
+            update_refs(d[s], baseuri, urlexpander, merged_map, set_block_style)
+
+def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
+                    runtimeContext, uuid=None,
+                    submit_runner_ram=0, name=None, merged_map=None,
+                    submit_runner_image=None,
+                    git_info=None):
+
+    firstfile = None
+    workflow_files = set()
+    import_files = set()
+    include_files = set()
+
+    for w in tool.doc_loader.idx:
+        if w.startswith("file://"):
+            workflow_files.add(urllib.parse.urldefrag(w)[0])
+            if firstfile is None:
+                firstfile = urllib.parse.urldefrag(w)[0]
+        if w.startswith("import:file://"):
+            import_files.add(urllib.parse.urldefrag(w[7:])[0])
+        if w.startswith("include:file://"):
+            include_files.add(urllib.parse.urldefrag(w[8:])[0])
+
+    all_files = workflow_files | import_files | include_files
+
+    n = 7
+    allmatch = True
+    while allmatch:
+        n += 1
+        for f in all_files:
+            if len(f)-1 < n:
+                n -= 1
+                allmatch = False
+                break
+            if f[n] != firstfile[n]:
+                allmatch = False
+                break
+
+    while firstfile[n] != "/":
+        n -= 1
+
+    prefix = firstfile[:n+1]
+
+    col = arvados.collection.Collection()
+
+    #print(merged_map.keys())
+
+    for w in workflow_files | import_files:
+        # 1. load YAML
+
+        text = tool.doc_loader.fetch_text(w)
+        if isinstance(text, bytes):
+            textIO = StringIO(text.decode('utf-8'))
+        else:
+            textIO = StringIO(text)
+
+        yamlloader = schema_salad.utils.yaml_no_ts()
+        result = yamlloader.load(textIO)
+
+        set_block_style = False
+        if result.fa.flow_style():
+            set_block_style = True
+
+        # 2. find $import, $include, $schema, run, location
+        # 3. update field value
+        update_refs(result, w, tool.doc_loader.expand_url, merged_map, set_block_style)
+
+        with col.open(w[n+1:], "wt") as f:
+            yamlloader.dump(result, stream=f)
+
+    for w in include_files:
+        with col.open(w[n+1:], "wb") as f1:
+            with open(uri_file_path(w), "rb") as f2:
+                dat = f2.read(65536)
+                while dat:
+                    f1.write(dat)
+                    dat = f2.read(65536)
+
+    toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
+    if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
+        toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
+
+    col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True)
+
+    return col.manifest_locator()
+
+
 def upload_workflow(arvRunner, tool, job_order, project_uuid,
                     runtimeContext, uuid=None,
                     submit_runner_ram=0, name=None, merged_map=None,
@@ -139,7 +276,7 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
         name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
 
     upload_dependencies(arvRunner, name, tool.doc_loader,
-                        packed, tool.tool["id"], False,
+                        packed, tool.tool["id"],
                         runtimeContext)
 
     wf_runner_resources = None
@@ -286,7 +423,6 @@ class ArvadosWorkflow(Workflow):
                                 self.doc_loader,
                                 joborder,
                                 joborder.get("id", "#"),
-                                False,
                                 runtimeContext)
 
             if self.wf_pdh is None:
@@ -330,7 +466,6 @@ class ArvadosWorkflow(Workflow):
                                     self.doc_loader,
                                     packed,
                                     self.tool["id"],
-                                    False,
                                     runtimeContext)
 
                 # Discover files/directories referenced by the
index 382c1643e4b570ed20f7405867f000c4403dfd86..1e6344f5e9e3b6866a8e2bc8ffeb899fd82f1135 100644 (file)
@@ -36,7 +36,7 @@ import arvados_cwl.util
 from .arvcontainer import RunnerContainer, cleanup_name_for_collection
 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder
 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
-from .arvworkflow import ArvadosWorkflow, upload_workflow
+from .arvworkflow import ArvadosWorkflow, upload_workflow, new_upload_workflow
 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
 from .perf import Perf
 from .pathmapper import NoFollowPathMapper
@@ -674,16 +674,16 @@ The 'jobs' API is no longer supported.
         loadingContext = self.loadingContext.copy()
         loadingContext.do_validate = False
         loadingContext.disable_js_validation = True
-        if submitting and not self.fast_submit:
-            loadingContext.do_update = False
-            # Document may have been auto-updated. Reload the original
-            # document with updating disabled because we want to
-            # submit the document with its original CWL version, not
-            # the auto-updated one.
-            with Perf(metrics, "load_tool original"):
-                tool = load_tool(updated_tool.tool["id"], loadingContext)
-        else:
-            tool = updated_tool
+        if submitting and not self.fast_submit:
+            loadingContext.do_update = False
+            # Document may have been auto-updated. Reload the original
+            # document with updating disabled because we want to
+            # submit the document with its original CWL version, not
+            # the auto-updated one.
+            with Perf(metrics, "load_tool original"):
+                tool = load_tool(updated_tool.tool["id"], loadingContext)
+        else:
+        tool = updated_tool
 
         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
         # Also uploads docker images.
@@ -702,13 +702,13 @@ The 'jobs' API is no longer supported.
         loadingContext.avsc_names = tool.doc_schema
         loadingContext.metadata = tool.metadata
         loadingContext.skip_resolve_all = True
-        with Perf(metrics, "load_tool"):
-            tool = load_tool(tool.tool, loadingContext)
+        #with Perf(metrics, "load_tool"):
+        #    tool = load_tool(tool.tool, loadingContext)
 
         if runtimeContext.update_workflow or runtimeContext.create_workflow:
             # Create a pipeline template or workflow record and exit.
             if self.work_api == "containers":
-                uuid = upload_workflow(self, tool, job_order,
+                uuid = new_upload_workflow(self, tool, job_order,
                                        runtimeContext.project_uuid,
                                        runtimeContext,
                                        uuid=runtimeContext.update_workflow,
index 25c7eaf6c700400f9104a65f2de3d44ce804c37f..7364df3345561ed24cf4afb2bdb085f8fbc62388 100644 (file)
@@ -295,7 +295,7 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No
             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
 
 def upload_dependencies(arvrunner, name, document_loader,
-                        workflowobj, uri, loadref_run, runtimeContext,
+                        workflowobj, uri, runtimeContext,
                         include_primary=True, discovered_secondaryfiles=None,
                         cache=None):
     """Upload the dependencies of the workflowobj document to Keep.
@@ -303,64 +303,27 @@ def upload_dependencies(arvrunner, name, document_loader,
     Returns a pathmapper object mapping local paths to keep references.  Also
     does an in-place update of references in "workflowobj".
 
-    Use scandeps to find $import, $include, $schemas, run, File and Directory
+    Use scandeps to find $schemas, File and Directory
     fields that represent external references.
 
     If workflowobj has an "id" field, this will reload the document to ensure
     it is scanning the raw document prior to preprocessing.
     """
 
-    loaded = set()
-    def loadref(b, u):
-        joined = document_loader.fetcher.urljoin(b, u)
-        defrg, _ = urllib.parse.urldefrag(joined)
-        if defrg not in loaded:
-            loaded.add(defrg)
-            if cache is not None and defrg in cache:
-                return cache[defrg]
-            # Use fetch_text to get raw file (before preprocessing).
-            text = document_loader.fetch_text(defrg)
-            if isinstance(text, bytes):
-                textIO = StringIO(text.decode('utf-8'))
-            else:
-                textIO = StringIO(text)
-            yamlloader = YAML(typ='safe', pure=True)
-            result = yamlloader.load(textIO)
-            if cache is not None:
-                cache[defrg] = result
-            return result
-        else:
-            return {}
-
-    if loadref_run:
-        loadref_fields = set(("$import", "run"))
-    else:
-        loadref_fields = set(("$import",))
-
     scanobj = workflowobj
-    if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
-        defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
-        if cache is not None and defrg not in cache:
-            # if we haven't seen this file before, want raw file
-            # content (before preprocessing) to ensure that external
-            # references like $include haven't already been inlined.
-            scanobj = loadref("", workflowobj["id"])
-
     metadata = scanobj
 
-    with Perf(metrics, "scandeps include, location"):
+    with Perf(metrics, "scandeps"):
         sc_result = scandeps(uri, scanobj,
-                             loadref_fields,
-                             set(("$include", "location")),
-                             loadref, urljoin=document_loader.fetcher.urljoin,
+                             set(),
+                             set(("location",)),
+                             None, urljoin=document_loader.fetcher.urljoin,
                              nestdirs=False)
-
-    with Perf(metrics, "scandeps $schemas"):
         optional_deps = scandeps(uri, scanobj,
-                                      loadref_fields,
-                                      set(("$schemas",)),
-                                      loadref, urljoin=document_loader.fetcher.urljoin,
-                                      nestdirs=False)
+                             set(),
+                             set(("$schemas",)),
+                             None, urljoin=document_loader.fetcher.urljoin,
+                             nestdirs=False)
 
     if sc_result is None:
         sc_result = []
@@ -703,7 +666,6 @@ def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
                                     jobloader,
                                     job_order,
                                     job_order.get("id", "#"),
-                                    False,
                                     runtimeContext)
 
     if "id" in job_order:
@@ -721,8 +683,9 @@ FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
 def upload_workflow_deps(arvrunner, tool, runtimeContext):
     # Ensure that Docker images needed by this workflow are available
 
-    with Perf(metrics, "upload_docker"):
-        upload_docker(arvrunner, tool, runtimeContext)
+    # testing only
+    #with Perf(metrics, "upload_docker"):
+    #    upload_docker(arvrunner, tool, runtimeContext)
 
     document_loader = tool.doc_loader
 
@@ -748,7 +711,6 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext):
                                      document_loader,
                                      deptool,
                                      deptool["id"],
-                                     False,
                                      runtimeContext,
                                      include_primary=False,
                                      discovered_secondaryfiles=discovered_secondaryfiles,