From b420ec626f8cb5cd7a8b4252dfc2be76ba3ba844 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Mon, 9 Jan 2023 15:39:54 -0500 Subject: [PATCH] 19385: new_upload_workflow work in progress Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- sdk/cwl/arvados_cwl/arvworkflow.py | 104 ++++++++++++++++++++++++++--- sdk/cwl/arvados_cwl/runner.py | 16 +++-- sdk/cwl/setup.py | 4 +- sdk/python/arvados/util.py | 17 +++++ 4 files changed, 124 insertions(+), 17 deletions(-) diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py index f66e50dca9..784e1bdb36 100644 --- a/sdk/cwl/arvados_cwl/arvworkflow.py +++ b/sdk/cwl/arvados_cwl/arvworkflow.py @@ -129,11 +129,16 @@ def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, def rel_ref(s, baseuri, urlexpander, merged_map): uri = urlexpander(s, baseuri) - if baseuri in merged_map: - replacements = merged_map[baseuri].resolved + print("DDD", baseuri, merged_map) + fileuri = urllib.parse.urldefrag(baseuri)[0] + if fileuri in merged_map: + replacements = merged_map[fileuri].resolved if uri in replacements: return replacements[uri] + if s.startswith("keep:"): + return s + p1 = os.path.dirname(uri_file_path(baseuri)) p2 = os.path.dirname(uri_file_path(uri)) p3 = os.path.basename(uri_file_path(uri)) @@ -146,15 +151,13 @@ def rel_ref(s, baseuri, urlexpander, merged_map): def update_refs(d, baseuri, urlexpander, merged_map, set_block_style): - if isinstance(d, CommentedSeq): - if set_block_style: - d.fa.set_block_style() + if set_block_style and (isinstance(d, CommentedSeq) or isinstance(d, CommentedMap)): + d.fa.set_block_style() + + if isinstance(d, MutableSequence): for s in d: update_refs(s, baseuri, urlexpander, merged_map, set_block_style) - elif isinstance(d, CommentedMap): - if set_block_style: - d.fa.set_block_style() - + elif isinstance(d, MutableMapping): if "id" in d: baseuri = urlexpander(d["id"], baseuri, scoped_id=True) @@ -251,7 +254,88 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid, col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True) - return col.manifest_locator() + toolfile = tool.tool["id"][n+1:] + + # now construct the wrapper + + step = { + "id": "#main/" + toolname, + "in": [], + "out": [], + "run": "keep:%s/%s" % (col.portable_data_hash(), toolfile), + "label": name + } + + main = tool.tool + + newinputs = [] + for i in main["inputs"]: + inp = {} + # Make sure to only copy known fields that are meaningful at + # the workflow level. In practice this ensures that if we're + # wrapping a CommandLineTool we don't grab inputBinding. + # Right now also excludes extension fields, which is fine, + # Arvados doesn't currently look for any extension fields on + # input parameters. + for f in ("type", "label", "secondaryFiles", "streamable", + "doc", "format", "loadContents", + "loadListing", "default"): + if f in i: + inp[f] = i[f] + inp["id"] = "#main/%s" % shortname(i["id"]) + newinputs.append(inp) + + wrapper = { + "class": "Workflow", + "id": "#main", + "inputs": newinputs, + "outputs": [], + "steps": [step] + } + + for i in main["inputs"]: + step["in"].append({ + "id": "#main/step/%s" % shortname(i["id"]), + "source": "#main/%s" % shortname(i["id"]) + }) + + for i in main["outputs"]: + step["out"].append({"id": "#main/step/%s" % shortname(i["id"])}) + wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]), + "type": i["type"], + "id": "#main/%s" % shortname(i["id"])}) + + wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}] + + if main.get("requirements"): + wrapper["requirements"].extend(main["requirements"]) + if main.get("hints"): + wrapper["hints"] = main["hints"] + + doc = {"cwlVersion": "v1.2", "$graph": [wrapper]} + + if git_info: + for g in git_info: + doc[g] = git_info[g] + + update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False) + + wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': ')) + + body = { + "workflow": { + "name": name, + "description": tool.tool.get("doc", ""), + "definition": wrappertext + }} + if project_uuid: + body["workflow"]["owner_uuid"] = project_uuid + + if uuid: + call = arvRunner.api.workflows().update(uuid=uuid, body=body) + else: + call = arvRunner.api.workflows().create(body=body) + return call.execute(num_retries=arvRunner.num_retries)["uuid"] def upload_workflow(arvRunner, tool, job_order, project_uuid, diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 7364df3345..dc6d0df3f1 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -447,6 +447,8 @@ def upload_dependencies(arvrunner, name, document_loader, single_collection=True, optional_deps=optional_deps) + print("MMM", mapper._pathmap) + keeprefs = set() def addkeepref(k): if k.startswith("keep:"): @@ -489,9 +491,9 @@ def upload_dependencies(arvrunner, name, document_loader, p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "") p[collectionUUID] = uuid - with Perf(metrics, "setloc"): - visit_class(workflowobj, ("File", "Directory"), setloc) - visit_class(discovered, ("File", "Directory"), setloc) + #with Perf(metrics, "setloc"): + # visit_class(workflowobj, ("File", "Directory"), setloc) + # visit_class(discovered, ("File", "Directory"), setloc) if discovered_secondaryfiles is not None: for d in discovered: @@ -515,6 +517,8 @@ def upload_dependencies(arvrunner, name, document_loader, logger.warning("Cannot find collection with portable data hash %s", kr) continue col = col["items"][0] + col["name"] = arvados.util.trim_name(col["name"]) + print("CCC name", col["name"]) try: arvrunner.api.collections().create(body={"collection": { "owner_uuid": runtimeContext.project_uuid, @@ -527,7 +531,7 @@ def upload_dependencies(arvrunner, name, document_loader, "trash_at": col["trash_at"] }}, ensure_unique_name=True).execute() except Exception as e: - logger.warning("Unable copy collection to destination: %s", e) + logger.warning("Unable to copy collection to destination: %s", e) if "$schemas" in workflowobj: sch = CommentedSeq() @@ -683,7 +687,7 @@ FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"]) def upload_workflow_deps(arvrunner, tool, runtimeContext): # Ensure that Docker images needed by this workflow are available - # testing only + # commented out for testing only, uncomment me #with Perf(metrics, "upload_docker"): # upload_docker(arvrunner, tool, runtimeContext) @@ -715,6 +719,8 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext): include_primary=False, discovered_secondaryfiles=discovered_secondaryfiles, cache=tool_dep_cache) + + print("PM", pm.items()) document_loader.idx[deptool["id"]] = deptool toolmap = {} for k,v in pm.items(): diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py index 96d12f758b..ae9f6d0d41 100644 --- a/sdk/cwl/setup.py +++ b/sdk/cwl/setup.py @@ -36,8 +36,8 @@ setup(name='arvados-cwl-runner', # file to determine what version of cwltool and schema-salad to # build. install_requires=[ - 'cwltool==3.1.20220913185150', - 'schema-salad==8.3.20220913105718', + 'cwltool==3.1.20221224142944', + 'schema-salad>8.3.20220913105718', 'arvados-python-client{}'.format(pysdk_dep), 'ciso8601 >= 2.0.0', 'networkx < 2.6', diff --git a/sdk/python/arvados/util.py b/sdk/python/arvados/util.py index c383d529e8..a4b7e64a05 100644 --- a/sdk/python/arvados/util.py +++ b/sdk/python/arvados/util.py @@ -500,3 +500,20 @@ def get_vocabulary_once(svc): if not hasattr(svc, '_cached_vocabulary'): svc._cached_vocabulary = svc.vocabularies().get().execute() return svc._cached_vocabulary + +def trim_name(collectionname): + """ + trim_name takes a record name (collection name, project name, etc) + and trims it to fit the 255 character name limit, with additional + space for the timestamp added by ensure_unique_name, by removing + excess characters from the middle and inserting an ellipse + """ + + max_name_len = 254 - 28 + + if len(collectionname) > max_name_len: + over = len(collectionname) - max_name_len + split = int(max_name_len/2) + collectionname = collectionname[0:split] + "…" + collectionname[split+over:] + + return collectionname -- 2.30.2