from .arvcontainer import RunnerContainer
from .arvjob import RunnerJob, RunnerTemplate
from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
-from .arvtool import ArvadosCommandTool, validate_cluster_target
+from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
+from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
from .perf import Perf
from .pathmapper import NoFollowPathMapper
from .task_queue import TaskQueue
from ._version import __version__
from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
from cwltool.command_line_tool import compute_checksums
logger = logging.getLogger('arvados.cwl-runner')
arvargs.output_name = None
arvargs.output_tags = None
arvargs.thread_count = 1
+ arvargs.collection_cache_size = None
self.api = api_client
self.processes = {}
self.thread_count = arvargs.thread_count
self.poll_interval = 12
self.loadingContext = None
+ self.should_estimate_cache_size = True
if keep_client is not None:
self.keep_client = keep_client
else:
self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
- self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+ if arvargs.collection_cache_size:
+ collection_cache_size = arvargs.collection_cache_size*1024*1024
+ self.should_estimate_cache_size = False
+ else:
+ collection_cache_size = 256*1024*1024
+
+ self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
+ cap=collection_cache_size)
self.fetcher_constructor = partial(CollectionFetcher,
api_client=self.api,
return ArvadosCommandTool(self, toolpath_object, loadingContext)
elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
return ArvadosWorkflow(self, toolpath_object, loadingContext)
+ elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
+ return ArvadosExpressionTool(self, toolpath_object, loadingContext)
else:
- return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
+ raise Exception("Unknown tool %s" % toolpath_object.get("class"))
def output_callback(self, out, processStatus):
with self.workflow_eval_lock:
def start_run(self, runnable, runtimeContext):
- self.task_queue.add(partial(runnable.run, runtimeContext))
+ self.task_queue.add(partial(runnable.run, runtimeContext),
+ self.workflow_eval_lock, self.stop_polling)
def process_submitted(self, container):
with self.workflow_eval_lock:
with self.workflow_eval_lock:
j = self.processes[uuid]
logger.info("%s %s is %s", self.label(j), uuid, record["state"])
- self.task_queue.add(partial(j.done, record))
+ self.task_queue.add(partial(j.done, record),
+ self.workflow_eval_lock, self.stop_polling)
del self.processes[uuid]
def runtime_status_update(self, kind, message, detail=None):
uuid=existing_uuid,
submit_runner_ram=runtimeContext.submit_runner_ram,
name=runtimeContext.name,
- merged_map=merged_map)
+ merged_map=merged_map,
+ loadingContext=loadingContext)
tmpl.save()
# cwltool.main will write our return value to stdout.
return (tmpl.uuid, "success")
if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
raise Exception("--priority must be in the range 1..1000.")
+ if self.should_estimate_cache_size:
+ visited = set()
+ estimated_size = [0]
+ def estimate_collection_cache(obj):
+ if obj.get("location", "").startswith("keep:"):
+ m = pdh_size.match(obj["location"][5:])
+ if m and m.group(1) not in visited:
+ visited.add(m.group(1))
+ estimated_size[0] += int(m.group(2))
+ visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
+ runtimeContext.collection_cache_size = max(((estimated_size[0]*192) / (1024*1024))+1, 256)
+ self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
+
+ logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
+
runnerjob = None
if runtimeContext.submit:
# Submit a runner job to run the workflow for us.
if self.work_api == "containers":
if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
runtimeContext.runnerjob = tool.tool["id"]
- runnerjob = tool.job(job_order,
- self.output_callback,
- runtimeContext).next()
else:
- runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
+ tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
self.output_name,
self.output_tags,
submit_runner_ram=runtimeContext.submit_runner_ram,
intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
merged_map=merged_map,
priority=runtimeContext.priority,
- secret_store=self.secret_store)
+ secret_store=self.secret_store,
+ collection_cache_size=runtimeContext.collection_cache_size,
+ collection_cache_is_default=self.should_estimate_cache_size)
elif self.work_api == "jobs":
- runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
+ tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
self.output_name,
self.output_tags,
submit_runner_ram=runtimeContext.submit_runner_ram,
"state": "RunningOnClient"}).execute(num_retries=self.num_retries)
logger.info("Pipeline instance %s", self.pipeline["uuid"])
- if runnerjob and not runtimeContext.wait:
- submitargs = runtimeContext.copy()
- submitargs.submit = False
- runnerjob.run(submitargs)
+ if runtimeContext.cwl_runner_job is not None:
+ self.uuid = runtimeContext.cwl_runner_job.get('uuid')
+
+ jobiter = tool.job(job_order,
+ self.output_callback,
+ runtimeContext)
+
+ if runtimeContext.submit and not runtimeContext.wait:
+ runnerjob = jobiter.next()
+ runnerjob.run(runtimeContext)
return (runnerjob.uuid, "success")
current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
try:
self.workflow_eval_lock.acquire()
- if runnerjob:
- jobiter = iter((runnerjob,))
- else:
- if runtimeContext.cwl_runner_job is not None:
- self.uuid = runtimeContext.cwl_runner_job.get('uuid')
- jobiter = tool.job(job_order,
- self.output_callback,
- runtimeContext)
# Holds the lock while this code runs and releases it when
# it is safe to do so in self.workflow_eval_lock.wait(),
else:
logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
break
+
+ if self.stop_polling.is_set():
+ break
+
loopperf.__enter__()
loopperf.__exit__()
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
- if runnerjob and runnerjob.uuid and self.work_api == "containers":
- self.api.container_requests().update(uuid=runnerjob.uuid,
+ if runtimeContext.submit and isinstance(tool, Runner):
+ runnerjob = tool
+ if runnerjob.uuid and self.work_api == "containers":
+ self.api.container_requests().update(uuid=runnerjob.uuid,
body={"priority": "0"}).execute(num_retries=self.num_retries)
finally:
self.workflow_eval_lock.release()
if self.final_output is None:
raise WorkflowException("Workflow did not return a result.")
- if runtimeContext.submit and isinstance(runnerjob, Runner):
- logger.info("Final output collection %s", runnerjob.final_output)
+ if runtimeContext.submit and isinstance(tool, Runner):
+ logger.info("Final output collection %s", tool.final_output)
else:
if self.output_name is None:
self.output_name = "Output of %s" % (shortname(tool.tool["id"]))