19385: Fix test_copy_deps
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
index bef05fed32a67d0bc570fcf2f40b6dc8fe30cc7f..cc3a51d8011cbb46122996e11b675494c3ce0d32 100644 (file)
@@ -133,21 +133,16 @@ def rel_ref(s, baseuri, urlexpander, merged_map, jobmapper):
     if s.startswith("keep:"):
         return s
 
-    #print("BBB", s, baseuri)
     uri = urlexpander(s, baseuri)
-    #print("CCC", uri)
 
     if uri.startswith("keep:"):
         return uri
 
     fileuri = urllib.parse.urldefrag(baseuri)[0]
 
-    #print("BBB", s, baseuri, uri)
-
     for u in (baseuri, fileuri):
         if u in merged_map:
             replacements = merged_map[u].resolved
-            #print("RRR", u, uri, replacements)
             if uri in replacements:
                 return replacements[uri]
 
@@ -158,14 +153,10 @@ def rel_ref(s, baseuri, urlexpander, merged_map, jobmapper):
     p2 = os.path.dirname(uri_file_path(uri))
     p3 = os.path.basename(uri_file_path(uri))
 
-    #print("PPP", p1, p2, p3)
-
     r = os.path.relpath(p2, p1)
     if r == ".":
         r = ""
 
-    #print("RRR", r)
-
     return os.path.join(r, p3)
 
 def is_basetype(tp):
