14510: Estimate collection cache size wip
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Tue, 27 Nov 2018 23:13:30 +0000 (18:13 -0500)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Tue, 27 Nov 2018 23:13:30 +0000 (18:13 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

sdk/cwl/arvados_cwl/executor.py
sdk/cwl/arvados_cwl/fsaccess.py

index caf954fcb038108961c1cf296010ff0468a24fbc..29f1582aa647d2b171547639f5eb370cd1492a5c 100644 (file)
@@ -29,7 +29,7 @@ from .arvjob import RunnerJob, RunnerTemplate
 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
 from .arvtool import ArvadosCommandTool, validate_cluster_target
 from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
+from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
 from .perf import Perf
 from .pathmapper import NoFollowPathMapper
 from .task_queue import TaskQueue
@@ -37,7 +37,7 @@ from .context import ArvLoadingContext, ArvRuntimeContext
 from ._version import __version__
 
 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
 from cwltool.command_line_tool import compute_checksums
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -593,6 +593,17 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
             raise Exception("--priority must be in the range 1..1000.")
 
+        visited = set()
+        estimated_size = [0]
+        def estimate_collection_cache(obj):
+            if obj.get("location", "").startswith("keep:"):
+                m = pdh_size.match(obj["location"][5:])
+                if m and m.group(1) not in visited:
+                    visited.add(m.group(1))
+                    estimated_size[0] += int(m.group(2))
+        visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
+        logger.info("Estimated size %s", estimated_size)
+
         runnerjob = None
         if runtimeContext.submit:
             # Submit a runner job to run the workflow for us.
index 5981268128486496f22d28b4928d0b90b5775e7e..c3713b21ece80c620d1a16b064ff04ec7122532e 100644 (file)
@@ -28,6 +28,8 @@ from schema_salad.ref_resolver import DefaultFetcher
 
 logger = logging.getLogger('arvados.cwl-runner')
 
+pdh_size = re.compile(r'([0-9a-f]{32})\+(\d+)(\+\S+)*')
+
 class CollectionCache(object):
     def __init__(self, api_client, keep_client, num_retries,
                  cap=256*1024*1024,
@@ -41,20 +43,26 @@ class CollectionCache(object):
         self.cap = cap
         self.min_entries = min_entries
 
-    def cap_cache(self):
-        if self.total > self.cap:
-            # ordered list iterates from oldest to newest
-            for pdh, v in self.collections.items():
-                if self.total < self.cap or len(self.collections) < self.min_entries:
-                    break
-                # cut it loose
-                logger.debug("Evicting collection reader %s from cache", pdh)
-                del self.collections[pdh]
-                self.total -= v[1]
+    def set_cap(self, cap):
+        self.cap = cap
+
+    def cap_cache(self, required):
+        # ordered dict iterates from oldest to newest
+        for pdh, v in self.collections.items():
+            available = self.cap - self.total
+            if available >= required or len(self.collections) < self.min_entries:
+                return
+            # cut it loose
+            logger.debug("Evicting collection reader %s from cache", pdh)
+            del self.collections[pdh]
+            self.total -= v[1]
 
     def get(self, pdh):
         with self.lock:
             if pdh not in self.collections:
+                m = pdh_size.match(pdh)
+                if m:
+                    self.cap_cache(int(m.group(2)) * 128)
                 logger.debug("Creating collection reader for %s", pdh)
                 cr = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
                                                          keep_client=self.keep_client,
@@ -62,7 +70,6 @@ class CollectionCache(object):
                 sz = len(cr.manifest_text()) * 128
                 self.collections[pdh] = (cr, sz)
                 self.total += sz
-                self.cap_cache()
             else:
                 cr, sz = self.collections[pdh]
                 # bump it to the back