#!/usr/bin/env python # Crunch script integration for running arvados-cwl-runner (importing # arvados_cwl module) inside a crunch job. # # This gets the job record, transforms the script parameters into a valid CWL # input object, then executes the CWL runner to run the underlying workflow or # tool. When the workflow completes, record the output object in an output # collection for this runner job. import arvados import arvados_cwl import arvados.collection import arvados.util from cwltool.process import shortname import cwltool.main import logging import os import json import argparse import re from arvados.api import OrderedJsonModel from cwltool.process import adjustFileObjs, adjustDirObjs from cwltool.load_tool import load_tool # Print package versions logging.info(cwltool.main.versionstring()) api = arvados.api("v1") try: job_order_object = arvados.current_job()['script_parameters'] pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$') def keeppath(v): if pdh_path.match(v): return "keep:%s" % v else: return v def keeppathObj(v): v["location"] = keeppath(v["location"]) job_order_object["cwl:tool"] = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object["cwl:tool"]) for k,v in job_order_object.items(): if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v): job_order_object[k] = { "class": "File", "location": "keep:%s" % v } adjustFileObjs(job_order_object, keeppathObj) adjustDirObjs(job_order_object, keeppathObj) runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel())) t = load_tool(job_order_object, runner.arvMakeTool) args = argparse.Namespace() args.project_uuid = arvados.current_job()["owner_uuid"] args.enable_reuse = True args.submit = False args.debug = True args.quiet = False args.ignore_docker_for_reuse = False args.basedir = os.getcwd() args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]} outputObj = runner.arvExecutor(t, job_order_object, **vars(args)) files = {} def capture(fileobj): path = fileobj["location"] sp = path.split("/") col = sp[0][5:] if col not in files: files[col] = set() files[col].add("/".join(sp[1:])) fileobj["location"] = path adjustFileObjs(outputObj, capture) final = arvados.collection.Collection() for k,v in files.iteritems(): with arvados.collection.Collection(k) as c: for f in c: final.copy(f, f, c, True) def makeRelative(fileobj): fileobj["location"] = "/".join(fileobj["location"].split("/")[1:]) adjustFileObjs(outputObj, makeRelative) with final.open("cwl.output.json", "w") as f: json.dump(outputObj, f, indent=4) api.job_tasks().update(uuid=arvados.current_task()['uuid'], body={ 'output': final.save_new(create_collection_record=False), 'success': True, 'progress':1.0 }).execute() except Exception as e: logging.exception("Unhandled exception") api.job_tasks().update(uuid=arvados.current_task()['uuid'], body={ 'output': None, 'success': False, 'progress':1.0 }).execute()