@@ -188,6 +179,11 @@ def update_refs(d, baseuri, urlexpander, merged_map, jobmapper, set_block_style,
             else:
                 update_refs(s, baseuri, urlexpander, merged_map, jobmapper, set_block_style, runtimeContext, prefix, replacePrefix)
     elif isinstance(d, MutableMapping):
+        for field in ("id", "name"):
+            if isinstance(d.get(field), str) and d[field].startswith("_:"):
+                # blank node reference, was added in automatically, can get rid of it.
+                del d[field]
+
         if "id" in d:
             baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
         elif "name" in d and isinstance(d["name"], str):
@@ -203,14 +199,15 @@ def update_refs(d, baseuri, urlexpander, merged_map, jobmapper, set_block_style,
                 continue
 
             if field in ("$include", "$import") and isinstance(d[field], str):
-                d[field] = rel_ref(d[field], baseuri, urlexpander, {}, None)
+                d[field] = rel_ref(d[field], baseuri, urlexpander, {}, jobmapper)
                 continue
 
-            if (field == "type" and
-                isinstance(d["type"], str) and
-                not is_basetype(d["type"])):
-                d["type"] = rel_ref(d["type"], baseuri, urlexpander, merged_map, jobmapper)
-                continue
+            for t in ("type", "items"):
+                if (field == t and
+                    isinstance(d[t], str) and
+                    not is_basetype(d[t])):
+                    d[t] = rel_ref(d[t], baseuri, urlexpander, merged_map, jobmapper)
+                    continue
 
             if field == "inputs" and isinstance(d["inputs"], MutableMapping):
                 for inp in d["inputs"]:
@@ -228,17 +225,15 @@ def update_refs(d, baseuri, urlexpander, merged_map, jobmapper, set_block_style,
             update_refs(d[field], baseuri, urlexpander, merged_map, jobmapper, set_block_style, runtimeContext, prefix, replacePrefix)
 
 
-def fix_schemadef(req, baseuri, urlexpander, merged_map, pdh):
+def fix_schemadef(req, baseuri, urlexpander, merged_map, jobmapper, pdh):
     req = copy.deepcopy(req)
 
     for f in req["types"]:
         r = f["name"]
         path, frag = urllib.parse.urldefrag(r)
-        rel = rel_ref(r, baseuri, urlexpander, merged_map)
+        rel = rel_ref(r, baseuri, urlexpander, merged_map, jobmapper)
         merged_map.setdefault(path, FileUpdates({}, {}))
-        #print("PPP", path, r, frag)
         rename = "keep:%s/%s" %(pdh, rel)
-        #rename = "#%s" % frag
         for mm in merged_map:
             merged_map[mm].resolved[r] = rename
     return req
@@ -255,7 +250,7 @@ def drop_ids(d):
             drop_ids(d[field])
 
 
-def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
+def upload_workflow(arvRunner, tool, job_order, project_uuid,
                         runtimeContext,
                         uuid=None,
                         submit_runner_ram=0, name=None, merged_map=None,
@@ -300,8 +295,6 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
 
     col = arvados.collection.Collection(api_client=arvRunner.api)
 
-    #print(merged_map)
-
     for w in workflow_files | import_files:
         # 1. load YAML
 
@@ -323,16 +316,23 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
         update_refs(result, w, tool.doc_loader.expand_url, merged_map, jobmapper, set_block_style, runtimeContext, "", "")
 
         with col.open(w[n+1:], "wt") as f:
-            #print(yamlloader.dump(result, stream=sys.stdout))
+            # yamlloader.dump(result, stream=sys.stdout)
             yamlloader.dump(result, stream=f)
 
+        with col.open(os.path.join("original", w[n+1:]), "wt") as f:
+            f.write(text)
+
+
     for w in include_files:
         with col.open(w[n+1:], "wb") as f1:
-            with open(uri_file_path(w), "rb") as f2:
-                dat = f2.read(65536)
-                while dat:
-                    f1.write(dat)
+            with col.open(os.path.join("original", w[n+1:]), "wb") as f3:
+                with open(uri_file_path(w), "rb") as f2:
                     dat = f2.read(65536)
+                    while dat:
+                        f1.write(dat)
+                        f3.write(dat)
+                        dat = f2.read(65536)
+
 
     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
@@ -350,7 +350,13 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
             p = g.split("#", 1)[1]
             properties["arv:"+p] = git_info[g]
 
-    col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
+    existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", col.portable_data_hash()],
+                                                         ["owner_uuid", "=", arvRunner.project_uuid]]).execute(num_retries=arvRunner.num_retries)
+    if len(existing["items"]) == 0:
+        col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
+        logger.info("Workflow uploaded to %s", col.manifest_locator())
+    else:
+        logger.info("Workflow uploaded to %s", existing["items"][0]["uuid"])
 
     adjustDirObjs(job_order, trim_listing)
     adjustFileObjs(job_order, trim_anonymous_location)
@@ -450,29 +456,15 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
         for g in git_info:
             doc[g] = git_info[g]
 
-    #print("MMM", main["id"])
-    #print(yamlloader.dump(wrapper, stream=sys.stdout))
-
     for i, r in enumerate(wrapper["requirements"]):
         if r["class"] == "SchemaDefRequirement":
-            wrapper["requirements"][i] = fix_schemadef(r, main["id"], tool.doc_loader.expand_url, merged_map, col.portable_data_hash())
-
-    # print()
-    # print("merrrrged maaap", merged_map)
-    # print()
-    #print("update_refs", main["id"], runfile)
-
-    #print(yamlloader.dump(wrapper, stream=sys.stdout))
+            wrapper["requirements"][i] = fix_schemadef(r, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, col.portable_data_hash())
 
     update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, False, runtimeContext, main["id"]+"#", "#main/")
 
     # Remove any lingering file references.
     drop_ids(wrapper)
 
-    #print("HHH")
-
-    #print(yamlloader.dump(wrapper, stream=sys.stdout))
-
     return doc
 
 
@@ -496,70 +488,6 @@ def make_workflow_record(arvRunner, doc, name, tool, project_uuid, update_uuid):
     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
 
 
-def upload_workflow(arvRunner, tool, job_order, project_uuid,
-                    runtimeContext, uuid=None,
-                    submit_runner_ram=0, name=None, merged_map=None,
-                    submit_runner_image=None,
-                    git_info=None):
-
-    packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext, git_info)
-
-    adjustDirObjs(job_order, trim_listing)
-    adjustFileObjs(job_order, trim_anonymous_location)
-    adjustDirObjs(job_order, trim_anonymous_location)
-
-    main = [p for p in packed["$graph"] if p["id"] == "#main"][0]
-    for inp in main["inputs"]:
-        sn = shortname(inp["id"])
-        if sn in job_order:
-            inp["default"] = job_order[sn]
-
-    if not name:
-        name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
-
-    upload_dependencies(arvRunner, name, tool.doc_loader,
-                        packed, tool.tool["id"],
-                        runtimeContext)
-
-    wf_runner_resources = None
-
-    hints = main.get("hints", [])
-    found = False
-    for h in hints:
-        if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
-            wf_runner_resources = h
-            found = True
-            break
-    if not found:
-        wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
-        hints.append(wf_runner_resources)
-
-    wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
-                                                                  submit_runner_image or "arvados/jobs:"+__version__,
-                                                                  runtimeContext)
-
-    if submit_runner_ram:
-        wf_runner_resources["ramMin"] = submit_runner_ram
-
-    main["hints"] = hints
-
-    wrapper = make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool)
-
-    body = {
-        "workflow": {
-            "name": name,
-            "description": tool.tool.get("doc", ""),
-            "definition": wrapper
-        }}
-    if project_uuid:
-        body["workflow"]["owner_uuid"] = project_uuid
-
-    if uuid:
-        call = arvRunner.api.workflows().update(uuid=uuid, body=body)
-    else:
-        call = arvRunner.api.workflows().create(body=body)
-    return call.execute(num_retries=arvRunner.num_retries)["uuid"]
-
 def dedup_reqs(reqs):
     dedup = {}
     for r in reversed(reqs):