19385: Fix test_copy_deps
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
index 7b10c15af752ba895454fe9cbb85283f32294ba7..cc3a51d8011cbb46122996e11b675494c3ce0d32 100644 (file)
@@ -133,21 +133,16 @@ def rel_ref(s, baseuri, urlexpander, merged_map, jobmapper):
     if s.startswith("keep:"):
         return s
 
-    #print("BBB", s, baseuri)
     uri = urlexpander(s, baseuri)
-    #print("CCC", uri)
 
     if uri.startswith("keep:"):
         return uri
 
     fileuri = urllib.parse.urldefrag(baseuri)[0]
 
-    #print("BBB", s, baseuri, uri)
-
     for u in (baseuri, fileuri):
         if u in merged_map:
             replacements = merged_map[u].resolved
-            #print("RRR", u, uri, replacements)
             if uri in replacements:
                 return replacements[uri]
 
@@ -158,14 +153,10 @@ def rel_ref(s, baseuri, urlexpander, merged_map, jobmapper):
     p2 = os.path.dirname(uri_file_path(uri))
     p3 = os.path.basename(uri_file_path(uri))
 
-    #print("PPP", p1, p2, p3)
-
     r = os.path.relpath(p2, p1)
     if r == ".":
         r = ""
 
-    #print("RRR", r)
-
     return os.path.join(r, p3)
 
 def is_basetype(tp):
@@ -242,9 +233,7 @@ def fix_schemadef(req, baseuri, urlexpander, merged_map, jobmapper, pdh):
         path, frag = urllib.parse.urldefrag(r)
         rel = rel_ref(r, baseuri, urlexpander, merged_map, jobmapper)
         merged_map.setdefault(path, FileUpdates({}, {}))
-        #print("PPP", path, r, frag)
         rename = "keep:%s/%s" %(pdh, rel)
-        #rename = "#%s" % frag
         for mm in merged_map:
             merged_map[mm].resolved[r] = rename
     return req
@@ -261,7 +250,7 @@ def drop_ids(d):
             drop_ids(d[field])
 
 
-def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
+def upload_workflow(arvRunner, tool, job_order, project_uuid,
                         runtimeContext,
                         uuid=None,
                         submit_runner_ram=0, name=None, merged_map=None,
@@ -306,8 +295,6 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
 
     col = arvados.collection.Collection(api_client=arvRunner.api)
 
-    #print(merged_map)
-
     for w in workflow_files | import_files:
         # 1. load YAML
 
@@ -329,7 +316,7 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
         update_refs(result, w, tool.doc_loader.expand_url, merged_map, jobmapper, set_block_style, runtimeContext, "", "")
 
         with col.open(w[n+1:], "wt") as f:
-            #print(yamlloader.dump(result, stream=sys.stdout))
+            # yamlloader.dump(result, stream=sys.stdout)
             yamlloader.dump(result, stream=f)
 
         with col.open(os.path.join("original", w[n+1:]), "wt") as f:
@@ -363,9 +350,13 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
             p = g.split("#", 1)[1]
             properties["arv:"+p] = git_info[g]
 
-    col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
-
-    logger.info("Workflow uploaded to %s", col.manifest_locator())
+    existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", col.portable_data_hash()],
+                                                         ["owner_uuid", "=", arvRunner.project_uuid]]).execute(num_retries=arvRunner.num_retries)
+    if len(existing["items"]) == 0:
+        col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
+        logger.info("Workflow uploaded to %s", col.manifest_locator())
+    else:
+        logger.info("Workflow uploaded to %s", existing["items"][0]["uuid"])
 
     adjustDirObjs(job_order, trim_listing)
     adjustFileObjs(job_order, trim_anonymous_location)
@@ -465,29 +456,15 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
         for g in git_info:
             doc[g] = git_info[g]
 
-    #print("MMM", main["id"])
-    #print(yamlloader.dump(wrapper, stream=sys.stdout))
-
     for i, r in enumerate(wrapper["requirements"]):
         if r["class"] == "SchemaDefRequirement":
             wrapper["requirements"][i] = fix_schemadef(r, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, col.portable_data_hash())
 
-    # print()
-    # print("merrrrged maaap", merged_map)
-    # print()
-    #print("update_refs", main["id"], runfile)
-
-    #print(yamlloader.dump(wrapper, stream=sys.stdout))
-
     update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, False, runtimeContext, main["id"]+"#", "#main/")
 
     # Remove any lingering file references.
     drop_ids(wrapper)
 
-    #print("HHH")
-
-    #print(yamlloader.dump(wrapper, stream=sys.stdout))
-
     return doc
 
 
@@ -511,70 +488,6 @@ def make_workflow_record(arvRunner, doc, name, tool, project_uuid, update_uuid):
     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
 
 
-def upload_workflow(arvRunner, tool, job_order, project_uuid,
-                    runtimeContext, uuid=None,
-                    submit_runner_ram=0, name=None, merged_map=None,
-                    submit_runner_image=None,
-                    git_info=None):
-
-    packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext, git_info)
-
-    adjustDirObjs(job_order, trim_listing)
-    adjustFileObjs(job_order, trim_anonymous_location)
-    adjustDirObjs(job_order, trim_anonymous_location)
-
-    main = [p for p in packed["$graph"] if p["id"] == "#main"][0]
-    for inp in main["inputs"]:
-        sn = shortname(inp["id"])
-        if sn in job_order:
-            inp["default"] = job_order[sn]
-
-    if not name:
-        name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
-
-    upload_dependencies(arvRunner, name, tool.doc_loader,
-                        packed, tool.tool["id"],
-                        runtimeContext)
-
-    wf_runner_resources = None
-
-    hints = main.get("hints", [])
-    found = False
-    for h in hints:
-        if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
-            wf_runner_resources = h
-            found = True
-            break
-    if not found:
-        wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
-        hints.append(wf_runner_resources)
-
-    wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
-                                                                  submit_runner_image or "arvados/jobs:"+__version__,
-                                                                  runtimeContext)
-
-    if submit_runner_ram:
-        wf_runner_resources["ramMin"] = submit_runner_ram
-
-    main["hints"] = hints
-
-    wrapper = make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool)
-
-    body = {
-        "workflow": {
-            "name": name,
-            "description": tool.tool.get("doc", ""),
-            "definition": wrapper
-        }}
-    if project_uuid:
-        body["workflow"]["owner_uuid"] = project_uuid
-
-    if uuid:
-        call = arvRunner.api.workflows().update(uuid=uuid, body=body)
-    else:
-        call = arvRunner.api.workflows().create(body=body)
-    return call.execute(num_retries=arvRunner.num_retries)["uuid"]
-
 def dedup_reqs(reqs):
     dedup = {}
     for r in reversed(reqs):