19070: Fix --update-workflow
authorPeter Amstutz <peter.amstutz@curii.com>
Thu, 12 May 2022 18:31:57 +0000 (14:31 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Thu, 12 May 2022 18:31:57 +0000 (14:31 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/executor.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/tests/scripts/download_all_data.sh
sdk/cwl/tests/test_copy_deps.py

index 8eb12913adb854894cc19d4a3dd8cca862ae9dc1..51e7cd8b9e52a2d8ffcc0f890e60f30f6810c359 100644 (file)
@@ -307,7 +307,7 @@ class ArvadosWorkflow(Workflow):
             if self.wf_pdh is None:
                 adjustFileObjs(packed, keepmount)
                 adjustDirObjs(packed, keepmount)
-                self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
+                self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext)
 
         self.loadingContext = self.loadingContext.copy()
         self.loadingContext.metadata = self.loadingContext.metadata.copy()
index 70bc0c4572bbcf18c50006a61a85dea6b2d6957f..1759e4ac2829a4840895d47e465fdfcad6a2bf1d 100644 (file)
@@ -517,7 +517,6 @@ The 'jobs' API is no longer supported.
 
         updated_tool.visit(self.check_features)
 
-        self.project_uuid = runtimeContext.project_uuid
         self.pipeline = None
         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
         self.secret_store = runtimeContext.secret_store
@@ -558,7 +557,9 @@ The 'jobs' API is no longer supported.
             # gets uploaded goes into the same parent project, unless
             # an alternate --project-uuid was provided.
             existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
-            self.project_uuid = existing_wf["owner_uuid"]
+            runtimeContext.project_uuid = existing_wf["owner_uuid"]
+
+        self.project_uuid = runtimeContext.project_uuid
 
         # Upload local file references in the job order.
         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
@@ -604,7 +605,7 @@ The 'jobs' API is no longer supported.
             # Create a pipeline template or workflow record and exit.
             if self.work_api == "containers":
                 uuid = upload_workflow(self, tool, job_order,
-                                       self.project_uuid,
+                                       runtimeContext.project_uuid,
                                        runtimeContext,
                                        uuid=runtimeContext.update_workflow,
                                        submit_runner_ram=runtimeContext.submit_runner_ram,
index f39c98d8829ddad4c5e4002ce9d9e8e9693c1e62..50b3bb94d8934c1234220c25a3aa6bfb1a3075a0 100644 (file)
@@ -671,7 +671,7 @@ def arvados_jobs_image(arvrunner, img, runtimeContext):
         raise Exception("Docker image %s is not available\n%s" % (img, e) )
 
 
-def upload_workflow_collection(arvrunner, name, packed):
+def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
     collection = arvados.collection.Collection(api_client=arvrunner.api,
                                                keep_client=arvrunner.keep_client,
                                                num_retries=arvrunner.num_retries)
@@ -680,15 +680,15 @@ def upload_workflow_collection(arvrunner, name, packed):
 
     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
                ["name", "like", name+"%"]]
-    if arvrunner.project_uuid:
-        filters.append(["owner_uuid", "=", arvrunner.project_uuid])
+    if runtimeContext.project_uuid:
+        filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
 
     if exists["items"]:
         logger.info("Using collection %s", exists["items"][0]["uuid"])
     else:
         collection.save_new(name=name,
-                            owner_uuid=arvrunner.project_uuid,
+                            owner_uuid=runtimeContext.project_uuid,
                             ensure_unique_name=True,
                             num_retries=arvrunner.num_retries)
         logger.info("Uploaded to %s", collection.manifest_locator())
index d3a9d78762a60612f0974ca1325e3b528d2c51a8..7c769b5848688f6ca0efd39c703211a849425ce7 100755 (executable)
@@ -1,7 +1,7 @@
+#!/bin/sh
+
 # Copyright (C) The Arvados Authors. All rights reserved.
 #
 # SPDX-License-Identifier: Apache-2.0
 
-#!/bin/bash
-
 echo bubble
index 2b78db8b0b5d7cdb8ddea9453681eaa3f63e6a3d..853a7d360953df486df7d13cde5db04b978f1f07 100644 (file)
@@ -10,14 +10,14 @@ api = arvados.api()
 def check_contents(group, wf_uuid):
     contents = api.groups().contents(uuid=group["uuid"]).execute()
     if len(contents["items"]) != 3:
-        raise Exception("Expected 3 items")
+        raise Exception("Expected 3 items in "+group["uuid"]+" was "+len(contents["items"]))
 
     found = False
     for c in contents["items"]:
         if c["kind"] == "arvados#workflow" and c["uuid"] == wf_uuid:
             found = True
     if not found:
-        raise Exception("Couldn't find workflow")
+        raise Exception("Couldn't find workflow in "+group["uuid"])
 
     found = False
     for c in contents["items"]:
@@ -42,7 +42,9 @@ def test_create():
             raise Exception("Expected 0 items")
 
         # Create workflow, by default should also copy dependencies
-        wf_uuid = subprocess.check_output(["arvados-cwl-runner", "--create-workflow", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"])
+        cmd = ["arvados-cwl-runner", "--create-workflow", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"]
+        print(" ".join(cmd))
+        wf_uuid = subprocess.check_output(cmd)
         wf_uuid = wf_uuid.decode("utf-8").strip()
         check_contents(group, wf_uuid)
     finally:
@@ -57,7 +59,9 @@ def test_update():
             raise Exception("Expected 0 items")
 
         # Create workflow, but with --no-copy-deps it shouldn't copy anything
-        wf_uuid = subprocess.check_output(["arvados-cwl-runner", "--no-copy-deps", "--create-workflow", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"])
+        cmd = ["arvados-cwl-runner", "--no-copy-deps", "--create-workflow", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"]
+        print(" ".join(cmd))
+        wf_uuid = subprocess.check_output(cmd)
         wf_uuid = wf_uuid.decode("utf-8").strip()
 
         contents = api.groups().contents(uuid=group["uuid"]).execute()
@@ -72,7 +76,9 @@ def test_update():
             raise Exception("Couldn't find workflow")
 
         # Updating by default will copy missing items
-        wf_uuid = subprocess.check_output(["arvados-cwl-runner", "--update-workflow", wf_uuid, "19070-copy-deps.cwl"])
+        cmd = ["arvados-cwl-runner", "--update-workflow", wf_uuid, "19070-copy-deps.cwl"]
+        print(" ".join(cmd))
+        wf_uuid = subprocess.check_output(cmd)
         wf_uuid = wf_uuid.decode("utf-8").strip()
         check_contents(group, wf_uuid)
 
@@ -88,7 +94,9 @@ def test_execute():
             raise Exception("Expected 0 items")
 
         # Execute workflow, shouldn't copy anything.
-        wf_uuid = subprocess.check_output(["arvados-cwl-runner", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"])
+        cmd = ["arvados-cwl-runner", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"]
+        print(" ".join(cmd))
+        wf_uuid = subprocess.check_output(cmd)
         wf_uuid = wf_uuid.decode("utf-8").strip()
 
         contents = api.groups().contents(uuid=group["uuid"]).execute()
@@ -115,7 +123,9 @@ def test_execute():
             raise Exception("Didn't expect to find jobs image dependency")
 
         # Execute workflow with --copy-deps
-        wf_uuid = subprocess.check_output(["arvados-cwl-runner", "--project-uuid", group["uuid"], "--copy-deps", "19070-copy-deps.cwl"])
+        cmd = ["arvados-cwl-runner", "--project-uuid", group["uuid"], "--copy-deps", "19070-copy-deps.cwl"]
+        print(" ".join(cmd))
+        wf_uuid = subprocess.check_output(cmd)
         wf_uuid = wf_uuid.decode("utf-8").strip()
 
         contents = api.groups().contents(uuid=group["uuid"]).execute()