19385: new_upload_workflow work in progress
authorPeter Amstutz <peter.amstutz@curii.com>
Mon, 9 Jan 2023 20:39:54 +0000 (15:39 -0500)
committerPeter Amstutz <peter.amstutz@curii.com>
Mon, 9 Jan 2023 20:39:54 +0000 (15:39 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py
sdk/python/arvados/util.py

index f66e50dca935870a1f1c88ccc4e4be023c60f9c5..784e1bdb366d49f1a4d68c340987e5679f760e45 100644 (file)
@@ -129,11 +129,16 @@ def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info,
 
 def rel_ref(s, baseuri, urlexpander, merged_map):
     uri = urlexpander(s, baseuri)
-    if baseuri in merged_map:
-        replacements = merged_map[baseuri].resolved
+    print("DDD", baseuri, merged_map)
+    fileuri = urllib.parse.urldefrag(baseuri)[0]
+    if fileuri in merged_map:
+        replacements = merged_map[fileuri].resolved
         if uri in replacements:
             return replacements[uri]
 
+    if s.startswith("keep:"):
+        return s
+
     p1 = os.path.dirname(uri_file_path(baseuri))
     p2 = os.path.dirname(uri_file_path(uri))
     p3 = os.path.basename(uri_file_path(uri))
@@ -146,15 +151,13 @@ def rel_ref(s, baseuri, urlexpander, merged_map):
 
 
 def update_refs(d, baseuri, urlexpander, merged_map, set_block_style):
-    if isinstance(d, CommentedSeq):
-        if set_block_style:
-            d.fa.set_block_style()
+    if set_block_style and (isinstance(d, CommentedSeq) or isinstance(d, CommentedMap)):
+        d.fa.set_block_style()
+
+    if isinstance(d, MutableSequence):
         for s in d:
             update_refs(s, baseuri, urlexpander, merged_map, set_block_style)
-    elif isinstance(d, CommentedMap):
-        if set_block_style:
-            d.fa.set_block_style()
-
+    elif isinstance(d, MutableMapping):
         if "id" in d:
             baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
 
@@ -251,7 +254,88 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
 
     col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True)
 
