X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2b8d56f5ecb5b6b620b439c3d2aae5c8066b4cac..00bb1461d14cfc02e6ec2c74d622b7b6b716e775:/sdk/cwl/arvados_cwl/executor.py diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index 994594023a..ff8ff6ff89 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -27,7 +27,7 @@ import arvados_cwl.util from .arvcontainer import RunnerContainer from .arvjob import RunnerJob, RunnerTemplate from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps -from .arvtool import ArvadosCommandTool +from .arvtool import ArvadosCommandTool, validate_cluster_target from .arvworkflow import ArvadosWorkflow, upload_workflow from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache from .perf import Perf @@ -122,7 +122,8 @@ class ArvCwlExecutor(object): else: self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries) - self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries) + self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries, + cap=arvargs.collection_cache) self.fetcher_constructor = partial(CollectionFetcher, api_client=self.api, @@ -178,6 +179,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods self.runtimeContext.make_fs_access = partial(CollectionFsAccess, collection_cache=self.collection_cache) + validate_cluster_target(self, self.runtimeContext) + def arv_make_tool(self, toolpath_object, loadingContext): if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool": @@ -204,7 +207,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods def start_run(self, runnable, runtimeContext): - self.task_queue.add(partial(runnable.run, runtimeContext)) + self.task_queue.add(partial(runnable.run, runtimeContext), + self.workflow_eval_lock, self.stop_polling) def process_submitted(self, container): with self.workflow_eval_lock: @@ -214,7 +218,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods with self.workflow_eval_lock: j = self.processes[uuid] logger.info("%s %s is %s", self.label(j), uuid, record["state"]) - self.task_queue.add(partial(j.done, record)) + self.task_queue.add(partial(j.done, record), + self.workflow_eval_lock, self.stop_polling) del self.processes[uuid] def runtime_status_update(self, kind, message, detail=None): @@ -674,6 +679,10 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods else: logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.") break + + if self.stop_polling.is_set(): + break + loopperf.__enter__() loopperf.__exit__()