import cwltool.process
from schema_salad.sourceline import SourceLine
import schema_salad.validate as validate
+import cwltool.argparser
import arvados
import arvados.config
from .perf import Perf
from .pathmapper import NoFollowPathMapper
from .task_queue import TaskQueue
+from .context import ArvLoadingContext, ArvRuntimeContext
from ._version import __version__
from cwltool.pack import pack
from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
from cwltool.command_line_tool import compute_checksums
+
from arvados.api import OrderedJsonModel
logger = logging.getLogger('arvados.cwl-runner')
"""
- def __init__(self, api_client, work_api=None, keep_client=None,
- output_name=None, output_tags=None, num_retries=4,
+ def __init__(self, api_client,
+ arvargs=None,
+ keep_client=None,
+ num_retries=4,
thread_count=4):
+
+ if arvargs is None:
+ arvargs = argparse.Namespace()
+ arvargs.work_api = None
+ arvargs.output_name = None
+ arvargs.output_tags = None
+ arvargs.thread_count = 1
+
self.api = api_client
self.processes = {}
self.workflow_eval_lock = threading.Condition(threading.RLock())
self.poll_api = None
self.pipeline = None
self.final_output_collection = None
- self.output_name = output_name
- self.output_tags = output_tags
+ self.output_name = arvargs.output_name
+ self.output_tags = arvargs.output_tags
self.project_uuid = None
self.intermediate_output_ttl = 0
self.intermediate_output_collections = []
self.trash_intermediate = False
- self.thread_count = thread_count
+ self.thread_count = arvargs.thread_count
self.poll_interval = 12
+ self.loadingContext = None
if keep_client is not None:
self.keep_client = keep_client
try:
methods = self.api._rootDesc.get('resources')[api]['methods']
if ('httpMethod' in methods['create'] and
- (work_api == api or work_api is None)):
+ (arvargs.work_api == api or arvargs.work_api is None)):
self.work_api = api
break
except KeyError:
pass
if not self.work_api:
- if work_api is None:
+ if arvargs.work_api is None:
raise Exception("No supported APIs")
else:
raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
- def arv_make_tool(self, toolpath_object, **kwargs):
- kwargs["work_api"] = self.work_api
- kwargs["fetcher_constructor"] = self.fetcher_constructor
- kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
+ if self.work_api == "jobs":
+ logger.warn("""
+*******************************
+Using the deprecated 'jobs' API.
+
+To get rid of this warning:
+
+Users: read about migrating at
+http://doc.arvados.org/user/cwl/cwl-style.html#migrate
+and use the option --api=containers
+
+Admins: configure the cluster to disable the 'jobs' API as described at:
+http://doc.arvados.org/install/install-api-server.html#disable_api_methods
+*******************************""")
+
+ self.loadingContext = ArvLoadingContext(vars(arvargs))
+ self.loadingContext.fetcher_constructor = self.fetcher_constructor
+ self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
+ self.loadingContext.construct_tool_object = self.arv_make_tool
+
+
+ def arv_make_tool(self, toolpath_object, loadingContext):
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
- return ArvadosCommandTool(self, toolpath_object, **kwargs)
+ return ArvadosCommandTool(self, toolpath_object, loadingContext)
elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
- return ArvadosWorkflow(self, toolpath_object, **kwargs)
+ return ArvadosWorkflow(self, toolpath_object, loadingContext)
else:
- return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
+ return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
def output_callback(self, out, processStatus):
with self.workflow_eval_lock:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Complete"}).execute(num_retries=self.num_retries)
else:
- logger.warn("Overall process status is %s", processStatus)
+ logger.error("Overall process status is %s", processStatus)
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
self.workflow_eval_lock.notifyAll()
- def start_run(self, runnable, kwargs):
- self.task_queue.add(partial(runnable.run, **kwargs))
+ def start_run(self, runnable, runtimeContext):
+ self.task_queue.add(partial(runnable.run, runtimeContext))
def process_submitted(self, container):
with self.workflow_eval_lock:
with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
self.check_features(v)
- def make_output_collection(self, name, tagsString, outputObj):
+ def make_output_collection(self, name, storage_classes, tagsString, outputObj):
outputObj = copy.deepcopy(outputObj)
files = []
with final.open("cwl.output.json", "w") as f:
json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
- final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
+ final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
final.api_response()["name"],
'progress':1.0
}).execute(num_retries=self.num_retries)
- def arv_executor(self, tool, job_order, **kwargs):
- self.debug = kwargs.get("debug")
+ def arv_executor(self, tool, job_order, runtimeContext, logger=None):
+ self.debug = runtimeContext.debug
tool.visit(self.check_features)
- self.project_uuid = kwargs.get("project_uuid")
+ self.project_uuid = runtimeContext.project_uuid
self.pipeline = None
- make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
- collection_cache=self.collection_cache)
- self.fs_access = make_fs_access(kwargs["basedir"])
- self.secret_store = kwargs.get("secret_store")
+ self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
+ self.secret_store = runtimeContext.secret_store
- self.trash_intermediate = kwargs["trash_intermediate"]
+ self.trash_intermediate = runtimeContext.trash_intermediate
if self.trash_intermediate and self.work_api != "containers":
raise Exception("--trash-intermediate is only supported with --api=containers.")
- self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
+ self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
if self.intermediate_output_ttl and self.work_api != "containers":
raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
if self.intermediate_output_ttl < 0:
raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
- if kwargs.get("submit_request_uuid") and self.work_api != "containers":
+ if runtimeContext.submit_request_uuid and self.work_api != "containers":
raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
- if not kwargs.get("name"):
- kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
+ if not runtimeContext.name:
+ runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
# Reload tool object which may have been updated by
# upload_workflow_deps
# Don't validate this time because it will just print redundant errors.
+ loadingContext = self.loadingContext.copy()
+ loadingContext.loader = tool.doc_loader
+ loadingContext.avsc_names = tool.doc_schema
+ loadingContext.metadata = tool.metadata
+ loadingContext.do_validate = False
+
tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
- makeTool=self.arv_make_tool,
- loader=tool.doc_loader,
- avsc_names=tool.doc_schema,
- metadata=tool.metadata,
- do_validate=False)
+ loadingContext)
# Upload local file references in the job order.
- job_order = upload_job_order(self, "%s input" % kwargs["name"],
+ job_order = upload_job_order(self, "%s input" % runtimeContext.name,
tool, job_order)
- existing_uuid = kwargs.get("update_workflow")
- if existing_uuid or kwargs.get("create_workflow"):
+ existing_uuid = runtimeContext.update_workflow
+ if existing_uuid or runtimeContext.create_workflow:
# Create a pipeline template or workflow record and exit.
if self.work_api == "jobs":
tmpl = RunnerTemplate(self, tool, job_order,
- kwargs.get("enable_reuse"),
+ runtimeContext.enable_reuse,
uuid=existing_uuid,
- submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs["name"],
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
merged_map=merged_map)
tmpl.save()
# cwltool.main will write our return value to stdout.
return (upload_workflow(self, tool, job_order,
self.project_uuid,
uuid=existing_uuid,
- submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs["name"],
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
merged_map=merged_map),
"success")
- self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
- self.eval_timeout = kwargs.get("eval_timeout")
+ self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
+ self.eval_timeout = runtimeContext.eval_timeout
- kwargs["make_fs_access"] = make_fs_access
- kwargs["enable_reuse"] = kwargs.get("enable_reuse")
- kwargs["use_container"] = True
- kwargs["tmpdir_prefix"] = "tmp"
- kwargs["compute_checksum"] = kwargs.get("compute_checksum")
+ runtimeContext = runtimeContext.copy()
+ runtimeContext.use_container = True
+ runtimeContext.tmpdir_prefix = "tmp"
+ runtimeContext.work_api = self.work_api
if self.work_api == "containers":
if self.ignore_docker_for_reuse:
raise Exception("--ignore-docker-for-reuse not supported with containers API.")
- kwargs["outdir"] = "/var/spool/cwl"
- kwargs["docker_outdir"] = "/var/spool/cwl"
- kwargs["tmpdir"] = "/tmp"
- kwargs["docker_tmpdir"] = "/tmp"
+ runtimeContext.outdir = "/var/spool/cwl"
+ runtimeContext.docker_outdir = "/var/spool/cwl"
+ runtimeContext.tmpdir = "/tmp"
+ runtimeContext.docker_tmpdir = "/tmp"
elif self.work_api == "jobs":
- if kwargs["priority"] != DEFAULT_PRIORITY:
+ if runtimeContext.priority != DEFAULT_PRIORITY:
raise Exception("--priority not implemented for jobs API.")
- kwargs["outdir"] = "$(task.outdir)"
- kwargs["docker_outdir"] = "$(task.outdir)"
- kwargs["tmpdir"] = "$(task.tmpdir)"
+ runtimeContext.outdir = "$(task.outdir)"
+ runtimeContext.docker_outdir = "$(task.outdir)"
+ runtimeContext.tmpdir = "$(task.tmpdir)"
- if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
+ if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
raise Exception("--priority must be in the range 1..1000.")
runnerjob = None
- if kwargs.get("submit"):
+ if runtimeContext.submit:
# Submit a runner job to run the workflow for us.
if self.work_api == "containers":
- if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
- kwargs["runnerjob"] = tool.tool["id"]
+ if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
+ runtimeContext.runnerjob = tool.tool["id"]
runnerjob = tool.job(job_order,
self.output_callback,
- **kwargs).next()
+ runtimeContext).next()
else:
- runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
+ runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
self.output_name,
self.output_tags,
- submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name"),
- on_error=kwargs.get("on_error"),
- submit_runner_image=kwargs.get("submit_runner_image"),
- intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
+ on_error=runtimeContext.on_error,
+ submit_runner_image=runtimeContext.submit_runner_image,
+ intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
merged_map=merged_map,
- priority=kwargs.get("priority"),
+ priority=runtimeContext.priority,
secret_store=self.secret_store)
elif self.work_api == "jobs":
- runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
+ runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
self.output_name,
self.output_tags,
- submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name"),
- on_error=kwargs.get("on_error"),
- submit_runner_image=kwargs.get("submit_runner_image"),
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
+ on_error=runtimeContext.on_error,
+ submit_runner_image=runtimeContext.submit_runner_image,
merged_map=merged_map)
- elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
+ elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
# Create pipeline for local run
self.pipeline = self.api.pipeline_instances().create(
body={
"owner_uuid": self.project_uuid,
- "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
+ "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
"components": {},
"state": "RunningOnClient"}).execute(num_retries=self.num_retries)
logger.info("Pipeline instance %s", self.pipeline["uuid"])
- if runnerjob and not kwargs.get("wait"):
- submitargs = kwargs.copy()
- submitargs['submit'] = False
- runnerjob.run(**submitargs)
+ if runnerjob and not runtimeContext.wait:
+ submitargs = runtimeContext.copy()
+ submitargs.submit = False
+ runnerjob.run(submitargs)
return (runnerjob.uuid, "success")
self.poll_api = arvados.api('v1', timeout=kwargs["http_timeout"])
if runnerjob:
jobiter = iter((runnerjob,))
else:
- if "cwl_runner_job" in kwargs:
- self.uuid = kwargs.get("cwl_runner_job").get('uuid')
+ if runtimeContext.cwl_runner_job is not None:
+ self.uuid = runtimeContext.cwl_runner_job.get('uuid')
jobiter = tool.job(job_order,
self.output_callback,
- **kwargs)
+ runtimeContext)
try:
self.workflow_eval_lock.acquire()
if runnable:
with Perf(metrics, "run"):
- self.start_run(runnable, kwargs)
+ self.start_run(runnable, runtimeContext)
else:
if (self.task_queue.in_flight + len(self.processes)) > 0:
self.workflow_eval_lock.wait(3)
if self.final_output is None:
raise WorkflowException("Workflow did not return a result.")
- if kwargs.get("submit") and isinstance(runnerjob, Runner):
+ if runtimeContext.submit and isinstance(runnerjob, Runner):
logger.info("Final output collection %s", runnerjob.final_output)
else:
if self.output_name is None:
self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
if self.output_tags is None:
self.output_tags = ""
- self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
+
+ storage_classes = runtimeContext.storage_classes.strip().split(",")
+ self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
self.set_crunch_output()
- if kwargs.get("compute_checksum"):
+ if runtimeContext.compute_checksum:
adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
parser.add_argument("--submit-runner-ram", type=int,
help="RAM (in MiB) required for the workflow runner job (default 1024)",
- default=1024)
+ default=None)
parser.add_argument("--submit-runner-image", type=str,
help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
parser.add_argument("--enable-dev", action="store_true",
help="Enable loading and running development versions "
"of CWL spec.", default=False)
+ parser.add_argument('--storage-classes', default="default", type=str,
+ help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
job_order_object = None
arvargs = parser.parse_args(args)
+ if len(arvargs.storage_classes.strip().split(',')) > 1:
+ logger.error("Multiple storage classes are not supported currently.")
+ return 1
+
+ arvargs.use_container = True
+ arvargs.relax_path_checks = True
+ arvargs.print_supported_versions = False
+
if install_sig_handlers:
arv_cmd.install_signal_handlers()
api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
keep_params={"num_retries": 4})
keep_client = api_client.keep
+ # Make an API object now so errors are reported early.
+ api_client.users().current().execute()
if keep_client is None:
keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
- runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
- num_retries=4, output_name=arvargs.output_name,
- output_tags=arvargs.output_tags,
- thread_count=arvargs.thread_count)
+ runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
except Exception as e:
logger.error(e)
return 1
else:
arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
- arvargs.conformance_test = None
- arvargs.use_container = True
- arvargs.relax_path_checks = True
- arvargs.print_supported_versions = False
+ for key, val in cwltool.argparser.get_default_args().items():
+ if not hasattr(arvargs, key):
+ setattr(arvargs, key, val)
- make_fs_access = partial(CollectionFsAccess,
- collection_cache=runner.collection_cache)
+ runtimeContext = ArvRuntimeContext(vars(arvargs))
+ runtimeContext.make_fs_access = partial(CollectionFsAccess,
+ collection_cache=runner.collection_cache)
return cwltool.main.main(args=arvargs,
stdout=stdout,
stderr=stderr,
executor=runner.arv_executor,
- makeTool=runner.arv_make_tool,
versionfunc=versionstring,
job_order_object=job_order_object,
- make_fs_access=make_fs_access,
- fetcher_constructor=partial(CollectionFetcher,
- api_client=api_client,
- fs_access=make_fs_access(""),
- num_retries=runner.num_retries),
- resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
logger_handler=arvados.log_handler,
- custom_schema_callback=add_arv_hints)
+ custom_schema_callback=add_arv_hints,
+ loadingContext=runner.loadingContext,
+ runtimeContext=runtimeContext)