# So we build this thing separately.
#
# Ward, 2016-03-17
-fpm_build schema_salad "" "" python 1.20.20161122192122 --depends "${PYTHON2_PKG_PREFIX}-lockfile >= 1:0.12.2-2"
+saladversion=$(cat "$WORKSPACE/sdk/cwl/setup.py" | grep schema-salad== | sed "s/.*==\(.*\)'.*/\1/")
+fpm_build schema_salad "" "" python $saladversion --depends "${PYTHON2_PKG_PREFIX}-lockfile >= 1:0.12.2-2"
# And schema_salad now depends on ruamel-yaml, which apparently has a braindead setup.py that requires special arguments to build (otherwise, it aborts with 'error: you have to install with "pip install ."'). Sigh.
# Ward, 2016-05-26
fpm_build cwltest "" "" python 1.0.20160907111242
# And for cwltool we have the same problem as for schema_salad. Ward, 2016-03-17
-cwltoolversion=$(cat "$WORKSPACE/sdk/cwl/setup.py" | grep cwltool== | sed "s/.*==\(1\.0\..*\)'.*/\1/")
+cwltoolversion=$(cat "$WORKSPACE/sdk/cwl/setup.py" | grep cwltool== | sed "s/.*==\(.*\)'.*/\1/")
fpm_build cwltool "" "" python $cwltoolversion
# FPM eats the trailing .0 in the python-rdflib-jsonld package when built with 'rdflib-jsonld>=0.3.0'. Force the version. Ward, 2016-03-25
-fpm_build rdflib-jsonld "" "" python 0.3.0
+fpm_build rdflib-jsonld "" "" python 0.4.0
# The PAM module
if [[ $TARGET =~ debian|ubuntu ]]; then
import arvados
import arvados.config
+from arvados.keep import KeepClient
from arvados.errors import ApiError
from .arvcontainer import ArvadosContainer, RunnerContainer
from. runner import Runner, upload_instance
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess
+from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
from .perf import Perf
from .pathmapper import FinalOutputPathMapper
from ._version import __version__
"""
- def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None):
+ def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
self.api = api_client
self.processes = {}
self.lock = threading.Lock()
self.final_output = None
self.final_status = None
self.uploaded = {}
- self.num_retries = 4
+ self.num_retries = num_retries
self.uuid = None
self.stop_polling = threading.Event()
self.poll_api = None
def arv_make_tool(self, toolpath_object, **kwargs):
kwargs["work_api"] = self.work_api
+ kwargs["fetcher_constructor"] = partial(CollectionFetcher,
+ api_client=self.api,
+ keep_client=self.keep_client)
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
return ArvadosCommandTool(self, toolpath_object, **kwargs)
elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
try:
if api_client is None:
api_client=arvados.api('v1', model=OrderedJsonModel())
- runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name, output_tags=arvargs.output_tags)
+ if keep_client is None:
+ keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
+ runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
+ num_retries=4, output_name=arvargs.output_name,
+ output_tags=arvargs.output_tags)
except Exception as e:
logger.error(e)
return 1
makeTool=runner.arv_make_tool,
versionfunc=versionstring,
job_order_object=job_order_object,
- make_fs_access=partial(CollectionFsAccess, api_client=api_client))
+ make_fs_access=partial(CollectionFsAccess,
+ api_client=api_client,
+ keep_client=keep_client),
+ fetcher_constructor=partial(CollectionFetcher,
+ api_client=api_client,
+ keep_client=keep_client),
+ resolver=partial(collectionResolver, api_client))
import json
import os
+import ruamel.yaml as yaml
+
from cwltool.errors import WorkflowException
from cwltool.process import get_feature, UnsupportedRequirement, shortname
from cwltool.pathmapper import adjustFiles
from .arvdocker import arv_docker_get_image
from . import done
from .runner import Runner, arvados_jobs_image
+from .fsaccess import CollectionFetcher
logger = logging.getLogger('arvados.cwl-runner')
json.dump(self.job_order, f, sort_keys=True, indent=4)
jobobj.save_new(owner_uuid=self.arvrunner.project_uuid)
- workflowname = os.path.basename(self.tool.tool["id"])
- workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
- workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
- workflowcollection = workflowcollection[5:workflowcollection.index('/')]
jobpath = "/var/lib/cwl/job/cwl.input.json"
- command = ["arvados-cwl-runner", "--local", "--api=containers"]
- if self.output_name:
- command.append("--output-name=" + self.output_name)
-
- if self.output_tags:
- command.append("--output-tags=" + self.output_tags)
-
- if self.enable_reuse:
- command.append("--enable-reuse")
- else:
- command.append("--disable-reuse")
-
- command.extend([workflowpath, jobpath])
-
- return {
- "command": command,
+ container_req = {
"owner_uuid": self.arvrunner.project_uuid,
"name": self.name,
"output_path": "/var/spool/cwl",
"state": "Committed",
"container_image": arvados_jobs_image(self.arvrunner),
"mounts": {
- "/var/lib/cwl/workflow": {
- "kind": "collection",
- "portable_data_hash": "%s" % workflowcollection
- },
jobpath: {
"kind": "collection",
"portable_data_hash": "%s/cwl.input.json" % jobobj.portable_data_hash()
}
}
+ workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
+ if workflowcollection.startswith("keep:"):
+ workflowcollection = workflowcollection[5:workflowcollection.index('/')]
+ workflowname = os.path.basename(self.tool.tool["id"])
+ workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
+ container_req["mounts"]["/var/lib/cwl/workflow"] = {
+ "kind": "collection",
+ "portable_data_hash": "%s" % workflowcollection
+ }
+ elif workflowcollection.startswith("arvwf:"):
+ workflowpath = "/var/lib/cwl/workflow.json#main"
+ fetcher = CollectionFetcher({}, None,
+ api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client)
+ wfobj = yaml.safe_load(fetcher.fetch_text(workflowcollection))
+ container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+ "kind": "json",
+ "json": wfobj
+ }
+
+ command = ["arvados-cwl-runner", "--local", "--api=containers"]
+ if self.output_name:
+ command.append("--output-name=" + self.output_name)
+
+ if self.output_tags:
+ command.append("--output-tags=" + self.output_tags)
+
+ if self.enable_reuse:
+ command.append("--enable-reuse")
+ else:
+ command.append("--disable-reuse")
+
+ command.extend([workflowpath, jobpath])
+
+ container_req["command"] = command
+
+ return container_req
+
+
def run(self, *args, **kwargs):
kwargs["keepprefix"] = "keep:"
job_spec = self.arvados_job_spec(*args, **kwargs)
import fnmatch
import os
import errno
+import urlparse
+import re
+
+import ruamel.yaml as yaml
import cwltool.stdfsaccess
from cwltool.pathmapper import abspath
+import cwltool.resolver
import arvados.util
import arvados.collection
import arvados.arvfile
+from schema_salad.ref_resolver import DefaultFetcher
+
class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
"""Implement the cwltool FsAccess interface for Arvados Collections."""
return path
else:
return os.path.realpath(path)
+
+class CollectionFetcher(DefaultFetcher):
+ def __init__(self, cache, session, api_client=None, keep_client=None):
+ super(CollectionFetcher, self).__init__(cache, session)
+ self.api_client = api_client
+ self.fsaccess = CollectionFsAccess("", api_client=api_client, keep_client=keep_client)
+
+ def fetch_text(self, url):
+ if url.startswith("keep:"):
+ with self.fsaccess.open(url, "r") as f:
+ return f.read()
+ if url.startswith("arvwf:"):
+ return self.api_client.workflows().get(uuid=url[6:]).execute()["definition"]
+ return super(CollectionFetcher, self).fetch_text(url)
+
+ def check_exists(self, url):
+ if url.startswith("keep:"):
+ return self.fsaccess.exists(url)
+ if url.startswith("arvwf:"):
+ if self.fetch_text(url):
+ return True
+ return super(CollectionFetcher, self).check_exists(url)
+
+ def urljoin(self, base_url, url):
+ if not url:
+ return base_url
+
+ urlsp = urlparse.urlsplit(url)
+ if urlsp.scheme or not base_url:
+ return url
+
+ basesp = urlparse.urlsplit(base_url)
+ if basesp.scheme in ("keep", "arvwf"):
+ if not basesp.path:
+ raise IOError(errno.EINVAL, "Invalid Keep locator", base_url)
+
+ baseparts = basesp.path.split("/")
+ urlparts = urlsp.path.split("/") if urlsp.path else []
+
+ pdh = baseparts.pop(0)
+
+ if basesp.scheme == "keep" and not arvados.util.keep_locator_pattern.match(pdh):
+ raise IOError(errno.EINVAL, "Invalid Keep locator", base_url)
+
+ if urlsp.path.startswith("/"):
+ baseparts = []
+ urlparts.pop(0)
+
+ if baseparts and urlsp.path:
+ baseparts.pop()
+
+ path = "/".join([pdh] + baseparts + urlparts)
+ return urlparse.urlunsplit((basesp.scheme, "", path, "", urlsp.fragment))
+
+ return super(CollectionFetcher, self).urljoin(base_url, url)
+
+workflow_uuid_pattern = re.compile(r'[a-z0-9]{5}-7fd4e-[a-z0-9]{15}')
+
+def collectionResolver(api_client, document_loader, uri):
+ if workflow_uuid_pattern.match(uri):
+ return "arvwf:%s#main" % (uri)
+
+ p = uri.split("/")
+ if arvados.util.keep_locator_pattern.match(p[0]):
+ return "keep:%s" % (uri)
+
+ if arvados.util.collection_uuid_pattern.match(p[0]):
+ return "keep:%s%s" % (self.api_client.collections().
+ get(uuid=uri).execute()["portable_data_hash"],
+ uri[len(p[0]):])
+
+ return cwltool.resolver.tool_resolver(document_loader, uri)
pass
else:
raise WorkflowException("File literal '%s' is missing contents" % src)
+ elif src.startswith("arvwf:"):
+ self._pathmap[src] = MapperEnt(src, src, "File")
else:
raise WorkflowException("Input file path '%s' is invalid" % st)
if "secondaryFiles" in srcobj:
loaded = set()
def loadref(b, u):
- joined = urlparse.urljoin(b, u)
+ joined = document_loader.fetcher.urljoin(b, u)
defrg, _ = urlparse.urldefrag(joined)
if defrg not in loaded:
loaded.add(defrg)
sc = scandeps(uri, scanobj,
loadref_fields,
set(("$include", "$schemas", "location")),
- loadref)
+ loadref, urljoin=document_loader.fetcher.urljoin)
normalizeFilesDirs(sc)
'bin/cwl-runner',
'bin/arvados-cwl-runner'
],
- # Make sure to update arvados/build/run-build-packages.sh as well
- # when updating the cwltool version pin.
+ # Note that arvados/build/run-build-packages.sh looks at this
+ # file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20161128202906',
+ 'cwltool==1.0.20161206212859',
+ 'schema-salad==1.21.20161206204028',
'arvados-python-client>=0.1.20160826210445'
],
data_files=[
arvjob.builder = mock.MagicMock()
arvjob.output_callback = mock.MagicMock()
arvjob.collect_outputs = mock.MagicMock()
+ arvjob.collect_outputs.return_value = {"out": "stuff"}
arvjob.done({
"state": "Complete",
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'name': 'Output 9999999 of testjob'})
+ arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
+
@mock.patch("arvados.collection.CollectionReader")
def test_done_use_existing_collection(self, reader):
api = mock.MagicMock()
arvjob.builder = mock.MagicMock()
arvjob.output_callback = mock.MagicMock()
arvjob.collect_outputs = mock.MagicMock()
+ arvjob.collect_outputs.return_value = {"out": "stuff"}
arvjob.done({
"state": "Complete",
self.assertFalse(api.collections().create.called)
+ arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
+
class TestWorkflow(unittest.TestCase):
# The test passes no builder.resources
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
+ @mock.patch("arvados.collection.CollectionReader")
+ @mock.patch("time.sleep")
+ @stubs
+ def test_submit_file_keepref(self, stubs, tm, collectionReader):
+ capture_stdout = cStringIO.StringIO()
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug",
+ "tests/wf/submit_keepref_wf.cwl"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+
+ @mock.patch("arvados.collection.CollectionReader")
+ @mock.patch("time.sleep")
+ @stubs
+ def test_submit_keepref(self, stubs, tm, reader):
+ capture_stdout = cStringIO.StringIO()
+
+ with open("tests/wf/expect_arvworkflow.cwl") as f:
+ reader().open().__enter__().read.return_value = f.read()
+
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug",
+ "keep:99999999999999999999999999999994+99/expect_arvworkflow.cwl#main", "-x", "XxX"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ expect_container = {
+ 'priority': 1,
+ 'mounts': {
+ '/var/spool/cwl': {
+ 'writable': True,
+ 'kind': 'collection'
+ },
+ 'stdout': {
+ 'path': '/var/spool/cwl/cwl.output.json',
+ 'kind': 'file'
+ },
+ '/var/lib/cwl/workflow': {
+ 'portable_data_hash': '99999999999999999999999999999994+99',
+ 'kind': 'collection'
+ },
+ '/var/lib/cwl/job/cwl.input.json': {
+ 'portable_data_hash': 'e5454f8ca7d5b181e21ecd45841e3373+58/cwl.input.json',
+ 'kind': 'collection'}
+ }, 'state': 'Committed',
+ 'owner_uuid': None,
+ 'output_path': '/var/spool/cwl',
+ 'name': 'expect_arvworkflow.cwl#main',
+ 'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
+ 'command': ['arvados-cwl-runner', '--local', '--api=containers', '--enable-reuse', '/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/job/cwl.input.json'],
+ 'cwd': '/var/spool/cwl',
+ 'runtime_constraints': {
+ 'API': True,
+ 'vcpus': 1,
+ 'ram': 1073741824
+ }
+ }
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=expect_container)
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
+
+ @mock.patch("time.sleep")
+ @stubs
+ def test_submit_arvworkflow(self, stubs, tm):
+ capture_stdout = cStringIO.StringIO()
+
+ with open("tests/wf/expect_arvworkflow.cwl") as f:
+ stubs.api.workflows().get().execute.return_value = {"definition": f.read()}
+
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug",
+ "962eh-7fd4e-gkbzl62qqtfig37", "-x", "XxX"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ expect_container = {
+ 'priority': 1,
+ 'mounts': {
+ '/var/spool/cwl': {
+ 'writable': True,
+ 'kind': 'collection'
+ },
+ 'stdout': {
+ 'path': '/var/spool/cwl/cwl.output.json',
+ 'kind': 'file'
+ },
+ '/var/lib/cwl/workflow.json': {
+ 'kind': 'json',
+ 'json': {
+ 'cwlVersion': 'v1.0',
+ '$graph': [
+ {
+ 'inputs': [
+ {
+ 'inputBinding': {'position': 1},
+ 'type': 'string',
+ 'id': '#submit_tool.cwl/x'}
+ ],
+ 'requirements': [
+ {'dockerPull': 'debian:8', 'class': 'DockerRequirement'}
+ ],
+ 'id': '#submit_tool.cwl',
+ 'outputs': [],
+ 'baseCommand': 'cat',
+ 'class': 'CommandLineTool'
+ }, {
+ 'id': '#main',
+ 'inputs': [
+ {'type': 'string', 'id': '#main/x'}
+ ],
+ 'steps': [
+ {'in': [{'source': '#main/x', 'id': '#main/step1/x'}],
+ 'run': '#submit_tool.cwl',
+ 'id': '#main/step1',
+ 'out': []}
+ ],
+ 'class': 'Workflow',
+ 'outputs': []
+ }
+ ]
+ }
+ },
+ '/var/lib/cwl/job/cwl.input.json': {
+ 'portable_data_hash': 'e5454f8ca7d5b181e21ecd45841e3373+58/cwl.input.json',
+ 'kind': 'collection'}
+ }, 'state': 'Committed',
+ 'owner_uuid': None,
+ 'output_path': '/var/spool/cwl',
+ 'name': 'arvwf:962eh-7fd4e-gkbzl62qqtfig37#main',
+ 'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
+ 'command': ['arvados-cwl-runner', '--local', '--api=containers', '--enable-reuse', '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/job/cwl.input.json'],
+ 'cwd': '/var/spool/cwl',
+ 'runtime_constraints': {
+ 'API': True,
+ 'vcpus': 1,
+ 'ram': 1073741824
+ }
+ }
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=expect_container)
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
+
@mock.patch("arvados.commands.keepdocker.find_one_image_hash")
@mock.patch("cwltool.docker.get_image")
@mock.patch("arvados.api")
--- /dev/null
+import functools
+import mock
+import sys
+import unittest
+import json
+import logging
+import os
+
+import arvados
+import arvados.keep
+import arvados.collection
+import arvados_cwl
+
+from arvados_cwl.fsaccess import CollectionFetcher
+
+class TestUrljoin(unittest.TestCase):
+ def test_urljoin(self):
+ """Test path joining for keep references."""
+
+ cf = CollectionFetcher({}, None)
+
+ self.assertEquals("keep:99999999999999999999999999999991+99/hw.py",
+ cf.urljoin("keep:99999999999999999999999999999991+99", "hw.py"))
+
+ self.assertEquals("keep:99999999999999999999999999999991+99/hw.py",
+ cf.urljoin("keep:99999999999999999999999999999991+99/", "hw.py"))
+
+ self.assertEquals("keep:99999999999999999999999999999991+99/hw.py#main",
+ cf.urljoin("keep:99999999999999999999999999999991+99", "hw.py#main"))
+
+ self.assertEquals("keep:99999999999999999999999999999991+99/hw.py#main",
+ cf.urljoin("keep:99999999999999999999999999999991+99/hw.py", "#main"))
+
+ self.assertEquals("keep:99999999999999999999999999999991+99/dir/hw.py#main",
+ cf.urljoin("keep:99999999999999999999999999999991+99/dir/hw.py", "#main"))
+
+ self.assertEquals("keep:99999999999999999999999999999991+99/dir/wh.py",
+ cf.urljoin("keep:99999999999999999999999999999991+99/dir/hw.py", "wh.py"))
+
+ self.assertEquals("keep:99999999999999999999999999999991+99/wh.py",
+ cf.urljoin("keep:99999999999999999999999999999991+99/dir/hw.py", "/wh.py"))
+
+ self.assertEquals("keep:99999999999999999999999999999991+99/wh.py#main",
+ cf.urljoin("keep:99999999999999999999999999999991+99/dir/hw.py", "/wh.py#main"))
+
+ self.assertEquals("keep:99999999999999999999999999999991+99/wh.py",
+ cf.urljoin("keep:99999999999999999999999999999991+99/hw.py#main", "wh.py"))
+
+ self.assertEquals("keep:99999999999999999999999999999992+99",
+ cf.urljoin("keep:99999999999999999999999999999991+99", "keep:99999999999999999999999999999992+99"))
+
+ self.assertEquals("keep:99999999999999999999999999999991+99/dir/wh.py",
+ cf.urljoin("keep:99999999999999999999999999999991+99/dir/", "wh.py"))
+
+ def test_resolver(self):
+ pass
--- /dev/null
+$graph:
+- baseCommand: cat
+ class: CommandLineTool
+ id: '#submit_tool.cwl'
+ inputs:
+ - id: '#submit_tool.cwl/x'
+ inputBinding: {position: 1}
+ type: string
+ outputs: []
+ requirements:
+ - {class: DockerRequirement, dockerPull: 'debian:8'}
+- class: Workflow
+ id: '#main'
+ inputs:
+ - id: '#main/x'
+ type: string
+ outputs: []
+ steps:
+ - id: '#main/step1'
+ in:
+ - {id: '#main/step1/x', source: '#main/x'}
+ out: []
+ run: '#submit_tool.cwl'
+cwlVersion: v1.0
type: File
outputs: []
requirements:
- - {class: DockerRequirement, dockerImageId: 'debian:8', dockerPull: 'debian:8'}
+ - {class: DockerRequirement, dockerPull: 'debian:8'}
- class: Workflow
id: '#main'
inputs:
--- /dev/null
+# Test case for arvados-cwl-runner
+#
+# Used to test whether scanning a workflow file for dependencies
+# (e.g. submit_tool.cwl) and uploading to Keep works as intended.
+
+class: Workflow
+cwlVersion: v1.0
+inputs:
+ x:
+ type: File
+ default:
+ class: File
+ location: keep:99999999999999999999999999999994+99/blorp.txt
+outputs: []
+steps:
+ step1:
+ in:
+ x: x
+ out: []
+ run: ../tool/submit_tool.cwl