14510: Estimate collection cache size wip
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
index caf954fcb038108961c1cf296010ff0468a24fbc..29f1582aa647d2b171547639f5eb370cd1492a5c 100644 (file)
@@ -29,7 +29,7 @@ from .arvjob import RunnerJob, RunnerTemplate
 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
 from .arvtool import ArvadosCommandTool, validate_cluster_target
 from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
+from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
 from .perf import Perf
 from .pathmapper import NoFollowPathMapper
 from .task_queue import TaskQueue
@@ -37,7 +37,7 @@ from .context import ArvLoadingContext, ArvRuntimeContext
 from ._version import __version__
 
 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
 from cwltool.command_line_tool import compute_checksums
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -593,6 +593,17 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
             raise Exception("--priority must be in the range 1..1000.")
 
+        visited = set()
+        estimated_size = [0]
+        def estimate_collection_cache(obj):
+            if obj.get("location", "").startswith("keep:"):
+                m = pdh_size.match(obj["location"][5:])
+                if m and m.group(1) not in visited:
+                    visited.add(m.group(1))
+                    estimated_size[0] += int(m.group(2))
+        visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
+        logger.info("Estimated size %s", estimated_size)
+
         runnerjob = None
         if runtimeContext.submit:
             # Submit a runner job to run the workflow for us.