from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
from .arvtool import ArvadosCommandTool, validate_cluster_target
from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
+from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
from .perf import Perf
from .pathmapper import NoFollowPathMapper
from .task_queue import TaskQueue
from ._version import __version__
from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
from cwltool.command_line_tool import compute_checksums
logger = logging.getLogger('arvados.cwl-runner')
if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
raise Exception("--priority must be in the range 1..1000.")
+ visited = set()
+ estimated_size = [0]
+ def estimate_collection_cache(obj):
+ if obj.get("location", "").startswith("keep:"):
+ m = pdh_size.match(obj["location"][5:])
+ if m and m.group(1) not in visited:
+ visited.add(m.group(1))
+ estimated_size[0] += int(m.group(2))
+ visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
+ logger.info("Estimated size %s", estimated_size)
+
runnerjob = None
if runtimeContext.submit:
# Submit a runner job to run the workflow for us.
logger = logging.getLogger('arvados.cwl-runner')
+pdh_size = re.compile(r'([0-9a-f]{32})\+(\d+)(\+\S+)*')
+
class CollectionCache(object):
def __init__(self, api_client, keep_client, num_retries,
cap=256*1024*1024,
self.cap = cap
self.min_entries = min_entries
- def cap_cache(self):
- if self.total > self.cap:
- # ordered list iterates from oldest to newest
- for pdh, v in self.collections.items():
- if self.total < self.cap or len(self.collections) < self.min_entries:
- break
- # cut it loose
- logger.debug("Evicting collection reader %s from cache", pdh)
- del self.collections[pdh]
- self.total -= v[1]
+ def set_cap(self, cap):
+ self.cap = cap
+
+ def cap_cache(self, required):
+ # ordered dict iterates from oldest to newest
+ for pdh, v in self.collections.items():
+ available = self.cap - self.total
+ if available >= required or len(self.collections) < self.min_entries:
+ return
+ # cut it loose
+ logger.debug("Evicting collection reader %s from cache", pdh)
+ del self.collections[pdh]
+ self.total -= v[1]
def get(self, pdh):
with self.lock:
if pdh not in self.collections:
+ m = pdh_size.match(pdh)
+ if m:
+ self.cap_cache(int(m.group(2)) * 128)
logger.debug("Creating collection reader for %s", pdh)
cr = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
keep_client=self.keep_client,
sz = len(cr.manifest_text()) * 128
self.collections[pdh] = (cr, sz)
self.total += sz
- self.cap_cache()
else:
cr, sz = self.collections[pdh]
# bump it to the back