X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/bf59702692c89a809f102d8bd4b9caf531f4c9be..28225aeb0336a4872bbaa3aae5d331172f1e4068:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index af6ab33f1f..b2b93bf9e7 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -17,32 +17,39 @@ import json import re from functools import partial import pkg_resources # part of setuptools +import Queue +import time +import signal +import thread from cwltool.errors import WorkflowException import cwltool.main import cwltool.workflow import cwltool.process from schema_salad.sourceline import SourceLine +import schema_salad.validate as validate import arvados import arvados.config from arvados.keep import KeepClient from arvados.errors import ApiError +import arvados.commands._util as arv_cmd from .arvcontainer import ArvadosContainer, RunnerContainer from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate -from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies +from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps from .arvtool import ArvadosCommandTool from .arvworkflow import ArvadosWorkflow, upload_workflow from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache from .perf import Perf from .pathmapper import NoFollowPathMapper +from .task_queue import TaskQueue from ._version import __version__ from cwltool.pack import pack from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing -from cwltool.draft2tool import compute_checksums +from cwltool.command_line_tool import compute_checksums from arvados.api import OrderedJsonModel logger = logging.getLogger('arvados.cwl-runner') @@ -53,20 +60,22 @@ arvados.log_handler.setFormatter(logging.Formatter( '%(asctime)s %(name)s %(levelname)s: %(message)s', '%Y-%m-%d %H:%M:%S')) +DEFAULT_PRIORITY = 500 + class ArvCwlRunner(object): """Execute a CWL tool or workflow, submit work (using either jobs or containers API), wait for them to complete, and report output. """ - def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4): + def __init__(self, api_client, work_api=None, keep_client=None, + output_name=None, output_tags=None, default_storage_classes="default", + num_retries=4, thread_count=4): self.api = api_client self.processes = {} - self.lock = threading.Lock() - self.cond = threading.Condition(self.lock) + self.workflow_eval_lock = threading.Condition(threading.RLock()) self.final_output = None self.final_status = None - self.uploaded = {} self.num_retries = num_retries self.uuid = None self.stop_polling = threading.Event() @@ -79,6 +88,9 @@ class ArvCwlRunner(object): self.intermediate_output_ttl = 0 self.intermediate_output_collections = [] self.trash_intermediate = False + self.thread_count = thread_count + self.poll_interval = 12 + self.default_storage_classes = default_storage_classes if keep_client is not None: self.keep_client = keep_client @@ -87,6 +99,11 @@ class ArvCwlRunner(object): self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries) + self.fetcher_constructor = partial(CollectionFetcher, + api_client=self.api, + fs_access=CollectionFsAccess("", collection_cache=self.collection_cache), + num_retries=self.num_retries) + self.work_api = None expected_api = ["jobs", "containers"] for api in expected_api: @@ -107,10 +124,7 @@ class ArvCwlRunner(object): def arv_make_tool(self, toolpath_object, **kwargs): kwargs["work_api"] = self.work_api - kwargs["fetcher_constructor"] = partial(CollectionFetcher, - api_client=self.api, - fs_access=CollectionFsAccess("", collection_cache=self.collection_cache), - num_retries=self.num_retries) + kwargs["fetcher_constructor"] = self.fetcher_constructor kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries) if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool": return ArvadosCommandTool(self, toolpath_object, **kwargs) @@ -120,40 +134,56 @@ class ArvCwlRunner(object): return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs) def output_callback(self, out, processStatus): - if processStatus == "success": - logger.info("Overall process status is %s", processStatus) - if self.pipeline: - self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], - body={"state": "Complete"}).execute(num_retries=self.num_retries) - else: - logger.warn("Overall process status is %s", processStatus) - if self.pipeline: - self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], - body={"state": "Failed"}).execute(num_retries=self.num_retries) - self.final_status = processStatus - self.final_output = out + with self.workflow_eval_lock: + if processStatus == "success": + logger.info("Overall process status is %s", processStatus) + if self.pipeline: + self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], + body={"state": "Complete"}).execute(num_retries=self.num_retries) + else: + logger.warn("Overall process status is %s", processStatus) + if self.pipeline: + self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], + body={"state": "Failed"}).execute(num_retries=self.num_retries) + self.final_status = processStatus + self.final_output = out + self.workflow_eval_lock.notifyAll() + + + def start_run(self, runnable, kwargs): + self.task_queue.add(partial(runnable.run, **kwargs)) + + def process_submitted(self, container): + with self.workflow_eval_lock: + self.processes[container.uuid] = container + + def process_done(self, uuid, record): + with self.workflow_eval_lock: + j = self.processes[uuid] + logger.info("%s %s is %s", self.label(j), uuid, record["state"]) + self.task_queue.add(partial(j.done, record)) + del self.processes[uuid] + + def wrapped_callback(self, cb, obj, st): + with self.workflow_eval_lock: + cb(obj, st) + self.workflow_eval_lock.notifyAll() + + def get_wrapped_callback(self, cb): + return partial(self.wrapped_callback, cb) def on_message(self, event): - if "object_uuid" in event: - if event["object_uuid"] in self.processes and event["event_type"] == "update": - if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False: - uuid = event["object_uuid"] - with self.lock: - j = self.processes[uuid] - logger.info("%s %s is Running", self.label(j), uuid) + if event.get("object_uuid") in self.processes and event["event_type"] == "update": + uuid = event["object_uuid"] + if event["properties"]["new_attributes"]["state"] == "Running": + with self.workflow_eval_lock: + j = self.processes[uuid] + if j.running is False: j.running = True j.update_pipeline_component(event["properties"]["new_attributes"]) - elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"): - uuid = event["object_uuid"] - try: - self.cond.acquire() - j = self.processes[uuid] - logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"]) - with Perf(metrics, "done %s" % j.name): - j.done(event["properties"]["new_attributes"]) - self.cond.notify() - finally: - self.cond.release() + logger.info("%s %s is Running", self.label(j), uuid) + elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"): + self.process_done(uuid, event["properties"]["new_attributes"]) def label(self, obj): return "[%s %s]" % (self.work_api[0:-1], obj.name) @@ -165,15 +195,19 @@ class ArvCwlRunner(object): """ try: + remain_wait = self.poll_interval while True: - self.stop_polling.wait(15) + if remain_wait > 0: + self.stop_polling.wait(remain_wait) if self.stop_polling.is_set(): break - with self.lock: - keys = self.processes.keys() + with self.workflow_eval_lock: + keys = list(self.processes.keys()) if not keys: + remain_wait = self.poll_interval continue + begin_poll = time.time() if self.work_api == "containers": table = self.poll_api.container_requests() elif self.work_api == "jobs": @@ -183,6 +217,7 @@ class ArvCwlRunner(object): proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries) except Exception as e: logger.warn("Error checking states on API server: %s", e) + remain_wait = self.poll_interval continue for p in proc_states["items"]: @@ -193,21 +228,16 @@ class ArvCwlRunner(object): "new_attributes": p } }) + finish_poll = time.time() + remain_wait = self.poll_interval - (finish_poll - begin_poll) except: - logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False)) - self.cond.acquire() - self.processes.clear() - self.cond.notify() - self.cond.release() + logger.exception("Fatal error in state polling thread.") + with self.workflow_eval_lock: + self.processes.clear() + self.workflow_eval_lock.notifyAll() finally: self.stop_polling.set() - def get_uploaded(self): - return self.uploaded.copy() - - def add_uploaded(self, src, pair): - self.uploaded[src] = pair - def add_intermediate_output(self, uuid): if uuid: self.intermediate_output_collections.append(uuid) @@ -219,7 +249,7 @@ class ArvCwlRunner(object): self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries) except: logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False)) - if sys.exc_info()[0] is KeyboardInterrupt: + if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit: break def check_features(self, obj): @@ -227,9 +257,15 @@ class ArvCwlRunner(object): if obj.get("writable") and self.work_api != "containers": raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs") if obj.get("class") == "DockerRequirement": - if obj.get("dockerOutputDirectory") and self.work_api != "containers": - raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError( - "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.") + if obj.get("dockerOutputDirectory"): + if self.work_api != "containers": + raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError( + "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.") + if not obj.get("dockerOutputDirectory").startswith('/'): + raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError( + "Option 'dockerOutputDirectory' must be an absolute path.") + if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers": + raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs") for v in obj.itervalues(): self.check_features(v) elif isinstance(obj, list): @@ -237,7 +273,7 @@ class ArvCwlRunner(object): with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)): self.check_features(v) - def make_output_collection(self, name, tagsString, outputObj): + def make_output_collection(self, name, storage_classes, tagsString, outputObj): outputObj = copy.deepcopy(outputObj) files = [] @@ -278,7 +314,7 @@ class ArvCwlRunner(object): def rewrite(fileobj): fileobj["location"] = generatemapper.mapper(fileobj["location"]).target - for k in ("basename", "listing", "contents", "nameext", "nameroot", "dirname"): + for k in ("listing", "contents", "nameext", "nameroot", "dirname"): if k in fileobj: del fileobj[k] @@ -288,7 +324,7 @@ class ArvCwlRunner(object): with final.open("cwl.output.json", "w") as f: json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': ')) - final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True) + final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True) logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(), final.api_response()["name"], @@ -347,7 +383,7 @@ class ArvCwlRunner(object): make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, collection_cache=self.collection_cache) self.fs_access = make_fs_access(kwargs["basedir"]) - + self.secret_store = kwargs.get("secret_store") self.trash_intermediate = kwargs["trash_intermediate"] if self.trash_intermediate and self.work_api != "containers": @@ -359,6 +395,9 @@ class ArvCwlRunner(object): if self.intermediate_output_ttl < 0: raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl) + if kwargs.get("submit_request_uuid") and self.work_api != "containers": + raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api)) + if not kwargs.get("name"): kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"]) @@ -368,11 +407,13 @@ class ArvCwlRunner(object): # Reload tool object which may have been updated by # upload_workflow_deps + # Don't validate this time because it will just print redundant errors. tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]], makeTool=self.arv_make_tool, loader=tool.doc_loader, avsc_names=tool.doc_schema, - metadata=tool.metadata) + metadata=tool.metadata, + do_validate=False) # Upload local file references in the job order. job_order = upload_job_order(self, "%s input" % kwargs["name"], @@ -401,6 +442,7 @@ class ArvCwlRunner(object): "success") self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse") + self.eval_timeout = kwargs.get("eval_timeout") kwargs["make_fs_access"] = make_fs_access kwargs["enable_reuse"] = kwargs.get("enable_reuse") @@ -409,15 +451,22 @@ class ArvCwlRunner(object): kwargs["compute_checksum"] = kwargs.get("compute_checksum") if self.work_api == "containers": + if self.ignore_docker_for_reuse: + raise Exception("--ignore-docker-for-reuse not supported with containers API.") kwargs["outdir"] = "/var/spool/cwl" kwargs["docker_outdir"] = "/var/spool/cwl" kwargs["tmpdir"] = "/tmp" kwargs["docker_tmpdir"] = "/tmp" elif self.work_api == "jobs": + if kwargs["priority"] != DEFAULT_PRIORITY: + raise Exception("--priority not implemented for jobs API.") kwargs["outdir"] = "$(task.outdir)" kwargs["docker_outdir"] = "$(task.outdir)" kwargs["tmpdir"] = "$(task.tmpdir)" + if kwargs["priority"] < 1 or kwargs["priority"] > 1000: + raise Exception("--priority must be in the range 1..1000.") + runnerjob = None if kwargs.get("submit"): # Submit a runner job to run the workflow for us. @@ -436,7 +485,10 @@ class ArvCwlRunner(object): on_error=kwargs.get("on_error"), submit_runner_image=kwargs.get("submit_runner_image"), intermediate_output_ttl=kwargs.get("intermediate_output_ttl"), - merged_map=merged_map) + merged_map=merged_map, + default_storage_classes=self.default_storage_classes, + priority=kwargs.get("priority"), + secret_store=self.secret_store) elif self.work_api == "jobs": runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, @@ -457,13 +509,17 @@ class ArvCwlRunner(object): logger.info("Pipeline instance %s", self.pipeline["uuid"]) if runnerjob and not kwargs.get("wait"): - runnerjob.run(wait=kwargs.get("wait")) + submitargs = kwargs.copy() + submitargs['submit'] = False + runnerjob.run(**submitargs) return (runnerjob.uuid, "success") self.poll_api = arvados.api('v1') self.polling_thread = threading.Thread(target=self.poll_states) self.polling_thread.start() + self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count) + if runnerjob: jobiter = iter((runnerjob,)) else: @@ -474,10 +530,11 @@ class ArvCwlRunner(object): **kwargs) try: - self.cond.acquire() - # Will continue to hold the lock for the duration of this code - # except when in cond.wait(), at which point on_message can update - # job state and process output callbacks. + self.workflow_eval_lock.acquire() + # Holds the lock while this code runs and releases it when + # it is safe to do so in self.workflow_eval_lock.wait(), + # at which point on_message can update job state and + # process output callbacks. loopperf = Perf(metrics, "jobiter") loopperf.__enter__() @@ -487,26 +544,31 @@ class ArvCwlRunner(object): if self.stop_polling.is_set(): break + if self.task_queue.error is not None: + raise self.task_queue.error + if runnable: with Perf(metrics, "run"): - runnable.run(**kwargs) + self.start_run(runnable, kwargs) else: - if self.processes: - self.cond.wait(1) + if (self.task_queue.in_flight + len(self.processes)) > 0: + self.workflow_eval_lock.wait(3) else: - logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.") + logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.") break loopperf.__enter__() loopperf.__exit__() - while self.processes: - self.cond.wait(1) + while (self.task_queue.in_flight + len(self.processes)) > 0: + if self.task_queue.error is not None: + raise self.task_queue.error + self.workflow_eval_lock.wait(3) except UnsupportedRequirement: raise except: - if sys.exc_info()[0] is KeyboardInterrupt: - logger.error("Interrupted, marking pipeline as failed") + if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit: + logger.error("Interrupted, workflow will be cancelled") else: logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False)) if self.pipeline: @@ -516,9 +578,11 @@ class ArvCwlRunner(object): self.api.container_requests().update(uuid=runnerjob.uuid, body={"priority": "0"}).execute(num_retries=self.num_retries) finally: - self.cond.release() + self.workflow_eval_lock.release() + self.task_queue.drain() self.stop_polling.set() self.polling_thread.join() + self.task_queue.join() if self.final_status == "UnsupportedRequirement": raise UnsupportedRequirement("Check log for details.") @@ -526,6 +590,7 @@ class ArvCwlRunner(object): if self.final_output is None: raise WorkflowException("Workflow did not return a result.") + if kwargs.get("submit") and isinstance(runnerjob, Runner): logger.info("Final output collection %s", runnerjob.final_output) else: @@ -533,7 +598,9 @@ class ArvCwlRunner(object): self.output_name = "Output of %s" % (shortname(tool.tool["id"])) if self.output_tags is None: self.output_tags = "" - self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output) + + storage_classes = kwargs.get("storage_classes").strip().split(",") + self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output) self.set_crunch_output() if kwargs.get("compute_checksum"): @@ -574,7 +641,7 @@ def arg_parser(): # type: () -> argparse.ArgumentParser exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--print-dot", action="store_true", help="Print workflow visualization in graphviz format and exit") - exgroup.add_argument("--version", action="store_true", help="Print version and exit") + exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring()) exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.") exgroup = parser.add_mutually_exclusive_group() @@ -640,6 +707,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__, default=None) + parser.add_argument("--submit-request-uuid", type=str, + default=None, + help="Update and commit supplied container request instead of creating a new one (containers API only).") + parser.add_argument("--name", type=str, help="Name to use for workflow execution instance.", default=None) @@ -651,11 +722,28 @@ def arg_parser(): # type: () -> argparse.ArgumentParser parser.add_argument("--enable-dev", action="store_true", help="Enable loading and running development versions " "of CWL spec.", default=False) + parser.add_argument('--storage-classes', default="default", type=str, + help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.") parser.add_argument("--intermediate-output-ttl", type=int, metavar="N", help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).", default=0) + parser.add_argument("--priority", type=int, + help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)", + default=DEFAULT_PRIORITY) + + parser.add_argument("--disable-validate", dest="do_validate", + action="store_false", default=True, + help=argparse.SUPPRESS) + + parser.add_argument("--disable-js-validation", + action="store_true", default=False, + help=argparse.SUPPRESS) + + parser.add_argument("--thread-count", type=int, + default=4, help="Number of threads to use for job submit and output collection.") + exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--trash-intermediate", action="store_true", default=False, dest="trash_intermediate", @@ -664,14 +752,14 @@ def arg_parser(): # type: () -> argparse.ArgumentParser default=False, dest="trash_intermediate", help="Do not trash intermediate outputs (default).") - parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute") + parser.add_argument("workflow", type=str, default=None, help="The workflow to execute") parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.") return parser def add_arv_hints(): - cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*") - cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE + cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*") + cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml') use_custom_schema("v1.0", "http://arvados.org/cwl", res.read()) res.close() @@ -686,15 +774,23 @@ def add_arv_hints(): "http://arvados.org/cwl#ReuseRequirement" ]) -def main(args, stdout, stderr, api_client=None, keep_client=None): +def exit_signal_handler(sigcode, frame): + logger.error("Caught signal {}, exiting.".format(sigcode)) + sys.exit(-sigcode) + +def main(args, stdout, stderr, api_client=None, keep_client=None, + install_sig_handlers=True): parser = arg_parser() job_order_object = None arvargs = parser.parse_args(args) - if arvargs.version: - print versionstring() - return + if len(arvargs.storage_classes.strip().split(',')) > 1: + logger.error("Multiple storage classes are not supported currently.") + return 1 + + if install_sig_handlers: + arv_cmd.install_signal_handlers() if arvargs.update_workflow: if arvargs.update_workflow.find('-7fd4e-') == 5: @@ -716,12 +812,14 @@ def main(args, stdout, stderr, api_client=None, keep_client=None): try: if api_client is None: - api_client=arvados.api('v1', model=OrderedJsonModel()) + api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}) + keep_client = api_client.keep if keep_client is None: keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4) runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, num_retries=4, output_name=arvargs.output_name, - output_tags=arvargs.output_tags) + output_tags=arvargs.output_tags, default_storage_classes=parser.get_default("storage_classes"), + thread_count=arvargs.thread_count) except Exception as e: logger.error(e) return 1