X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/aa998f9f62321b44780923d60692beb6805b0ff5..e6d8eb6d2415f15439ab6b7ab715ca962e7e7763:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 6cfdd0b2ab..ca807951bd 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -20,14 +20,20 @@ import os import sys from cwltool.process import get_feature +from arvados.api import OrderedJsonModel logger = logging.getLogger('arvados.cwl-runner') logger.setLevel(logging.INFO) -crunchrunner_pdh = "721abe848fd8e6e6d1c99b920e6b7a2c+140" +crunchrunner_pdh = "83db29f08544e1c319572a6bd971088a+140" crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner" certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt" +tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)") +outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)") +keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)") + + def arv_docker_get_image(api_client, dockerRequirement, pull_image): if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement: dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"] @@ -146,6 +152,19 @@ class ArvadosJob(object): if docker_req and kwargs.get("use_container") is not False: runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image) + resources = self.builder.resources + if resources is not None: + if "coresMin" in resources.keys(): + try: + runtime_constraints["min_cores_per_node"] = int(resources["coresMin"]) + except: + runtime_constraints["min_cores_per_node"] = None + if "ramMin" in resources.keys(): + try: + runtime_constraints["min_ram_mb_per_node"] = int(resources["ramMin"]) + except: + runtime_constraints["min_ram_mb_per_node"] = None + try: response = self.arvrunner.api.jobs().create(body={ "script": "crunchrunner", @@ -193,8 +212,27 @@ class ArvadosJob(object): try: outputs = {} if record["output"]: - self.builder.outdir = "keep:" + record["output"] - outputs = self.collect_outputs(self.builder.outdir) + logc = arvados.collection.Collection(record["log"]) + log = logc.open(logc.keys()[0]) + tmpdir = None + outdir = None + keepdir = None + for l in log.readlines(): + g = tmpdirre.match(l) + if g: + tmpdir = g.group(1) + g = outdirre.match(l) + if g: + outdir = g.group(1) + g = keepre.match(l) + if g: + keepdir = g.group(1) + if tmpdir and outdir and keepdir: + break + + self.builder.outdir = outdir + self.builder.pathmapper.keepdir = keepdir + outputs = self.collect_outputs("keep:" + record["output"]) except Exception as e: logger.exception("Got exception while collecting job outputs:") processStatus = "permanentFail" @@ -237,6 +275,15 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper): arvrunner.add_uploaded(src, (ab, st.fn)) self._pathmap[src] = (ab, st.fn) + self.keepdir = None + + def reversemap(self, target): + if target.startswith("keep:"): + return target + elif self.keepdir and target.startswith(self.keepdir): + return "keep:" + target[len(self.keepdir)+1:] + else: + return super(ArvPathMapper, self).reversemap(target) class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool): @@ -343,6 +390,7 @@ class ArvCwlRunner(object): jobiter = tool.job(job_order, input_basedir, self.output_callback, + docker_outdir="$(task.outdir)", **kwargs) try: @@ -398,7 +446,7 @@ def main(args, stdout, stderr, api_client=None): help="") try: - runner = ArvCwlRunner(api_client=arvados.api('v1')) + runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel())) except Exception as e: logger.error(e) return 1