X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0939c5719af9601ae6b95f6d230cbc4cace122cc..9c0ea426167e01ea69ff022a811803aa95a302d4:/sdk/cwl/arvados_cwl/executor.py diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index 7d398af37f..6cac709260 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -23,17 +23,17 @@ import arvados.config from arvados.keep import KeepClient from arvados.errors import ApiError +import arvados_cwl.util from .arvcontainer import RunnerContainer from .arvjob import RunnerJob, RunnerTemplate from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps -from .arvtool import ArvadosCommandTool +from .arvtool import ArvadosCommandTool, validate_cluster_target from .arvworkflow import ArvadosWorkflow, upload_workflow from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache from .perf import Perf from .pathmapper import NoFollowPathMapper from .task_queue import TaskQueue from .context import ArvLoadingContext, ArvRuntimeContext -from .util import get_current_container from ._version import __version__ from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema @@ -43,6 +43,8 @@ from cwltool.command_line_tool import compute_checksums logger = logging.getLogger('arvados.cwl-runner') metrics = logging.getLogger('arvados.cwl-runner.metrics') +DEFAULT_PRIORITY = 500 + class RuntimeStatusLoggingHandler(logging.Handler): """ Intercepts logging calls and report them as runtime statuses on runner @@ -167,7 +169,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods # Add a custom logging handler to the root logger for runtime status reporting # if running inside a container - if get_current_container(self.api, self.num_retries, logger): + if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger): root_logger = logging.getLogger('') handler = RuntimeStatusLoggingHandler(self.runtime_status_update) root_logger.addHandler(handler) @@ -176,6 +178,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods self.runtimeContext.make_fs_access = partial(CollectionFsAccess, collection_cache=self.collection_cache) + validate_cluster_target(self, self.runtimeContext) + def arv_make_tool(self, toolpath_object, loadingContext): if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool": @@ -222,7 +226,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods activity statuses, for example in the RuntimeStatusLoggingHandler. """ with self.workflow_eval_lock: - current = get_current_container(self.api, self.num_retries, logger) + current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger) if current is None: return runtime_status = current.get('runtime_status', {}) @@ -326,21 +330,26 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods elif self.work_api == "jobs": table = self.poll_api.jobs() - try: - proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries) - except Exception as e: - logger.warn("Error checking states on API server: %s", e) - remain_wait = self.poll_interval - continue + pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000) - for p in proc_states["items"]: - self.on_message({ - "object_uuid": p["uuid"], - "event_type": "update", - "properties": { - "new_attributes": p - } - }) + while keys: + page = keys[:pageSize] + keys = keys[pageSize:] + try: + proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries) + except Exception as e: + logger.warn("Error checking states on API server: %s", e) + remain_wait = self.poll_interval + continue + + for p in proc_states["items"]: + self.on_message({ + "object_uuid": p["uuid"], + "event_type": "update", + "properties": { + "new_attributes": p + } + }) finish_poll = time.time() remain_wait = self.poll_interval - (finish_poll - begin_poll) except: @@ -460,7 +469,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods def set_crunch_output(self): if self.work_api == "containers": - current = get_current_container(self.api, self.num_retries, logger) + current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger) if current is None: return try: @@ -579,7 +588,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods if runtimeContext.submit: # Submit a runner job to run the workflow for us. if self.work_api == "containers": - if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait: + if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner): runtimeContext.runnerjob = tool.tool["id"] runnerjob = tool.job(job_order, self.output_callback, @@ -621,23 +630,27 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods runnerjob.run(submitargs) return (runnerjob.uuid, "success") + current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger) + if current_container: + logger.info("Running inside container %s", current_container.get("uuid")) + self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout) self.polling_thread = threading.Thread(target=self.poll_states) self.polling_thread.start() self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count) - if runnerjob: - jobiter = iter((runnerjob,)) - else: - if runtimeContext.cwl_runner_job is not None: - self.uuid = runtimeContext.cwl_runner_job.get('uuid') - jobiter = tool.job(job_order, - self.output_callback, - runtimeContext) - try: self.workflow_eval_lock.acquire() + if runnerjob: + jobiter = iter((runnerjob,)) + else: + if runtimeContext.cwl_runner_job is not None: + self.uuid = runtimeContext.cwl_runner_job.get('uuid') + jobiter = tool.job(job_order, + self.output_callback, + runtimeContext) + # Holds the lock while this code runs and releases it when # it is safe to do so in self.workflow_eval_lock.wait(), # at which point on_message can update job state and @@ -677,7 +690,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit: logger.error("Interrupted, workflow will be cancelled") else: - logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False)) + logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False)) if self.pipeline: self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], body={"state": "Failed"}).execute(num_retries=self.num_retries)