from .perf import Perf
from .pathmapper import NoFollowPathMapper
from .task_queue import TaskQueue
+from .context import ArvLoadingContext, ArvRuntimeContext
from ._version import __version__
from cwltool.pack import pack
from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
from cwltool.command_line_tool import compute_checksums
+
from arvados.api import OrderedJsonModel
logger = logging.getLogger('arvados.cwl-runner')
"""
- def __init__(self, api_client, work_api=None, keep_client=None,
- output_name=None, output_tags=None, default_storage_classes="default",
- num_retries=4, thread_count=4):
+ def __init__(self, api_client,
+ arvargs=None,
+ keep_client=None,
+ num_retries=4,
+ thread_count=4):
+
+ if arvargs is None:
+ arvargs = argparse.Namespace()
+ arvargs.work_api = None
+ arvargs.output_name = None
+ arvargs.output_tags = None
+ arvargs.thread_count = 1
+
self.api = api_client
self.processes = {}
self.workflow_eval_lock = threading.Condition(threading.RLock())
self.poll_api = None
self.pipeline = None
self.final_output_collection = None
- self.output_name = output_name
- self.output_tags = output_tags
+ self.output_name = arvargs.output_name
+ self.output_tags = arvargs.output_tags
self.project_uuid = None
self.intermediate_output_ttl = 0
self.intermediate_output_collections = []
self.trash_intermediate = False
- self.thread_count = thread_count
+ self.thread_count = arvargs.thread_count
self.poll_interval = 12
- self.default_storage_classes = default_storage_classes
+ self.loadingContext = None
if keep_client is not None:
self.keep_client = keep_client
try:
methods = self.api._rootDesc.get('resources')[api]['methods']
if ('httpMethod' in methods['create'] and
- (work_api == api or work_api is None)):
+ (arvargs.work_api == api or arvargs.work_api is None)):
self.work_api = api
break
except KeyError:
pass
if not self.work_api:
- if work_api is None:
+ if arvargs.work_api is None:
raise Exception("No supported APIs")
else:
raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
+ if self.work_api == "jobs":
+ logger.warn("""
+*******************************
+Using the deprecated 'jobs' API.
+
+To get rid of this warning:
+
+Users: read about migrating at
+http://doc.arvados.org/user/cwl/cwl-style.html#migrate
+and use the option --api=containers
+
+Admins: configure the cluster to disable the 'jobs' API as described at:
+http://doc.arvados.org/install/install-api-server.html#disable_api_methods
+*******************************""")
+
+ self.loadingContext = ArvLoadingContext(vars(arvargs))
+ self.loadingContext.fetcher_constructor = self.fetcher_constructor
+ self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
+ self.loadingContext.construct_tool_object = self.arv_make_tool
+
+
def arv_make_tool(self, toolpath_object, loadingContext):
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
return ArvadosCommandTool(self, toolpath_object, loadingContext)
elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
return ArvadosWorkflow(self, toolpath_object, loadingContext)
else:
- return cwltool.workflow.defaultMakeTool(toolpath_object, loadingContext)
+ return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
def output_callback(self, out, processStatus):
with self.workflow_eval_lock:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Complete"}).execute(num_retries=self.num_retries)
else:
- logger.warn("Overall process status is %s", processStatus)
+ logger.error("Overall process status is %s", processStatus)
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
'progress':1.0
}).execute(num_retries=self.num_retries)
- def arv_executor(self, tool, job_order, runtimeContext):
+ def arv_executor(self, tool, job_order, runtimeContext, logger=None):
self.debug = runtimeContext.debug
tool.visit(self.check_features)
# Reload tool object which may have been updated by
# upload_workflow_deps
# Don't validate this time because it will just print redundant errors.
+ loadingContext = self.loadingContext.copy()
+ loadingContext.loader = tool.doc_loader
+ loadingContext.avsc_names = tool.doc_schema
+ loadingContext.metadata = tool.metadata
+ loadingContext.do_validate = False
+
tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
- construct_tool_object=self.arv_make_tool,
- loader=tool.doc_loader,
- avsc_names=tool.doc_schema,
- metadata=tool.metadata,
- do_validate=False)
+ loadingContext)
# Upload local file references in the job order.
job_order = upload_job_order(self, "%s input" % runtimeContext.name,
runtimeContext = runtimeContext.copy()
runtimeContext.use_container = True
runtimeContext.tmpdir_prefix = "tmp"
+ runtimeContext.work_api = self.work_api
if self.work_api == "containers":
if self.ignore_docker_for_reuse:
submit_runner_image=runtimeContext.submit_runner_image,
intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
merged_map=merged_map,
- default_storage_classes=self.default_storage_classes,
priority=runtimeContext.priority,
secret_store=self.secret_store)
elif self.work_api == "jobs":
if self.output_tags is None:
self.output_tags = ""
- storage_classes = kwargs.get("storage_classes").strip().split(",")
+ storage_classes = runtimeContext.storage_classes.strip().split(",")
self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
self.set_crunch_output()
parser.add_argument("--submit-runner-ram", type=int,
help="RAM (in MiB) required for the workflow runner job (default 1024)",
- default=1024)
+ default=None)
parser.add_argument("--submit-runner-image", type=str,
help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
logger.error("Multiple storage classes are not supported currently.")
return 1
+ arvargs.use_container = True
+ arvargs.relax_path_checks = True
+ arvargs.print_supported_versions = False
+
if install_sig_handlers:
arv_cmd.install_signal_handlers()
if api_client is None:
api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
keep_client = api_client.keep
+ # Make an API object now so errors are reported early.
+ api_client.users().current().execute()
if keep_client is None:
keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
- runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
- num_retries=4, output_name=arvargs.output_name,
- output_tags=arvargs.output_tags, default_storage_classes=parser.get_default("storage_classes"),
- thread_count=arvargs.thread_count)
+ runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
except Exception as e:
logger.error(e)
return 1
else:
arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
- for key, val in six.iteritems(cwltool.argparser.get_default_args()):
+ for key, val in cwltool.argparser.get_default_args().items():
if not hasattr(arvargs, key):
setattr(arvargs, key, val)
- arvargs.use_container = True
- arvargs.relax_path_checks = True
- arvargs.print_supported_versions = False
-
- loadingContext = LoadingContext(vars(arvargs))
- loadingContext.fetcher_constructor = runner.fetcher_constructor
- loadingContext.resolver = partial(collectionResolver, api_client, num_retries=runner.num_retries)
- loadingContext.construct_tool_object = runner.arv_make_tool
-
- runtimeContext = RuntimeContext(vars(arvargs))
+ runtimeContext = ArvRuntimeContext(vars(arvargs))
runtimeContext.make_fs_access = partial(CollectionFsAccess,
collection_cache=runner.collection_cache)
versionfunc=versionstring,
job_order_object=job_order_object,
logger_handler=arvados.log_handler,
- custom_schema_callback=add_arv_hints)
+ custom_schema_callback=add_arv_hints,
+ loadingContext=runner.loadingContext,
+ runtimeContext=runtimeContext)