self.thread_count = arvargs.thread_count
self.poll_interval = 12
self.loadingContext = None
+ self.should_estimate_cache_size = True
if keep_client is not None:
self.keep_client = keep_client
if arvargs.collection_cache_size:
collection_cache_size = arvargs.collection_cache_size*1024*1024
+ self.should_estimate_cache_size = False
else:
collection_cache_size = 256*1024*1024
if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
raise Exception("--priority must be in the range 1..1000.")
- visited = set()
- estimated_size = [0]
- def estimate_collection_cache(obj):
- if obj.get("location", "").startswith("keep:"):
- m = pdh_size.match(obj["location"][5:])
- if m and m.group(1) not in visited:
- visited.add(m.group(1))
- estimated_size[0] += int(m.group(2))
- visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
- logger.info("Estimated size %s", estimated_size)
+ if self.should_estimate_cache_size:
+ visited = set()
+ estimated_size = [0]
+ def estimate_collection_cache(obj):
+ if obj.get("location", "").startswith("keep:"):
+ m = pdh_size.match(obj["location"][5:])
+ if m and m.group(1) not in visited:
+ visited.add(m.group(1))
+ estimated_size[0] += int(m.group(2))
+ visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
+ runtimeContext.collection_cache_size = max(((estimated_size[0]*192) / (1024*1024))+1, 256)
+ self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
+
+ logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
runnerjob = None
if runtimeContext.submit: