X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/138fef8ee97f3cbd335434ad6acd26771fd0b762..9fa635c9dc288317f19471291accecf8690f5718:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index c2f43fe368..a7e698b6dc 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -19,6 +19,8 @@ from functools import partial import pkg_resources # part of setuptools import Queue import time +import signal +import thread from cwltool.errors import WorkflowException import cwltool.main @@ -26,27 +28,31 @@ import cwltool.workflow import cwltool.process from schema_salad.sourceline import SourceLine import schema_salad.validate as validate +import cwltool.argparser import arvados import arvados.config from arvados.keep import KeepClient from arvados.errors import ApiError +import arvados.commands._util as arv_cmd from .arvcontainer import ArvadosContainer, RunnerContainer from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate -from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies +from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps from .arvtool import ArvadosCommandTool from .arvworkflow import ArvadosWorkflow, upload_workflow from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache from .perf import Perf from .pathmapper import NoFollowPathMapper from .task_queue import TaskQueue +from .context import ArvLoadingContext, ArvRuntimeContext from ._version import __version__ from cwltool.pack import pack from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing from cwltool.command_line_tool import compute_checksums + from arvados.api import OrderedJsonModel logger = logging.getLogger('arvados.cwl-runner') @@ -65,29 +71,39 @@ class ArvCwlRunner(object): """ - def __init__(self, api_client, work_api=None, keep_client=None, - output_name=None, output_tags=None, num_retries=4, + def __init__(self, api_client, + arvargs=None, + keep_client=None, + num_retries=4, thread_count=4): + + if arvargs is None: + arvargs = argparse.Namespace() + arvargs.work_api = None + arvargs.output_name = None + arvargs.output_tags = None + arvargs.thread_count = 1 + self.api = api_client self.processes = {} self.workflow_eval_lock = threading.Condition(threading.RLock()) self.final_output = None self.final_status = None - self.uploaded = {} self.num_retries = num_retries self.uuid = None self.stop_polling = threading.Event() self.poll_api = None self.pipeline = None self.final_output_collection = None - self.output_name = output_name - self.output_tags = output_tags + self.output_name = arvargs.output_name + self.output_tags = arvargs.output_tags self.project_uuid = None self.intermediate_output_ttl = 0 self.intermediate_output_collections = [] self.trash_intermediate = False - self.thread_count = thread_count + self.thread_count = arvargs.thread_count self.poll_interval = 12 + self.loadingContext = None if keep_client is not None: self.keep_client = keep_client @@ -96,37 +112,42 @@ class ArvCwlRunner(object): self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries) + self.fetcher_constructor = partial(CollectionFetcher, + api_client=self.api, + fs_access=CollectionFsAccess("", collection_cache=self.collection_cache), + num_retries=self.num_retries) + self.work_api = None expected_api = ["jobs", "containers"] for api in expected_api: try: methods = self.api._rootDesc.get('resources')[api]['methods'] if ('httpMethod' in methods['create'] and - (work_api == api or work_api is None)): + (arvargs.work_api == api or arvargs.work_api is None)): self.work_api = api break except KeyError: pass if not self.work_api: - if work_api is None: + if arvargs.work_api is None: raise Exception("No supported APIs") else: raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api)) - def arv_make_tool(self, toolpath_object, **kwargs): - kwargs["work_api"] = self.work_api - kwargs["fetcher_constructor"] = partial(CollectionFetcher, - api_client=self.api, - fs_access=CollectionFsAccess("", collection_cache=self.collection_cache), - num_retries=self.num_retries) - kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries) + self.loadingContext = ArvLoadingContext(vars(arvargs)) + self.loadingContext.fetcher_constructor = self.fetcher_constructor + self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries) + self.loadingContext.construct_tool_object = self.arv_make_tool + + + def arv_make_tool(self, toolpath_object, loadingContext): if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool": - return ArvadosCommandTool(self, toolpath_object, **kwargs) + return ArvadosCommandTool(self, toolpath_object, loadingContext) elif "class" in toolpath_object and toolpath_object["class"] == "Workflow": - return ArvadosWorkflow(self, toolpath_object, **kwargs) + return ArvadosWorkflow(self, toolpath_object, loadingContext) else: - return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs) + return cwltool.workflow.default_make_tool(toolpath_object, loadingContext) def output_callback(self, out, processStatus): with self.workflow_eval_lock: @@ -136,7 +157,7 @@ class ArvCwlRunner(object): self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], body={"state": "Complete"}).execute(num_retries=self.num_retries) else: - logger.warn("Overall process status is %s", processStatus) + logger.error("Overall process status is %s", processStatus) if self.pipeline: self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], body={"state": "Failed"}).execute(num_retries=self.num_retries) @@ -145,17 +166,19 @@ class ArvCwlRunner(object): self.workflow_eval_lock.notifyAll() - def start_run(self, runnable, kwargs): - self.task_queue.add(partial(runnable.run, **kwargs)) + def start_run(self, runnable, runtimeContext): + self.task_queue.add(partial(runnable.run, runtimeContext)) def process_submitted(self, container): with self.workflow_eval_lock: self.processes[container.uuid] = container - def process_done(self, uuid): + def process_done(self, uuid, record): with self.workflow_eval_lock: - if uuid in self.processes: - del self.processes[uuid] + j = self.processes[uuid] + logger.info("%s %s is %s", self.label(j), uuid, record["state"]) + self.task_queue.add(partial(j.done, record)) + del self.processes[uuid] def wrapped_callback(self, cb, obj, st): with self.workflow_eval_lock: @@ -176,10 +199,7 @@ class ArvCwlRunner(object): j.update_pipeline_component(event["properties"]["new_attributes"]) logger.info("%s %s is Running", self.label(j), uuid) elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"): - with self.workflow_eval_lock: - j = self.processes[uuid] - self.task_queue.add(partial(j.done, event["properties"]["new_attributes"])) - logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"]) + self.process_done(uuid, event["properties"]["new_attributes"]) def label(self, obj): return "[%s %s]" % (self.work_api[0:-1], obj.name) @@ -234,12 +254,6 @@ class ArvCwlRunner(object): finally: self.stop_polling.set() - def get_uploaded(self): - return self.uploaded.copy() - - def add_uploaded(self, src, pair): - self.uploaded[src] = pair - def add_intermediate_output(self, uuid): if uuid: self.intermediate_output_collections.append(uuid) @@ -251,7 +265,7 @@ class ArvCwlRunner(object): self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries) except: logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False)) - if sys.exc_info()[0] is KeyboardInterrupt: + if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit: break def check_features(self, obj): @@ -275,7 +289,7 @@ class ArvCwlRunner(object): with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)): self.check_features(v) - def make_output_collection(self, name, tagsString, outputObj): + def make_output_collection(self, name, storage_classes, tagsString, outputObj): outputObj = copy.deepcopy(outputObj) files = [] @@ -326,7 +340,7 @@ class ArvCwlRunner(object): with final.open("cwl.output.json", "w") as f: json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': ')) - final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True) + final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True) logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(), final.api_response()["name"], @@ -375,30 +389,31 @@ class ArvCwlRunner(object): 'progress':1.0 }).execute(num_retries=self.num_retries) - def arv_executor(self, tool, job_order, **kwargs): - self.debug = kwargs.get("debug") + def arv_executor(self, tool, job_order, runtimeContext, logger=None): + self.debug = runtimeContext.debug tool.visit(self.check_features) - self.project_uuid = kwargs.get("project_uuid") + self.project_uuid = runtimeContext.project_uuid self.pipeline = None - make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, - collection_cache=self.collection_cache) - self.fs_access = make_fs_access(kwargs["basedir"]) - self.secret_store = kwargs.get("secret_store") + self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir) + self.secret_store = runtimeContext.secret_store - self.trash_intermediate = kwargs["trash_intermediate"] + self.trash_intermediate = runtimeContext.trash_intermediate if self.trash_intermediate and self.work_api != "containers": raise Exception("--trash-intermediate is only supported with --api=containers.") - self.intermediate_output_ttl = kwargs["intermediate_output_ttl"] + self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl if self.intermediate_output_ttl and self.work_api != "containers": raise Exception("--intermediate-output-ttl is only supported with --api=containers.") if self.intermediate_output_ttl < 0: raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl) - if not kwargs.get("name"): - kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"]) + if runtimeContext.submit_request_uuid and self.work_api != "containers": + raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api)) + + if not runtimeContext.name: + runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"]) # Upload direct dependencies of workflow steps, get back mapping of files to keep references. # Also uploads docker images. @@ -407,26 +422,28 @@ class ArvCwlRunner(object): # Reload tool object which may have been updated by # upload_workflow_deps # Don't validate this time because it will just print redundant errors. + loadingContext = self.loadingContext.copy() + loadingContext.loader = tool.doc_loader + loadingContext.avsc_names = tool.doc_schema + loadingContext.metadata = tool.metadata + loadingContext.do_validate = False + tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]], - makeTool=self.arv_make_tool, - loader=tool.doc_loader, - avsc_names=tool.doc_schema, - metadata=tool.metadata, - do_validate=False) + loadingContext) # Upload local file references in the job order. - job_order = upload_job_order(self, "%s input" % kwargs["name"], + job_order = upload_job_order(self, "%s input" % runtimeContext.name, tool, job_order) - existing_uuid = kwargs.get("update_workflow") - if existing_uuid or kwargs.get("create_workflow"): + existing_uuid = runtimeContext.update_workflow + if existing_uuid or runtimeContext.create_workflow: # Create a pipeline template or workflow record and exit. if self.work_api == "jobs": tmpl = RunnerTemplate(self, tool, job_order, - kwargs.get("enable_reuse"), + runtimeContext.enable_reuse, uuid=existing_uuid, - submit_runner_ram=kwargs.get("submit_runner_ram"), - name=kwargs["name"], + submit_runner_ram=runtimeContext.submit_runner_ram, + name=runtimeContext.name, merged_map=merged_map) tmpl.save() # cwltool.main will write our return value to stdout. @@ -435,79 +452,79 @@ class ArvCwlRunner(object): return (upload_workflow(self, tool, job_order, self.project_uuid, uuid=existing_uuid, - submit_runner_ram=kwargs.get("submit_runner_ram"), - name=kwargs["name"], + submit_runner_ram=runtimeContext.submit_runner_ram, + name=runtimeContext.name, merged_map=merged_map), "success") - self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse") - self.eval_timeout = kwargs.get("eval_timeout") + self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse + self.eval_timeout = runtimeContext.eval_timeout - kwargs["make_fs_access"] = make_fs_access - kwargs["enable_reuse"] = kwargs.get("enable_reuse") - kwargs["use_container"] = True - kwargs["tmpdir_prefix"] = "tmp" - kwargs["compute_checksum"] = kwargs.get("compute_checksum") + runtimeContext = runtimeContext.copy() + runtimeContext.use_container = True + runtimeContext.tmpdir_prefix = "tmp" if self.work_api == "containers": if self.ignore_docker_for_reuse: raise Exception("--ignore-docker-for-reuse not supported with containers API.") - kwargs["outdir"] = "/var/spool/cwl" - kwargs["docker_outdir"] = "/var/spool/cwl" - kwargs["tmpdir"] = "/tmp" - kwargs["docker_tmpdir"] = "/tmp" + runtimeContext.outdir = "/var/spool/cwl" + runtimeContext.docker_outdir = "/var/spool/cwl" + runtimeContext.tmpdir = "/tmp" + runtimeContext.docker_tmpdir = "/tmp" elif self.work_api == "jobs": - if kwargs["priority"] != DEFAULT_PRIORITY: + if runtimeContext.priority != DEFAULT_PRIORITY: raise Exception("--priority not implemented for jobs API.") - kwargs["outdir"] = "$(task.outdir)" - kwargs["docker_outdir"] = "$(task.outdir)" - kwargs["tmpdir"] = "$(task.tmpdir)" + runtimeContext.outdir = "$(task.outdir)" + runtimeContext.docker_outdir = "$(task.outdir)" + runtimeContext.tmpdir = "$(task.tmpdir)" - if kwargs["priority"] < 1 or kwargs["priority"] > 1000: + if runtimeContext.priority < 1 or runtimeContext.priority > 1000: raise Exception("--priority must be in the range 1..1000.") runnerjob = None - if kwargs.get("submit"): + if runtimeContext.submit: # Submit a runner job to run the workflow for us. if self.work_api == "containers": - if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"): - kwargs["runnerjob"] = tool.tool["id"] + if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait: + runtimeContext.runnerjob = tool.tool["id"] runnerjob = tool.job(job_order, self.output_callback, - **kwargs).next() + runtimeContext).next() else: - runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), + runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse, self.output_name, self.output_tags, - submit_runner_ram=kwargs.get("submit_runner_ram"), - name=kwargs.get("name"), - on_error=kwargs.get("on_error"), - submit_runner_image=kwargs.get("submit_runner_image"), - intermediate_output_ttl=kwargs.get("intermediate_output_ttl"), + submit_runner_ram=runtimeContext.submit_runner_ram, + name=runtimeContext.name, + on_error=runtimeContext.on_error, + submit_runner_image=runtimeContext.submit_runner_image, + intermediate_output_ttl=runtimeContext.intermediate_output_ttl, merged_map=merged_map, - priority=kwargs.get("priority"), + priority=runtimeContext.priority, secret_store=self.secret_store) elif self.work_api == "jobs": - runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), + runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse, self.output_name, self.output_tags, - submit_runner_ram=kwargs.get("submit_runner_ram"), - name=kwargs.get("name"), - on_error=kwargs.get("on_error"), - submit_runner_image=kwargs.get("submit_runner_image"), + submit_runner_ram=runtimeContext.submit_runner_ram, + name=runtimeContext.name, + on_error=runtimeContext.on_error, + submit_runner_image=runtimeContext.submit_runner_image, merged_map=merged_map) - elif "cwl_runner_job" not in kwargs and self.work_api == "jobs": + elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs": # Create pipeline for local run self.pipeline = self.api.pipeline_instances().create( body={ "owner_uuid": self.project_uuid, - "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]), + "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]), "components": {}, "state": "RunningOnClient"}).execute(num_retries=self.num_retries) logger.info("Pipeline instance %s", self.pipeline["uuid"]) - if runnerjob and not kwargs.get("wait"): - runnerjob.run(**kwargs) + if runnerjob and not runtimeContext.wait: + submitargs = runtimeContext.copy() + submitargs.submit = False + runnerjob.run(submitargs) return (runnerjob.uuid, "success") self.poll_api = arvados.api('v1') @@ -519,11 +536,11 @@ class ArvCwlRunner(object): if runnerjob: jobiter = iter((runnerjob,)) else: - if "cwl_runner_job" in kwargs: - self.uuid = kwargs.get("cwl_runner_job").get('uuid') + if runtimeContext.cwl_runner_job is not None: + self.uuid = runtimeContext.cwl_runner_job.get('uuid') jobiter = tool.job(job_order, self.output_callback, - **kwargs) + runtimeContext) try: self.workflow_eval_lock.acquire() @@ -545,7 +562,7 @@ class ArvCwlRunner(object): if runnable: with Perf(metrics, "run"): - self.start_run(runnable, kwargs) + self.start_run(runnable, runtimeContext) else: if (self.task_queue.in_flight + len(self.processes)) > 0: self.workflow_eval_lock.wait(3) @@ -563,8 +580,8 @@ class ArvCwlRunner(object): except UnsupportedRequirement: raise except: - if sys.exc_info()[0] is KeyboardInterrupt: - logger.error("Interrupted, marking pipeline as failed") + if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit: + logger.error("Interrupted, workflow will be cancelled") else: logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False)) if self.pipeline: @@ -586,17 +603,19 @@ class ArvCwlRunner(object): if self.final_output is None: raise WorkflowException("Workflow did not return a result.") - if kwargs.get("submit") and isinstance(runnerjob, Runner): + if runtimeContext.submit and isinstance(runnerjob, Runner): logger.info("Final output collection %s", runnerjob.final_output) else: if self.output_name is None: self.output_name = "Output of %s" % (shortname(tool.tool["id"])) if self.output_tags is None: self.output_tags = "" - self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output) + + storage_classes = runtimeContext.storage_classes.strip().split(",") + self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output) self.set_crunch_output() - if kwargs.get("compute_checksum"): + if runtimeContext.compute_checksum: adjustDirObjs(self.final_output, partial(get_listing, self.fs_access)) adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access)) @@ -700,6 +719,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__, default=None) + parser.add_argument("--submit-request-uuid", type=str, + default=None, + help="Update and commit supplied container request instead of creating a new one (containers API only).") + parser.add_argument("--name", type=str, help="Name to use for workflow execution instance.", default=None) @@ -711,6 +734,8 @@ def arg_parser(): # type: () -> argparse.ArgumentParser parser.add_argument("--enable-dev", action="store_true", help="Enable loading and running development versions " "of CWL spec.", default=False) + parser.add_argument('--storage-classes', default="default", type=str, + help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.") parser.add_argument("--intermediate-output-ttl", type=int, metavar="N", help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).", @@ -761,12 +786,28 @@ def add_arv_hints(): "http://arvados.org/cwl#ReuseRequirement" ]) -def main(args, stdout, stderr, api_client=None, keep_client=None): +def exit_signal_handler(sigcode, frame): + logger.error("Caught signal {}, exiting.".format(sigcode)) + sys.exit(-sigcode) + +def main(args, stdout, stderr, api_client=None, keep_client=None, + install_sig_handlers=True): parser = arg_parser() job_order_object = None arvargs = parser.parse_args(args) + if len(arvargs.storage_classes.strip().split(',')) > 1: + logger.error("Multiple storage classes are not supported currently.") + return 1 + + arvargs.use_container = True + arvargs.relax_path_checks = True + arvargs.print_supported_versions = False + + if install_sig_handlers: + arv_cmd.install_signal_handlers() + if arvargs.update_workflow: if arvargs.update_workflow.find('-7fd4e-') == 5: want_api = 'containers' @@ -791,10 +832,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None): keep_client = api_client.keep if keep_client is None: keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4) - runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, - num_retries=4, output_name=arvargs.output_name, - output_tags=arvargs.output_tags, - thread_count=arvargs.thread_count) + runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4) except Exception as e: logger.error(e) return 1 @@ -819,26 +857,21 @@ def main(args, stdout, stderr, api_client=None, keep_client=None): else: arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s')) - arvargs.conformance_test = None - arvargs.use_container = True - arvargs.relax_path_checks = True - arvargs.print_supported_versions = False + for key, val in cwltool.argparser.get_default_args().items(): + if not hasattr(arvargs, key): + setattr(arvargs, key, val) - make_fs_access = partial(CollectionFsAccess, - collection_cache=runner.collection_cache) + runtimeContext = ArvRuntimeContext(vars(arvargs)) + runtimeContext.make_fs_access = partial(CollectionFsAccess, + collection_cache=runner.collection_cache) return cwltool.main.main(args=arvargs, stdout=stdout, stderr=stderr, executor=runner.arv_executor, - makeTool=runner.arv_make_tool, versionfunc=versionstring, job_order_object=job_order_object, - make_fs_access=make_fs_access, - fetcher_constructor=partial(CollectionFetcher, - api_client=api_client, - fs_access=make_fs_access(""), - num_retries=runner.num_retries), - resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries), logger_handler=arvados.log_handler, - custom_schema_callback=add_arv_hints) + custom_schema_callback=add_arv_hints, + loadingContext=runner.loadingContext, + runtimeContext=runtimeContext)