X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d458444b6de03c10dcddc80882887dab1d67b201..dfc93aac9c256d6ebb868aeb6c2107821e9fd041:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 533d24a698..e3fd1fccd3 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -11,6 +11,7 @@ import cwltool.draft2tool import cwltool.workflow import cwltool.main from cwltool.process import shortname +from cwltool.errors import WorkflowException import threading import cwltool.docker import fnmatch @@ -25,10 +26,6 @@ from arvados.api import OrderedJsonModel logger = logging.getLogger('arvados.cwl-runner') logger.setLevel(logging.INFO) -crunchrunner_pdh = "83db29f08544e1c319572a6bd971088a+140" -crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner" -certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt" - tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)") outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)") keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)") @@ -151,6 +148,8 @@ class ArvadosJob(object): (docker_req, docker_is_req) = get_feature(self, "DockerRequirement") if docker_req and kwargs.get("use_container") is not False: runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid) + else: + runtime_constraints["docker_image"] = "arvados/jobs" resources = self.builder.resources if resources is not None: @@ -165,7 +164,7 @@ class ArvadosJob(object): "repository": "arvados", "script_version": "master", "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6", - "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"}, + "script_parameters": {"tasks": [script_parameters]}, "runtime_constraints": runtime_constraints }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries) @@ -212,7 +211,15 @@ class ArvadosJob(object): tmpdir = None outdir = None keepdir = None - for l in log.readlines(): + for l in log: + # Determine the tmpdir, outdir and keepdir paths from + # the job run. Unfortunately, we can't take the first + # values we find (which are expected to be near the + # top) and stop scanning because if the node fails and + # the job restarts on a different node these values + # will different runs, and we need to know about the + # final run that actually produced output. + g = tmpdirre.match(l) if g: tmpdir = g.group(1) @@ -222,14 +229,49 @@ class ArvadosJob(object): g = keepre.match(l) if g: keepdir = g.group(1) - if tmpdir and outdir and keepdir: - break + + colname = "Output %s of %s" % (record["output"][0:7], self.name) + + # check if collection already exists with same owner, name and content + collection_exists = self.arvrunner.api.collections().list( + filters=[["owner_uuid", "=", self.arvrunner.project_uuid], + ['portable_data_hash', '=', record["output"]], + ["name", "=", colname]] + ).execute(num_retries=self.arvrunner.num_retries) + + if not collection_exists["items"]: + # Create a collection located in the same project as the + # pipeline with the contents of the output. + # First, get output record. + collections = self.arvrunner.api.collections().list( + limit=1, + filters=[['portable_data_hash', '=', record["output"]]], + select=["manifest_text"] + ).execute(num_retries=self.arvrunner.num_retries) + + if not collections["items"]: + raise WorkflowException( + "Job output '%s' cannot be found on API server" % ( + record["output"])) + + # Create new collection in the parent project + # with the output contents. + self.arvrunner.api.collections().create(body={ + "owner_uuid": self.arvrunner.project_uuid, + "name": colname, + "portable_data_hash": record["output"], + "manifest_text": collections["items"][0]["manifest_text"] + }, ensure_unique_name=True).execute( + num_retries=self.arvrunner.num_retries) self.builder.outdir = outdir self.builder.pathmapper.keepdir = keepdir outputs = self.collect_outputs("keep:" + record["output"]) + except WorkflowException as e: + logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False)) + processStatus = "permanentFail" except Exception as e: - logger.exception("Got exception while collecting job outputs:") + logger.exception("Got unknown exception while collecting job outputs:") processStatus = "permanentFail" self.output_callback(outputs, processStatus) @@ -353,21 +395,7 @@ class ArvCwlRunner(object): def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs): events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message) - try: - self.api.collections().get(uuid=crunchrunner_pdh).execute() - except arvados.errors.ApiError as e: - import httplib2 - h = httplib2.Http(ca_certs=arvados.util.ca_certs_path()) - resp, content = h.request(crunchrunner_download, "GET") - resp2, content2 = h.request(certs_download, "GET") - with arvados.collection.Collection() as col: - with col.open("crunchrunner", "w") as f: - f.write(content) - with col.open("ca-certificates.crt", "w") as f: - f.write(content2) - - col.save_new("crunchrunner binary", ensure_unique_name=True) - + self.debug = args.debug self.fs_access = CollectionFsAccess(input_basedir) kwargs["fs_access"] = self.fs_access @@ -426,7 +454,7 @@ class ArvCwlRunner(object): if sys.exc_info()[0] is KeyboardInterrupt: logger.error("Interrupted, marking pipeline as failed") else: - logger.exception("Caught unhandled exception, marking pipeline as failed") + logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False)) self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], body={"state": "Failed"}).execute(num_retries=self.num_retries) finally: