def rel_ref(s, baseuri, urlexpander, merged_map):
uri = urlexpander(s, baseuri)
- if baseuri in merged_map:
- replacements = merged_map[baseuri].resolved
+ print("DDD", baseuri, merged_map)
+ fileuri = urllib.parse.urldefrag(baseuri)[0]
+ if fileuri in merged_map:
+ replacements = merged_map[fileuri].resolved
if uri in replacements:
return replacements[uri]
+ if s.startswith("keep:"):
+ return s
+
p1 = os.path.dirname(uri_file_path(baseuri))
p2 = os.path.dirname(uri_file_path(uri))
p3 = os.path.basename(uri_file_path(uri))
def update_refs(d, baseuri, urlexpander, merged_map, set_block_style):
- if isinstance(d, CommentedSeq):
- if set_block_style:
- d.fa.set_block_style()
+ if set_block_style and (isinstance(d, CommentedSeq) or isinstance(d, CommentedMap)):
+ d.fa.set_block_style()
+
+ if isinstance(d, MutableSequence):
for s in d:
update_refs(s, baseuri, urlexpander, merged_map, set_block_style)
- elif isinstance(d, CommentedMap):
- if set_block_style:
- d.fa.set_block_style()
-
+ elif isinstance(d, MutableMapping):
if "id" in d:
baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True)
- return col.manifest_locator()
+ toolfile = tool.tool["id"][n+1:]
+
+ # now construct the wrapper
+
+ step = {
+ "id": "#main/" + toolname,
+ "in": [],
+ "out": [],
+ "run": "keep:%s/%s" % (col.portable_data_hash(), toolfile),
+ "label": name
+ }
+
+ main = tool.tool
+
+ newinputs = []
+ for i in main["inputs"]:
+ inp = {}
+ # Make sure to only copy known fields that are meaningful at
+ # the workflow level. In practice this ensures that if we're
+ # wrapping a CommandLineTool we don't grab inputBinding.
+ # Right now also excludes extension fields, which is fine,
+ # Arvados doesn't currently look for any extension fields on
+ # input parameters.
+ for f in ("type", "label", "secondaryFiles", "streamable",
+ "doc", "format", "loadContents",
+ "loadListing", "default"):
+ if f in i:
+ inp[f] = i[f]
+ inp["id"] = "#main/%s" % shortname(i["id"])
+ newinputs.append(inp)
+
+ wrapper = {
+ "class": "Workflow",
+ "id": "#main",
+ "inputs": newinputs,
+ "outputs": [],
+ "steps": [step]
+ }
+
+ for i in main["inputs"]:
+ step["in"].append({
+ "id": "#main/step/%s" % shortname(i["id"]),
+ "source": "#main/%s" % shortname(i["id"])
+ })
+
+ for i in main["outputs"]:
+ step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
+ wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
+ "type": i["type"],
+ "id": "#main/%s" % shortname(i["id"])})
+
+ wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
+
+ if main.get("requirements"):
+ wrapper["requirements"].extend(main["requirements"])
+ if main.get("hints"):
+ wrapper["hints"] = main["hints"]
+
+ doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
+
+ if git_info:
+ for g in git_info:
+ doc[g] = git_info[g]
+
+ update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False)
+
+ wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
+
+ body = {
+ "workflow": {
+ "name": name,
+ "description": tool.tool.get("doc", ""),
+ "definition": wrappertext
+ }}
+ if project_uuid:
+ body["workflow"]["owner_uuid"] = project_uuid
+
+ if uuid:
+ call = arvRunner.api.workflows().update(uuid=uuid, body=body)
+ else:
+ call = arvRunner.api.workflows().create(body=body)
+ return call.execute(num_retries=arvRunner.num_retries)["uuid"]
def upload_workflow(arvRunner, tool, job_order, project_uuid,
single_collection=True,
optional_deps=optional_deps)
+ print("MMM", mapper._pathmap)
+
keeprefs = set()
def addkeepref(k):
if k.startswith("keep:"):
p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
p[collectionUUID] = uuid
- with Perf(metrics, "setloc"):
- visit_class(workflowobj, ("File", "Directory"), setloc)
- visit_class(discovered, ("File", "Directory"), setloc)
+ #with Perf(metrics, "setloc"):
+ # visit_class(workflowobj, ("File", "Directory"), setloc)
+ # visit_class(discovered, ("File", "Directory"), setloc)
if discovered_secondaryfiles is not None:
for d in discovered:
logger.warning("Cannot find collection with portable data hash %s", kr)
continue
col = col["items"][0]
+ col["name"] = arvados.util.trim_name(col["name"])
+ print("CCC name", col["name"])
try:
arvrunner.api.collections().create(body={"collection": {
"owner_uuid": runtimeContext.project_uuid,
"trash_at": col["trash_at"]
}}, ensure_unique_name=True).execute()
except Exception as e:
- logger.warning("Unable copy collection to destination: %s", e)
+ logger.warning("Unable to copy collection to destination: %s", e)
if "$schemas" in workflowobj:
sch = CommentedSeq()
def upload_workflow_deps(arvrunner, tool, runtimeContext):
# Ensure that Docker images needed by this workflow are available
- # testing only
+ # commented out for testing only, uncomment me
#with Perf(metrics, "upload_docker"):
# upload_docker(arvrunner, tool, runtimeContext)
include_primary=False,
discovered_secondaryfiles=discovered_secondaryfiles,
cache=tool_dep_cache)
+
+ print("PM", pm.items())
document_loader.idx[deptool["id"]] = deptool
toolmap = {}
for k,v in pm.items():