Merge branch '8840-lock-job-record' closes #8840
authorPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 31 Mar 2016 15:54:40 +0000 (11:54 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 31 Mar 2016 15:54:40 +0000 (11:54 -0400)
25 files changed:
apps/workbench/test/test_helper.rb
crunch_scripts/cwl-runner [new file with mode: 0755]
doc/api/schema/Job.html.textile.liquid
docker/jobs/Dockerfile
docker/jobs/apt.arvados.org.list
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/setup.py
sdk/cwl/test_with_arvbox.sh
sdk/cwl/tests/input/blorp.txt [new file with mode: 0644]
sdk/cwl/tests/submit_test_job.json [new file with mode: 0644]
sdk/cwl/tests/test_submit.py [new file with mode: 0644]
sdk/cwl/tests/tool/blub.txt [new file with mode: 0644]
sdk/cwl/tests/tool/submit_tool.cwl [new file with mode: 0644]
sdk/cwl/tests/wf/submit_wf.cwl [new file with mode: 0644]
sdk/python/arvados/commands/keepdocker.py
sdk/python/arvados/commands/run.py
sdk/python/setup.py
services/api/app/controllers/arvados/v1/jobs_controller.rb
services/api/app/models/job.rb
services/api/db/migrate/20160324144017_add_components_to_job.rb [new file with mode: 0644]
services/api/db/structure.sql
services/api/test/fixtures/jobs.yml
services/api/test/functional/arvados/v1/groups_controller_test.rb
services/api/test/functional/arvados/v1/jobs_controller_test.rb
services/api/test/test_helper.rb

index 41592af993261b6affd55fa6cba0f05780d475ec..78ef2d21f1a15a106c000710c440984b5a210b16 100644 (file)
@@ -233,12 +233,12 @@ end
 
 class ActionController::TestCase
   setup do
-    @counter = 0
+    @test_counter = 0
   end
 
   def check_counter action
-    @counter += 1
-    if @counter == 2
+    @test_counter += 1
+    if @test_counter == 2
       assert_equal 1, 2, "Multiple actions in controller test"
     end
   end
diff --git a/crunch_scripts/cwl-runner b/crunch_scripts/cwl-runner
new file mode 100755 (executable)
index 0000000..d628f1c
--- /dev/null
@@ -0,0 +1,91 @@
+#!/usr/bin/env python
+
+# Crunch script integration for running arvados-cwl-runner (importing
+# arvados_cwl module) inside a crunch job.
+#
+# This gets the job record, transforms the script parameters into a valid CWL
+# input object, then executes the CWL runner to run the underlying workflow or
+# tool.  When the workflow completes, record the output object in an output
+# collection for this runner job.
+
+import arvados
+import arvados_cwl
+import arvados.collection
+import arvados.util
+from cwltool.process import shortname
+import cwltool.main
+import logging
+import os
+import json
+import argparse
+from arvados.api import OrderedJsonModel
+from cwltool.process import adjustFiles
+
+# Print package versions
+logging.info(cwltool.main.versionstring())
+
+api = arvados.api("v1")
+
+try:
+    job_order_object = arvados.current_job()['script_parameters']
+
+    def keeppath(v):
+        if arvados.util.keep_locator_pattern.match(v):
+            return "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], v)
+
+    job_order_object["cwl:tool"] = keeppath(job_order_object["cwl:tool"])
+
+    adjustFiles(job_order_object, keeppath)
+
+    runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
+
+    t = cwltool.main.load_tool(job_order_object, False, True, runner.arvMakeTool, True)
+
+    args = argparse.Namespace()
+    args.project_uuid = arvados.current_job()["owner_uuid"]
+    args.enable_reuse = True
+    args.submit = False
+    args.debug = True
+    args.quiet = False
+    outputObj = runner.arvExecutor(t, job_order_object, "", args, cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]})
+
+    files = {}
+    def capture(path):
+        sp = path.split("/")
+        col = sp[0][5:]
+        if col not in files:
+            files[col] = set()
+        files[col].add("/".join(sp[1:]))
+        return path
+
+    adjustFiles(outputObj, capture)
+
+    final = arvados.collection.Collection()
+
+    for k,v in files.iteritems():
+        with arvados.collection.Collection(k) as c:
+            for f in c:
+                final.copy(f, f, c, True)
+
+    def makeRelative(path):
+        return "/".join(path.split("/")[1:])
+
+    adjustFiles(outputObj, makeRelative)
+
+    with final.open("cwl.output.json", "w") as f:
+        json.dump(outputObj, f, indent=4)
+
+    api.job_tasks().update(uuid=arvados.current_task()['uuid'],
+                                         body={
+                                             'output': final.save_new(create_collection_record=False),
+                                             'success': True,
+                                             'progress':1.0
+                                         }).execute()
+except Exception as e:
+    logging.exception("Unhandled exception")
+    api.job_tasks().update(uuid=arvados.current_task()['uuid'],
+                                         body={
+                                             'output': None,
+                                             'success': False,
+                                             'progress':1.0
+                                         }).execute()
index 5bc7611d0deedede7db25a38af830ffb877dc693..58b6a51cb444c2bc02cc9bf4472c3a474ad55b1e 100644 (file)
@@ -47,6 +47,7 @@ See "Specifying Git versions":#script_version below for more detail about accept
 |arvados_sdk_version|string|Git commit hash that specifies the SDK version to use from the Arvados repository|This is set by searching the Arvados repository for a match for the arvados_sdk_version runtime constraint.|
 |docker_image_locator|string|Portable data hash of the collection that contains the Docker image to use|This is set by searching readable collections for a match for the docker_image runtime constraint.|
 |runtime_constraints|hash|Constraints that must be satisfied by the job/task scheduler in order to run the job.|See below.|
+|components|hash|Name and uuid pairs representing the child work units of this job. The uuids can be of different object types.|Example components hash: @{"name1": "zzzzz-8i9sb-xyz...", "name2": "zzzzz-d1hrv-xyz...",}@|
 
 h3(#script_version). Specifying Git versions
 
index 30c2416e99511b2f1919591671296d7d13a659a3..d80c3a882defe43676476df144401eee64d97728 100644 (file)
@@ -1,5 +1,5 @@
-# Based on Debian Wheezy
-FROM arvados/debian:wheezy
+# Based on Debian Jessie
+FROM debian:jessie
 MAINTAINER Ward Vandewege <ward@curoverse.com>
 
 ENV DEBIAN_FRONTEND noninteractive
@@ -11,7 +11,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3
 ARG COMMIT=latest
 RUN echo $COMMIT && apt-get update -q
 
-RUN apt-get install -qy git python-pip python-virtualenv python-arvados-python-client python-dev libcurl4-gnutls-dev
+RUN apt-get install -qy git python-pip python-virtualenv python-arvados-python-client python-dev libcurl4-gnutls-dev nodejs python-arvados-cwl-runner
 
 # Install dependencies and set up system.
 RUN /usr/sbin/adduser --disabled-password \
index 7eb8716071ac41adeaba2ece2428098002c73ed4..3ae6df42160b2c66025f832fa159e739aa975cb5 100644 (file)
@@ -1,2 +1,2 @@
 # apt.arvados.org
-deb http://apt.arvados.org/ wheezy main
+deb http://apt.arvados.org/ jessie main
index e3fd1fccd372d75abf06445f32fe2a13f8a8e66c..511f173eb0008995a60e55652a9c3af6be40bb06 100644 (file)
@@ -1,5 +1,7 @@
 #!/usr/bin/env python
 
+# Implement cwl-runner interface for submitting and running jobs on Arvados.
+
 import argparse
 import arvados
 import arvados.events
@@ -19,8 +21,11 @@ import logging
 import re
 import os
 import sys
+import functools
+import json
+import pkg_resources  # part of setuptools
 
-from cwltool.process import get_feature
+from cwltool.process import get_feature, adjustFiles, scandeps
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -32,6 +37,8 @@ keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.ke
 
 
 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
+    """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
+
     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
 
@@ -49,12 +56,14 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
         if image_tag:
             args.append(image_tag)
         logger.info("Uploading Docker image %s", ":".join(args[1:]))
-        arvados.commands.keepdocker.main(args)
+        arvados.commands.keepdocker.main(args, stdout=sys.stderr)
 
     return dockerRequirement["dockerImageId"]
 
 
 class CollectionFsAccess(cwltool.process.StdFsAccess):
+    """Implement the cwltool FsAccess interface for Arvados Collections."""
+
     def __init__(self, basedir):
         self.collections = {}
         self.basedir = basedir
@@ -111,6 +120,8 @@ class CollectionFsAccess(cwltool.process.StdFsAccess):
             return os.path.exists(self._abs(fn))
 
 class ArvadosJob(object):
+    """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
+
     def __init__(self, runner):
         self.arvrunner = runner
         self.running = False
@@ -279,23 +290,128 @@ class ArvadosJob(object):
             del self.arvrunner.jobs[record["uuid"]]
 
 
+class RunnerJob(object):
+    """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
+
+    def __init__(self, runner, tool, job_order, enable_reuse):
+        self.arvrunner = runner
+        self.tool = tool
+        self.job_order = job_order
+        self.running = False
+        self.enable_reuse = enable_reuse
+
+    def update_pipeline_component(self, record):
+        pass
+
+    def upload_docker(self, tool):
+        if isinstance(tool, cwltool.draft2tool.CommandLineTool):
+            (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
+            if docker_req:
+                arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
+        elif isinstance(tool, cwltool.workflow.Workflow):
+            for s in tool.steps:
+                self.upload_docker(s.embedded_tool)
+
+    def run(self, dry_run=False, pull_image=True, **kwargs):
+        self.upload_docker(self.tool)
+
+        workflowfiles = set()
+        jobfiles = set()
+        workflowfiles.add(self.tool.tool["id"])
+
+        self.name = os.path.basename(self.tool.tool["id"])
+
+        def visitFiles(files, path):
+            files.add(path)
+            return path
+
+        document_loader, _, _ = cwltool.process.get_schema()
+        def loadref(b, u):
+            return document_loader.resolve_ref(u, base_url=b)[0]
+
+        sc = scandeps("", self.tool.tool,
+                      set(("$import", "run")),
+                      set(("$include", "$schemas", "path")),
+                      loadref)
+        adjustFiles(sc, functools.partial(visitFiles, workflowfiles))
+        adjustFiles(self.job_order, functools.partial(visitFiles, jobfiles))
+
+        workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
+                                       "%s",
+                                       "%s/%s",
+                                       name=self.name,
+                                       **kwargs)
+
+        jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
+                                  "%s",
+                                  "%s/%s",
+                                  name=os.path.basename(self.job_order.get("id", "#")),
+                                  **kwargs)
+
+        adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
+
+        if "id" in self.job_order:
+            del self.job_order["id"]
+
+        self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
+
+        response = self.arvrunner.api.jobs().create(body={
+            "script": "cwl-runner",
+            "script_version": "master",
+            "repository": "arvados",
+            "script_parameters": self.job_order,
+            "runtime_constraints": {
+                "docker_image": "arvados/jobs"
+            }
+        }, find_or_create=self.enable_reuse).execute(num_retries=self.arvrunner.num_retries)
+
+        self.arvrunner.jobs[response["uuid"]] = self
+
+        logger.info("Submitted job %s", response["uuid"])
+
+        if response["state"] in ("Complete", "Failed", "Cancelled"):
+            self.done(response)
+
+    def done(self, record):
+        if record["state"] == "Complete":
+            processStatus = "success"
+        else:
+            processStatus = "permanentFail"
+
+        outputs = None
+        try:
+            try:
+                outc = arvados.collection.Collection(record["output"])
+                with outc.open("cwl.output.json") as f:
+                    outputs = json.load(f)
+            except Exception as e:
+                logger.error("While getting final output object: %s", e)
+            self.arvrunner.output_callback(outputs, processStatus)
+        finally:
+            del self.arvrunner.jobs[record["uuid"]]
+
 class ArvPathMapper(cwltool.pathmapper.PathMapper):
-    def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
+    """Convert container-local paths to and from Keep collection ids."""
+
+    def __init__(self, arvrunner, referenced_files, basedir,
+                 collection_pattern, file_pattern, name=None, **kwargs):
         self._pathmap = arvrunner.get_uploaded()
-        uploadfiles = []
+        uploadfiles = set()
 
         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
 
         for src in referenced_files:
             if isinstance(src, basestring) and pdh_path.match(src):
-                self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
+                self._pathmap[src] = (src, collection_pattern % src[5:])
+            if "#" in src:
+                src = src[:src.index("#")]
             if src not in self._pathmap:
                 ab = cwltool.pathmapper.abspath(src, basedir)
-                st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
+                st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
                 if kwargs.get("conformance_test"):
                     self._pathmap[src] = (src, ab)
                 elif isinstance(st, arvados.commands.run.UploadFile):
-                    uploadfiles.append((src, ab, st))
+                    uploadfiles.add((src, ab, st))
                 elif isinstance(st, arvados.commands.run.ArvFile):
                     self._pathmap[src] = (ab, st.fn)
                 else:
@@ -306,7 +422,8 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
                                              arvrunner.api,
                                              dry_run=kwargs.get("dry_run"),
                                              num_retries=3,
-                                             fnPattern="$(task.keep)/%s/%s",
+                                             fnPattern=file_pattern,
+                                             name=name,
                                              project=arvrunner.project_uuid)
 
         for src, ab, st in uploadfiles:
@@ -325,6 +442,8 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
 
 
 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
+    """Wrap cwltool CommandLineTool to override selected methods."""
+
     def __init__(self, arvrunner, toolpath_object, **kwargs):
         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
         self.arvrunner = arvrunner
@@ -333,10 +452,16 @@ class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
         return ArvadosJob(self.arvrunner)
 
     def makePathMapper(self, reffiles, input_basedir, **kwargs):
-        return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
+        return ArvPathMapper(self.arvrunner, reffiles, input_basedir,
+                             "$(task.keep)/%s",
+                             "$(task.keep)/%s/%s",
+                             **kwargs)
 
 
 class ArvCwlRunner(object):
+    """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
+    complete, and report output."""
+
     def __init__(self, api_client):
         self.api = api_client
         self.jobs = {}
@@ -355,13 +480,15 @@ class ArvCwlRunner(object):
     def output_callback(self, out, processStatus):
         if processStatus == "success":
             logger.info("Overall job status is %s", processStatus)
-            self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                 body={"state": "Complete"}).execute(num_retries=self.num_retries)
+            if self.pipeline:
+                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                     body={"state": "Complete"}).execute(num_retries=self.num_retries)
 
         else:
             logger.warn("Overall job status is %s", processStatus)
-            self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                 body={"state": "Failed"}).execute(num_retries=self.num_retries)
+            if self.pipeline:
+                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
         self.final_output = out
 
 
@@ -393,9 +520,24 @@ class ArvCwlRunner(object):
         self.uploaded[src] = pair
 
     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
+        self.debug = args.debug
+
+        if args.quiet:
+            logger.setLevel(logging.WARN)
+            logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
+
+        useruuid = self.api.users().current().execute()["uuid"]
+        self.project_uuid = args.project_uuid if args.project_uuid else useruuid
+        self.pipeline = None
+
+        if args.submit:
+            runnerjob = RunnerJob(self, tool, job_order, args.enable_reuse)
+            if not args.wait:
+                runnerjob.run()
+                return
+
         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
 
-        self.debug = args.debug
         self.fs_access = CollectionFsAccess(input_basedir)
 
         kwargs["fs_access"] = self.fs_access
@@ -404,26 +546,30 @@ class ArvCwlRunner(object):
         kwargs["outdir"] = "$(task.outdir)"
         kwargs["tmpdir"] = "$(task.tmpdir)"
 
-        useruuid = self.api.users().current().execute()["uuid"]
-        self.project_uuid = args.project_uuid if args.project_uuid else useruuid
-
         if kwargs.get("conformance_test"):
             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
         else:
-            self.pipeline = self.api.pipeline_instances().create(
-                body={
-                    "owner_uuid": self.project_uuid,
-                    "name": shortname(tool.tool["id"]),
-                    "components": {},
-                    "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+            if args.submit:
+                jobiter = iter((runnerjob,))
+            else:
+                components = {}
+                if "cwl_runner_job" in kwargs:
+                    components[os.path.basename(tool.tool["id"])] = {"job": kwargs["cwl_runner_job"]}
+
+                self.pipeline = self.api.pipeline_instances().create(
+                    body={
+                        "owner_uuid": self.project_uuid,
+                        "name": shortname(tool.tool["id"]),
+                        "components": components,
+                        "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
 
-            logger.info("Pipeline instance %s", self.pipeline["uuid"])
+                logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
-            jobiter = tool.job(job_order,
-                               input_basedir,
-                               self.output_callback,
-                               docker_outdir="$(task.outdir)",
-                               **kwargs)
+                jobiter = tool.job(job_order,
+                                   input_basedir,
+                                   self.output_callback,
+                                   docker_outdir="$(task.outdir)",
+                                   **kwargs)
 
             try:
                 self.cond.acquire()
@@ -455,17 +601,29 @@ class ArvCwlRunner(object):
                     logger.error("Interrupted, marking pipeline as failed")
                 else:
                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
-                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
+                if self.pipeline:
+                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                         body={"state": "Failed"}).execute(num_retries=self.num_retries)
             finally:
                 self.cond.release()
 
             return self.final_output
 
+def versionstring():
+    """Print version string of key packages for provenance and debugging."""
+
+    arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
+    arvpkg = pkg_resources.require("arvados-python-client")
+    cwlpkg = pkg_resources.require("cwltool")
+
+    return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
+                                    "arvados-python-client", arvpkg[0].version,
+                                    "cwltool", cwlpkg[0].version)
 
 def main(args, stdout, stderr, api_client=None):
     args.insert(0, "--leave-outputs")
     parser = cwltool.main.arg_parser()
+
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--enable-reuse", action="store_true",
                         default=True, dest="enable_reuse",
@@ -473,12 +631,33 @@ def main(args, stdout, stderr, api_client=None):
     exgroup.add_argument("--disable-reuse", action="store_false",
                         default=True, dest="enable_reuse",
                         help="")
+
     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
 
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
+                        default=True, dest="submit")
+    exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
+                        default=True, dest="submit")
+
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
+                        default=True, dest="wait")
+    exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
+                        default=True, dest="wait")
+
     try:
-        runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
+        if api_client is None:
+            api_client=arvados.api('v1', model=OrderedJsonModel())
+        runner = ArvCwlRunner(api_client)
     except Exception as e:
         logger.error(e)
         return 1
 
-    return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)
+    return cwltool.main.main(args,
+                             stdout=stdout,
+                             stderr=stderr,
+                             executor=runner.arvExecutor,
+                             makeTool=runner.arvMakeTool,
+                             parser=parser,
+                             versionfunc=versionstring)
index 3fc7433adfcd054c1e28bcd11acb49807506732e..c061309f70ddb548ca1f543d4ffee54ba3447504 100644 (file)
@@ -30,8 +30,8 @@ setup(name='arvados-cwl-runner',
           'bin/arvados-cwl-runner'
       ],
       install_requires=[
-          'cwltool>=1.0.20160311170456',
-          'arvados-python-client>=0.1.20160219154918'
+          'cwltool>=1.0.20160325200114',
+          'arvados-python-client>=0.1.20160322001610'
       ],
       test_suite='tests',
       tests_require=['mock>=1.0'],
index aef27001e00e1a2b330296a3eafa8decea22f518..bc0289e20415a88f9379504707c88a4befee1248 100755 (executable)
@@ -58,6 +58,9 @@ git pull
 export ARVADOS_API_HOST=localhost:8000
 export ARVADOS_API_HOST_INSECURE=1
 export ARVADOS_API_TOKEN=\$(cat /var/lib/arvados/superuser_token)
+
+arv-keepdocker --pull arvados/jobs
+
 env
 exec ./run_test.sh "$@"
 EOF
diff --git a/sdk/cwl/tests/input/blorp.txt b/sdk/cwl/tests/input/blorp.txt
new file mode 100644 (file)
index 0000000..09fc24d
--- /dev/null
@@ -0,0 +1 @@
+blopper blubber
diff --git a/sdk/cwl/tests/submit_test_job.json b/sdk/cwl/tests/submit_test_job.json
new file mode 100644 (file)
index 0000000..95ff0ff
--- /dev/null
@@ -0,0 +1,6 @@
+{
+    "x": {
+        "class": "File",
+        "path": "input/blorp.txt"
+    }
+}
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
new file mode 100644 (file)
index 0000000..4e343b6
--- /dev/null
@@ -0,0 +1,60 @@
+import unittest
+import mock
+import arvados_cwl
+import sys
+import arvados
+import arvados.keep
+import arvados.collection
+import hashlib
+
+class TestSubmit(unittest.TestCase):
+    @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+    @mock.patch("arvados.collection.KeepClient")
+    @mock.patch("arvados.events.subscribe")
+    def test_submit(self, events, keep, keepdocker):
+        api = mock.MagicMock()
+        def putstub(p, **kwargs):
+            return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p))
+        keep().put.side_effect = putstub
+        keepdocker.return_value = True
+        api.users().current().execute.return_value = {"uuid": "zzzzz-tpzed-zzzzzzzzzzzzzzz"}
+        api.collections().list().execute.return_value = {"items": []}
+        api.collections().create().execute.side_effect = ({"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz1",
+                                                           "portable_data_hash": "99999999999999999999999999999991+99"},
+                                                          {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
+                                                           "portable_data_hash": "99999999999999999999999999999992+99"})
+        api.jobs().create().execute.return_value = {"uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz", "state": "Queued"}
+
+        arvados_cwl.main(["--debug", "--submit", "--no-wait", "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                         sys.stdout, sys.stderr, api_client=api)
+
+        api.collections().create.assert_has_calls([
+            mock.call(),
+            mock.call(body={'manifest_text': './tool 84ec4df683711de31b782505389a8843+429 0:16:blub.txt 16:413:submit_tool.cwl\n./wf 81d977a245a41b8e79859fbe00623fd0+344 0:344:submit_wf.cwl\n',
+                            'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+                            'name': 'submit_wf.cwl'
+                        }, ensure_unique_name=True),
+            mock.call().execute(),
+            mock.call(body={'manifest_text': '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
+                            'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+                            'name': '#'
+                        }, ensure_unique_name=True),
+            mock.call().execute()])
+
+        api.jobs().create.assert_called_with(
+            body={
+                'runtime_constraints': {
+                    'docker_image': 'arvados/jobs'
+                },
+            'script_parameters': {
+                'x': {
+                    'path': '99999999999999999999999999999992+99/blorp.txt',
+                    'class': 'File'
+                },
+                'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+            },
+            'repository': 'arvados',
+                'script_version': 'master',
+                'script': 'cwl-runner'
+            },
+            find_or_create=True)
diff --git a/sdk/cwl/tests/tool/blub.txt b/sdk/cwl/tests/tool/blub.txt
new file mode 100644 (file)
index 0000000..f12927b
--- /dev/null
@@ -0,0 +1 @@
+blibber blubber
diff --git a/sdk/cwl/tests/tool/submit_tool.cwl b/sdk/cwl/tests/tool/submit_tool.cwl
new file mode 100644 (file)
index 0000000..28fecff
--- /dev/null
@@ -0,0 +1,19 @@
+# Test case for arvados-cwl-runner
+#
+# Used to test whether scanning a tool file for dependencies (e.g. default
+# value blub.txt) and uploading to Keep works as intended.
+
+class: CommandLineTool
+requirements:
+  - class: DockerRequirement
+    dockerPull: debian:8
+inputs:
+  - id: x
+    type: File
+    default:
+      class: File
+      path: blub.txt
+    inputBinding:
+      position: 1
+outputs: []
+baseCommand: cat
diff --git a/sdk/cwl/tests/wf/submit_wf.cwl b/sdk/cwl/tests/wf/submit_wf.cwl
new file mode 100644 (file)
index 0000000..e1ff188
--- /dev/null
@@ -0,0 +1,16 @@
+# Test case for arvados-cwl-runner
+#
+# Used to test whether scanning a workflow file for dependencies
+# (e.g. submit_tool.cwl) and uploading to Keep works as intended.
+
+class: Workflow
+inputs:
+  - id: x
+    type: File
+outputs: []
+steps:
+  - id: step1
+    inputs:
+      - { id: x, source: "#x" }
+    outputs: []
+    run: ../tool/submit_tool.cwl
index e48a6d15472cc2c90cd4af7e35251b653aa87cce..f66554115306796b94a0bdaef6aa0a194e5fabe7 100644 (file)
@@ -283,15 +283,15 @@ def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None)
     return [(image['collection'], image) for image in images
             if image['collection'] in existing_coll_uuids]
 
-def main(arguments=None):
+def main(arguments=None, stdout=sys.stdout):
     args = arg_parser.parse_args(arguments)
     api = arvados.api('v1')
 
     if args.image is None or args.image == 'images':
-        fmt = "{:30}  {:10}  {:12}  {:29}  {:20}"
-        print fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED")
+        fmt = "{:30}  {:10}  {:12}  {:29}  {:20}\n"
+        stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED"))
         for i, j in list_images_in_arv(api, args.retries):
-            print(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
+            stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
         sys.exit(0)
 
     # Pull the image if requested, unless the image is specified as a hash
@@ -375,7 +375,7 @@ def main(arguments=None):
                     make_link(api, args.retries, 'docker_image_repo+tag',
                               image_repo_tag, **link_base)
 
-                print(coll_uuid)
+                stdout.write(coll_uuid + "\n")
 
                 sys.exit(0)
 
index ef39be81a4650cda86e20c6d13a7d23848398ecb..5d29c45117acd71e924838bb9b758af77d8e9b91 100644 (file)
@@ -34,6 +34,12 @@ class ArvFile(object):
         self.prefix = prefix
         self.fn = fn
 
+    def __hash__(self):
+        return (self.prefix+self.fn).__hash__()
+
+    def __eq__(self, other):
+        return (self.prefix == other.prefix) and (self.fn == other.fn)
+
 class UploadFile(ArvFile):
     pass
 
@@ -101,10 +107,10 @@ def statfile(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)"):
 
     return prefix+fn
 
-def uploadfiles(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)"):
+def uploadfiles(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)", name=None):
     # Find the smallest path prefix that includes all the files that need to be uploaded.
     # This starts at the root and iteratively removes common parent directory prefixes
-    # until all file pathes no longer have a common parent.
+    # until all file paths no longer have a common parent.
     n = True
     pathprefix = "/"
     while n:
@@ -148,9 +154,21 @@ def uploadfiles(files, api, dry_run=False, num_retries=0, project=None, fnPatter
                 stream = sp[0]
                 collection.start_new_stream(stream)
             collection.write_file(f.fn, sp[1])
-        item = api.collections().create(body={"owner_uuid": project, "manifest_text": collection.manifest_text()}).execute()
+
+        exists = api.collections().list(filters=[["owner_uuid", "=", project],
+                                                 ["portable_data_hash", "=", collection.portable_data_hash()],
+                                                 ["name", "=", name]]).execute(num_retries=num_retries)
+        if exists["items"]:
+            item = exists["items"][0]
+            logger.info("Using collection %s", item["uuid"])
+        else:
+            body = {"owner_uuid": project, "manifest_text": collection.manifest_text()}
+            if name is not None:
+                body["name"] = name
+            item = api.collections().create(body=body, ensure_unique_name=True).execute()
+            logger.info("Uploaded to %s", item["uuid"])
+
         pdh = item["portable_data_hash"]
-        logger.info("Uploaded to %s", item["uuid"])
 
     for c in files:
         c.fn = fnPattern % (pdh, c.fn)
index 759e8ff67edf1ec8b99b0de86ee8a3e4602b73b7..17f9cb43a084710c764db9bd565d0fe08f2b7627 100644 (file)
@@ -41,6 +41,7 @@ setup(name='arvados-python-client',
       install_requires=[
           'google-api-python-client==1.4.2',
           'oauth2client >=1.4.6, <2',
+          'pyasn1-modules==0.0.5',
           'ciso8601',
           'httplib2',
           'pycurl >=7.19.5.1, <7.21.5',
index 0190537e8bd918130e44f4855b713e29cb079099..67963388639f9fb352ca42b0abff2a05e86d140c 100644 (file)
@@ -1,4 +1,5 @@
 class Arvados::V1::JobsController < ApplicationController
+  accept_attribute_as_json :components, Hash
   accept_attribute_as_json :script_parameters, Hash
   accept_attribute_as_json :runtime_constraints, Hash
   accept_attribute_as_json :tasks_summary, Hash
index 01154077fd5dd96b888912740315f3643d1a2ecb..0ed53535778335d11b2b12d3007058e9ad76adfc 100644 (file)
@@ -2,6 +2,7 @@ class Job < ArvadosModel
   include HasUuid
   include KindAndEtag
   include CommonApiTemplate
+  serialize :components, Hash
   attr_protected :arvados_sdk_version, :docker_image_locator
   serialize :script_parameters, Hash
   serialize :runtime_constraints, Hash
@@ -52,6 +53,7 @@ class Job < ArvadosModel
     t.add :queue_position
     t.add :node_uuids
     t.add :description
+    t.add :components
   end
 
   # Supported states for a job
@@ -238,7 +240,8 @@ class Job < ArvadosModel
           output_changed? or
           log_changed? or
           tasks_summary_changed? or
-          state_changed?
+          state_changed? or
+          components_changed?
         logger.warn "User #{current_user.uuid if current_user} tried to change protected job attributes on locked #{self.class.to_s} #{uuid_was}"
         return false
       end
diff --git a/services/api/db/migrate/20160324144017_add_components_to_job.rb b/services/api/db/migrate/20160324144017_add_components_to_job.rb
new file mode 100644 (file)
index 0000000..9595d7f
--- /dev/null
@@ -0,0 +1,11 @@
+class AddComponentsToJob < ActiveRecord::Migration
+  def up
+    add_column :jobs, :components, :text
+  end
+
+  def down
+    if column_exists?(:jobs, :components)
+      remove_column :jobs, :components
+    end
+  end
+end
index e482e6e607b4141bbbc00f9b70352852be62ac90..3ec420c1ddcf1d3ac84991359e02bb03ce1b10e9 100644 (file)
@@ -536,7 +536,8 @@ CREATE TABLE jobs (
     priority integer DEFAULT 0 NOT NULL,
     description character varying(524288),
     state character varying(255),
-    arvados_sdk_version character varying(255)
+    arvados_sdk_version character varying(255),
+    components text
 );
 
 
@@ -2580,4 +2581,6 @@ INSERT INTO schema_migrations (version) VALUES ('20151229214707');
 
 INSERT INTO schema_migrations (version) VALUES ('20160208210629');
 
-INSERT INTO schema_migrations (version) VALUES ('20160209155729');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20160209155729');
+
+INSERT INTO schema_migrations (version) VALUES ('20160324144017');
\ No newline at end of file
index 12493e35514714dfdffc1e6f5aeabdf09d0dd223..13fe1abba8b9bfc5903dc2ebf96a388be67226cf 100644 (file)
@@ -499,6 +499,35 @@ job_in_publicly_accessible_project_but_other_objects_elsewhere:
   log: zzzzz-4zz18-fy296fx3hot09f7
   output: zzzzz-4zz18-bv31uwvy3neko21
 
+running_job_with_components:
+  uuid: zzzzz-8i9sb-with2components
+  owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  cancelled_at: ~
+  cancelled_by_user_uuid: ~
+  cancelled_by_client_uuid: ~
+  created_at: <%= 3.minute.ago.to_s(:db) %>
+  started_at: <%= 3.minute.ago.to_s(:db) %>
+  finished_at: ~
+  script: hash
+  repository: active/foo
+  script_version: 1de84a854e2b440dc53bf42f8548afa4c17da332
+  running: true
+  success: ~
+  output: ~
+  priority: 0
+  log: ~
+  is_locked_by_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  tasks_summary:
+    failed: 0
+    todo: 3
+    running: 1
+    done: 1
+  runtime_constraints: {}
+  state: Running
+  components:
+    component1: zzzzz-8i9sb-jobuuid00000001
+    component2: zzzzz-d1hrv-pipelineuuid001
+
 # Test Helper trims the rest of the file
 
 # Do not add your fixtures below this line as the rest of this file will be trimmed by test_helper
index 6623c726df01923b7227d33f17e6f2098cab649e..00846795b4d7f7501964d0b888ba87739ce6c9d7 100644 (file)
@@ -389,7 +389,7 @@ class Arvados::V1::GroupsControllerTest < ActionController::TestCase
     assert_response :success
 
     # verify that the user can no longer see the project
-    @counter = 0  # Reset executed action counter
+    @test_counter = 0  # Reset executed action counter
     @controller = Arvados::V1::GroupsController.new
     authorize_with :project_viewer
     get :index, filters: [['group_class', '=', 'project']], format: :json
@@ -401,7 +401,7 @@ class Arvados::V1::GroupsControllerTest < ActionController::TestCase
     assert_equal false, found_projects.include?(groups(:starred_and_shared_active_user_project).uuid)
 
     # share the project
-    @counter = 0
+    @test_counter = 0
     @controller = Arvados::V1::LinksController.new
     authorize_with :system_user
     post :create, link: {
@@ -412,7 +412,7 @@ class Arvados::V1::GroupsControllerTest < ActionController::TestCase
     }
 
     # verify that project_viewer user can now see shared project again
-    @counter = 0
+    @test_counter = 0
     @controller = Arvados::V1::GroupsController.new
     authorize_with :project_viewer
     get :index, filters: [['group_class', '=', 'project']], format: :json
index 1e1425e92b7d27057e89a335c2480b8024b0c444..601f9a7af56f3f4260724eb8eae3bc28f5014ea7 100644 (file)
@@ -433,4 +433,79 @@ class Arvados::V1::JobsControllerTest < ActionController::TestCase
     assert_equal('077ba2ad3ea24a929091a9e6ce545c93199b8e57',
                  internal_tag(json_response['uuid']))
   end
+
+  test 'get job with components' do
+    authorize_with :active
+    get :show, {id: jobs(:running_job_with_components).uuid}
+    assert_response :success
+    assert_not_nil json_response["components"]
+    assert_equal ["component1", "component2"], json_response["components"].keys
+  end
+
+  [
+    [:active, :success],
+    [:system_user, :success],
+    [:admin, 403],
+  ].each do |user, expected|
+    test "add components to job locked by active user as #{user} user and expect #{expected}" do
+      authorize_with user
+      put :update, {
+        id: jobs(:running).uuid,
+        job: {
+          components: {"component1" => "value1", "component2" => "value2"}
+        }
+      }
+      assert_response expected
+      if expected == :success
+        assert_not_nil json_response["components"]
+        keys = json_response["components"].keys
+        assert_equal ["component1", "component2"], keys
+        assert_equal "value1", json_response["components"][keys[0]]
+      end
+    end
+  end
+
+  test 'get_delete components_get again for job with components' do
+    authorize_with :active
+    get :show, {id: jobs(:running_job_with_components).uuid}
+    assert_response :success
+    assert_not_nil json_response["components"]
+    assert_equal ["component1", "component2"], json_response["components"].keys
+
+    # delete second component
+    @test_counter = 0  # Reset executed action counter
+    @controller = Arvados::V1::JobsController.new
+    put :update, {
+      id: jobs(:running_job_with_components).uuid,
+      job: {
+        components: {"component1" => "zzzzz-8i9sb-jobuuid00000001"}
+      }
+    }
+    assert_response :success
+
+    @test_counter = 0  # Reset executed action counter
+    @controller = Arvados::V1::JobsController.new
+    get :show, {id: jobs(:running_job_with_components).uuid}
+    assert_response :success
+    assert_not_nil json_response["components"]
+    assert_equal ["component1"], json_response["components"].keys
+
+    # delete all components
+    @test_counter = 0  # Reset executed action counter
+    @controller = Arvados::V1::JobsController.new
+    put :update, {
+      id: jobs(:running_job_with_components).uuid,
+      job: {
+        components: {}
+      }
+    }
+    assert_response :success
+
+    @test_counter = 0  # Reset executed action counter
+    @controller = Arvados::V1::JobsController.new
+    get :show, {id: jobs(:running_job_with_components).uuid}
+    assert_response :success
+    assert_not_nil json_response["components"]
+    assert_equal [], json_response["components"].keys
+  end
 end
index 68d4bbf5af4b03349b11259f82357e917dd52cf7..7579abf1ffd0234b3f1857516922416e1ff263e1 100644 (file)
@@ -106,12 +106,12 @@ end
 
 class ActionController::TestCase
   setup do
-    @counter = 0
+    @test_counter = 0
   end
 
   def check_counter action
-    @counter += 1
-    if @counter == 2
+    @test_counter += 1
+    if @test_counter == 2
       assert_equal 1, 2, "Multiple actions in functional test"
     end
   end