X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/00bb1461d14cfc02e6ec2c74d622b7b6b716e775..d7d074d790366338a01736552e916c5e4b5cef69:/sdk/cwl/arvados_cwl/executor.py diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index ff8ff6ff89..27774b2f7c 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -27,9 +27,9 @@ import arvados_cwl.util from .arvcontainer import RunnerContainer from .arvjob import RunnerJob, RunnerTemplate from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps -from .arvtool import ArvadosCommandTool, validate_cluster_target +from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool from .arvworkflow import ArvadosWorkflow, upload_workflow -from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache +from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size from .perf import Perf from .pathmapper import NoFollowPathMapper from .task_queue import TaskQueue @@ -37,7 +37,7 @@ from .context import ArvLoadingContext, ArvRuntimeContext from ._version import __version__ from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema -from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing +from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class from cwltool.command_line_tool import compute_checksums logger = logging.getLogger('arvados.cwl-runner') @@ -95,6 +95,7 @@ class ArvCwlExecutor(object): arvargs.output_name = None arvargs.output_tags = None arvargs.thread_count = 1 + arvargs.collection_cache_size = None self.api = api_client self.processes = {} @@ -116,14 +117,21 @@ class ArvCwlExecutor(object): self.thread_count = arvargs.thread_count self.poll_interval = 12 self.loadingContext = None + self.should_estimate_cache_size = True if keep_client is not None: self.keep_client = keep_client else: self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries) + if arvargs.collection_cache_size: + collection_cache_size = arvargs.collection_cache_size*1024*1024 + self.should_estimate_cache_size = False + else: + collection_cache_size = 256*1024*1024 + self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries, - cap=arvargs.collection_cache) + cap=collection_cache_size) self.fetcher_constructor = partial(CollectionFetcher, api_client=self.api, @@ -187,8 +195,10 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods return ArvadosCommandTool(self, toolpath_object, loadingContext) elif "class" in toolpath_object and toolpath_object["class"] == "Workflow": return ArvadosWorkflow(self, toolpath_object, loadingContext) + elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool": + return ArvadosExpressionTool(self, toolpath_object, loadingContext) else: - return cwltool.workflow.default_make_tool(toolpath_object, loadingContext) + raise Exception("Unknown tool %s" % toolpath_object.get("class")) def output_callback(self, out, processStatus): with self.workflow_eval_lock: @@ -549,7 +559,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods uuid=existing_uuid, submit_runner_ram=runtimeContext.submit_runner_ram, name=runtimeContext.name, - merged_map=merged_map) + merged_map=merged_map, + loadingContext=loadingContext) tmpl.save() # cwltool.main will write our return value to stdout. return (tmpl.uuid, "success") @@ -587,17 +598,29 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods if runtimeContext.priority < 1 or runtimeContext.priority > 1000: raise Exception("--priority must be in the range 1..1000.") + if self.should_estimate_cache_size: + visited = set() + estimated_size = [0] + def estimate_collection_cache(obj): + if obj.get("location", "").startswith("keep:"): + m = pdh_size.match(obj["location"][5:]) + if m and m.group(1) not in visited: + visited.add(m.group(1)) + estimated_size[0] += int(m.group(2)) + visit_class(job_order, ("File", "Directory"), estimate_collection_cache) + runtimeContext.collection_cache_size = max(((estimated_size[0]*192) / (1024*1024))+1, 256) + self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024) + + logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size) + runnerjob = None if runtimeContext.submit: # Submit a runner job to run the workflow for us. if self.work_api == "containers": if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner): runtimeContext.runnerjob = tool.tool["id"] - runnerjob = tool.job(job_order, - self.output_callback, - runtimeContext).next() else: - runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse, + tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse, self.output_name, self.output_tags, submit_runner_ram=runtimeContext.submit_runner_ram, @@ -607,9 +630,11 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods intermediate_output_ttl=runtimeContext.intermediate_output_ttl, merged_map=merged_map, priority=runtimeContext.priority, - secret_store=self.secret_store) + secret_store=self.secret_store, + collection_cache_size=runtimeContext.collection_cache_size, + collection_cache_is_default=self.should_estimate_cache_size) elif self.work_api == "jobs": - runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse, + tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse, self.output_name, self.output_tags, submit_runner_ram=runtimeContext.submit_runner_ram, @@ -627,10 +652,16 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods "state": "RunningOnClient"}).execute(num_retries=self.num_retries) logger.info("Pipeline instance %s", self.pipeline["uuid"]) - if runnerjob and not runtimeContext.wait: - submitargs = runtimeContext.copy() - submitargs.submit = False - runnerjob.run(submitargs) + if runtimeContext.cwl_runner_job is not None: + self.uuid = runtimeContext.cwl_runner_job.get('uuid') + + jobiter = tool.job(job_order, + self.output_callback, + runtimeContext) + + if runtimeContext.submit and not runtimeContext.wait: + runnerjob = jobiter.next() + runnerjob.run(runtimeContext) return (runnerjob.uuid, "success") current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger) @@ -645,14 +676,6 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods try: self.workflow_eval_lock.acquire() - if runnerjob: - jobiter = iter((runnerjob,)) - else: - if runtimeContext.cwl_runner_job is not None: - self.uuid = runtimeContext.cwl_runner_job.get('uuid') - jobiter = tool.job(job_order, - self.output_callback, - runtimeContext) # Holds the lock while this code runs and releases it when # it is safe to do so in self.workflow_eval_lock.wait(), @@ -701,8 +724,10 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods if self.pipeline: self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], body={"state": "Failed"}).execute(num_retries=self.num_retries) - if runnerjob and runnerjob.uuid and self.work_api == "containers": - self.api.container_requests().update(uuid=runnerjob.uuid, + if runtimeContext.submit and isinstance(tool, Runner): + runnerjob = tool + if runnerjob.uuid and self.work_api == "containers": + self.api.container_requests().update(uuid=runnerjob.uuid, body={"priority": "0"}).execute(num_retries=self.num_retries) finally: self.workflow_eval_lock.release() @@ -717,8 +742,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods if self.final_output is None: raise WorkflowException("Workflow did not return a result.") - if runtimeContext.submit and isinstance(runnerjob, Runner): - logger.info("Final output collection %s", runnerjob.final_output) + if runtimeContext.submit and isinstance(tool, Runner): + logger.info("Final output collection %s", tool.final_output) else: if self.output_name is None: self.output_name = "Output of %s" % (shortname(tool.tool["id"]))