19385: Work in progress checkpoint, submitting uses wrappers
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
index 784e1bdb366d49f1a4d68c340987e5679f760e45..299ad5c0d55b2bfadce2b0f6524a6abbbe51521f 100644 (file)
@@ -129,7 +129,6 @@ def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info,
 
 def rel_ref(s, baseuri, urlexpander, merged_map):
     uri = urlexpander(s, baseuri)
-    print("DDD", baseuri, merged_map)
     fileuri = urllib.parse.urldefrag(baseuri)[0]
     if fileuri in merged_map:
         replacements = merged_map[fileuri].resolved
@@ -145,22 +144,24 @@ def rel_ref(s, baseuri, urlexpander, merged_map):
     r = os.path.relpath(p2, p1)
     if r == ".":
         r = ""
-    print("AAA", uri, s)
-    print("BBBB", p1, p2, p3, r)
     return os.path.join(r, p3)
 
 
-def update_refs(d, baseuri, urlexpander, merged_map, set_block_style):
+def update_refs(d, baseuri, urlexpander, merged_map, set_block_style, runtimeContext):
     if set_block_style and (isinstance(d, CommentedSeq) or isinstance(d, CommentedMap)):
         d.fa.set_block_style()
 
     if isinstance(d, MutableSequence):
         for s in d:
-            update_refs(s, baseuri, urlexpander, merged_map, set_block_style)
+            update_refs(s, baseuri, urlexpander, merged_map, set_block_style, runtimeContext)
     elif isinstance(d, MutableMapping):
         if "id" in d:
             baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
 
+        if d.get("class") == "DockerRequirement":
+            dockerImageId = d.get("dockerImageId") or d.get("dockerPull")
+            d["http://arvados.org/cwl#dockerCollectionPDH"] = runtimeContext.cached_docker_lookups.get(dockerImageId)
+
         for s in d:
             for field in ("$include", "$import", "location", "run"):
                 if field in d and isinstance(d[field], str):
@@ -170,13 +171,15 @@ def update_refs(d, baseuri, urlexpander, merged_map, set_block_style):
                 for n, s in enumerate(d["$schemas"]):
                     d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map)
 
-            update_refs(d[s], baseuri, urlexpander, merged_map, set_block_style)
+            update_refs(d[s], baseuri, urlexpander, merged_map, set_block_style, runtimeContext)
 
 def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
-                    runtimeContext, uuid=None,
-                    submit_runner_ram=0, name=None, merged_map=None,
-                    submit_runner_image=None,
-                    git_info=None):
+                        runtimeContext,
+                        uuid=None,
+                        submit_runner_ram=0, name=None, merged_map=None,
+                        submit_runner_image=None,
+                        git_info=None,
+                        set_defaults=False):
 
     firstfile = None
     workflow_files = set()
@@ -215,8 +218,6 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
 
     col = arvados.collection.Collection()
 
-    #print(merged_map.keys())
-
     for w in workflow_files | import_files:
         # 1. load YAML
 
@@ -235,7 +236,7 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
 
         # 2. find $import, $include, $schema, run, location
         # 3. update field value
-        update_refs(result, w, tool.doc_loader.expand_url, merged_map, set_block_style)
+        update_refs(result, w, tool.doc_loader.expand_url, merged_map, set_block_style, runtimeContext)
 
         with col.open(w[n+1:], "wt") as f:
             yamlloader.dump(result, stream=f)
@@ -252,10 +253,19 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
 
-    col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True)
-
     toolfile = tool.tool["id"][n+1:]
 
+    properties = {
+        "type": "workflow",
+        "arv:workflowMain": toolfile,
+    }
+
+    col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
+
+    adjustDirObjs(job_order, trim_listing)
+    adjustFileObjs(job_order, trim_anonymous_location)
+    adjustDirObjs(job_order, trim_anonymous_location)
+
     # now construct the wrapper
 
     step = {
@@ -268,6 +278,27 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
 
     main = tool.tool
 
+    wf_runner_resources = None
+
+    hints = main.get("hints", [])
+    found = False
+    for h in hints:
+        if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
+            wf_runner_resources = h
+            found = True
+            break
+    if not found:
+        wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
+        hints.append(wf_runner_resources)
+
+    # uncomment me
+    # wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
+    #                                                               submit_runner_image or "arvados/jobs:"+__version__,
+    #                                                               runtimeContext)
+
+    if submit_runner_ram:
+        wf_runner_resources["ramMin"] = submit_runner_ram
+
     newinputs = []
     for i in main["inputs"]:
         inp = {}
@@ -282,6 +313,12 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
                   "loadListing", "default"):
             if f in i:
                 inp[f] = i[f]
+
+        if set_defaults:
+            sn = shortname(i["id"])
+            if sn in job_order:
+                inp["default"] = job_order[sn]
+
         inp["id"] = "#main/%s" % shortname(i["id"])
         newinputs.append(inp)
 
@@ -309,7 +346,7 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
 
     if main.get("requirements"):
         wrapper["requirements"].extend(main["requirements"])
-    if main.get("hints"):
+    if hints:
         wrapper["hints"] = main["hints"]
 
     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
@@ -318,7 +355,12 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
         for g in git_info:
             doc[g] = git_info[g]
 
-    update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False)
+    update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False, runtimeContext)
+
+    return doc
+
+
+def make_workflow_record(arvRunner, doc, name, tool, project_uuid, update_uuid):
 
     wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
 
@@ -331,8 +373,8 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
     if project_uuid:
         body["workflow"]["owner_uuid"] = project_uuid
 
-    if uuid:
-        call = arvRunner.api.workflows().update(uuid=uuid, body=body)
+    if update_uuid:
+        call = arvRunner.api.workflows().update(uuid=update_uuid, body=body)
     else:
         call = arvRunner.api.workflows().create(body=body)
     return call.execute(num_retries=arvRunner.num_retries)["uuid"]