19070: Add --copy-deps/--no-copy-deps
authorPeter Amstutz <peter.amstutz@curii.com>
Tue, 3 May 2022 21:01:26 +0000 (17:01 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Wed, 11 May 2022 21:27:13 +0000 (17:27 -0400)
Copies dependencies by default using --create-workflow and --update
workflow

Keeps old behavior by default when running from the command line (can
get new behavior with --copy-deps).

Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvdocker.py
sdk/cwl/arvados_cwl/context.py
sdk/cwl/arvados_cwl/executor.py
sdk/cwl/arvados_cwl/runner.py

index c73b358eccfb19211ce5a077d56ac995d30a40c0..3faa510a0a854cc2b1d713345a745a0eac21d677 100644 (file)
@@ -217,6 +217,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--enable-preemptible", dest="enable_preemptible", default=None, action="store_true", help="Use preemptible instances. Control individual steps with arv:UsePreemptible hint.")
     exgroup.add_argument("--disable-preemptible", dest="enable_preemptible", default=None, action="store_false", help="Don't use preemptible instances.")
 
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--copy-deps", dest="copy_deps", default=None, action="store_true", help="Copy dependencies into the destination project.")
+    exgroup.add_argument("--no-copy-deps", dest="copy_deps", default=None, action="store_false", help="Leave depenencies where they are.")
+
     parser.add_argument(
         "--skip-schemas",
         action="store_true",
index f75bde81e6cebd655a8378fbd382f18b3bf18d2f..33b4c90c617711f6c2505bda1be35e93ed74137f 100644 (file)
@@ -247,7 +247,8 @@ class ArvadosContainer(JobBase):
                                                                     runtimeContext.project_uuid,
                                                                     runtimeContext.force_docker_pull,
                                                                     runtimeContext.tmp_outdir_prefix,
-                                                                    runtimeContext.match_local_docker)
+                                                                    runtimeContext.match_local_docker,
+                                                                    runtimeContext.copy_deps)
 
         network_req, _ = self.get_requirement("NetworkAccess")
         if network_req:
index d5295afc23a4d8511569091ef96e94a8c7a6268c..cf0b3b9daff3deb02a84fe6e41428e04cdefca4e 100644 (file)
@@ -57,7 +57,7 @@ def determine_image_id(dockerImageId):
 
 
 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid,
