--- /dev/null
+#!/usr/bin/env python
+
+import arvados
+import arvados_cwl
+import arvados.collection
+import arvados.util
+from cwltool.process import shortname
+import cwltool.main
+import logging
+import os
+import json
+import argparse
+from arvados.api import OrderedJsonModel
+from cwltool.process import adjustFiles
+
+api = arvados.api("v1")
+
+try:
+ job_order_object = arvados.current_job()['script_parameters']
+
+ print job_order_object
+
+ for k,v in job_order_object.items():
+ if arvados.util.keep_locator_pattern.match(v):
+ job_order_object[k] = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], v)
+
+ runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
+
+ t = cwltool.main.load_tool(job_order_object, False, True, runner.arvMakeTool, True)
+
+ np = argparse.Namespace()
+ np.project_uuid = arvados.current_job()["owner_uuid"]
+ np.enable_reuse = True
+ outputObj = runner.arvExecutor(t, job_order_object, "", np)
+
+ files = {}
+ def capture(path):
+ sp = path.split("/")
+ col = sp[0][5:]
+ if col not in files:
+ files[col] = set()
+ files[col].add("/".join(sp[1:]))
+ return path
+
+ adjustFiles(outputObj, capture)
+
+ final = arvados.collection.Collection()
+
+ for k,v in files.iteritems():
+ with arvados.collection.Collection(k) as c:
+ for f in c:
+ final.copy(f, f, c, True)
+
+ def makeRelative(path):
+ return "/".join(path.split("/")[1:])
+
+ adjustFiles(outputObj, makeRelative)
+
+ with final.open("cwl.output.json", "w") as f:
+ json.dump(outputObj, f, indent=4)
+
+ api.job_tasks().update(uuid=arvados.current_task()['uuid'],
+ body={
+ 'output': final.save_new(create_collection_record=False),
+ 'success': True,
+ 'progress':1.0
+ }).execute()
+except Exception as e:
+ logging.exception("Unhandled exception")
+ api.job_tasks().update(uuid=arvados.current_task()['uuid'],
+ body={
+ 'output': None,
+ 'success': False,
+ 'progress':1.0
+ }).execute()
class ArvPathMapper(cwltool.pathmapper.PathMapper):
- def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
+ def __init__(self, arvrunner, referenced_files, basedir,
+ collection_pattern, file_pattern, **kwargs):
self._pathmap = arvrunner.get_uploaded()
uploadfiles = []
for src in referenced_files:
if isinstance(src, basestring) and pdh_path.match(src):
- self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
+ self._pathmap[src] = (src, collection_pattern % src[5:])
if src not in self._pathmap:
ab = cwltool.pathmapper.abspath(src, basedir)
- st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
+ st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
if kwargs.get("conformance_test"):
self._pathmap[src] = (src, ab)
elif isinstance(st, arvados.commands.run.UploadFile):
arvrunner.api,
dry_run=kwargs.get("dry_run"),
num_retries=3,
- fnPattern="$(task.keep)/%s/%s",
+ fnPattern=file_pattern,
project=arvrunner.project_uuid)
for src, ab, st in uploadfiles:
return ArvadosJob(self.arvrunner)
def makePathMapper(self, reffiles, input_basedir, **kwargs):
- return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
+ return ArvPathMapper(self.arvrunner, reffiles, input_basedir,
+ "$(task.keep)/%s",
+ "$(task.keep)/%s/%s",
+ **kwargs)
class ArvCwlRunner(object):
def add_uploaded(self, src, pair):
self.uploaded[src] = pair
+ def upload_docker(self, tool):
+ pass
+
+ def submit(self, tool, job_order, input_basedir, args, **kwargs):
+ files = set()
+ def visitFiles(self, path):
+ files.add(path)
+
+ adjustFiles(process.scandeps("", tool.tool,
+ set(("run")),
+ set(("$schemas", "path"))),
+ visitFiles)
+ adjustFiles(job_order, visitFiles)
+
+ mapper = ArvPathMapper(self, files, "",
+ "$(task.keep)/%s",
+ "$(task.keep)/%s/%s",
+ **kwargs)
+
+ job_order = adjustFiles(job_order, lambda p: mapper.mapper(p))
+
+ response = self.api.jobs().create(body={
+ "script": "cwl-runner",
+ "script_version": "8654-arv-jobs-cwl-runner",
+ "repository": "arvados",
+ "script_parameters": job_order,
+ "runtime_constraints": {
+ "docker_image": "arvados/jobs"
+ }
+ }, find_or_create=args.enable_reuse).execute(num_retries=self.num_retries)
+ print response["uuid"]
+ return None
+
+
def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
+ if args.submit:
+ self.submit(tool, job_order, input_basedir, args, **kwargs)
+ return
+
events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
self.debug = args.debug
default=True, dest="enable_reuse",
help="")
parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
+ parser.add_argument("--submit", type=str, help="Submit job and print job uuid.")
try:
runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))