20531: Add an extra newline to make the error section easier to see
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
index cc3a51d8011cbb46122996e11b675494c3ce0d32..43d23e9b92bd7ff79e5b874d96754de3d8bb5c0e 100644 (file)
@@ -41,6 +41,7 @@ from .runner import (upload_dependencies, packed_workflow, upload_workflow_colle
 from .pathmapper import ArvPathMapper, trim_listing
 from .arvtool import ArvadosCommandTool, set_cluster_target
 from ._version import __version__
+from .util import common_prefix
 
 from .perf import Perf
 
@@ -167,17 +168,14 @@ def is_basetype(tp):
     return False
 
 
-def update_refs(d, baseuri, urlexpander, merged_map, jobmapper, set_block_style, runtimeContext, prefix, replacePrefix):
-    if set_block_style and (isinstance(d, CommentedSeq) or isinstance(d, CommentedMap)):
-        d.fa.set_block_style()
-
+def update_refs(d, baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix):
     if isinstance(d, MutableSequence):
         for i, s in enumerate(d):
             if prefix and isinstance(s, str):
                 if s.startswith(prefix):
                     d[i] = replacePrefix+s[len(prefix):]
             else:
-                update_refs(s, baseuri, urlexpander, merged_map, jobmapper, set_block_style, runtimeContext, prefix, replacePrefix)
+                update_refs(s, baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
     elif isinstance(d, MutableMapping):
         for field in ("id", "name"):
             if isinstance(d.get(field), str) and d[field].startswith("_:"):
@@ -214,7 +212,7 @@ def update_refs(d, baseuri, urlexpander, merged_map, jobmapper, set_block_style,
                     if isinstance(d["inputs"][inp], str) and not is_basetype(d["inputs"][inp]):
                         d["inputs"][inp] = rel_ref(d["inputs"][inp], baseuri, urlexpander, merged_map, jobmapper)
                     if isinstance(d["inputs"][inp], MutableMapping):
-                        update_refs(d["inputs"][inp], baseuri, urlexpander, merged_map, jobmapper, set_block_style, runtimeContext, prefix, replacePrefix)
+                        update_refs(d["inputs"][inp], baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
                 continue
 
             if field == "$schemas":
@@ -222,7 +220,7 @@ def update_refs(d, baseuri, urlexpander, merged_map, jobmapper, set_block_style,
                     d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map, jobmapper)
                 continue
 
-            update_refs(d[field], baseuri, urlexpander, merged_map, jobmapper, set_block_style, runtimeContext, prefix, replacePrefix)
+            update_refs(d[field], baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
 
 
 def fix_schemadef(req, baseuri, urlexpander, merged_map, jobmapper, pdh):
@@ -238,6 +236,7 @@ def fix_schemadef(req, baseuri, urlexpander, merged_map, jobmapper, pdh):
             merged_map[mm].resolved[r] = rename
     return req
 
+
 def drop_ids(d):
     if isinstance(d, MutableSequence):
         for i, s in enumerate(d):
@@ -264,6 +263,10 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
     import_files = set()
     include_files = set()
 
+    # The document loader index will have entries for all the files
+    # that were loaded in the process of parsing the entire workflow
+    # (including subworkflows, tools, imports, etc).  We use this to
+    # get compose a list of the workflow file dependencies.
     for w in tool.doc_loader.idx:
         if w.startswith("file://"):
             workflow_files.add(urllib.parse.urldefrag(w)[0])
@@ -276,27 +279,25 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
 
     all_files = workflow_files | import_files | include_files
 
-    n = 7
-    allmatch = True
-    if firstfile:
-        while allmatch:
-            n += 1
-            for f in all_files:
-                if len(f)-1 < n:
-                    n -= 1
-                    allmatch = False
-                    break
-                if f[n] != firstfile[n]:
-                    allmatch = False
-                    break
-
-        while firstfile[n] != "/":
-            n -= 1
+    # Find the longest common prefix among all the file names.  We'll
+    # use this to recreate the directory structure in a keep
+    # collection with correct relative references.
+    prefix = common_prefix(firstfile, all_files)
 
     col = arvados.collection.Collection(api_client=arvRunner.api)
 
+    # Now go through all the files and update references to other
+    # files.  We previously scanned for file dependencies, these are
+    # are passed in as merged_map.
+    #
+    # note about merged_map: we upload dependencies of each process
+    # object (CommandLineTool/Workflow) to a separate collection.
+    # That way, when the user edits something, this limits collection
+    # PDH changes to just that tool, and minimizes situations where
+    # small changes break container reuse for the whole workflow.
+    #
     for w in workflow_files | import_files:
-        # 1. load YAML
+        # 1. load the YAML  file
 
         text = tool.doc_loader.fetch_text(w)
         if isinstance(text, bytes):
@@ -307,25 +308,33 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
         yamlloader = schema_salad.utils.yaml_no_ts()
         result = yamlloader.load(textIO)
 
-        set_block_style = False
-        if result.fa.flow_style():
-            set_block_style = True
+        # If the whole document is in "flow style" it is probably JSON
+        # formatted.  We'll re-export it as JSON because the
+        # ruamel.yaml round-trip mode is a lie and only preserves
+        # "block style" formatting and not "flow style" formatting.
+        export_as_json = result.fa.flow_style()
 
         # 2. find $import, $include, $schema, run, location
         # 3. update field value
-        update_refs(result, w, tool.doc_loader.expand_url, merged_map, jobmapper, set_block_style, runtimeContext, "", "")
+        update_refs(result, w, tool.doc_loader.expand_url, merged_map, jobmapper, runtimeContext, "", "")
 
-        with col.open(w[n+1:], "wt") as f:
-            # yamlloader.dump(result, stream=sys.stdout)
-            yamlloader.dump(result, stream=f)
+        # Write the updated file to the collection.
+        with col.open(w[len(prefix):], "wt") as f:
+            if export_as_json:
+                json.dump(result, f, indent=4, separators=(',',': '))
+            else:
+                yamlloader.dump(result, stream=f)
 
-        with col.open(os.path.join("original", w[n+1:]), "wt") as f:
+        # Also store a verbatim copy of the original files
+        with col.open(os.path.join("original", w[len(prefix):]), "wt") as f:
             f.write(text)
 
 
+    # Upload files referenced by $include directives, these are used
+    # unchanged and don't need to be updated.
     for w in include_files:
-        with col.open(w[n+1:], "wb") as f1:
-            with col.open(os.path.join("original", w[n+1:]), "wb") as f3:
+        with col.open(w[len(prefix):], "wb") as f1:
+            with col.open(os.path.join("original", w[len(prefix):]), "wb") as f3:
                 with open(uri_file_path(w), "rb") as f2:
                     dat = f2.read(65536)
                     while dat:
@@ -333,12 +342,13 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
                         f3.write(dat)
                         dat = f2.read(65536)
 
+    # Now collect metadata: the collection name and git properties.
 
     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
 
-    toolfile = tool.tool["id"][n+1:]
+    toolfile = tool.tool["id"][len(prefix):]
 
     properties = {
         "type": "workflow",
@@ -350,19 +360,21 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
             p = g.split("#", 1)[1]
             properties["arv:"+p] = git_info[g]
 
+    # Check if a collection with the same content already exists in the target project.  If so, just use that one.
     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", col.portable_data_hash()],
                                                          ["owner_uuid", "=", arvRunner.project_uuid]]).execute(num_retries=arvRunner.num_retries)
+
     if len(existing["items"]) == 0:
+        toolname = toolname.replace("/", " ")
         col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
         logger.info("Workflow uploaded to %s", col.manifest_locator())
     else:
         logger.info("Workflow uploaded to %s", existing["items"][0]["uuid"])
 
-    adjustDirObjs(job_order, trim_listing)
-    adjustFileObjs(job_order, trim_anonymous_location)
-    adjustDirObjs(job_order, trim_anonymous_location)
-
-    # now construct the wrapper
+    # Now that we've updated the workflow and saved it to a
+    # collection, we're going to construct a minimal "wrapper"
+    # workflow which consists of only of input and output parameters
+    # connected to a single step that runs the real workflow.
 
     runfile = "keep:%s/%s" % (col.portable_data_hash(), toolfile)
 
@@ -396,6 +408,14 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
     if submit_runner_ram:
         wf_runner_resources["ramMin"] = submit_runner_ram
 
+    # Remove a few redundant fields from the "job order" (aka input
+    # object or input parameters).  In the situation where we're
+    # creating or updating a workflow record, any values in the job
+    # order get copied over as default values for input parameters.
+    adjustDirObjs(job_order, trim_listing)
+    adjustFileObjs(job_order, trim_anonymous_location)
+    adjustDirObjs(job_order, trim_anonymous_location)
+
     newinputs = []
     for i in main["inputs"]:
         inp = {}
@@ -446,9 +466,14 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
     if hints:
         wrapper["hints"] = hints
 
-    # 1. check for SchemaDef
-    # 2. do what pack does
-    # 3. fix inputs
+    # Schema definitions (this lets you define things like record
+    # types) require a special handling.
+
+    for i, r in enumerate(wrapper["requirements"]):
+        if r["class"] == "SchemaDefRequirement":
+            wrapper["requirements"][i] = fix_schemadef(r, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, col.portable_data_hash())
+
+    update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, runtimeContext, main["id"]+"#", "#main/")
 
     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
 
@@ -456,12 +481,6 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
         for g in git_info:
             doc[g] = git_info[g]
 
-    for i, r in enumerate(wrapper["requirements"]):
-        if r["class"] == "SchemaDefRequirement":
-            wrapper["requirements"][i] = fix_schemadef(r, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, col.portable_data_hash())
-
-    update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, False, runtimeContext, main["id"]+"#", "#main/")
-
     # Remove any lingering file references.
     drop_ids(wrapper)