X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e4c5f98f696c354638bbba22ee4a1db20a52837c..28225aeb0336a4872bbaa3aae5d331172f1e4068:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 2fa6da7034..b2b93bf9e7 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -33,10 +33,11 @@ import arvados import arvados.config from arvados.keep import KeepClient from arvados.errors import ApiError +import arvados.commands._util as arv_cmd from .arvcontainer import ArvadosContainer, RunnerContainer from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate -from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies +from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps from .arvtool import ArvadosCommandTool from .arvworkflow import ArvadosWorkflow, upload_workflow from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache @@ -68,14 +69,13 @@ class ArvCwlRunner(object): """ def __init__(self, api_client, work_api=None, keep_client=None, - output_name=None, output_tags=None, num_retries=4, - thread_count=4): + output_name=None, output_tags=None, default_storage_classes="default", + num_retries=4, thread_count=4): self.api = api_client self.processes = {} self.workflow_eval_lock = threading.Condition(threading.RLock()) self.final_output = None self.final_status = None - self.uploaded = {} self.num_retries = num_retries self.uuid = None self.stop_polling = threading.Event() @@ -90,6 +90,7 @@ class ArvCwlRunner(object): self.trash_intermediate = False self.thread_count = thread_count self.poll_interval = 12 + self.default_storage_classes = default_storage_classes if keep_client is not None: self.keep_client = keep_client @@ -98,6 +99,11 @@ class ArvCwlRunner(object): self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries) + self.fetcher_constructor = partial(CollectionFetcher, + api_client=self.api, + fs_access=CollectionFsAccess("", collection_cache=self.collection_cache), + num_retries=self.num_retries) + self.work_api = None expected_api = ["jobs", "containers"] for api in expected_api: @@ -118,10 +124,7 @@ class ArvCwlRunner(object): def arv_make_tool(self, toolpath_object, **kwargs): kwargs["work_api"] = self.work_api - kwargs["fetcher_constructor"] = partial(CollectionFetcher, - api_client=self.api, - fs_access=CollectionFsAccess("", collection_cache=self.collection_cache), - num_retries=self.num_retries) + kwargs["fetcher_constructor"] = self.fetcher_constructor kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries) if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool": return ArvadosCommandTool(self, toolpath_object, **kwargs) @@ -154,10 +157,12 @@ class ArvCwlRunner(object): with self.workflow_eval_lock: self.processes[container.uuid] = container - def process_done(self, uuid): + def process_done(self, uuid, record): with self.workflow_eval_lock: - if uuid in self.processes: - del self.processes[uuid] + j = self.processes[uuid] + logger.info("%s %s is %s", self.label(j), uuid, record["state"]) + self.task_queue.add(partial(j.done, record)) + del self.processes[uuid] def wrapped_callback(self, cb, obj, st): with self.workflow_eval_lock: @@ -178,10 +183,7 @@ class ArvCwlRunner(object): j.update_pipeline_component(event["properties"]["new_attributes"]) logger.info("%s %s is Running", self.label(j), uuid) elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"): - with self.workflow_eval_lock: - j = self.processes[uuid] - self.task_queue.add(partial(j.done, event["properties"]["new_attributes"])) - logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"]) + self.process_done(uuid, event["properties"]["new_attributes"]) def label(self, obj): return "[%s %s]" % (self.work_api[0:-1], obj.name) @@ -236,12 +238,6 @@ class ArvCwlRunner(object): finally: self.stop_polling.set() - def get_uploaded(self): - return self.uploaded.copy() - - def add_uploaded(self, src, pair): - self.uploaded[src] = pair - def add_intermediate_output(self, uuid): if uuid: self.intermediate_output_collections.append(uuid) @@ -253,7 +249,7 @@ class ArvCwlRunner(object): self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries) except: logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False)) - if sys.exc_info()[0] is KeyboardInterrupt: + if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit: break def check_features(self, obj): @@ -277,7 +273,7 @@ class ArvCwlRunner(object): with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)): self.check_features(v) - def make_output_collection(self, name, tagsString, outputObj): + def make_output_collection(self, name, storage_classes, tagsString, outputObj): outputObj = copy.deepcopy(outputObj) files = [] @@ -328,7 +324,7 @@ class ArvCwlRunner(object): with final.open("cwl.output.json", "w") as f: json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': ')) - final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True) + final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True) logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(), final.api_response()["name"], @@ -399,6 +395,9 @@ class ArvCwlRunner(object): if self.intermediate_output_ttl < 0: raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl) + if kwargs.get("submit_request_uuid") and self.work_api != "containers": + raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api)) + if not kwargs.get("name"): kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"]) @@ -487,6 +486,7 @@ class ArvCwlRunner(object): submit_runner_image=kwargs.get("submit_runner_image"), intermediate_output_ttl=kwargs.get("intermediate_output_ttl"), merged_map=merged_map, + default_storage_classes=self.default_storage_classes, priority=kwargs.get("priority"), secret_store=self.secret_store) elif self.work_api == "jobs": @@ -509,7 +509,9 @@ class ArvCwlRunner(object): logger.info("Pipeline instance %s", self.pipeline["uuid"]) if runnerjob and not kwargs.get("wait"): - runnerjob.run(**kwargs) + submitargs = kwargs.copy() + submitargs['submit'] = False + runnerjob.run(**submitargs) return (runnerjob.uuid, "success") self.poll_api = arvados.api('v1') @@ -565,7 +567,7 @@ class ArvCwlRunner(object): except UnsupportedRequirement: raise except: - if sys.exc_info()[0] is KeyboardInterrupt: + if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit: logger.error("Interrupted, workflow will be cancelled") else: logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False)) @@ -588,6 +590,7 @@ class ArvCwlRunner(object): if self.final_output is None: raise WorkflowException("Workflow did not return a result.") + if kwargs.get("submit") and isinstance(runnerjob, Runner): logger.info("Final output collection %s", runnerjob.final_output) else: @@ -595,7 +598,9 @@ class ArvCwlRunner(object): self.output_name = "Output of %s" % (shortname(tool.tool["id"])) if self.output_tags is None: self.output_tags = "" - self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output) + + storage_classes = kwargs.get("storage_classes").strip().split(",") + self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output) self.set_crunch_output() if kwargs.get("compute_checksum"): @@ -702,6 +707,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__, default=None) + parser.add_argument("--submit-request-uuid", type=str, + default=None, + help="Update and commit supplied container request instead of creating a new one (containers API only).") + parser.add_argument("--name", type=str, help="Name to use for workflow execution instance.", default=None) @@ -713,6 +722,8 @@ def arg_parser(): # type: () -> argparse.ArgumentParser parser.add_argument("--enable-dev", action="store_true", help="Enable loading and running development versions " "of CWL spec.", default=False) + parser.add_argument('--storage-classes', default="default", type=str, + help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.") parser.add_argument("--intermediate-output-ttl", type=int, metavar="N", help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).", @@ -763,6 +774,10 @@ def add_arv_hints(): "http://arvados.org/cwl#ReuseRequirement" ]) +def exit_signal_handler(sigcode, frame): + logger.error("Caught signal {}, exiting.".format(sigcode)) + sys.exit(-sigcode) + def main(args, stdout, stderr, api_client=None, keep_client=None, install_sig_handlers=True): parser = arg_parser() @@ -770,8 +785,12 @@ def main(args, stdout, stderr, api_client=None, keep_client=None, job_order_object = None arvargs = parser.parse_args(args) + if len(arvargs.storage_classes.strip().split(',')) > 1: + logger.error("Multiple storage classes are not supported currently.") + return 1 + if install_sig_handlers: - signal.signal(signal.SIGTERM, lambda x, y: thread.interrupt_main()) + arv_cmd.install_signal_handlers() if arvargs.update_workflow: if arvargs.update_workflow.find('-7fd4e-') == 5: @@ -799,7 +818,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None, keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4) runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, num_retries=4, output_name=arvargs.output_name, - output_tags=arvargs.output_tags, + output_tags=arvargs.output_tags, default_storage_classes=parser.get_default("storage_classes"), thread_count=arvargs.thread_count) except Exception as e: logger.error(e)