X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c2989afb1ed2397676212804272cb128236758cf..9c0ea426167e01ea69ff022a811803aa95a302d4:/sdk/cwl/arvados_cwl/executor.py diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index 8c2023e187..6cac709260 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -23,17 +23,17 @@ import arvados.config from arvados.keep import KeepClient from arvados.errors import ApiError +import arvados_cwl.util from .arvcontainer import RunnerContainer from .arvjob import RunnerJob, RunnerTemplate from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps -from .arvtool import ArvadosCommandTool +from .arvtool import ArvadosCommandTool, validate_cluster_target from .arvworkflow import ArvadosWorkflow, upload_workflow from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache from .perf import Perf from .pathmapper import NoFollowPathMapper from .task_queue import TaskQueue from .context import ArvLoadingContext, ArvRuntimeContext -from .util import get_current_container from ._version import __version__ from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema @@ -43,6 +43,8 @@ from cwltool.command_line_tool import compute_checksums logger = logging.getLogger('arvados.cwl-runner') metrics = logging.getLogger('arvados.cwl-runner.metrics') +DEFAULT_PRIORITY = 500 + class RuntimeStatusLoggingHandler(logging.Handler): """ Intercepts logging calls and report them as runtime statuses on runner @@ -167,7 +169,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods # Add a custom logging handler to the root logger for runtime status reporting # if running inside a container - if get_current_container(self.api, self.num_retries, logger): + if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger): root_logger = logging.getLogger('') handler = RuntimeStatusLoggingHandler(self.runtime_status_update) root_logger.addHandler(handler) @@ -176,6 +178,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods self.runtimeContext.make_fs_access = partial(CollectionFsAccess, collection_cache=self.collection_cache) + validate_cluster_target(self, self.runtimeContext) + def arv_make_tool(self, toolpath_object, loadingContext): if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool": @@ -222,7 +226,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods activity statuses, for example in the RuntimeStatusLoggingHandler. """ with self.workflow_eval_lock: - current = get_current_container(self.api, self.num_retries, logger) + current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger) if current is None: return runtime_status = current.get('runtime_status', {}) @@ -465,7 +469,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods def set_crunch_output(self): if self.work_api == "containers": - current = get_current_container(self.api, self.num_retries, logger) + current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger) if current is None: return try: @@ -584,7 +588,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods if runtimeContext.submit: # Submit a runner job to run the workflow for us. if self.work_api == "containers": - if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait: + if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner): runtimeContext.runnerjob = tool.tool["id"] runnerjob = tool.job(job_order, self.output_callback, @@ -626,7 +630,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods runnerjob.run(submitargs) return (runnerjob.uuid, "success") - current_container = get_current_container(self.api, self.num_retries, logger) + current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger) if current_container: logger.info("Running inside container %s", current_container.get("uuid"))