X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/9359b63514047e816e69fb049fbd792c06bede24..ebed37b04288c946739d4266e8867b08908edc36:/sdk/cwl/arvados_cwl/executor.py diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index bf81853be4..994594023a 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -23,6 +23,7 @@ import arvados.config from arvados.keep import KeepClient from arvados.errors import ApiError +import arvados_cwl.util from .arvcontainer import RunnerContainer from .arvjob import RunnerJob, RunnerTemplate from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps @@ -33,7 +34,6 @@ from .perf import Perf from .pathmapper import NoFollowPathMapper from .task_queue import TaskQueue from .context import ArvLoadingContext, ArvRuntimeContext -from .util import get_current_container from ._version import __version__ from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema @@ -43,6 +43,8 @@ from cwltool.command_line_tool import compute_checksums logger = logging.getLogger('arvados.cwl-runner') metrics = logging.getLogger('arvados.cwl-runner.metrics') +DEFAULT_PRIORITY = 500 + class RuntimeStatusLoggingHandler(logging.Handler): """ Intercepts logging calls and report them as runtime statuses on runner @@ -167,7 +169,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods # Add a custom logging handler to the root logger for runtime status reporting # if running inside a container - if get_current_container(self.api, self.num_retries, logger): + if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger): root_logger = logging.getLogger('') handler = RuntimeStatusLoggingHandler(self.runtime_status_update) root_logger.addHandler(handler) @@ -222,7 +224,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods activity statuses, for example in the RuntimeStatusLoggingHandler. """ with self.workflow_eval_lock: - current = get_current_container(self.api, self.num_retries, logger) + current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger) if current is None: return runtime_status = current.get('runtime_status', {}) @@ -326,21 +328,26 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods elif self.work_api == "jobs": table = self.poll_api.jobs() - try: - proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries) - except Exception as e: - logger.warn("Error checking states on API server: %s", e) - remain_wait = self.poll_interval - continue + pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000) - for p in proc_states["items"]: - self.on_message({ - "object_uuid": p["uuid"], - "event_type": "update", - "properties": { - "new_attributes": p - } - }) + while keys: + page = keys[:pageSize] + keys = keys[pageSize:] + try: + proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries) + except Exception as e: + logger.warn("Error checking states on API server: %s", e) + remain_wait = self.poll_interval + continue + + for p in proc_states["items"]: + self.on_message({ + "object_uuid": p["uuid"], + "event_type": "update", + "properties": { + "new_attributes": p + } + }) finish_poll = time.time() remain_wait = self.poll_interval - (finish_poll - begin_poll) except: @@ -460,7 +467,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods def set_crunch_output(self): if self.work_api == "containers": - current = get_current_container(self.api, self.num_retries, logger) + current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger) if current is None: return try: @@ -579,7 +586,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods if runtimeContext.submit: # Submit a runner job to run the workflow for us. if self.work_api == "containers": - if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait: + if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner): runtimeContext.runnerjob = tool.tool["id"] runnerjob = tool.job(job_order, self.output_callback, @@ -621,7 +628,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods runnerjob.run(submitargs) return (runnerjob.uuid, "success") - current_container = get_current_container(self.api, self.num_retries, logger) + current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger) if current_container: logger.info("Running inside container %s", current_container.get("uuid")) @@ -631,17 +638,17 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count) - if runnerjob: - jobiter = iter((runnerjob,)) - else: - if runtimeContext.cwl_runner_job is not None: - self.uuid = runtimeContext.cwl_runner_job.get('uuid') - jobiter = tool.job(job_order, - self.output_callback, - runtimeContext) - try: self.workflow_eval_lock.acquire() + if runnerjob: + jobiter = iter((runnerjob,)) + else: + if runtimeContext.cwl_runner_job is not None: + self.uuid = runtimeContext.cwl_runner_job.get('uuid') + jobiter = tool.job(job_order, + self.output_callback, + runtimeContext) + # Holds the lock while this code runs and releases it when # it is safe to do so in self.workflow_eval_lock.wait(), # at which point on_message can update job state and @@ -681,7 +688,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit: logger.error("Interrupted, workflow will be cancelled") else: - logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False)) + logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False)) if self.pipeline: self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], body={"state": "Failed"}).execute(num_retries=self.num_retries)