-    return col.manifest_locator()
+    toolfile = tool.tool["id"][n+1:]
+
+    # now construct the wrapper
+
+    step = {
+        "id": "#main/" + toolname,
+        "in": [],
+        "out": [],
+        "run": "keep:%s/%s" % (col.portable_data_hash(), toolfile),
+        "label": name
+    }
+
+    main = tool.tool
+
+    newinputs = []
+    for i in main["inputs"]:
+        inp = {}
+        # Make sure to only copy known fields that are meaningful at
+        # the workflow level. In practice this ensures that if we're
+        # wrapping a CommandLineTool we don't grab inputBinding.
+        # Right now also excludes extension fields, which is fine,
+        # Arvados doesn't currently look for any extension fields on
+        # input parameters.
+        for f in ("type", "label", "secondaryFiles", "streamable",
+                  "doc", "format", "loadContents",
+                  "loadListing", "default"):
+            if f in i:
+                inp[f] = i[f]
+        inp["id"] = "#main/%s" % shortname(i["id"])
+        newinputs.append(inp)
+
+    wrapper = {
+        "class": "Workflow",
+        "id": "#main",
+        "inputs": newinputs,
+        "outputs": [],
+        "steps": [step]
+    }
+
+    for i in main["inputs"]:
+        step["in"].append({
+            "id": "#main/step/%s" % shortname(i["id"]),
+            "source": "#main/%s" % shortname(i["id"])
+        })
+
+    for i in main["outputs"]:
+        step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
+        wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
+                                   "type": i["type"],
+                                   "id": "#main/%s" % shortname(i["id"])})
+
+    wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
+
+    if main.get("requirements"):
+        wrapper["requirements"].extend(main["requirements"])
+    if main.get("hints"):
+        wrapper["hints"] = main["hints"]
+
+    doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
+
+    if git_info:
+        for g in git_info:
+            doc[g] = git_info[g]
+
+    update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False)
+
+    wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
+
+    body = {
+        "workflow": {
+            "name": name,
+            "description": tool.tool.get("doc", ""),
+            "definition": wrappertext
+        }}
+    if project_uuid:
+        body["workflow"]["owner_uuid"] = project_uuid
+
+    if uuid:
+        call = arvRunner.api.workflows().update(uuid=uuid, body=body)
+    else:
+        call = arvRunner.api.workflows().create(body=body)
+    return call.execute(num_retries=arvRunner.num_retries)["uuid"]
 
 
 def upload_workflow(arvRunner, tool, job_order, project_uuid,
index 7364df3345561ed24cf4afb2bdb085f8fbc62388..dc6d0df3f1f6a4b1ac025d0003ba70595571c769 100644 (file)
@@ -447,6 +447,8 @@ def upload_dependencies(arvrunner, name, document_loader,
                                single_collection=True,
                                optional_deps=optional_deps)
 
+    print("MMM", mapper._pathmap)
+
     keeprefs = set()
     def addkeepref(k):
         if k.startswith("keep:"):
@@ -489,9 +491,9 @@ def upload_dependencies(arvrunner, name, document_loader,
         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
         p[collectionUUID] = uuid
 
-    with Perf(metrics, "setloc"):
-        visit_class(workflowobj, ("File", "Directory"), setloc)
-        visit_class(discovered, ("File", "Directory"), setloc)
+    #with Perf(metrics, "setloc"):
+    #    visit_class(workflowobj, ("File", "Directory"), setloc)
+    #    visit_class(discovered, ("File", "Directory"), setloc)
 
     if discovered_secondaryfiles is not None:
         for d in discovered:
@@ -515,6 +517,8 @@ def upload_dependencies(arvrunner, name, document_loader,
                 logger.warning("Cannot find collection with portable data hash %s", kr)
                 continue
             col = col["items"][0]
+            col["name"] = arvados.util.trim_name(col["name"])
+            print("CCC name", col["name"])
             try:
                 arvrunner.api.collections().create(body={"collection": {
                     "owner_uuid": runtimeContext.project_uuid,
@@ -527,7 +531,7 @@ def upload_dependencies(arvrunner, name, document_loader,
                     "trash_at": col["trash_at"]
                 }}, ensure_unique_name=True).execute()
             except Exception as e:
-                logger.warning("Unable copy collection to destination: %s", e)
+                logger.warning("Unable to copy collection to destination: %s", e)
 
     if "$schemas" in workflowobj:
         sch = CommentedSeq()
@@ -683,7 +687,7 @@ FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
 def upload_workflow_deps(arvrunner, tool, runtimeContext):
     # Ensure that Docker images needed by this workflow are available
 
-    # testing only
+    # commented out for testing only, uncomment me
     #with Perf(metrics, "upload_docker"):
     #    upload_docker(arvrunner, tool, runtimeContext)
 
@@ -715,6 +719,8 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext):
                                      include_primary=False,
                                      discovered_secondaryfiles=discovered_secondaryfiles,
                                      cache=tool_dep_cache)
+
+        print("PM", pm.items())
         document_loader.idx[deptool["id"]] = deptool
         toolmap = {}
         for k,v in pm.items():
index 96d12f758b71c22eac86deca65a098bf1273c0ad..ae9f6d0d41c0ec3bf0accb4b9fe79c991a9b9162 100644 (file)
@@ -36,8 +36,8 @@ setup(name='arvados-cwl-runner',
       # file to determine what version of cwltool and schema-salad to
       # build.
       install_requires=[
-          'cwltool==3.1.20220913185150',
-          'schema-salad==8.3.20220913105718',
+          'cwltool==3.1.20221224142944',
+          'schema-salad>8.3.20220913105718',
           'arvados-python-client{}'.format(pysdk_dep),
           'ciso8601 >= 2.0.0',
           'networkx < 2.6',
index c383d529e8087da579fcf4ae6814f76a57044e29..a4b7e64a05e2c74b8c330bb5b66f76d9974aeab7 100644 (file)
@@ -500,3 +500,20 @@ def get_vocabulary_once(svc):
     if not hasattr(svc, '_cached_vocabulary'):
         svc._cached_vocabulary = svc.vocabularies().get().execute()
     return svc._cached_vocabulary
+
+def trim_name(collectionname):
+    """
+    trim_name takes a record name (collection name, project name, etc)
+    and trims it to fit the 255 character name limit, with additional
+    space for the timestamp added by ensure_unique_name, by removing
+    excess characters from the middle and inserting an ellipse
+    """
+
+    max_name_len = 254 - 28
+
+    if len(collectionname) > max_name_len:
+        over = len(collectionname) - max_name_len
+        split = int(max_name_len/2)
+        collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
+
+    return collectionname