-                         force_pull, tmp_outdir_prefix, match_local_docker):
+                         force_pull, tmp_outdir_prefix, match_local_docker, copy_deps):
     """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
 
     if "http://arvados.org/cwl#dockerCollectionPDH" in dockerRequirement:
@@ -85,10 +85,14 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
                                                                 image_tag=image_tag,
                                                                 project_uuid=None)
 
-        images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
-                                                                image_name=image_name,
-                                                                image_tag=image_tag,
-                                                                project_uuid=project_uuid)
+        if copy_deps:
+            # Only images that are available in the destination project
+            images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+                                                                    image_name=image_name,
+                                                                    image_tag=image_tag,
+                                                                    project_uuid=project_uuid)
+        else:
+            images = out_of_project_images
 
         if match_local_docker:
             local_image_id = determine_image_id(dockerRequirement["dockerImageId"])
index 316250106b09cdd248d0ddf7b292cbfc1881a700..64f85e20763590fd173e57046f471f6e41602ac2 100644 (file)
@@ -38,6 +38,7 @@ class ArvRuntimeContext(RuntimeContext):
         self.collection_cache_size = 256
         self.match_local_docker = False
         self.enable_preemptible = None
+        self.copy_deps = None
 
         super(ArvRuntimeContext, self).__init__(kwargs)
 
index ef371b43dfd9068fa5d7c94ad412e26a6833eae4..ed57c6dae0709c544bff404e10cfd7821cb10856 100644 (file)
@@ -544,6 +544,16 @@ The 'jobs' API is no longer supported.
         if not runtimeContext.name:
             runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
 
+        if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
+            runtimeContext.copy_deps = True
+
+        if runtimeContext.update_workflow and self.project_uuid is None:
+            # If we are updating a workflow, make sure anything that
+            # gets uploaded goes into the same parent project, unless
+            # an alternate --project-uuid was provided.
+            existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
+            self.project_uuid = existing_wf["owner_uuid"]
+
         # Upload local file references in the job order.
         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
                                      updated_tool, job_order)
@@ -571,10 +581,6 @@ The 'jobs' API is no longer supported.
         else:
             tool = updated_tool
 
-        if runtimeContext.update_workflow and self.project_uuid is None:
-            existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
-            self.project_uuid = existing_wf["owner_uuid"]
-
         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
         # Also uploads docker images.
         merged_map = upload_workflow_deps(self, tool)
index dae0541bb4098bbc0bdc43432e673c8028d6d5ea..6b670c73df4e6d58872193897223429337b4402f 100644 (file)
@@ -39,6 +39,7 @@ from cwltool.builder import Builder
 import schema_salad.validate as validate
 
 import arvados.collection
+import arvados.util
 from .util import collectionUUID
 from ruamel.yaml import YAML
 from ruamel.yaml.comments import CommentedMap, CommentedSeq
@@ -399,10 +400,15 @@ def upload_dependencies(arvrunner, name, document_loader,
                            single_collection=True,
                            optional_deps=optional_deps)
 
+    keeprefs = set()
+    def addkeepref(k):
+        keeprefs.add(collection_pdh_pattern.match(k).group(1))
+
     def setloc(p):
         loc = p.get("location")
         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
             p["location"] = mapper.mapper(p["location"]).resolved
+            addkeepref(p["location"])
             return
 
         if not loc:
@@ -424,7 +430,10 @@ def upload_dependencies(arvrunner, name, document_loader,
 
         gp = collection_uuid_pattern.match(loc)
         if not gp:
+            # Not a uuid pattern (must be a pdh pattern)
+            addkeepref(p["location"])
             return
+
         uuid = gp.groups()[0]
         if uuid not in uuid_map:
             raise SourceLine(p, "location", validate.ValidationException).makeError(
@@ -439,6 +448,38 @@ def upload_dependencies(arvrunner, name, document_loader,
         for d in discovered:
             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
 
+    if arvrunner.runtimeContext.copy_deps:
+        # Find referenced collections and copy them into the
+        # destination project, for easy sharing.
+        already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
+                                     filters=[["portable_data_hash", "in", list(keeprefs)],
+                                              ["owner_uuid", "=", arvrunner.project_uuid]],
+                                     select=["uuid", "portable_data_hash", "created_at"]))
+
+        keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
+        for kr in keeprefs:
+            col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
+                                                  order="created_at desc",
+                                                   select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
+                                                   limit=1).execute()
+            if len(col["items"]) == 0:
+                logger.warning("Cannot find collection with portable data hash %s", kr)
+                continue
+            col = col["items"][0]
+            try:
+                arvrunner.api.collections().create(body={"collection": {
+                    "owner_uuid": arvrunner.project_uuid,
+                    "name": col["name"],
+                    "description": col["description"],
+                    "properties": col["properties"],
+                    "portable_data_hash": col["portable_data_hash"],
+                    "manifest_text": col["manifest_text"],
+                    "storage_classes_desired": col["storage_classes_desired"],
+                    "trash_at": col["trash_at"]
+                }}, ensure_unique_name=True).execute()
+            except Exception as e:
+                logger.warning("Unable copy collection to destination: %s", e)
+
     if "$schemas" in workflowobj:
         sch = CommentedSeq()
         for s in workflowobj["$schemas"]:
@@ -462,13 +503,15 @@ def upload_docker(arvrunner, tool):
             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
                                                        arvrunner.runtimeContext.force_docker_pull,
                                                        arvrunner.runtimeContext.tmp_outdir_prefix,
-                                                       arvrunner.runtimeContext.match_local_docker)
+                                                       arvrunner.runtimeContext.match_local_docker,
+                                                       arvrunner.runtimeContext.copy_deps)
         else:
             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
                                                        True, arvrunner.project_uuid,
                                                        arvrunner.runtimeContext.force_docker_pull,
                                                        arvrunner.runtimeContext.tmp_outdir_prefix,
-                                                       arvrunner.runtimeContext.match_local_docker)
+                                                       arvrunner.runtimeContext.match_local_docker,
+                                                       arvrunner.runtimeContext.copy_deps)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
             upload_docker(arvrunner, s.embedded_tool)
@@ -506,7 +549,8 @@ def packed_workflow(arvrunner, tool, merged_map):
                                                                                                              arvrunner.project_uuid,
                                                                                                              arvrunner.runtimeContext.force_docker_pull,
                                                                                                              arvrunner.runtimeContext.tmp_outdir_prefix,
-                                                                                                             arvrunner.runtimeContext.match_local_docker)
+                                                                                                             arvrunner.runtimeContext.match_local_docker,
+                                                                                                             arvrunner.runtimeContext.copy_deps)
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
@@ -614,7 +658,8 @@ def arvados_jobs_image(arvrunner, img):
         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
                                                           arvrunner.runtimeContext.force_docker_pull,
                                                           arvrunner.runtimeContext.tmp_outdir_prefix,
-                                                          arvrunner.runtimeContext.match_local_docker)
+                                                          arvrunner.runtimeContext.match_local_docker,
+                                                          arvrunner.runtimeContext.copy_deps)
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )