Merge branch 'master' into 8654-arv-jobs-cwl-runner
authorPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 21 Mar 2016 16:37:38 +0000 (12:37 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 21 Mar 2016 16:37:38 +0000 (12:37 -0400)
crunch_scripts/cwl-runner [new file with mode: 0755]
docker/jobs/Dockerfile
docker/jobs/apt.arvados.org.list
sdk/cwl/arvados_cwl/__init__.py

diff --git a/crunch_scripts/cwl-runner b/crunch_scripts/cwl-runner
new file mode 100755 (executable)
index 0000000..d042426
--- /dev/null
@@ -0,0 +1,75 @@
+#!/usr/bin/env python
+
+import arvados
+import arvados_cwl
+import arvados.collection
+import arvados.util
+from cwltool.process import shortname
+import cwltool.main
+import logging
+import os
+import json
+import argparse
+from arvados.api import OrderedJsonModel
+from cwltool.process import adjustFiles
+
+api = arvados.api("v1")
+
+try:
+    job_order_object = arvados.current_job()['script_parameters']
+
+    print job_order_object
+
+    for k,v in job_order_object.items():
+        if arvados.util.keep_locator_pattern.match(v):
+            job_order_object[k] = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], v)
+
+    runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
+
+    t = cwltool.main.load_tool(job_order_object, False, True, runner.arvMakeTool, True)
+
+    np = argparse.Namespace()
+    np.project_uuid = arvados.current_job()["owner_uuid"]
+    np.enable_reuse = True
+    outputObj = runner.arvExecutor(t, job_order_object, "", np)
+
+    files = {}
+    def capture(path):
+        sp = path.split("/")
+        col = sp[0][5:]
+        if col not in files:
+            files[col] = set()
+        files[col].add("/".join(sp[1:]))
+        return path
+
+    adjustFiles(outputObj, capture)
+
+    final = arvados.collection.Collection()
+
+    for k,v in files.iteritems():
+        with arvados.collection.Collection(k) as c:
+            for f in c:
+                final.copy(f, f, c, True)
+
+    def makeRelative(path):
+        return "/".join(path.split("/")[1:])
+
+    adjustFiles(outputObj, makeRelative)
+
+    with final.open("cwl.output.json", "w") as f:
+        json.dump(outputObj, f, indent=4)
+
+    api.job_tasks().update(uuid=arvados.current_task()['uuid'],
+                                         body={
+                                             'output': final.save_new(create_collection_record=False),
+                                             'success': True,
+                                             'progress':1.0
+                                         }).execute()
+except Exception as e:
+    logging.exception("Unhandled exception")
+    api.job_tasks().update(uuid=arvados.current_task()['uuid'],
+                                         body={
+                                             'output': None,
+                                             'success': False,
+                                             'progress':1.0
+                                         }).execute()
index 0d7295873f723e637cf76413e01c16c6a2be5d95..bbe7844b7dd8f5accd7745c331bf802271a2682a 100644 (file)
@@ -1,14 +1,13 @@
-# Based on Debian Wheezy
-FROM arvados/debian:wheezy
+# Based on Debian Jessie
+FROM debian:jessie
 MAINTAINER Ward Vandewege <ward@curoverse.com>
 
 ENV DEBIAN_FRONTEND noninteractive
 
 ADD apt.arvados.org.list /etc/apt/sources.list.d/
 RUN apt-key adv --keyserver pool.sks-keyservers.net --recv 1078ECD7
-RUN apt-get update -q
 
-RUN apt-get install -qy git python-pip python-virtualenv python-arvados-python-client python-dev libcurl4-gnutls-dev
+RUN apt-get update -q && apt-get install -qy git python-pip python-virtualenv python-arvados-python-client python-dev libcurl4-gnutls-dev nodejs python-arvados-cwl-runner
 
 RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3
 
index 7eb8716071ac41adeaba2ece2428098002c73ed4..3ae6df42160b2c66025f832fa159e739aa975cb5 100644 (file)
@@ -1,2 +1,2 @@
 # apt.arvados.org
-deb http://apt.arvados.org/ wheezy main
+deb http://apt.arvados.org/ jessie main
index 5ed83abb920a604f6a95d5d0d9162cd69985d31a..c0adb33550f10e2a95a26becbca3c961a4379ac3 100644 (file)
@@ -248,7 +248,8 @@ class ArvadosJob(object):
 
 
 class ArvPathMapper(cwltool.pathmapper.PathMapper):
-    def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
+    def __init__(self, arvrunner, referenced_files, basedir,
+                 collection_pattern, file_pattern, **kwargs):
         self._pathmap = arvrunner.get_uploaded()
         uploadfiles = []
 
@@ -256,10 +257,10 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
 
         for src in referenced_files:
             if isinstance(src, basestring) and pdh_path.match(src):
-                self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
+                self._pathmap[src] = (src, collection_pattern % src[5:])
             if src not in self._pathmap:
                 ab = cwltool.pathmapper.abspath(src, basedir)
-                st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
+                st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
                 if kwargs.get("conformance_test"):
                     self._pathmap[src] = (src, ab)
                 elif isinstance(st, arvados.commands.run.UploadFile):
@@ -274,7 +275,7 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
                                              arvrunner.api,
                                              dry_run=kwargs.get("dry_run"),
                                              num_retries=3,
-                                             fnPattern="$(task.keep)/%s/%s",
+                                             fnPattern=file_pattern,
                                              project=arvrunner.project_uuid)
 
         for src, ab, st in uploadfiles:
@@ -301,7 +302,10 @@ class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
         return ArvadosJob(self.arvrunner)
 
     def makePathMapper(self, reffiles, input_basedir, **kwargs):
-        return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
+        return ArvPathMapper(self.arvrunner, reffiles, input_basedir,
+                             "$(task.keep)/%s",
+                             "$(task.keep)/%s/%s",
+                             **kwargs)
 
 
 class ArvCwlRunner(object):
@@ -360,7 +364,45 @@ class ArvCwlRunner(object):
     def add_uploaded(self, src, pair):
         self.uploaded[src] = pair
 
+    def upload_docker(self, tool):
+        pass
+
+    def submit(self, tool, job_order, input_basedir, args, **kwargs):
+        files = set()
+        def visitFiles(self, path):
+            files.add(path)
+
+        adjustFiles(process.scandeps("", tool.tool,
+                                     set(("run")),
+                                     set(("$schemas", "path"))),
+                    visitFiles)
+        adjustFiles(job_order, visitFiles)
+
+        mapper = ArvPathMapper(self, files, "",
+                               "$(task.keep)/%s",
+                               "$(task.keep)/%s/%s",
+                               **kwargs)
+
+        job_order = adjustFiles(job_order, lambda p: mapper.mapper(p))
+
+        response = self.api.jobs().create(body={
+            "script": "cwl-runner",
+            "script_version": "8654-arv-jobs-cwl-runner",
+            "repository": "arvados",
+            "script_parameters": job_order,
+            "runtime_constraints": {
+                "docker_image": "arvados/jobs"
+            }
+        }, find_or_create=args.enable_reuse).execute(num_retries=self.num_retries)
+        print response["uuid"]
+        return None
+
+
     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
+        if args.submit:
+            self.submit(tool, job_order, input_basedir, args, **kwargs)
+            return
+
         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
 
         self.debug = args.debug
@@ -458,6 +500,7 @@ def main(args, stdout, stderr, api_client=None):
                         default=True, dest="enable_reuse",
                         help="")
     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
+    parser.add_argument("--submit", type=str, help="Submit job and print job uuid.")
 
     try:
         runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))