from arvados.keep import KeepClient
from arvados.errors import ApiError
+import arvados_cwl.util
from .arvcontainer import RunnerContainer
from .arvjob import RunnerJob, RunnerTemplate
from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
-from .arvtool import ArvadosCommandTool
+from .arvtool import ArvadosCommandTool, validate_cluster_target
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
from .perf import Perf
from .pathmapper import NoFollowPathMapper
from .task_queue import TaskQueue
from .context import ArvLoadingContext, ArvRuntimeContext
-from .util import get_current_container
from ._version import __version__
from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
+DEFAULT_PRIORITY = 500
+
class RuntimeStatusLoggingHandler(logging.Handler):
"""
Intercepts logging calls and report them as runtime statuses on runner
# Add a custom logging handler to the root logger for runtime status reporting
# if running inside a container
- if get_current_container(self.api, self.num_retries, logger):
+ if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
root_logger = logging.getLogger('')
handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
root_logger.addHandler(handler)
self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
collection_cache=self.collection_cache)
+ validate_cluster_target(self, self.runtimeContext)
+
def arv_make_tool(self, toolpath_object, loadingContext):
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
activity statuses, for example in the RuntimeStatusLoggingHandler.
"""
with self.workflow_eval_lock:
- current = get_current_container(self.api, self.num_retries, logger)
+ current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
if current is None:
return
runtime_status = current.get('runtime_status', {})
def set_crunch_output(self):
if self.work_api == "containers":
- current = get_current_container(self.api, self.num_retries, logger)
+ current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
if current is None:
return
try:
if runtimeContext.submit:
# Submit a runner job to run the workflow for us.
if self.work_api == "containers":
- if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
+ if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
runtimeContext.runnerjob = tool.tool["id"]
runnerjob = tool.job(job_order,
self.output_callback,
runnerjob.run(submitargs)
return (runnerjob.uuid, "success")
- current_container = get_current_container(self.api, self.num_retries, logger)
+ current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
if current_container:
logger.info("Running inside container %s", current_container.get("uuid"))