11907: Make collections containing dependencies have predictable PDHs
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 7 Jun 2018 13:54:47 +0000 (09:54 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 7 Jun 2018 14:45:30 +0000 (10:45 -0400)
Removes optimization that tries to avoid redundant file
uploads (decided complexity tradeoff isn't worth it.)  Collections
created from local file inputs longer use block packing.

Likely to invalidate job reuse for jobs submitted by past versions of
a-c-r, but will have more stable job reuse going forward.

Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py

index 5c60f7d2a019dee14b7fc5aa0a6965e4ce9ac085..5b29ae517e8b15c33781cd70247c36d2ae831b94 100644 (file)
@@ -76,7 +76,6 @@ class ArvCwlRunner(object):
         self.workflow_eval_lock = threading.Condition(threading.RLock())
         self.final_output = None
         self.final_status = None
-        self.uploaded = {}
         self.num_retries = num_retries
         self.uuid = None
         self.stop_polling = threading.Event()
@@ -238,12 +237,6 @@ class ArvCwlRunner(object):
         finally:
             self.stop_polling.set()
 
-    def get_uploaded(self):
-        return self.uploaded.copy()
-
-    def add_uploaded(self, src, pair):
-        self.uploaded[src] = pair
-
     def add_intermediate_output(self, uuid):
         if uuid:
             self.intermediate_output_collections.append(uuid)
index bd4b5283fbe4ff3de751110e9a15fd5250697008..27e48f1f4408e33630985f2060ba738af720111f 100644 (file)
@@ -127,19 +127,6 @@ class ArvPathMapper(PathMapper):
                                                        keep_client=self.arvrunner.keep_client,
                                                        num_retries=self.arvrunner.num_retries)
 
-        already_uploaded = self.arvrunner.get_uploaded()
-        copied_files = set()
-        for k in referenced_files:
-            loc = k["location"]
-            if loc in already_uploaded:
-                v = already_uploaded[loc]
-                self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), v.type, True)
-                if self.single_collection:
-                    basename = k["basename"]
-                    if basename not in collection:
-                        self.addentry({"location": loc, "class": v.type, "basename": basename}, collection, ".", [])
-                        copied_files.add((loc, basename, v.type))
-
         for srcobj in referenced_files:
             self.visit(srcobj, uploadfiles)
 
@@ -150,16 +137,12 @@ class ArvPathMapper(PathMapper):
                                          fnPattern="keep:%s/%s",
                                          name=self.name,
                                          project=self.arvrunner.project_uuid,
-                                         collection=collection)
+                                         collection=collection,
+                                         packed=False)
 
         for src, ab, st in uploadfiles:
             self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
                                            "Directory" if os.path.isdir(ab) else "File", True)
-            self.arvrunner.add_uploaded(src, self._pathmap[src])
-
-        for loc, basename, cls in copied_files:
-            fn = "keep:%s/%s" % (collection.portable_data_hash(), basename)
-            self._pathmap[loc] = MapperEnt(urllib.quote(fn, "/:+@"), self.collection_pattern % fn[5:], cls, True)
 
         for srcobj in referenced_files:
             remap = []
index 3ce08f6cc7971973f7e6925bbc351d65b3492592..cf91f69f818cd51e721c658cd05d5a81e9df6e05 100644 (file)
@@ -122,11 +122,18 @@ def upload_dependencies(arvrunner, name, document_loader,
         # that external references in $include and $mixin are captured.
         scanobj = loadref("", workflowobj["id"])
 
-    sc = scandeps(uri, scanobj,
+    sc_result = scandeps(uri, scanobj,
                   loadref_fields,
                   set(("$include", "$schemas", "location")),
                   loadref, urljoin=document_loader.fetcher.urljoin)
 
+    sc = []
+    def only_real(obj):
+        if obj.get("location", "").startswith("file:"):
+            sc.append(obj)
+
+    visit_class(sc_result, ("File", "Directory"), only_real)
+
     normalizeFilesDirs(sc)
 
     if include_primary and "id" in workflowobj:
index 8efcb6e7dd8c88590305f012b2c62c3cdfef36bb..4c31d3b4450eac66ed5f839de2d34842913067f0 100644 (file)
@@ -37,7 +37,7 @@ setup(name='arvados-cwl-runner',
           'schema-salad==2.7.20180501211602',
           'typing >= 3.5.3',
           'ruamel.yaml >=0.13.11, <0.15',
-          'arvados-python-client>=1.1.4.20180507184611',
+          'arvados-python-client>=1.1.4.20180607143841',
           'setuptools',
           'ciso8601 >=1.0.6, <2.0.0'
       ],