Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>
-[comment]: # (Copyright © The Arvados Authors. All rights reserved.)
+[comment]: # (Copyright (c) The Arvados Authors. All rights reserved.)
[comment]: # ()
[comment]: # (SPDX-License-Identifier: CC-BY-SA-3.0)
from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
+from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
if self.intermediate_output_ttl < 0:
raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
+ if kwargs.get("submit_request_uuid") and self.work_api != "containers":
+ raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
+
if not kwargs.get("name"):
kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
default=None)
+ parser.add_argument("--submit-request-uuid", type=str,
+ default=None,
+ help="Update and commit supplied container request instead of creating a new one (containers API only).")
+
parser.add_argument("--name", type=str,
help="Name to use for workflow execution instance.",
default=None)
container_request = {
"command": self.command_line,
- "owner_uuid": self.arvrunner.project_uuid,
"name": self.name,
"output_path": self.outdir,
"cwd": self.outdir,
}
runtime_constraints = {}
+ if self.arvrunner.project_uuid:
+ container_request["owner_uuid"] = self.arvrunner.project_uuid
+
if self.arvrunner.secret_store.has_secret(self.command_line):
raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
try:
- response = self.arvrunner.api.container_requests().create(
- body=container_request
- ).execute(num_retries=self.arvrunner.num_retries)
+ if kwargs.get("submit_request_uuid"):
+ response = self.arvrunner.api.container_requests().update(
+ uuid=kwargs["submit_request_uuid"],
+ body=container_request
+ ).execute(num_retries=self.arvrunner.num_retries)
+ else:
+ response = self.arvrunner.api.container_requests().create(
+ body=container_request
+ ).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
self.arvrunner.process_submitted(self)
self.job_order[param] = {"$include": mnt}
container_req = {
- "owner_uuid": self.arvrunner.project_uuid,
"name": self.name,
"output_path": "/var/spool/cwl",
"cwd": "/var/spool/cwl",
def run(self, **kwargs):
kwargs["keepprefix"] = "keep:"
job_spec = self.arvados_job_spec(**kwargs)
- job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
+ if self.arvrunner.project_uuid:
+ job_spec["owner_uuid"] = self.arvrunner.project_uuid
- response = self.arvrunner.api.container_requests().create(
- body=job_spec
- ).execute(num_retries=self.arvrunner.num_retries)
+ if kwargs.get("submit_request_uuid"):
+ response = self.arvrunner.api.container_requests().update(
+ uuid=kwargs["submit_request_uuid"],
+ body=job_spec
+ ).execute(num_retries=self.arvrunner.num_retries)
+ else:
+ response = self.arvrunner.api.container_requests().create(
+ body=job_spec
+ ).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
self.arvrunner.process_submitted(self)
})
kwargs["loader"] = self.doc_loader
kwargs["avsc_names"] = self.doc_schema
+ kwargs["metadata"] = self.metadata
return ArvadosCommandTool(self.arvrunner, wf_runner, **kwargs).job(joborder_resolved, output_callback, **kwargs)
else:
return super(ArvadosWorkflow, self).job(joborder, output_callback, **kwargs)
args.priority = arvados_cwl.DEFAULT_PRIORITY
args.do_validate = True
args.disable_js_validation = False
+ args.tmp_outdir_prefix = "tmp"
runner.arv_executor(t, job_order_object, **vars(args))
except Exception as e:
--- /dev/null
+import requests
+import email.utils
+import time
+import datetime
+import re
+import arvados
+import arvados.collection
+import urlparse
+import logging
+
+logger = logging.getLogger('arvados.cwl-runner')
+
+def my_formatdate(dt):
+ return email.utils.formatdate(timeval=time.mktime(dt.timetuple()),
+ localtime=False, usegmt=True)
+
+def my_parsedate(text):
+ parsed = email.utils.parsedate(text)
+ if parsed:
+ return datetime.datetime(*parsed[:6])
+ else:
+ return datetime.datetime(1970, 1, 1)
+
+def fresh_cache(url, properties, now):
+ pr = properties[url]
+ expires = None
+
+ logger.debug("Checking cache freshness for %s using %s", url, pr)
+
+ if "Cache-Control" in pr:
+ if re.match(r"immutable", pr["Cache-Control"]):
+ return True
+
+ g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
+ if g:
+ expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
+
+ if expires is None and "Expires" in pr:
+ expires = my_parsedate(pr["Expires"])
+
+ if expires is None:
+ # Use a default cache time of 24 hours if upstream didn't set
+ # any cache headers, to reduce redundant downloads.
+ expires = my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
+
+ if not expires:
+ return False
+
+ return (now < expires)
+
+def remember_headers(url, properties, headers, now):
+ properties.setdefault(url, {})
+ for h in ("Cache-Control", "ETag", "Expires", "Date", "Content-Length"):
+ if h in headers:
+ properties[url][h] = headers[h]
+ if "Date" not in headers:
+ properties[url]["Date"] = my_formatdate(now)
+
+
+def changed(url, properties, now):
+ req = requests.head(url, allow_redirects=True)
+ remember_headers(url, properties, req.headers, now)
+
+ if req.status_code != 200:
+ raise Exception("Got status %s" % req.status_code)
+
+ pr = properties[url]
+ if "ETag" in pr and "ETag" in req.headers:
+ if pr["ETag"] == req.headers["ETag"]:
+ return False
+
+ return True
+
+def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
+ r = api.collections().list(filters=[["properties", "exists", url]]).execute()
+
+ now = utcnow()
+
+ for item in r["items"]:
+ properties = item["properties"]
+ if fresh_cache(url, properties, now):
+ # Do nothing
+ cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
+ return "keep:%s/%s" % (item["portable_data_hash"], cr.keys()[0])
+
+ if not changed(url, properties, now):
+ # ETag didn't change, same content, just update headers
+ api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
+ cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
+ return "keep:%s/%s" % (item["portable_data_hash"], cr.keys()[0])
+
+ properties = {}
+ req = requests.get(url, stream=True, allow_redirects=True)
+
+ if req.status_code != 200:
+ raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
+
+ remember_headers(url, properties, req.headers, now)
+
+ if "Content-Length" in properties[url]:
+ cl = int(properties[url]["Content-Length"])
+ logger.info("Downloading %s (%s bytes)", url, cl)
+ else:
+ cl = None
+ logger.info("Downloading %s (unknown size)", url)
+
+ c = arvados.collection.Collection()
+
+ if req.headers.get("Content-Disposition"):
+ grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))', req.headers["Content-Disposition"])
+ if grp.group(2):
+ name = grp.group(2)
+ else:
+ name = grp.group(4)
+ else:
+ name = urlparse.urlparse(url).path.split("/")[-1]
+
+ count = 0
+ start = time.time()
+ checkpoint = start
+ with c.open(name, "w") as f:
+ for chunk in req.iter_content(chunk_size=1024):
+ count += len(chunk)
+ f.write(chunk)
+ loopnow = time.time()
+ if (loopnow - checkpoint) > 20:
+ bps = (float(count)/float(loopnow - start))
+ if cl is not None:
+ logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left",
+ float(count * 100) / float(cl),
+ bps/(1024*1024),
+ (cl-count)/bps)
+ else:
+ logger.info("%d downloaded, %3.2f MiB/s", count, bps/(1024*1024))
+ checkpoint = loopnow
+
+ c.save_new(name="Downloaded from %s" % url, owner_uuid=project_uuid, ensure_unique_name=True)
+
+ api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
+
+ return "keep:%s/%s" % (c.portable_data_hash(), name)
from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
from cwltool.workflow import WorkflowException
+from .http import http_to_keep
+
logger = logging.getLogger('arvados.cwl-runner')
def trim_listing(obj):
raise WorkflowException("File literal '%s' is missing `contents`" % src)
if srcobj["class"] == "Directory" and "listing" not in srcobj:
raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
+ elif src.startswith("http:") or src.startswith("https:"):
+ keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
+ logger.info("%s is %s", src, keepref)
+ self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
else:
self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
else:
try:
save_version(setup_dir, module, git_latest_tag() + git_timestamp_tag())
- except subprocess.CalledProcessError:
+ except (subprocess.CalledProcessError, OSError):
pass
return read_version(setup_dir, module)
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20180522135731',
+ 'cwltool==1.0.20180524215209',
'schema-salad==2.7.20180501211602',
- 'typing==3.5.3.0',
+ 'typing >= 3.5.3',
'ruamel.yaml >=0.13.11, <0.15',
'arvados-python-client>=1.1.4.20180507184611',
'setuptools',
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+ basedir="", make_fs_access=make_fs_access, loader=Loader({}),
+ metadata={"cwlVersion": "v1.0"})
arvtool.formatgraph = None
for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_"+str(enable_reuse),
make_fs_access=make_fs_access, tmpdir="/tmp"):
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
avsc_names=avsc_names, make_fs_access=make_fs_access,
- loader=Loader({}))
+ loader=Loader({}), metadata={"cwlVersion": "v1.0"})
arvtool.formatgraph = None
for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_resource_requirements",
make_fs_access=make_fs_access, tmpdir="/tmp"):
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
avsc_names=avsc_names, make_fs_access=make_fs_access,
- loader=Loader({}))
+ loader=Loader({}), metadata={"cwlVersion": "v1.0"})
arvtool.formatgraph = None
for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_initial_work_dir",
make_fs_access=make_fs_access, tmpdir="/tmp"):
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+ basedir="", make_fs_access=make_fs_access, loader=Loader({}),
+ metadata={"cwlVersion": "v1.0"})
arvtool.formatgraph = None
for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_redirect",
make_fs_access=make_fs_access, tmpdir="/tmp"):
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+ basedir="", make_fs_access=make_fs_access, loader=Loader({}),
+ metadata={"cwlVersion": "v1.0"})
arvtool.formatgraph = None
job_order = {
"p1": {
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+ basedir="", make_fs_access=make_fs_access, loader=Loader({}),
+ metadata={"cwlVersion": "v1.0"})
arvtool.formatgraph = None
job_order = {"pw": "blorp"}
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import copy
+import cStringIO
+import functools
+import hashlib
+import json
+import logging
+import mock
+import sys
+import unittest
+import datetime
+
+import arvados
+import arvados.collection
+import arvados_cwl
+import arvados_cwl.runner
+import arvados.keep
+
+from .matcher import JsonDiffMatcher, StripYAMLComments
+from .mock_discovery import get_rootDesc
+
+import arvados_cwl.http
+
+import ruamel.yaml as yaml
+
+
+class TestHttpToKeep(unittest.TestCase):
+
+ @mock.patch("requests.get")
+ @mock.patch("arvados.collection.Collection")
+ def test_http_get(self, collectionmock, getmock):
+ api = mock.MagicMock()
+
+ api.collections().list().execute.return_value = {
+ "items": []
+ }
+
+ cm = mock.MagicMock()
+ cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+ cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+ collectionmock.return_value = cm
+
+ req = mock.MagicMock()
+ req.status_code = 200
+ req.headers = {}
+ req.iter_content.return_value = ["abc"]
+ getmock.return_value = req
+
+ utcnow = mock.MagicMock()
+ utcnow.return_value = datetime.datetime(2018, 5, 15)
+
+ r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+ getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True)
+
+ cm.open.assert_called_with("file1.txt", "w")
+ cm.save_new.assert_called_with(name="Downloaded from http://example.com/file1.txt",
+ owner_uuid=None, ensure_unique_name=True)
+
+ api.collections().update.assert_has_calls([
+ mock.call(uuid=cm.manifest_locator(),
+ body={"collection":{"properties": {'http://example.com/file1.txt': {'Date': 'Tue, 15 May 2018 00:00:00 GMT'}}}})
+ ])
+
+
+ @mock.patch("requests.get")
+ @mock.patch("arvados.collection.CollectionReader")
+ def test_http_expires(self, collectionmock, getmock):
+ api = mock.MagicMock()
+
+ api.collections().list().execute.return_value = {
+ "items": [{
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+ "portable_data_hash": "99999999999999999999999999999998+99",
+ "properties": {
+ 'http://example.com/file1.txt': {
+ 'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 17 May 2018 00:00:00 GMT'
+ }
+ }
+ }]
+ }
+
+ cm = mock.MagicMock()
+ cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+ cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+ cm.keys.return_value = ["file1.txt"]
+ collectionmock.return_value = cm
+
+ req = mock.MagicMock()
+ req.status_code = 200
+ req.headers = {}
+ req.iter_content.return_value = ["abc"]
+ getmock.return_value = req
+
+ utcnow = mock.MagicMock()
+ utcnow.return_value = datetime.datetime(2018, 5, 16)
+
+ r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+ getmock.assert_not_called()
+
+
+ @mock.patch("requests.get")
+ @mock.patch("arvados.collection.CollectionReader")
+ def test_http_cache_control(self, collectionmock, getmock):
+ api = mock.MagicMock()
+
+ api.collections().list().execute.return_value = {
+ "items": [{
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+ "portable_data_hash": "99999999999999999999999999999998+99",
+ "properties": {
+ 'http://example.com/file1.txt': {
+ 'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+ 'Cache-Control': 'max-age=172800'
+ }
+ }
+ }]
+ }
+
+ cm = mock.MagicMock()
+ cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+ cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+ cm.keys.return_value = ["file1.txt"]
+ collectionmock.return_value = cm
+
+ req = mock.MagicMock()
+ req.status_code = 200
+ req.headers = {}
+ req.iter_content.return_value = ["abc"]
+ getmock.return_value = req
+
+ utcnow = mock.MagicMock()
+ utcnow.return_value = datetime.datetime(2018, 5, 16)
+
+ r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+ getmock.assert_not_called()
+
+
+ @mock.patch("requests.get")
+ @mock.patch("requests.head")
+ @mock.patch("arvados.collection.Collection")
+ def test_http_expired(self, collectionmock, headmock, getmock):
+ api = mock.MagicMock()
+
+ api.collections().list().execute.return_value = {
+ "items": [{
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+ "portable_data_hash": "99999999999999999999999999999998+99",
+ "properties": {
+ 'http://example.com/file1.txt': {
+ 'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 16 May 2018 00:00:00 GMT'
+ }
+ }
+ }]
+ }
+
+ cm = mock.MagicMock()
+ cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz4"
+ cm.portable_data_hash.return_value = "99999999999999999999999999999997+99"
+ cm.keys.return_value = ["file1.txt"]
+ collectionmock.return_value = cm
+
+ req = mock.MagicMock()
+ req.status_code = 200
+ req.headers = {'Date': 'Tue, 17 May 2018 00:00:00 GMT'}
+ req.iter_content.return_value = ["def"]
+ getmock.return_value = req
+ headmock.return_value = req
+
+ utcnow = mock.MagicMock()
+ utcnow.return_value = datetime.datetime(2018, 5, 17)
+
+ r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, "keep:99999999999999999999999999999997+99/file1.txt")
+
+ getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True)
+
+ cm.open.assert_called_with("file1.txt", "w")
+ cm.save_new.assert_called_with(name="Downloaded from http://example.com/file1.txt",
+ owner_uuid=None, ensure_unique_name=True)
+
+ api.collections().update.assert_has_calls([
+ mock.call(uuid=cm.manifest_locator(),
+ body={"collection":{"properties": {'http://example.com/file1.txt': {'Date': 'Tue, 17 May 2018 00:00:00 GMT'}}}})
+ ])
+
+
+ @mock.patch("requests.get")
+ @mock.patch("requests.head")
+ @mock.patch("arvados.collection.CollectionReader")
+ def test_http_etag(self, collectionmock, headmock, getmock):
+ api = mock.MagicMock()
+
+ api.collections().list().execute.return_value = {
+ "items": [{
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+ "portable_data_hash": "99999999999999999999999999999998+99",
+ "properties": {
+ 'http://example.com/file1.txt': {
+ 'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
+ 'ETag': '123456'
+ }
+ }
+ }]
+ }
+
+ cm = mock.MagicMock()
+ cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+ cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+ cm.keys.return_value = ["file1.txt"]
+ collectionmock.return_value = cm
+
+ req = mock.MagicMock()
+ req.status_code = 200
+ req.headers = {
+ 'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+ 'ETag': '123456'
+ }
+ headmock.return_value = req
+
+ utcnow = mock.MagicMock()
+ utcnow.return_value = datetime.datetime(2018, 5, 17)
+
+ r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+ getmock.assert_not_called()
+ cm.open.assert_not_called()
+
+ api.collections().update.assert_has_calls([
+ mock.call(uuid=cm.manifest_locator(),
+ body={"collection":{"properties": {'http://example.com/file1.txt': {
+ 'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+ 'ETag': '123456'
+ }}}})
+ ])
+
+ @mock.patch("requests.get")
+ @mock.patch("arvados.collection.Collection")
+ def test_http_content_disp(self, collectionmock, getmock):
+ api = mock.MagicMock()
+
+ api.collections().list().execute.return_value = {
+ "items": []
+ }
+
+ cm = mock.MagicMock()
+ cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+ cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+ collectionmock.return_value = cm
+
+ req = mock.MagicMock()
+ req.status_code = 200
+ req.headers = {"Content-Disposition": "attachment; filename=file1.txt"}
+ req.iter_content.return_value = ["abc"]
+ getmock.return_value = req
+
+ utcnow = mock.MagicMock()
+ utcnow.return_value = datetime.datetime(2018, 5, 15)
+
+ r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/download?fn=/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+ getmock.assert_called_with("http://example.com/download?fn=/file1.txt", stream=True, allow_redirects=True)
+
+ cm.open.assert_called_with("file1.txt", "w")
+ cm.save_new.assert_called_with(name="Downloaded from http://example.com/download?fn=/file1.txt",
+ owner_uuid=None, ensure_unique_name=True)
+
+ api.collections().update.assert_has_calls([
+ mock.call(uuid=cm.manifest_locator(),
+ body={"collection":{"properties": {"http://example.com/download?fn=/file1.txt": {'Date': 'Tue, 15 May 2018 00:00:00 GMT'}}}})
+ ])
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+ basedir="", make_fs_access=make_fs_access, loader=Loader({}),
+ metadata={"cwlVersion": "v1.0"})
arvtool.formatgraph = None
for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
j.run(enable_reuse=enable_reuse)
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
- make_fs_access=make_fs_access, loader=Loader({}))
+ make_fs_access=make_fs_access, loader=Loader({}),
+ metadata={"cwlVersion": "v1.0"})
arvtool.formatgraph = None
for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
j.run(enable_reuse=True)
},
'secret_mounts': {},
'state': 'Committed',
- 'owner_uuid': None,
'command': ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
'--eval-timeout=20', '--thread-count=4',
'kind': 'json'
}
}, 'state': 'Committed',
- 'owner_uuid': None,
'output_path': '/var/spool/cwl',
'name': 'expect_arvworkflow.cwl#main',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
'kind': 'json'
}
}, 'state': 'Committed',
- 'owner_uuid': None,
'output_path': '/var/spool/cwl',
'name': 'a test workflow',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
},
"name": "secret_wf.cwl",
"output_path": "/var/spool/cwl",
- "owner_uuid": None,
"priority": 500,
"properties": {},
"runtime_constraints": {
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
+ @stubs
+ def test_submit_request_uuid(self, stubs):
+ stubs.expect_container_request_uuid = "zzzzz-xvhdp-yyyyyyyyyyyyyyy"
+
+ stubs.api.container_requests().update().execute.return_value = {
+ "uuid": stubs.expect_container_request_uuid,
+ "container_uuid": "zzzzz-dz642-zzzzzzzzzzzzzzz",
+ "state": "Queued"
+ }
+
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--submit-request-uuid=zzzzz-xvhdp-yyyyyyyyyyyyyyy",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ stubs.api.container_requests().update.assert_called_with(
+ uuid="zzzzz-xvhdp-yyyyyyyyyyyyyyy", body=JsonDiffMatcher(stubs.expect_container_spec))
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
class TestCreateTemplate(unittest.TestCase):
existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"
blkid arvados.SizedDigest
blk *BlockState
}
- nWorkers := 1 + runtime.NumCPU()
- todo := make(chan balanceTask, nWorkers)
- results := make(chan balanceResult, 16)
- var wg sync.WaitGroup
- for i := 0; i < nWorkers; i++ {
- wg.Add(1)
- go func() {
- for work := range todo {
- results <- bal.balanceBlock(work.blkid, work.blk)
+ workers := runtime.GOMAXPROCS(-1)
+ todo := make(chan balanceTask, workers)
+ go func() {
+ bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
+ todo <- balanceTask{
+ blkid: blkid,
+ blk: blk,
}
- wg.Done()
- }()
- }
- bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
- todo <- balanceTask{
- blkid: blkid,
- blk: blk,
- }
- })
- close(todo)
+ })
+ close(todo)
+ }()
+ results := make(chan balanceResult, workers)
go func() {
+ var wg sync.WaitGroup
+ for i := 0; i < workers; i++ {
+ wg.Add(1)
+ go func() {
+ for work := range todo {
+ results <- bal.balanceBlock(work.blkid, work.blk)
+ }
+ wg.Done()
+ }()
+ }
wg.Wait()
close(results)
}()
case <-ctx.Done():
theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
// Our pipe might be stuck in Write(), waiting for
- // io.Copy() to read. If so, un-stick it. This means
+ // PutReader() to read. If so, un-stick it. This means
// PutReader will get corrupt data, but that's OK: the
// size and MD5 won't match, so the write will fail.
go io.Copy(ioutil.Discard, bufr)
theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
return ctx.Err()
case <-ready:
+ // Unblock pipe in case PutReader did not consume it.
+ io.Copy(ioutil.Discard, bufr)
return v.translateError(err)
}
}
subsecs = float(subsec_match.group(1))
timestr = timestr[:subsec_match.start()] + 'Z'
return calendar.timegm(time.strptime(timestr + 'UTC',
- ARVADOS_TIMEFMT + '%Z'))
+ ARVADOS_TIMEFMT + '%Z')) + subsecs
def timestamp_fresh(timestamp, fresh_time):
return (time.time() - timestamp) < fresh_time
time_mock.return_value += 200
self.assertEqual(961, timer.next_opening())
self.assertFalse(timer.window_open())
+
+
+class ArvadosTimestamp(unittest.TestCase):
+ def test_arvados_timestamp(self):
+ self.assertEqual(1527710178, cnode.arvados_timestamp('2018-05-30T19:56:18Z'))
+ self.assertEqual(1527710178.999371, cnode.arvados_timestamp('2018-05-30T19:56:18.999371Z'))
from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
from . import testutil
from . import test_status
+from . import pykka_timeout
import logging
class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
unittest.TestCase):
+ def assertwait(self, f, timeout=pykka_timeout*2):
+ deadline = time.time() + timeout
+ while True:
+ try:
+ return f()
+ except AssertionError:
+ if time.time() > deadline:
+ raise
+ pass
+ time.sleep(.1)
+ self.daemon.ping().get(self.TIMEOUT)
+
def busywait(self, f):
for n in xrange(200):
ok = f()
self.assertIn('node_quota', status.tracker._latest)
def check_monitors_arvados_nodes(self, *arv_nodes):
- self.busywait(lambda: len(arv_nodes) == len(self.monitored_arvados_nodes()))
- self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
+ self.assertwait(lambda: self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes()))
def test_node_pairing(self):
cloud_node = testutil.cloud_node_mock(1)
arvados_nodes=[testutil.arvados_node_mock(1),
testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size])
- self.busywait(lambda: 2 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
for mon_ref in self.monitor_list():
self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
self.assertEqual(1, self.node_shutdown.start.call_count)
arvados_nodes=[testutil.arvados_node_mock(1),
testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size])
- self.busywait(lambda: 2 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
get_cloud_node = mock.MagicMock(name="get_cloud_node")
get_cloud_node.get.return_value = cloud_nodes[1]
mock_node_monitor = mock.MagicMock()
self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
- self.busywait(lambda: 2 == self.alive_monitor_count())
+ self.assertwait(lambda: self.assertEqual(2, self.alive_monitor_count()))
for mon_ref in self.monitor_list():
self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
arv_node = testutil.arvados_node_mock(2, job_uuid=True)
self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
[size], avail_sizes=[(size, {"cores":1})])
- self.busywait(lambda: 1 == self.paired_monitor_count())
- self.busywait(lambda: self.node_setup.start.called)
+ self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
+ self.assertwait(lambda: self.assertEqual(1, self.node_setup.start.called))
def test_boot_new_node_below_min_nodes(self):
min_size = testutil.MockSize(1)
arv_node = testutil.arvados_node_mock(1)
size = testutil.MockSize(1)
self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
- self.busywait(lambda: 1 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
cloud_node = testutil.cloud_node_mock(1)
arv_node = testutil.arvados_node_mock(1)
self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
- self.busywait(lambda: 1 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
testutil.arvados_node_mock(4, job_uuid=None)]
self.make_daemon(cloud_nodes, arv_nodes, [size])
- self.busywait(lambda: 2 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
for mon_ref in self.monitor_list():
monitor = mon_ref.proxy()
if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.last_shutdown.success.get.return_value = False
self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
- self.busywait(lambda: 1 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.last_shutdown.success.get.return_value = True
self.last_shutdown.stop.side_effect = lambda: monitor.stop()
self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
- self.busywait(lambda: 0 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(0, self.paired_monitor_count()))
def test_nodes_shutting_down_replaced_below_max_nodes(self):
size = testutil.MockSize(6)
cloud_node = testutil.cloud_node_mock(7)
self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
max_nodes=1)
- self.busywait(lambda: 1 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.assertTrue(self.node_shutdown.start.called)
arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
self.make_daemon(cloud_nodes, arv_nodes, [size],
avail_sizes=[(size, {"cores":1})])
- self.busywait(lambda: 2 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
for mon_ref in self.monitor_list():
self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
self.assertEqual(1, self.node_shutdown.start.call_count)
cloud_nodes = [testutil.cloud_node_mock(1, size=size)]
arv_nodes = [testutil.arvados_node_mock(1, job_uuid=None)]
self.make_daemon(cloud_nodes, arv_nodes, [size])
- self.busywait(lambda: 1 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
for mon_ref in self.monitor_list():
monitor = mon_ref.proxy()
if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
testutil.arvados_node_mock(3)],
want_sizes=[small, small, big],
avail_sizes=avail_sizes)
- self.busywait(lambda: 3 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(3, self.paired_monitor_count()))
self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
self.assertEqual(0, self.node_shutdown.start.call_count)
def test_nonfatal_error(self):
status.tracker.update({'actor_exceptions': 0})
kill_mock = mock.Mock('os.kill')
- act = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock).tell_proxy()
+ bgact = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock)
+ act_thread = bgact.proxy().get_thread().get()
+ act = bgact.tell_proxy()
act.doStuff()
act.actor_ref.stop(block=True)
+ act_thread.join()
self.assertFalse(kill_mock.called)
self.assertEqual(1, status.tracker.get('actor_exceptions'))