class ActionController::TestCase
setup do
- @counter = 0
+ @test_counter = 0
end
def check_counter action
- @counter += 1
- if @counter == 2
+ @test_counter += 1
+ if @test_counter == 2
assert_equal 1, 2, "Multiple actions in controller test"
end
end
--- /dev/null
+#!/usr/bin/env python
+
+# Crunch script integration for running arvados-cwl-runner (importing
+# arvados_cwl module) inside a crunch job.
+#
+# This gets the job record, transforms the script parameters into a valid CWL
+# input object, then executes the CWL runner to run the underlying workflow or
+# tool. When the workflow completes, record the output object in an output
+# collection for this runner job.
+
+import arvados
+import arvados_cwl
+import arvados.collection
+import arvados.util
+from cwltool.process import shortname
+import cwltool.main
+import logging
+import os
+import json
+import argparse
+from arvados.api import OrderedJsonModel
+from cwltool.process import adjustFiles
+
+# Print package versions
+logging.info(cwltool.main.versionstring())
+
+api = arvados.api("v1")
+
+try:
+ job_order_object = arvados.current_job()['script_parameters']
+
+ def keeppath(v):
+ if arvados.util.keep_locator_pattern.match(v):
+ return "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], v)
+
+ job_order_object["cwl:tool"] = keeppath(job_order_object["cwl:tool"])
+
+ adjustFiles(job_order_object, keeppath)
+
+ runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
+
+ t = cwltool.main.load_tool(job_order_object, False, True, runner.arvMakeTool, True)
+
+ args = argparse.Namespace()
+ args.project_uuid = arvados.current_job()["owner_uuid"]
+ args.enable_reuse = True
+ args.submit = False
+ args.debug = True
+ args.quiet = False
+ outputObj = runner.arvExecutor(t, job_order_object, "", args, cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]})
+
+ files = {}
+ def capture(path):
+ sp = path.split("/")
+ col = sp[0][5:]
+ if col not in files:
+ files[col] = set()
+ files[col].add("/".join(sp[1:]))
+ return path
+
+ adjustFiles(outputObj, capture)
+
+ final = arvados.collection.Collection()
+
+ for k,v in files.iteritems():
+ with arvados.collection.Collection(k) as c:
+ for f in c:
+ final.copy(f, f, c, True)
+
+ def makeRelative(path):
+ return "/".join(path.split("/")[1:])
+
+ adjustFiles(outputObj, makeRelative)
+
+ with final.open("cwl.output.json", "w") as f:
+ json.dump(outputObj, f, indent=4)
+
+ api.job_tasks().update(uuid=arvados.current_task()['uuid'],
+ body={
+ 'output': final.save_new(create_collection_record=False),
+ 'success': True,
+ 'progress':1.0
+ }).execute()
+except Exception as e:
+ logging.exception("Unhandled exception")
+ api.job_tasks().update(uuid=arvados.current_task()['uuid'],
+ body={
+ 'output': None,
+ 'success': False,
+ 'progress':1.0
+ }).execute()
|arvados_sdk_version|string|Git commit hash that specifies the SDK version to use from the Arvados repository|This is set by searching the Arvados repository for a match for the arvados_sdk_version runtime constraint.|
|docker_image_locator|string|Portable data hash of the collection that contains the Docker image to use|This is set by searching readable collections for a match for the docker_image runtime constraint.|
|runtime_constraints|hash|Constraints that must be satisfied by the job/task scheduler in order to run the job.|See below.|
+|components|hash|Name and uuid pairs representing the child work units of this job. The uuids can be of different object types.|Example components hash: @{"name1": "zzzzz-8i9sb-xyz...", "name2": "zzzzz-d1hrv-xyz...",}@|
h3(#script_version). Specifying Git versions
-# Based on Debian Wheezy
-FROM arvados/debian:wheezy
+# Based on Debian Jessie
+FROM debian:jessie
MAINTAINER Ward Vandewege <ward@curoverse.com>
ENV DEBIAN_FRONTEND noninteractive
ARG COMMIT=latest
RUN echo $COMMIT && apt-get update -q
-RUN apt-get install -qy git python-pip python-virtualenv python-arvados-python-client python-dev libcurl4-gnutls-dev
+RUN apt-get install -qy git python-pip python-virtualenv python-arvados-python-client python-dev libcurl4-gnutls-dev nodejs python-arvados-cwl-runner
# Install dependencies and set up system.
RUN /usr/sbin/adduser --disabled-password \
# apt.arvados.org
-deb http://apt.arvados.org/ wheezy main
+deb http://apt.arvados.org/ jessie main
#!/usr/bin/env python
+# Implement cwl-runner interface for submitting and running jobs on Arvados.
+
import argparse
import arvados
import arvados.events
import re
import os
import sys
+import functools
+import json
+import pkg_resources # part of setuptools
-from cwltool.process import get_feature
+from cwltool.process import get_feature, adjustFiles, scandeps
from arvados.api import OrderedJsonModel
logger = logging.getLogger('arvados.cwl-runner')
def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
+ """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
+
if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
if image_tag:
args.append(image_tag)
logger.info("Uploading Docker image %s", ":".join(args[1:]))
- arvados.commands.keepdocker.main(args)
+ arvados.commands.keepdocker.main(args, stdout=sys.stderr)
return dockerRequirement["dockerImageId"]
class CollectionFsAccess(cwltool.process.StdFsAccess):
+ """Implement the cwltool FsAccess interface for Arvados Collections."""
+
def __init__(self, basedir):
self.collections = {}
self.basedir = basedir
return os.path.exists(self._abs(fn))
class ArvadosJob(object):
+ """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
+
def __init__(self, runner):
self.arvrunner = runner
self.running = False
del self.arvrunner.jobs[record["uuid"]]
+class RunnerJob(object):
+ """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
+
+ def __init__(self, runner, tool, job_order, enable_reuse):
+ self.arvrunner = runner
+ self.tool = tool
+ self.job_order = job_order
+ self.running = False
+ self.enable_reuse = enable_reuse
+
+ def update_pipeline_component(self, record):
+ pass
+
+ def upload_docker(self, tool):
+ if isinstance(tool, cwltool.draft2tool.CommandLineTool):
+ (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
+ if docker_req:
+ arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
+ elif isinstance(tool, cwltool.workflow.Workflow):
+ for s in tool.steps:
+ self.upload_docker(s.embedded_tool)
+
+ def run(self, dry_run=False, pull_image=True, **kwargs):
+ self.upload_docker(self.tool)
+
+ workflowfiles = set()
+ jobfiles = set()
+ workflowfiles.add(self.tool.tool["id"])
+
+ self.name = os.path.basename(self.tool.tool["id"])
+
+ def visitFiles(files, path):
+ files.add(path)
+ return path
+
+ document_loader, _, _ = cwltool.process.get_schema()
+ def loadref(b, u):
+ return document_loader.resolve_ref(u, base_url=b)[0]
+
+ sc = scandeps("", self.tool.tool,
+ set(("$import", "run")),
+ set(("$include", "$schemas", "path")),
+ loadref)
+ adjustFiles(sc, functools.partial(visitFiles, workflowfiles))
+ adjustFiles(self.job_order, functools.partial(visitFiles, jobfiles))
+
+ workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
+ "%s",
+ "%s/%s",
+ name=self.name,
+ **kwargs)
+
+ jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
+ "%s",
+ "%s/%s",
+ name=os.path.basename(self.job_order.get("id", "#")),
+ **kwargs)
+
+ adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
+
+ if "id" in self.job_order:
+ del self.job_order["id"]
+
+ self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
+
+ response = self.arvrunner.api.jobs().create(body={
+ "script": "cwl-runner",
+ "script_version": "master",
+ "repository": "arvados",
+ "script_parameters": self.job_order,
+ "runtime_constraints": {
+ "docker_image": "arvados/jobs"
+ }
+ }, find_or_create=self.enable_reuse).execute(num_retries=self.arvrunner.num_retries)
+
+ self.arvrunner.jobs[response["uuid"]] = self
+
+ logger.info("Submitted job %s", response["uuid"])
+
+ if response["state"] in ("Complete", "Failed", "Cancelled"):
+ self.done(response)
+
+ def done(self, record):
+ if record["state"] == "Complete":
+ processStatus = "success"
+ else:
+ processStatus = "permanentFail"
+
+ outputs = None
+ try:
+ try:
+ outc = arvados.collection.Collection(record["output"])
+ with outc.open("cwl.output.json") as f:
+ outputs = json.load(f)
+ except Exception as e:
+ logger.error("While getting final output object: %s", e)
+ self.arvrunner.output_callback(outputs, processStatus)
+ finally:
+ del self.arvrunner.jobs[record["uuid"]]
+
class ArvPathMapper(cwltool.pathmapper.PathMapper):
- def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
+ """Convert container-local paths to and from Keep collection ids."""
+
+ def __init__(self, arvrunner, referenced_files, basedir,
+ collection_pattern, file_pattern, name=None, **kwargs):
self._pathmap = arvrunner.get_uploaded()
- uploadfiles = []
+ uploadfiles = set()
pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
for src in referenced_files:
if isinstance(src, basestring) and pdh_path.match(src):
- self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
+ self._pathmap[src] = (src, collection_pattern % src[5:])
+ if "#" in src:
+ src = src[:src.index("#")]
if src not in self._pathmap:
ab = cwltool.pathmapper.abspath(src, basedir)
- st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
+ st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
if kwargs.get("conformance_test"):
self._pathmap[src] = (src, ab)
elif isinstance(st, arvados.commands.run.UploadFile):
- uploadfiles.append((src, ab, st))
+ uploadfiles.add((src, ab, st))
elif isinstance(st, arvados.commands.run.ArvFile):
self._pathmap[src] = (ab, st.fn)
else:
arvrunner.api,
dry_run=kwargs.get("dry_run"),
num_retries=3,
- fnPattern="$(task.keep)/%s/%s",
+ fnPattern=file_pattern,
+ name=name,
project=arvrunner.project_uuid)
for src, ab, st in uploadfiles:
class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
+ """Wrap cwltool CommandLineTool to override selected methods."""
+
def __init__(self, arvrunner, toolpath_object, **kwargs):
super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
self.arvrunner = arvrunner
return ArvadosJob(self.arvrunner)
def makePathMapper(self, reffiles, input_basedir, **kwargs):
- return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
+ return ArvPathMapper(self.arvrunner, reffiles, input_basedir,
+ "$(task.keep)/%s",
+ "$(task.keep)/%s/%s",
+ **kwargs)
class ArvCwlRunner(object):
+ """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
+ complete, and report output."""
+
def __init__(self, api_client):
self.api = api_client
self.jobs = {}
def output_callback(self, out, processStatus):
if processStatus == "success":
logger.info("Overall job status is %s", processStatus)
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Complete"}).execute(num_retries=self.num_retries)
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Complete"}).execute(num_retries=self.num_retries)
else:
logger.warn("Overall job status is %s", processStatus)
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Failed"}).execute(num_retries=self.num_retries)
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Failed"}).execute(num_retries=self.num_retries)
self.final_output = out
self.uploaded[src] = pair
def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
+ self.debug = args.debug
+
+ if args.quiet:
+ logger.setLevel(logging.WARN)
+ logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
+
+ useruuid = self.api.users().current().execute()["uuid"]
+ self.project_uuid = args.project_uuid if args.project_uuid else useruuid
+ self.pipeline = None
+
+ if args.submit:
+ runnerjob = RunnerJob(self, tool, job_order, args.enable_reuse)
+ if not args.wait:
+ runnerjob.run()
+ return
+
events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
- self.debug = args.debug
self.fs_access = CollectionFsAccess(input_basedir)
kwargs["fs_access"] = self.fs_access
kwargs["outdir"] = "$(task.outdir)"
kwargs["tmpdir"] = "$(task.tmpdir)"
- useruuid = self.api.users().current().execute()["uuid"]
- self.project_uuid = args.project_uuid if args.project_uuid else useruuid
-
if kwargs.get("conformance_test"):
return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
else:
- self.pipeline = self.api.pipeline_instances().create(
- body={
- "owner_uuid": self.project_uuid,
- "name": shortname(tool.tool["id"]),
- "components": {},
- "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+ if args.submit:
+ jobiter = iter((runnerjob,))
+ else:
+ components = {}
+ if "cwl_runner_job" in kwargs:
+ components[os.path.basename(tool.tool["id"])] = {"job": kwargs["cwl_runner_job"]}
+
+ self.pipeline = self.api.pipeline_instances().create(
+ body={
+ "owner_uuid": self.project_uuid,
+ "name": shortname(tool.tool["id"]),
+ "components": components,
+ "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
- logger.info("Pipeline instance %s", self.pipeline["uuid"])
+ logger.info("Pipeline instance %s", self.pipeline["uuid"])
- jobiter = tool.job(job_order,
- input_basedir,
- self.output_callback,
- docker_outdir="$(task.outdir)",
- **kwargs)
+ jobiter = tool.job(job_order,
+ input_basedir,
+ self.output_callback,
+ docker_outdir="$(task.outdir)",
+ **kwargs)
try:
self.cond.acquire()
logger.error("Interrupted, marking pipeline as failed")
else:
logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Failed"}).execute(num_retries=self.num_retries)
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Failed"}).execute(num_retries=self.num_retries)
finally:
self.cond.release()
return self.final_output
+def versionstring():
+ """Print version string of key packages for provenance and debugging."""
+
+ arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
+ arvpkg = pkg_resources.require("arvados-python-client")
+ cwlpkg = pkg_resources.require("cwltool")
+
+ return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
+ "arvados-python-client", arvpkg[0].version,
+ "cwltool", cwlpkg[0].version)
def main(args, stdout, stderr, api_client=None):
args.insert(0, "--leave-outputs")
parser = cwltool.main.arg_parser()
+
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--enable-reuse", action="store_true",
default=True, dest="enable_reuse",
exgroup.add_argument("--disable-reuse", action="store_false",
default=True, dest="enable_reuse",
help="")
+
parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
+ exgroup = parser.add_mutually_exclusive_group()
+ exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
+ default=True, dest="submit")
+ exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
+ default=True, dest="submit")
+
+ exgroup = parser.add_mutually_exclusive_group()
+ exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
+ default=True, dest="wait")
+ exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
+ default=True, dest="wait")
+
try:
- runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
+ if api_client is None:
+ api_client=arvados.api('v1', model=OrderedJsonModel())
+ runner = ArvCwlRunner(api_client)
except Exception as e:
logger.error(e)
return 1
- return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)
+ return cwltool.main.main(args,
+ stdout=stdout,
+ stderr=stderr,
+ executor=runner.arvExecutor,
+ makeTool=runner.arvMakeTool,
+ parser=parser,
+ versionfunc=versionstring)
'bin/arvados-cwl-runner'
],
install_requires=[
- 'cwltool>=1.0.20160311170456',
- 'arvados-python-client>=0.1.20160219154918'
+ 'cwltool>=1.0.20160325200114',
+ 'arvados-python-client>=0.1.20160322001610'
],
test_suite='tests',
tests_require=['mock>=1.0'],
export ARVADOS_API_HOST=localhost:8000
export ARVADOS_API_HOST_INSECURE=1
export ARVADOS_API_TOKEN=\$(cat /var/lib/arvados/superuser_token)
+
+arv-keepdocker --pull arvados/jobs
+
env
exec ./run_test.sh "$@"
EOF
--- /dev/null
+blopper blubber
--- /dev/null
+{
+ "x": {
+ "class": "File",
+ "path": "input/blorp.txt"
+ }
+}
--- /dev/null
+import unittest
+import mock
+import arvados_cwl
+import sys
+import arvados
+import arvados.keep
+import arvados.collection
+import hashlib
+
+class TestSubmit(unittest.TestCase):
+ @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+ @mock.patch("arvados.collection.KeepClient")
+ @mock.patch("arvados.events.subscribe")
+ def test_submit(self, events, keep, keepdocker):
+ api = mock.MagicMock()
+ def putstub(p, **kwargs):
+ return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p))
+ keep().put.side_effect = putstub
+ keepdocker.return_value = True
+ api.users().current().execute.return_value = {"uuid": "zzzzz-tpzed-zzzzzzzzzzzzzzz"}
+ api.collections().list().execute.return_value = {"items": []}
+ api.collections().create().execute.side_effect = ({"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz1",
+ "portable_data_hash": "99999999999999999999999999999991+99"},
+ {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
+ "portable_data_hash": "99999999999999999999999999999992+99"})
+ api.jobs().create().execute.return_value = {"uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz", "state": "Queued"}
+
+ arvados_cwl.main(["--debug", "--submit", "--no-wait", "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ sys.stdout, sys.stderr, api_client=api)
+
+ api.collections().create.assert_has_calls([
+ mock.call(),
+ mock.call(body={'manifest_text': './tool 84ec4df683711de31b782505389a8843+429 0:16:blub.txt 16:413:submit_tool.cwl\n./wf 81d977a245a41b8e79859fbe00623fd0+344 0:344:submit_wf.cwl\n',
+ 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+ 'name': 'submit_wf.cwl'
+ }, ensure_unique_name=True),
+ mock.call().execute(),
+ mock.call(body={'manifest_text': '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
+ 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+ 'name': '#'
+ }, ensure_unique_name=True),
+ mock.call().execute()])
+
+ api.jobs().create.assert_called_with(
+ body={
+ 'runtime_constraints': {
+ 'docker_image': 'arvados/jobs'
+ },
+ 'script_parameters': {
+ 'x': {
+ 'path': '99999999999999999999999999999992+99/blorp.txt',
+ 'class': 'File'
+ },
+ 'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+ },
+ 'repository': 'arvados',
+ 'script_version': 'master',
+ 'script': 'cwl-runner'
+ },
+ find_or_create=True)
--- /dev/null
+blibber blubber
--- /dev/null
+# Test case for arvados-cwl-runner
+#
+# Used to test whether scanning a tool file for dependencies (e.g. default
+# value blub.txt) and uploading to Keep works as intended.
+
+class: CommandLineTool
+requirements:
+ - class: DockerRequirement
+ dockerPull: debian:8
+inputs:
+ - id: x
+ type: File
+ default:
+ class: File
+ path: blub.txt
+ inputBinding:
+ position: 1
+outputs: []
+baseCommand: cat
--- /dev/null
+# Test case for arvados-cwl-runner
+#
+# Used to test whether scanning a workflow file for dependencies
+# (e.g. submit_tool.cwl) and uploading to Keep works as intended.
+
+class: Workflow
+inputs:
+ - id: x
+ type: File
+outputs: []
+steps:
+ - id: step1
+ inputs:
+ - { id: x, source: "#x" }
+ outputs: []
+ run: ../tool/submit_tool.cwl
return [(image['collection'], image) for image in images
if image['collection'] in existing_coll_uuids]
-def main(arguments=None):
+def main(arguments=None, stdout=sys.stdout):
args = arg_parser.parse_args(arguments)
api = arvados.api('v1')
if args.image is None or args.image == 'images':
- fmt = "{:30} {:10} {:12} {:29} {:20}"
- print fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED")
+ fmt = "{:30} {:10} {:12} {:29} {:20}\n"
+ stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED"))
for i, j in list_images_in_arv(api, args.retries):
- print(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
+ stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
sys.exit(0)
# Pull the image if requested, unless the image is specified as a hash
make_link(api, args.retries, 'docker_image_repo+tag',
image_repo_tag, **link_base)
- print(coll_uuid)
+ stdout.write(coll_uuid + "\n")
sys.exit(0)
self.prefix = prefix
self.fn = fn
+ def __hash__(self):
+ return (self.prefix+self.fn).__hash__()
+
+ def __eq__(self, other):
+ return (self.prefix == other.prefix) and (self.fn == other.fn)
+
class UploadFile(ArvFile):
pass
return prefix+fn
-def uploadfiles(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)"):
+def uploadfiles(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)", name=None):
# Find the smallest path prefix that includes all the files that need to be uploaded.
# This starts at the root and iteratively removes common parent directory prefixes
- # until all file pathes no longer have a common parent.
+ # until all file paths no longer have a common parent.
n = True
pathprefix = "/"
while n:
stream = sp[0]
collection.start_new_stream(stream)
collection.write_file(f.fn, sp[1])
- item = api.collections().create(body={"owner_uuid": project, "manifest_text": collection.manifest_text()}).execute()
+
+ exists = api.collections().list(filters=[["owner_uuid", "=", project],
+ ["portable_data_hash", "=", collection.portable_data_hash()],
+ ["name", "=", name]]).execute(num_retries=num_retries)
+ if exists["items"]:
+ item = exists["items"][0]
+ logger.info("Using collection %s", item["uuid"])
+ else:
+ body = {"owner_uuid": project, "manifest_text": collection.manifest_text()}
+ if name is not None:
+ body["name"] = name
+ item = api.collections().create(body=body, ensure_unique_name=True).execute()
+ logger.info("Uploaded to %s", item["uuid"])
+
pdh = item["portable_data_hash"]
- logger.info("Uploaded to %s", item["uuid"])
for c in files:
c.fn = fnPattern % (pdh, c.fn)
install_requires=[
'google-api-python-client==1.4.2',
'oauth2client >=1.4.6, <2',
+ 'pyasn1-modules==0.0.5',
'ciso8601',
'httplib2',
'pycurl >=7.19.5.1, <7.21.5',
class Arvados::V1::JobsController < ApplicationController
+ accept_attribute_as_json :components, Hash
accept_attribute_as_json :script_parameters, Hash
accept_attribute_as_json :runtime_constraints, Hash
accept_attribute_as_json :tasks_summary, Hash
include HasUuid
include KindAndEtag
include CommonApiTemplate
+ serialize :components, Hash
attr_protected :arvados_sdk_version, :docker_image_locator
serialize :script_parameters, Hash
serialize :runtime_constraints, Hash
t.add :queue_position
t.add :node_uuids
t.add :description
+ t.add :components
end
# Supported states for a job
output_changed? or
log_changed? or
tasks_summary_changed? or
- state_changed?
+ state_changed? or
+ components_changed?
logger.warn "User #{current_user.uuid if current_user} tried to change protected job attributes on locked #{self.class.to_s} #{uuid_was}"
return false
end
--- /dev/null
+class AddComponentsToJob < ActiveRecord::Migration
+ def up
+ add_column :jobs, :components, :text
+ end
+
+ def down
+ if column_exists?(:jobs, :components)
+ remove_column :jobs, :components
+ end
+ end
+end
priority integer DEFAULT 0 NOT NULL,
description character varying(524288),
state character varying(255),
- arvados_sdk_version character varying(255)
+ arvados_sdk_version character varying(255),
+ components text
);
INSERT INTO schema_migrations (version) VALUES ('20160208210629');
-INSERT INTO schema_migrations (version) VALUES ('20160209155729');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20160209155729');
+
+INSERT INTO schema_migrations (version) VALUES ('20160324144017');
\ No newline at end of file
log: zzzzz-4zz18-fy296fx3hot09f7
output: zzzzz-4zz18-bv31uwvy3neko21
+running_job_with_components:
+ uuid: zzzzz-8i9sb-with2components
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ cancelled_at: ~
+ cancelled_by_user_uuid: ~
+ cancelled_by_client_uuid: ~
+ created_at: <%= 3.minute.ago.to_s(:db) %>
+ started_at: <%= 3.minute.ago.to_s(:db) %>
+ finished_at: ~
+ script: hash
+ repository: active/foo
+ script_version: 1de84a854e2b440dc53bf42f8548afa4c17da332
+ running: true
+ success: ~
+ output: ~
+ priority: 0
+ log: ~
+ is_locked_by_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ tasks_summary:
+ failed: 0
+ todo: 3
+ running: 1
+ done: 1
+ runtime_constraints: {}
+ state: Running
+ components:
+ component1: zzzzz-8i9sb-jobuuid00000001
+ component2: zzzzz-d1hrv-pipelineuuid001
+
# Test Helper trims the rest of the file
# Do not add your fixtures below this line as the rest of this file will be trimmed by test_helper
assert_response :success
# verify that the user can no longer see the project
- @counter = 0 # Reset executed action counter
+ @test_counter = 0 # Reset executed action counter
@controller = Arvados::V1::GroupsController.new
authorize_with :project_viewer
get :index, filters: [['group_class', '=', 'project']], format: :json
assert_equal false, found_projects.include?(groups(:starred_and_shared_active_user_project).uuid)
# share the project
- @counter = 0
+ @test_counter = 0
@controller = Arvados::V1::LinksController.new
authorize_with :system_user
post :create, link: {
}
# verify that project_viewer user can now see shared project again
- @counter = 0
+ @test_counter = 0
@controller = Arvados::V1::GroupsController.new
authorize_with :project_viewer
get :index, filters: [['group_class', '=', 'project']], format: :json
assert_equal('077ba2ad3ea24a929091a9e6ce545c93199b8e57',
internal_tag(json_response['uuid']))
end
+
+ test 'get job with components' do
+ authorize_with :active
+ get :show, {id: jobs(:running_job_with_components).uuid}
+ assert_response :success
+ assert_not_nil json_response["components"]
+ assert_equal ["component1", "component2"], json_response["components"].keys
+ end
+
+ [
+ [:active, :success],
+ [:system_user, :success],
+ [:admin, 403],
+ ].each do |user, expected|
+ test "add components to job locked by active user as #{user} user and expect #{expected}" do
+ authorize_with user
+ put :update, {
+ id: jobs(:running).uuid,
+ job: {
+ components: {"component1" => "value1", "component2" => "value2"}
+ }
+ }
+ assert_response expected
+ if expected == :success
+ assert_not_nil json_response["components"]
+ keys = json_response["components"].keys
+ assert_equal ["component1", "component2"], keys
+ assert_equal "value1", json_response["components"][keys[0]]
+ end
+ end
+ end
+
+ test 'get_delete components_get again for job with components' do
+ authorize_with :active
+ get :show, {id: jobs(:running_job_with_components).uuid}
+ assert_response :success
+ assert_not_nil json_response["components"]
+ assert_equal ["component1", "component2"], json_response["components"].keys
+
+ # delete second component
+ @test_counter = 0 # Reset executed action counter
+ @controller = Arvados::V1::JobsController.new
+ put :update, {
+ id: jobs(:running_job_with_components).uuid,
+ job: {
+ components: {"component1" => "zzzzz-8i9sb-jobuuid00000001"}
+ }
+ }
+ assert_response :success
+
+ @test_counter = 0 # Reset executed action counter
+ @controller = Arvados::V1::JobsController.new
+ get :show, {id: jobs(:running_job_with_components).uuid}
+ assert_response :success
+ assert_not_nil json_response["components"]
+ assert_equal ["component1"], json_response["components"].keys
+
+ # delete all components
+ @test_counter = 0 # Reset executed action counter
+ @controller = Arvados::V1::JobsController.new
+ put :update, {
+ id: jobs(:running_job_with_components).uuid,
+ job: {
+ components: {}
+ }
+ }
+ assert_response :success
+
+ @test_counter = 0 # Reset executed action counter
+ @controller = Arvados::V1::JobsController.new
+ get :show, {id: jobs(:running_job_with_components).uuid}
+ assert_response :success
+ assert_not_nil json_response["components"]
+ assert_equal [], json_response["components"].keys
+ end
end
class ActionController::TestCase
setup do
- @counter = 0
+ @test_counter = 0
end
def check_counter action
- @counter += 1
- if @counter == 2
+ @test_counter += 1
+ if @test_counter == 2
assert_equal 1, 2, "Multiple actions in functional test"
end
end