From 62e7af59cbad5577423b844213be7b2f59709602 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 6 Jan 2023 17:21:07 -0500 Subject: [PATCH] 19385: Messy work in progress for uploading workflows to collections Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- sdk/cwl/arvados_cwl/arvdocker.py | 3 +- sdk/cwl/arvados_cwl/arvworkflow.py | 141 ++++++++++++++++++++++++++++- sdk/cwl/arvados_cwl/executor.py | 28 +++--- sdk/cwl/arvados_cwl/runner.py | 64 +++---------- 4 files changed, 167 insertions(+), 69 deletions(-) diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py index cf0b3b9daf..ef8398df34 100644 --- a/sdk/cwl/arvados_cwl/arvdocker.py +++ b/sdk/cwl/arvados_cwl/arvdocker.py @@ -121,7 +121,8 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid if not out_of_project_images: # Fetch Docker image if necessary. try: - result = cwltool.docker.DockerCommandLineJob.get_image(dockerRequirement, pull_image, + dockerjob = cwltool.docker.DockerCommandLineJob(None, None, None, None, None, None) + result = dockerjob.get_image(dockerRequirement, pull_image, force_pull, tmp_outdir_prefix) if not result: raise WorkflowException("Docker image '%s' not available" % dockerRequirement["dockerImageId"]) diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py index 56226388d7..f66e50dca9 100644 --- a/sdk/cwl/arvados_cwl/arvworkflow.py +++ b/sdk/cwl/arvados_cwl/arvworkflow.py @@ -9,6 +9,14 @@ import os import json import copy import logging +import urllib +from io import StringIO +import sys + +from typing import (MutableSequence, MutableMapping) + +from ruamel.yaml import YAML +from ruamel.yaml.comments import CommentedMap, CommentedSeq from schema_salad.sourceline import SourceLine, cmap import schema_salad.ref_resolver @@ -22,6 +30,8 @@ from cwltool.workflow import Workflow, WorkflowException, WorkflowStep from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs from cwltool.context import LoadingContext +from schema_salad.ref_resolver import file_uri, uri_file_path + import ruamel.yaml as yaml from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection, @@ -117,6 +127,133 @@ def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, return json.dumps(doc, sort_keys=True, indent=4, separators=(',',': ')) +def rel_ref(s, baseuri, urlexpander, merged_map): + uri = urlexpander(s, baseuri) + if baseuri in merged_map: + replacements = merged_map[baseuri].resolved + if uri in replacements: + return replacements[uri] + + p1 = os.path.dirname(uri_file_path(baseuri)) + p2 = os.path.dirname(uri_file_path(uri)) + p3 = os.path.basename(uri_file_path(uri)) + r = os.path.relpath(p2, p1) + if r == ".": + r = "" + print("AAA", uri, s) + print("BBBB", p1, p2, p3, r) + return os.path.join(r, p3) + + +def update_refs(d, baseuri, urlexpander, merged_map, set_block_style): + if isinstance(d, CommentedSeq): + if set_block_style: + d.fa.set_block_style() + for s in d: + update_refs(s, baseuri, urlexpander, merged_map, set_block_style) + elif isinstance(d, CommentedMap): + if set_block_style: + d.fa.set_block_style() + + if "id" in d: + baseuri = urlexpander(d["id"], baseuri, scoped_id=True) + + for s in d: + for field in ("$include", "$import", "location", "run"): + if field in d and isinstance(d[field], str): + d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map) + + if "$schemas" in d: + for n, s in enumerate(d["$schemas"]): + d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map) + + update_refs(d[s], baseuri, urlexpander, merged_map, set_block_style) + +def new_upload_workflow(arvRunner, tool, job_order, project_uuid, + runtimeContext, uuid=None, + submit_runner_ram=0, name=None, merged_map=None, + submit_runner_image=None, + git_info=None): + + firstfile = None + workflow_files = set() + import_files = set() + include_files = set() + + for w in tool.doc_loader.idx: + if w.startswith("file://"): + workflow_files.add(urllib.parse.urldefrag(w)[0]) + if firstfile is None: + firstfile = urllib.parse.urldefrag(w)[0] + if w.startswith("import:file://"): + import_files.add(urllib.parse.urldefrag(w[7:])[0]) + if w.startswith("include:file://"): + include_files.add(urllib.parse.urldefrag(w[8:])[0]) + + all_files = workflow_files | import_files | include_files + + n = 7 + allmatch = True + while allmatch: + n += 1 + for f in all_files: + if len(f)-1 < n: + n -= 1 + allmatch = False + break + if f[n] != firstfile[n]: + allmatch = False + break + + while firstfile[n] != "/": + n -= 1 + + prefix = firstfile[:n+1] + + col = arvados.collection.Collection() + + #print(merged_map.keys()) + + for w in workflow_files | import_files: + # 1. load YAML + + text = tool.doc_loader.fetch_text(w) + if isinstance(text, bytes): + textIO = StringIO(text.decode('utf-8')) + else: + textIO = StringIO(text) + + yamlloader = schema_salad.utils.yaml_no_ts() + result = yamlloader.load(textIO) + + set_block_style = False + if result.fa.flow_style(): + set_block_style = True + + # 2. find $import, $include, $schema, run, location + # 3. update field value + update_refs(result, w, tool.doc_loader.expand_url, merged_map, set_block_style) + + with col.open(w[n+1:], "wt") as f: + yamlloader.dump(result, stream=f) + + for w in include_files: + with col.open(w[n+1:], "wb") as f1: + with open(uri_file_path(w), "rb") as f2: + dat = f2.read(65536) + while dat: + f1.write(dat) + dat = f2.read(65536) + + toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"]) + if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"): + toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe")) + + col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True) + + return col.manifest_locator() + + def upload_workflow(arvRunner, tool, job_order, project_uuid, runtimeContext, uuid=None, submit_runner_ram=0, name=None, merged_map=None, @@ -139,7 +276,7 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, name = tool.tool.get("label", os.path.basename(tool.tool["id"])) upload_dependencies(arvRunner, name, tool.doc_loader, - packed, tool.tool["id"], False, + packed, tool.tool["id"], runtimeContext) wf_runner_resources = None @@ -286,7 +423,6 @@ class ArvadosWorkflow(Workflow): self.doc_loader, joborder, joborder.get("id", "#"), - False, runtimeContext) if self.wf_pdh is None: @@ -330,7 +466,6 @@ class ArvadosWorkflow(Workflow): self.doc_loader, packed, self.tool["id"], - False, runtimeContext) # Discover files/directories referenced by the diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index 382c1643e4..1e6344f5e9 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -36,7 +36,7 @@ import arvados_cwl.util from .arvcontainer import RunnerContainer, cleanup_name_for_collection from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool -from .arvworkflow import ArvadosWorkflow, upload_workflow +from .arvworkflow import ArvadosWorkflow, upload_workflow, new_upload_workflow from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size from .perf import Perf from .pathmapper import NoFollowPathMapper @@ -674,16 +674,16 @@ The 'jobs' API is no longer supported. loadingContext = self.loadingContext.copy() loadingContext.do_validate = False loadingContext.disable_js_validation = True - if submitting and not self.fast_submit: - loadingContext.do_update = False - # Document may have been auto-updated. Reload the original - # document with updating disabled because we want to - # submit the document with its original CWL version, not - # the auto-updated one. - with Perf(metrics, "load_tool original"): - tool = load_tool(updated_tool.tool["id"], loadingContext) - else: - tool = updated_tool + # if submitting and not self.fast_submit: + # loadingContext.do_update = False + # # Document may have been auto-updated. Reload the original + # # document with updating disabled because we want to + # # submit the document with its original CWL version, not + # # the auto-updated one. + # with Perf(metrics, "load_tool original"): + # tool = load_tool(updated_tool.tool["id"], loadingContext) + # else: + tool = updated_tool # Upload direct dependencies of workflow steps, get back mapping of files to keep references. # Also uploads docker images. @@ -702,13 +702,13 @@ The 'jobs' API is no longer supported. loadingContext.avsc_names = tool.doc_schema loadingContext.metadata = tool.metadata loadingContext.skip_resolve_all = True - with Perf(metrics, "load_tool"): - tool = load_tool(tool.tool, loadingContext) + #with Perf(metrics, "load_tool"): + # tool = load_tool(tool.tool, loadingContext) if runtimeContext.update_workflow or runtimeContext.create_workflow: # Create a pipeline template or workflow record and exit. if self.work_api == "containers": - uuid = upload_workflow(self, tool, job_order, + uuid = new_upload_workflow(self, tool, job_order, runtimeContext.project_uuid, runtimeContext, uuid=runtimeContext.update_workflow, diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 25c7eaf6c7..7364df3345 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -295,7 +295,7 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No set_secondary(fsaccess, builder, inputschema, None, primary, discovered) def upload_dependencies(arvrunner, name, document_loader, - workflowobj, uri, loadref_run, runtimeContext, + workflowobj, uri, runtimeContext, include_primary=True, discovered_secondaryfiles=None, cache=None): """Upload the dependencies of the workflowobj document to Keep. @@ -303,64 +303,27 @@ def upload_dependencies(arvrunner, name, document_loader, Returns a pathmapper object mapping local paths to keep references. Also does an in-place update of references in "workflowobj". - Use scandeps to find $import, $include, $schemas, run, File and Directory + Use scandeps to find $schemas, File and Directory fields that represent external references. If workflowobj has an "id" field, this will reload the document to ensure it is scanning the raw document prior to preprocessing. """ - loaded = set() - def loadref(b, u): - joined = document_loader.fetcher.urljoin(b, u) - defrg, _ = urllib.parse.urldefrag(joined) - if defrg not in loaded: - loaded.add(defrg) - if cache is not None and defrg in cache: - return cache[defrg] - # Use fetch_text to get raw file (before preprocessing). - text = document_loader.fetch_text(defrg) - if isinstance(text, bytes): - textIO = StringIO(text.decode('utf-8')) - else: - textIO = StringIO(text) - yamlloader = YAML(typ='safe', pure=True) - result = yamlloader.load(textIO) - if cache is not None: - cache[defrg] = result - return result - else: - return {} - - if loadref_run: - loadref_fields = set(("$import", "run")) - else: - loadref_fields = set(("$import",)) - scanobj = workflowobj - if "id" in workflowobj and not workflowobj["id"].startswith("_:"): - defrg, _ = urllib.parse.urldefrag(workflowobj["id"]) - if cache is not None and defrg not in cache: - # if we haven't seen this file before, want raw file - # content (before preprocessing) to ensure that external - # references like $include haven't already been inlined. - scanobj = loadref("", workflowobj["id"]) - metadata = scanobj - with Perf(metrics, "scandeps include, location"): + with Perf(metrics, "scandeps"): sc_result = scandeps(uri, scanobj, - loadref_fields, - set(("$include", "location")), - loadref, urljoin=document_loader.fetcher.urljoin, + set(), + set(("location",)), + None, urljoin=document_loader.fetcher.urljoin, nestdirs=False) - - with Perf(metrics, "scandeps $schemas"): optional_deps = scandeps(uri, scanobj, - loadref_fields, - set(("$schemas",)), - loadref, urljoin=document_loader.fetcher.urljoin, - nestdirs=False) + set(), + set(("$schemas",)), + None, urljoin=document_loader.fetcher.urljoin, + nestdirs=False) if sc_result is None: sc_result = [] @@ -703,7 +666,6 @@ def upload_job_order(arvrunner, name, tool, job_order, runtimeContext): jobloader, job_order, job_order.get("id", "#"), - False, runtimeContext) if "id" in job_order: @@ -721,8 +683,9 @@ FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"]) def upload_workflow_deps(arvrunner, tool, runtimeContext): # Ensure that Docker images needed by this workflow are available - with Perf(metrics, "upload_docker"): - upload_docker(arvrunner, tool, runtimeContext) + # testing only + #with Perf(metrics, "upload_docker"): + # upload_docker(arvrunner, tool, runtimeContext) document_loader = tool.doc_loader @@ -748,7 +711,6 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext): document_loader, deptool, deptool["id"], - False, runtimeContext, include_primary=False, discovered_secondaryfiles=discovered_secondaryfiles, -- 2.30.2