19385: Add comments and minor reorganization 19385-cwl-fast-pack
authorPeter Amstutz <peter.amstutz@curii.com>
Tue, 7 Feb 2023 21:28:27 +0000 (16:28 -0500)
committerPeter Amstutz <peter.amstutz@curii.com>
Tue, 7 Feb 2023 21:28:27 +0000 (16:28 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/executor.py

index 86eee4051f1de03e06d0a145f1e4fa1bd9cf3bbf..895676565d53f6b817ac0ad555330aa4b12781e4 100644 (file)
@@ -261,6 +261,10 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
     import_files = set()
     include_files = set()
 
+    # The document loader index will have entries for all the files
+    # that were loaded in the process of parsing the entire workflow
+    # (including subworkflows, tools, imports, etc).  We use this to
+    # get compose a list of the workflow file dependencies.
     for w in tool.doc_loader.idx:
         if w.startswith("file://"):
             workflow_files.add(urllib.parse.urldefrag(w)[0])
@@ -273,6 +277,9 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
 
     all_files = workflow_files | import_files | include_files
 
+    # Find the longest common prefix among all the file names.  We'll
+    # use this to recreate the directory structure in a keep
+    # collection with correct relative references.
     n = 7
     allmatch = True
     if firstfile:
@@ -292,8 +299,18 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
 
     col = arvados.collection.Collection(api_client=arvRunner.api)
 
+    # Now go through all the files and update references to other
+    # files.  We previously scanned for file dependencies, these are
+    # are passed in as merged_map.
+    #
+    # note about merged_map: we upload dependencies of each process
+    # object (CommandLineTool/Workflow) to a separate collection.
+    # That way, when the user edits something, this limits collection
+    # PDH changes to just that tool, and minimizes situations where
+    # small changes break container reuse for the whole workflow.
+    #
     for w in workflow_files | import_files:
-        # 1. load YAML
+        # 1. load the YAML  file
 
         text = tool.doc_loader.fetch_text(w)
         if isinstance(text, bytes):
@@ -304,23 +321,30 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
         yamlloader = schema_salad.utils.yaml_no_ts()
         result = yamlloader.load(textIO)
 
+        # If the whole document is in "flow style" it is probably JSON
+        # formatted.  We'll re-export it as JSON because the
+        # ruamel.yaml round-trip mode is a lie and only preserves
+        # "block style" formatting and not "flow style" formatting.
         export_as_json = result.fa.flow_style()
 
         # 2. find $import, $include, $schema, run, location
         # 3. update field value
         update_refs(result, w, tool.doc_loader.expand_url, merged_map, jobmapper, runtimeContext, "", "")
 
+        # Write the updated file to the collection.
         with col.open(w[n+1:], "wt") as f:
-            # yamlloader.dump(result, stream=sys.stdout)
             if export_as_json:
                 json.dump(result, f, indent=4, separators=(',',': '))
             else:
                 yamlloader.dump(result, stream=f)
 
+        # Also store a verbatim copy of the original files
         with col.open(os.path.join("original", w[n+1:]), "wt") as f:
             f.write(text)
 
 
+    # Upload files referenced by $include directives, these are used
+    # unchanged and don't need to be updated.
     for w in include_files:
         with col.open(w[n+1:], "wb") as f1:
             with col.open(os.path.join("original", w[n+1:]), "wb") as f3:
@@ -331,6 +355,7 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
                         f3.write(dat)
                         dat = f2.read(65536)
 
+    # Now collect metadata: the collection name and git properties.
 
     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
@@ -348,6 +373,7 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
             p = g.split("#", 1)[1]
             properties["arv:"+p] = git_info[g]
 
+    # Check if a collection with the same content already exists in the target project.  If so, just use that one.
     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", col.portable_data_hash()],
                                                          ["owner_uuid", "=", arvRunner.project_uuid]]).execute(num_retries=arvRunner.num_retries)
 
@@ -358,11 +384,10 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
     else:
         logger.info("Workflow uploaded to %s", existing["items"][0]["uuid"])
 
-    adjustDirObjs(job_order, trim_listing)
-    adjustFileObjs(job_order, trim_anonymous_location)
-    adjustDirObjs(job_order, trim_anonymous_location)
-
-    # now construct the wrapper
+    # Now that we've updated the workflow and saved it to a
+    # collection, we're going to construct a minimal "wrapper"
+    # workflow which consists of only of input and output parameters
+    # connected to a single step that runs the real workflow.
 
     runfile = "keep:%s/%s" % (col.portable_data_hash(), toolfile)
 
@@ -396,6 +421,14 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
     if submit_runner_ram:
         wf_runner_resources["ramMin"] = submit_runner_ram
 
+    # Remove a few redundant fields from the "job order" (aka input
+    # object or input parameters).  In the situation where we're
+    # creating or updating a workflow record, any values in the job
+    # order get copied over as default values for input parameters.
+    adjustDirObjs(job_order, trim_listing)
+    adjustFileObjs(job_order, trim_anonymous_location)
+    adjustDirObjs(job_order, trim_anonymous_location)
+
     newinputs = []
     for i in main["inputs"]:
         inp = {}
@@ -446,15 +479,8 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
     if hints:
         wrapper["hints"] = hints
 
