def rel_ref(s, baseuri, urlexpander, merged_map):
uri = urlexpander(s, baseuri)
- print("DDD", baseuri, merged_map)
fileuri = urllib.parse.urldefrag(baseuri)[0]
if fileuri in merged_map:
replacements = merged_map[fileuri].resolved
r = os.path.relpath(p2, p1)
if r == ".":
r = ""
- print("AAA", uri, s)
- print("BBBB", p1, p2, p3, r)
return os.path.join(r, p3)
-def update_refs(d, baseuri, urlexpander, merged_map, set_block_style):
+def update_refs(d, baseuri, urlexpander, merged_map, set_block_style, runtimeContext):
if set_block_style and (isinstance(d, CommentedSeq) or isinstance(d, CommentedMap)):
d.fa.set_block_style()
if isinstance(d, MutableSequence):
for s in d:
- update_refs(s, baseuri, urlexpander, merged_map, set_block_style)
+ update_refs(s, baseuri, urlexpander, merged_map, set_block_style, runtimeContext)
elif isinstance(d, MutableMapping):
if "id" in d:
baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
+ if d.get("class") == "DockerRequirement":
+ dockerImageId = d.get("dockerImageId") or d.get("dockerPull")
+ d["http://arvados.org/cwl#dockerCollectionPDH"] = runtimeContext.cached_docker_lookups.get(dockerImageId)
+
for s in d:
for field in ("$include", "$import", "location", "run"):
if field in d and isinstance(d[field], str):
for n, s in enumerate(d["$schemas"]):
d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map)
- update_refs(d[s], baseuri, urlexpander, merged_map, set_block_style)
+ update_refs(d[s], baseuri, urlexpander, merged_map, set_block_style, runtimeContext)
def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
- runtimeContext, uuid=None,
- submit_runner_ram=0, name=None, merged_map=None,
- submit_runner_image=None,
- git_info=None):
+ runtimeContext,
+ uuid=None,
+ submit_runner_ram=0, name=None, merged_map=None,
+ submit_runner_image=None,
+ git_info=None,
+ set_defaults=False):
firstfile = None
workflow_files = set()
col = arvados.collection.Collection()
- #print(merged_map.keys())
-
for w in workflow_files | import_files:
# 1. load YAML
# 2. find $import, $include, $schema, run, location
# 3. update field value
- update_refs(result, w, tool.doc_loader.expand_url, merged_map, set_block_style)
+ update_refs(result, w, tool.doc_loader.expand_url, merged_map, set_block_style, runtimeContext)
with col.open(w[n+1:], "wt") as f:
yamlloader.dump(result, stream=f)
if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
- col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True)
-
toolfile = tool.tool["id"][n+1:]
+ properties = {
+ "type": "workflow",
+ "arv:workflowMain": toolfile,
+ }
+
+ col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
+
+ adjustDirObjs(job_order, trim_listing)
+ adjustFileObjs(job_order, trim_anonymous_location)
+ adjustDirObjs(job_order, trim_anonymous_location)
+
# now construct the wrapper
step = {
main = tool.tool
+ wf_runner_resources = None
+
+ hints = main.get("hints", [])
+ found = False
+ for h in hints:
+ if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
+ wf_runner_resources = h
+ found = True
+ break
+ if not found:
+ wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
+ hints.append(wf_runner_resources)
+
+ # uncomment me
+ # wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
+ # submit_runner_image or "arvados/jobs:"+__version__,
+ # runtimeContext)
+
+ if submit_runner_ram:
+ wf_runner_resources["ramMin"] = submit_runner_ram
+
newinputs = []
for i in main["inputs"]:
inp = {}
"loadListing", "default"):
if f in i:
inp[f] = i[f]
+
+ if set_defaults:
+ sn = shortname(i["id"])
+ if sn in job_order:
+ inp["default"] = job_order[sn]
+
inp["id"] = "#main/%s" % shortname(i["id"])
newinputs.append(inp)
if main.get("requirements"):
wrapper["requirements"].extend(main["requirements"])
- if main.get("hints"):
+ if hints:
wrapper["hints"] = main["hints"]
doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
for g in git_info:
doc[g] = git_info[g]
- update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False)
+ update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False, runtimeContext)
+
+ return doc
+
+
+def make_workflow_record(arvRunner, doc, name, tool, project_uuid, update_uuid):
wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
if project_uuid:
body["workflow"]["owner_uuid"] = project_uuid
- if uuid:
- call = arvRunner.api.workflows().update(uuid=uuid, body=body)
+ if update_uuid:
+ call = arvRunner.api.workflows().update(uuid=update_uuid, body=body)
else:
call = arvRunner.api.workflows().create(body=body)
return call.execute(num_retries=arvRunner.num_retries)["uuid"]