from arvados.keep import KeepClient
from arvados.errors import ApiError
+import arvados_cwl.util
from .arvcontainer import RunnerContainer
from .arvjob import RunnerJob, RunnerTemplate
from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
-from .arvtool import ArvadosCommandTool
+from .arvtool import ArvadosCommandTool, validate_cluster_target
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
from .perf import Perf
from .pathmapper import NoFollowPathMapper
from .task_queue import TaskQueue
from .context import ArvLoadingContext, ArvRuntimeContext
-from .util import get_current_container
from ._version import __version__
from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
+DEFAULT_PRIORITY = 500
+
class RuntimeStatusLoggingHandler(logging.Handler):
"""
Intercepts logging calls and report them as runtime statuses on runner
else:
self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
- self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+ self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
+ cap=arvargs.collection_cache)
self.fetcher_constructor = partial(CollectionFetcher,
api_client=self.api,
# Add a custom logging handler to the root logger for runtime status reporting
# if running inside a container
- if get_current_container(self.api, self.num_retries, logger):
+ if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
root_logger = logging.getLogger('')
handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
root_logger.addHandler(handler)
self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
collection_cache=self.collection_cache)
+ validate_cluster_target(self, self.runtimeContext)
+
def arv_make_tool(self, toolpath_object, loadingContext):
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
def start_run(self, runnable, runtimeContext):
- self.task_queue.add(partial(runnable.run, runtimeContext))
+ self.task_queue.add(partial(runnable.run, runtimeContext),
+ self.workflow_eval_lock, self.stop_polling)
def process_submitted(self, container):
with self.workflow_eval_lock:
with self.workflow_eval_lock:
j = self.processes[uuid]
logger.info("%s %s is %s", self.label(j), uuid, record["state"])
- self.task_queue.add(partial(j.done, record))
+ self.task_queue.add(partial(j.done, record),
+ self.workflow_eval_lock, self.stop_polling)
del self.processes[uuid]
def runtime_status_update(self, kind, message, detail=None):
activity statuses, for example in the RuntimeStatusLoggingHandler.
"""
with self.workflow_eval_lock:
- current = get_current_container(self.api, self.num_retries, logger)
+ current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
if current is None:
return
runtime_status = current.get('runtime_status', {})
elif self.work_api == "jobs":
table = self.poll_api.jobs()
- try:
- proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
- except Exception as e:
- logger.warn("Error checking states on API server: %s", e)
- remain_wait = self.poll_interval
- continue
+ pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
- for p in proc_states["items"]:
- self.on_message({
- "object_uuid": p["uuid"],
- "event_type": "update",
- "properties": {
- "new_attributes": p
- }
- })
+ while keys:
+ page = keys[:pageSize]
+ keys = keys[pageSize:]
+ try:
+ proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
+ except Exception as e:
+ logger.warn("Error checking states on API server: %s", e)
+ remain_wait = self.poll_interval
+ continue
+
+ for p in proc_states["items"]:
+ self.on_message({
+ "object_uuid": p["uuid"],
+ "event_type": "update",
+ "properties": {
+ "new_attributes": p
+ }
+ })
finish_poll = time.time()
remain_wait = self.poll_interval - (finish_poll - begin_poll)
except:
def set_crunch_output(self):
if self.work_api == "containers":
- current = get_current_container(self.api, self.num_retries, logger)
+ current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
if current is None:
return
try:
if runtimeContext.submit:
# Submit a runner job to run the workflow for us.
if self.work_api == "containers":
- if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
+ if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
runtimeContext.runnerjob = tool.tool["id"]
runnerjob = tool.job(job_order,
self.output_callback,
runnerjob.run(submitargs)
return (runnerjob.uuid, "success")
- current_container = get_current_container(self.api, self.num_retries, logger)
+ current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
if current_container:
logger.info("Running inside container %s", current_container.get("uuid"))
self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
- if runnerjob:
- jobiter = iter((runnerjob,))
- else:
- if runtimeContext.cwl_runner_job is not None:
- self.uuid = runtimeContext.cwl_runner_job.get('uuid')
- jobiter = tool.job(job_order,
- self.output_callback,
- runtimeContext)
-
try:
self.workflow_eval_lock.acquire()
+ if runnerjob:
+ jobiter = iter((runnerjob,))
+ else:
+ if runtimeContext.cwl_runner_job is not None:
+ self.uuid = runtimeContext.cwl_runner_job.get('uuid')
+ jobiter = tool.job(job_order,
+ self.output_callback,
+ runtimeContext)
+
# Holds the lock while this code runs and releases it when
# it is safe to do so in self.workflow_eval_lock.wait(),
# at which point on_message can update job state and
else:
logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
break
+
+ if self.stop_polling.is_set():
+ break
+
loopperf.__enter__()
loopperf.__exit__()
if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
logger.error("Interrupted, workflow will be cancelled")
else:
- logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+ logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)