X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e8e78685ed7d893433e1ebe799a66084e39a0345..92c876d90253a0db23d40aa125cba335f7bf7e27:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 885497171e..339b91ce59 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -11,6 +11,7 @@ import cwltool.draft2tool import cwltool.workflow import cwltool.main from cwltool.process import shortname +from cwltool.errors import WorkflowException import threading import cwltool.docker import fnmatch @@ -25,16 +26,12 @@ from arvados.api import OrderedJsonModel logger = logging.getLogger('arvados.cwl-runner') logger.setLevel(logging.INFO) -crunchrunner_pdh = "83db29f08544e1c319572a6bd971088a+140" -crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner" -certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt" - tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)") outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)") keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)") -def arv_docker_get_image(api_client, dockerRequirement, pull_image): +def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid): if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement: dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"] @@ -48,10 +45,10 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image): if not images: imageId = cwltool.docker.get_image(dockerRequirement, pull_image) - args = [image_name] + args = ["--project-uuid="+project_uuid, image_name] if image_tag: args.append(image_tag) - logger.info("Uploading Docker image %s", ":".join(args)) + logger.info("Uploading Docker image %s", ":".join(args[1:])) arvados.commands.keepdocker.main(args) return dockerRequirement["dockerImageId"] @@ -150,16 +147,36 @@ class ArvadosJob(object): (docker_req, docker_is_req) = get_feature(self, "DockerRequirement") if docker_req and kwargs.get("use_container") is not False: - runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image) + runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid) + else: + runtime_constraints["docker_image"] = "arvados/jobs" + + resources = self.builder.resources + if resources is not None: + runtime_constraints["min_cores_per_node"] = resources.get("cores", 1) + runtime_constraints["min_ram_mb_per_node"] = resources.get("ram") + runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0) + + filters = [["repository", "=", "arvados"], + ["script", "=", "crunchrunner"], + ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]] + if not self.arvrunner.ignore_docker_for_reuse: + filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]]) try: - response = self.arvrunner.api.jobs().create(body={ - "script": "crunchrunner", - "repository": "arvados", - "script_version": "8488-cwl-crunchrunner-collection", - "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"}, - "runtime_constraints": runtime_constraints - }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries) + response = self.arvrunner.api.jobs().create( + body={ + "owner_uuid": self.arvrunner.project_uuid, + "script": "crunchrunner", + "repository": "arvados", + "script_version": "master", + "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6", + "script_parameters": {"tasks": [script_parameters]}, + "runtime_constraints": runtime_constraints + }, + filters=filters, + find_or_create=kwargs.get("enable_reuse", True) + ).execute(num_retries=self.arvrunner.num_retries) self.arvrunner.jobs[response["uuid"]] = self @@ -204,7 +221,15 @@ class ArvadosJob(object): tmpdir = None outdir = None keepdir = None - for l in log.readlines(): + for l in log: + # Determine the tmpdir, outdir and keepdir paths from + # the job run. Unfortunately, we can't take the first + # values we find (which are expected to be near the + # top) and stop scanning because if the node fails and + # the job restarts on a different node these values + # will different runs, and we need to know about the + # final run that actually produced output. + g = tmpdirre.match(l) if g: tmpdir = g.group(1) @@ -214,14 +239,49 @@ class ArvadosJob(object): g = keepre.match(l) if g: keepdir = g.group(1) - if tmpdir and outdir and keepdir: - break + + colname = "Output %s of %s" % (record["output"][0:7], self.name) + + # check if collection already exists with same owner, name and content + collection_exists = self.arvrunner.api.collections().list( + filters=[["owner_uuid", "=", self.arvrunner.project_uuid], + ['portable_data_hash', '=', record["output"]], + ["name", "=", colname]] + ).execute(num_retries=self.arvrunner.num_retries) + + if not collection_exists["items"]: + # Create a collection located in the same project as the + # pipeline with the contents of the output. + # First, get output record. + collections = self.arvrunner.api.collections().list( + limit=1, + filters=[['portable_data_hash', '=', record["output"]]], + select=["manifest_text"] + ).execute(num_retries=self.arvrunner.num_retries) + + if not collections["items"]: + raise WorkflowException( + "Job output '%s' cannot be found on API server" % ( + record["output"])) + + # Create new collection in the parent project + # with the output contents. + self.arvrunner.api.collections().create(body={ + "owner_uuid": self.arvrunner.project_uuid, + "name": colname, + "portable_data_hash": record["output"], + "manifest_text": collections["items"][0]["manifest_text"] + }, ensure_unique_name=True).execute( + num_retries=self.arvrunner.num_retries) self.builder.outdir = outdir self.builder.pathmapper.keepdir = keepdir outputs = self.collect_outputs("keep:" + record["output"]) + except WorkflowException as e: + logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False)) + processStatus = "permanentFail" except Exception as e: - logger.exception("Got exception while collecting job outputs:") + logger.exception("Got unknown exception while collecting job outputs:") processStatus = "permanentFail" self.output_callback(outputs, processStatus) @@ -256,7 +316,8 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper): arvrunner.api, dry_run=kwargs.get("dry_run"), num_retries=3, - fnPattern="$(task.keep)/%s/%s") + fnPattern="$(task.keep)/%s/%s", + project=arvrunner.project_uuid) for src, ab, st in uploadfiles: arvrunner.add_uploaded(src, (ab, st.fn)) @@ -266,9 +327,9 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper): def reversemap(self, target): if target.startswith("keep:"): - return target + return (target, target) elif self.keepdir and target.startswith(self.keepdir): - return "keep:" + target[len(self.keepdir)+1:] + return (target, "keep:" + target[len(self.keepdir)+1:]) else: return super(ArvPathMapper, self).reversemap(target) @@ -316,24 +377,24 @@ class ArvCwlRunner(object): def on_message(self, event): if "object_uuid" in event: - if event["object_uuid"] in self.jobs and event["event_type"] == "update": - if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False: - uuid = event["object_uuid"] - with self.lock: - j = self.jobs[uuid] - logger.info("Job %s (%s) is Running", j.name, uuid) - j.running = True - j.update_pipeline_component(event["properties"]["new_attributes"]) - elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"): - uuid = event["object_uuid"] - try: - self.cond.acquire() - j = self.jobs[uuid] - logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"]) - j.done(event["properties"]["new_attributes"]) - self.cond.notify() - finally: - self.cond.release() + if event["object_uuid"] in self.jobs and event["event_type"] == "update": + if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False: + uuid = event["object_uuid"] + with self.lock: + j = self.jobs[uuid] + logger.info("Job %s (%s) is Running", j.name, uuid) + j.running = True + j.update_pipeline_component(event["properties"]["new_attributes"]) + elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"): + uuid = event["object_uuid"] + try: + self.cond.acquire() + j = self.jobs[uuid] + logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"]) + j.done(event["properties"]["new_attributes"]) + self.cond.notify() + finally: + self.cond.release() def get_uploaded(self): return self.uploaded.copy() @@ -344,21 +405,8 @@ class ArvCwlRunner(object): def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs): events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message) - try: - self.api.collections().get(uuid=crunchrunner_pdh).execute() - except arvados.errors.ApiError as e: - import httplib2 - h = httplib2.Http(ca_certs=arvados.util.ca_certs_path()) - resp, content = h.request(crunchrunner_download, "GET") - resp2, content2 = h.request(certs_download, "GET") - with arvados.collection.Collection() as col: - with col.open("crunchrunner", "w") as f: - f.write(content) - with col.open("ca-certificates.crt", "w") as f: - f.write(content2) - - col.save_new("crunchrunner binary", ensure_unique_name=True) - + self.debug = args.debug + self.ignore_docker_for_reuse = args.ignore_docker_for_reuse self.fs_access = CollectionFsAccess(input_basedir) kwargs["fs_access"] = self.fs_access @@ -367,12 +415,20 @@ class ArvCwlRunner(object): kwargs["outdir"] = "$(task.outdir)" kwargs["tmpdir"] = "$(task.tmpdir)" + useruuid = self.api.users().current().execute()["uuid"] + self.project_uuid = args.project_uuid if args.project_uuid else useruuid + if kwargs.get("conformance_test"): return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs) else: - self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]), - "components": {}, - "state": "RunningOnClient"}).execute(num_retries=self.num_retries) + self.pipeline = self.api.pipeline_instances().create( + body={ + "owner_uuid": self.project_uuid, + "name": shortname(tool.tool["id"]), + "components": {}, + "state": "RunningOnClient"}).execute(num_retries=self.num_retries) + + logger.info("Pipeline instance %s", self.pipeline["uuid"]) jobiter = tool.job(job_order, input_basedir, @@ -381,42 +437,39 @@ class ArvCwlRunner(object): **kwargs) try: + self.cond.acquire() + # Will continue to hold the lock for the duration of this code + # except when in cond.wait(), at which point on_message can update + # job state and process output callbacks. + for runnable in jobiter: if runnable: - with self.lock: - runnable.run(**kwargs) + runnable.run(**kwargs) else: if self.jobs: - try: - self.cond.acquire() - self.cond.wait(1) - except RuntimeError: - pass - finally: - self.cond.release() + self.cond.wait(1) else: - logger.error("Workflow cannot make any more progress.") + logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.") break while self.jobs: - try: - self.cond.acquire() - self.cond.wait(1) - except RuntimeError: - pass - finally: - self.cond.release() + self.cond.wait(1) events.close() if self.final_output is None: raise cwltool.workflow.WorkflowException("Workflow did not return a result.") + # create final output collection except: - if sys.exc_info()[0] is not KeyboardInterrupt: - logger.exception("Caught unhandled exception, marking pipeline as failed") + if sys.exc_info()[0] is KeyboardInterrupt: + logger.error("Interrupted, marking pipeline as failed") + else: + logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False)) self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], body={"state": "Failed"}).execute(num_retries=self.num_retries) + finally: + self.cond.release() return self.final_output @@ -426,11 +479,15 @@ def main(args, stdout, stderr, api_client=None): parser = cwltool.main.arg_parser() exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--enable-reuse", action="store_true", - default=False, dest="enable_reuse", + default=True, dest="enable_reuse", help="") exgroup.add_argument("--disable-reuse", action="store_false", - default=False, dest="enable_reuse", + default=True, dest="enable_reuse", help="") + parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs") + parser.add_argument("--ignore-docker-for-reuse", action="store_true", + help="Ignore Docker image version when deciding whether to reuse past jobs.", + default=False) try: runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))