from arvados.keep import KeepClient
from arvados.errors import ApiError
+import arvados_cwl.util
from .arvcontainer import RunnerContainer
from .arvjob import RunnerJob, RunnerTemplate
from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
-from .arvtool import ArvadosCommandTool
+from .arvtool import ArvadosCommandTool, validate_cluster_target
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
from .perf import Perf
from .pathmapper import NoFollowPathMapper
from .task_queue import TaskQueue
from .context import ArvLoadingContext, ArvRuntimeContext
-from .util import get_current_container
from ._version import __version__
from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
+DEFAULT_PRIORITY = 500
+
class RuntimeStatusLoggingHandler(logging.Handler):
"""
Intercepts logging calls and report them as runtime statuses on runner
else:
self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
- self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+ self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
+ cap=arvargs.collection_cache)
self.fetcher_constructor = partial(CollectionFetcher,
api_client=self.api,
# Add a custom logging handler to the root logger for runtime status reporting
# if running inside a container
- if get_current_container(self.api, self.num_retries, logger):
+ if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
root_logger = logging.getLogger('')
handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
root_logger.addHandler(handler)
self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
collection_cache=self.collection_cache)
+ validate_cluster_target(self, self.runtimeContext)
+
def arv_make_tool(self, toolpath_object, loadingContext):
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
def start_run(self, runnable, runtimeContext):
- self.task_queue.add(partial(runnable.run, runtimeContext))
+ self.task_queue.add(partial(runnable.run, runtimeContext),
+ self.workflow_eval_lock, self.stop_polling)
def process_submitted(self, container):
with self.workflow_eval_lock:
with self.workflow_eval_lock:
j = self.processes[uuid]
logger.info("%s %s is %s", self.label(j), uuid, record["state"])
- self.task_queue.add(partial(j.done, record))
+ self.task_queue.add(partial(j.done, record),
+ self.workflow_eval_lock, self.stop_polling)
del self.processes[uuid]
def runtime_status_update(self, kind, message, detail=None):
activity statuses, for example in the RuntimeStatusLoggingHandler.
"""
with self.workflow_eval_lock:
- current = get_current_container(self.api, self.num_retries, logger)
+ current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
if current is None:
return
runtime_status = current.get('runtime_status', {})
def set_crunch_output(self):
if self.work_api == "containers":
- current = get_current_container(self.api, self.num_retries, logger)
+ current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
if current is None:
return
try:
if runtimeContext.submit:
# Submit a runner job to run the workflow for us.
if self.work_api == "containers":
- if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
+ if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
runtimeContext.runnerjob = tool.tool["id"]
runnerjob = tool.job(job_order,
self.output_callback,
runnerjob.run(submitargs)
return (runnerjob.uuid, "success")
- current_container = get_current_container(self.api, self.num_retries, logger)
+ current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
if current_container:
logger.info("Running inside container %s", current_container.get("uuid"))
else:
logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
break
+
+ if self.stop_polling.is_set():
+ break
+
loopperf.__enter__()
loopperf.__exit__()