-    # 1. check for SchemaDef
-    # 2. do what pack does
-    # 3. fix inputs
-
-    doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
-
-    if git_info:
-        for g in git_info:
-            doc[g] = git_info[g]
+    # Schema definitions (this lets you define things like record
+    # types) require a special handling.
 
     for i, r in enumerate(wrapper["requirements"]):
         if r["class"] == "SchemaDefRequirement":
@@ -462,6 +488,12 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
 
     update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, runtimeContext, main["id"]+"#", "#main/")
 
+    doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
+
+    if git_info:
+        for g in git_info:
+            doc[g] = git_info[g]
+
     # Remove any lingering file references.
     drop_ids(wrapper)
 
index 316e8d264a37fd0e02972ccb7782f2b2f305d645..ef84dd4983c870d2779a3cf993bcaeff8aa13f6f 100644 (file)
@@ -660,6 +660,8 @@ The 'jobs' API is no longer supported.
             job_order, jobmapper = upload_job_order(self, "%s input" % runtimeContext.name,
                                          updated_tool, job_order, runtimeContext)
 
+        # determine if we are submitting or directly executing the workflow.
+        #
         # the last clause means: if it is a command line tool, and we
         # are going to wait for the result, and always_submit_runner
         # is false, then we don't submit a runner process.
@@ -674,15 +676,6 @@ The 'jobs' API is no longer supported.
         loadingContext = self.loadingContext.copy()
         loadingContext.do_validate = False
         loadingContext.disable_js_validation = True
-        # if submitting and not self.fast_submit:
-        #     loadingContext.do_update = False
-        #     # Document may have been auto-updated. Reload the original
-        #     # document with updating disabled because we want to
-        #     # submit the document with its original CWL version, not
-        #     # the auto-updated one.
-        #     with Perf(metrics, "load_tool original"):
-        #         tool = load_tool(updated_tool.tool["id"], loadingContext)
-        # else:
         tool = updated_tool
 
         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
@@ -692,18 +685,16 @@ The 'jobs' API is no longer supported.
             with Perf(metrics, "upload_workflow_deps"):
                 merged_map = upload_workflow_deps(self, tool, runtimeContext)
         else:
+            # in the fast submit case, we are running a workflow that
+            # has already been uploaded to Arvados, so we assume all
+            # the dependencies have been pinned to keep references and
+            # there is nothing to do.
             merged_map = {}
 
-        # Recreate process object (ArvadosWorkflow or
-        # ArvadosCommandTool) because tool document may have been
-        # updated by upload_workflow_deps in ways that modify
-        # hints or requirements.
         loadingContext.loader = tool.doc_loader
         loadingContext.avsc_names = tool.doc_schema
         loadingContext.metadata = tool.metadata
         loadingContext.skip_resolve_all = True
-        #with Perf(metrics, "load_tool"):
-        #    tool = load_tool(tool.tool, loadingContext)
 
         workflow_wrapper = None
         if submitting and not self.fast_submit:
@@ -722,22 +713,30 @@ The 'jobs' API is no longer supported.
                                                jobmapper=jobmapper)
 
             if runtimeContext.update_workflow or runtimeContext.create_workflow:
-                # Now create a workflow record and exit.
+                # We're registering the workflow, so create or update
+                # the workflow record and then exit.
                 uuid = make_workflow_record(self, workflow_wrapper, runtimeContext.name, tool,
                                             runtimeContext.project_uuid, runtimeContext.update_workflow)
                 self.stdout.write(uuid + "\n")
                 return (None, "success")
 
+            # Did not register a workflow, we're going to submit
+            # it instead.
             loadingContext.loader.idx.clear()
             loadingContext.loader.idx["_:main"] = workflow_wrapper
             workflow_wrapper["id"] = "_:main"
 
-            # Reload just the wrapper workflow.
+            # Reload the minimal wrapper workflow.
             self.fast_submit = True
             tool = load_tool(workflow_wrapper, loadingContext)
             loadingContext.loader.idx["_:main"] = workflow_wrapper
 
         if not submitting:
+            # If we are going to run the workflow now (rather than
+            # submit it), we need to update the workflow document
+            # replacing file references with keep references.  If we
+            # are just going to construct a run submission, we don't
+            # need to do this.
             update_from_merged_map(tool, merged_map)
 
         self.apply_reqs(job_order, tool)
@@ -782,7 +781,10 @@ The 'jobs' API is no longer supported.
 
         runnerjob = None
         if runtimeContext.submit:
-            # Submit a runner job to run the workflow for us.
+            # We are submitting instead of running immediately.
+            #
+            # Create a "Runner job" that when run() is invoked,
+            # creates the container request to run the workflow.
             if self.work_api == "containers":
                 if submitting:
                     loadingContext.metadata = updated_tool.metadata.copy()
@@ -811,11 +813,16 @@ The 'jobs' API is no longer supported.
                            runtimeContext)
 
         if runtimeContext.submit and not runtimeContext.wait:
+            # User provided --no-wait so submit the container request,
+            # get the container request uuid, print it out, and exit.
             runnerjob = next(jobiter)
             runnerjob.run(runtimeContext)
             self.stdout.write(runnerjob.uuid+"\n")
             return (None, "success")
 
+        # We either running the workflow directly, or submitting it
+        # and will wait for a final result.
+
         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
         if current_container:
             logger.info("Running inside container %s", current_container.get("uuid"))