from cwltool.pack import pack
from cwltool.load_tool import fetch_document, resolve_and_validate_document
-from cwltool.process import shortname
+from cwltool.process import shortname, uniquename
from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
from cwltool.context import LoadingContext
from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
- make_builder, arvados_jobs_image)
+ make_builder, arvados_jobs_image, FileUpdates)
from .pathmapper import ArvPathMapper, trim_listing
from .arvtool import ArvadosCommandTool, set_cluster_target
from ._version import __version__
return json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
+
def rel_ref(s, baseuri, urlexpander, merged_map):
if s.startswith("keep:"):
return s
uri = urlexpander(s, baseuri)
#print("CCC", uri)
+ if uri.startswith("keep:"):
+ return uri
+
fileuri = urllib.parse.urldefrag(baseuri)[0]
+ #print("BBB", s, baseuri)
+
for u in (baseuri, fileuri):
if u in merged_map:
replacements = merged_map[u].resolved
+ #print("RRR", uri, replacements)
+ #print()
#print(uri, replacements)
if uri in replacements:
return replacements[uri]
elif isinstance(d, MutableMapping):
if "id" in d:
baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
+ elif "name" in d:
+ baseuri = urlexpander(d["name"], baseuri, scoped_id=True)
if d.get("class") == "DockerRequirement":
dockerImageId = d.get("dockerImageId") or d.get("dockerPull")
d["http://arvados.org/cwl#dockerCollectionPDH"] = runtimeContext.cached_docker_lookups.get(dockerImageId)
- for s in d:
- for field in ("location", "run", "name"):
- if field in d and isinstance(d[field], str):
- d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map)
+ for field in d:
+ if field in ("location", "run", "name") and isinstance(d[field], str):
+ d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map)
+ continue
- for field in ("$include", "$import"):
- if field in d and isinstance(d[field], str):
- d[field] = rel_ref(d[field], baseuri, urlexpander, {})
+ if field in ("$include", "$import") and isinstance(d[field], str):
+ d[field] = rel_ref(d[field], baseuri, urlexpander, {})
+ continue
- basetypes = ("null", "boolean", "int", "long", "float", "double", "string", "File", "Directory")
+ basetypes = ("null", "boolean", "int", "long", "float", "double", "string", "File", "Directory", "record", "array", "enum")
- if ("type" in d and
+ if (field == "type" and
isinstance(d["type"], str) and
d["type"] not in basetypes):
+ #print("DDD ding", d["type"])
d["type"] = rel_ref(d["type"], baseuri, urlexpander, merged_map)
+ #print("DDD dong", d["type"])
+ continue
- if "inputs" in d and isinstance(d["inputs"], MutableMapping):
+ if field == "inputs" and isinstance(d["inputs"], MutableMapping):
for inp in d["inputs"]:
if isinstance(d["inputs"][inp], str) and d["inputs"][inp] not in basetypes:
+ #print("III", inp)
d["inputs"][inp] = rel_ref(d["inputs"][inp], baseuri, urlexpander, merged_map)
+ continue
- if "$schemas" in d:
+ if field == "$schemas":
for n, s in enumerate(d["$schemas"]):
d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map)
+ continue
+
+ update_refs(d[field], baseuri, urlexpander, merged_map, set_block_style, runtimeContext, prefix, replacePrefix)
+
+
+def fix_schemadef(req, baseuri, urlexpander, merged_map, pdh):
+ req = copy.deepcopy(req)
+ #if "id" in req:
+ # del req["id"]
- update_refs(d[s], baseuri, urlexpander, merged_map, set_block_style, runtimeContext, prefix, replacePrefix)
+ for f in req["types"]:
+ r = f["name"]
+ path, frag = urllib.parse.urldefrag(r)
+ rel = rel_ref(r, baseuri, urlexpander, merged_map)
+ merged_map.setdefault(path, FileUpdates({}, {}))
+ #print("PPP", path, r, frag)
+ rename = "keep:%s/%s" %(pdh, rel)
+ for mm in merged_map:
+ merged_map[mm].resolved[r] = rename
+ return req
def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
runtimeContext,
# now construct the wrapper
+ runfile = "keep:%s/%s" % (col.portable_data_hash(), toolfile)
+
step = {
"id": "#main/" + toolname,
"in": [],
"out": [],
- "run": "keep:%s/%s" % (col.portable_data_hash(), toolfile),
+ "run": runfile,
"label": name
}
if hints:
wrapper["hints"] = hints
+ # 1. check for SchemaDef
+ # 2. do what pack does
+ # 3. fix inputs
+
doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
if git_info:
#print("MMM", main["id"])
#print(yamlloader.dump(wrapper, stream=sys.stdout))
+ for i, r in enumerate(wrapper["requirements"]):
+ if r["class"] == "SchemaDefRequirement":
+ wrapper["requirements"][i] = fix_schemadef(r, main["id"], tool.doc_loader.expand_url, merged_map, col.portable_data_hash())
+
+ # print()
+ # print("merrrrged maaap", merged_map)
+ # print()
+ print("update_refs", main["id"], runfile)
+
+ #print(yamlloader.dump(wrapper, stream=sys.stdout))
+
update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False, runtimeContext, main["id"]+"#", "#main/")
+ #print("HHH")
+
#print(yamlloader.dump(wrapper, stream=sys.stdout))
return doc