From: Lucas Di Pentima Date: Thu, 31 May 2018 19:24:07 +0000 (-0300) Subject: Merge branch 'master' into 7478-anm-spot-instances X-Git-Tag: 1.2.0~118^2~10 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/5836e576fe0b78c50383cf56e1c4fb4521daeca1?hp=00cca6a192eb1ab38559bf5ed9044711ed56fc4a Merge branch 'master' into 7478-anm-spot-instances Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima --- diff --git a/sdk/R/README.Rmd b/sdk/R/README.Rmd index c2fe078593..dcfa2186e9 100644 --- a/sdk/R/README.Rmd +++ b/sdk/R/README.Rmd @@ -1,4 +1,4 @@ -[comment]: # (Copyright © The Arvados Authors. All rights reserved.) +[comment]: # (Copyright (c) The Arvados Authors. All rights reserved.) [comment]: # () [comment]: # (SPDX-License-Identifier: CC-BY-SA-3.0) diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index d509f400f1..5c60f7d2a0 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -37,7 +37,7 @@ import arvados.commands._util as arv_cmd from .arvcontainer import ArvadosContainer, RunnerContainer from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate -from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies +from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps from .arvtool import ArvadosCommandTool from .arvworkflow import ArvadosWorkflow, upload_workflow from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache @@ -401,6 +401,9 @@ class ArvCwlRunner(object): if self.intermediate_output_ttl < 0: raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl) + if kwargs.get("submit_request_uuid") and self.work_api != "containers": + raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api)) + if not kwargs.get("name"): kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"]) @@ -706,6 +709,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__, default=None) + parser.add_argument("--submit-request-uuid", type=str, + default=None, + help="Update and commit supplied container request instead of creating a new one (containers API only).") + parser.add_argument("--name", type=str, help="Name to use for workflow execution instance.", default=None) diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index 4e7811d2e8..0bec692643 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -51,7 +51,6 @@ class ArvadosContainer(object): container_request = { "command": self.command_line, - "owner_uuid": self.arvrunner.project_uuid, "name": self.name, "output_path": self.outdir, "cwd": self.outdir, @@ -61,6 +60,9 @@ class ArvadosContainer(object): } runtime_constraints = {} + if self.arvrunner.project_uuid: + container_request["owner_uuid"] = self.arvrunner.project_uuid + if self.arvrunner.secret_store.has_secret(self.command_line): raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets") @@ -251,9 +253,15 @@ class ArvadosContainer(object): self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback) try: - response = self.arvrunner.api.container_requests().create( - body=container_request - ).execute(num_retries=self.arvrunner.num_retries) + if kwargs.get("submit_request_uuid"): + response = self.arvrunner.api.container_requests().update( + uuid=kwargs["submit_request_uuid"], + body=container_request + ).execute(num_retries=self.arvrunner.num_retries) + else: + response = self.arvrunner.api.container_requests().create( + body=container_request + ).execute(num_retries=self.arvrunner.num_retries) self.uuid = response["uuid"] self.arvrunner.process_submitted(self) @@ -343,7 +351,6 @@ class RunnerContainer(Runner): self.job_order[param] = {"$include": mnt} container_req = { - "owner_uuid": self.arvrunner.project_uuid, "name": self.name, "output_path": "/var/spool/cwl", "cwd": "/var/spool/cwl", @@ -442,11 +449,18 @@ class RunnerContainer(Runner): def run(self, **kwargs): kwargs["keepprefix"] = "keep:" job_spec = self.arvados_job_spec(**kwargs) - job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid) + if self.arvrunner.project_uuid: + job_spec["owner_uuid"] = self.arvrunner.project_uuid - response = self.arvrunner.api.container_requests().create( - body=job_spec - ).execute(num_retries=self.arvrunner.num_retries) + if kwargs.get("submit_request_uuid"): + response = self.arvrunner.api.container_requests().update( + uuid=kwargs["submit_request_uuid"], + body=job_spec + ).execute(num_retries=self.arvrunner.num_retries) + else: + response = self.arvrunner.api.container_requests().create( + body=job_spec + ).execute(num_retries=self.arvrunner.num_retries) self.uuid = response["uuid"] self.arvrunner.process_submitted(self) diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py index 6f731fd687..f675fb10e8 100644 --- a/sdk/cwl/arvados_cwl/arvworkflow.py +++ b/sdk/cwl/arvados_cwl/arvworkflow.py @@ -277,6 +277,7 @@ class ArvadosWorkflow(Workflow): }) kwargs["loader"] = self.doc_loader kwargs["avsc_names"] = self.doc_schema + kwargs["metadata"] = self.metadata return ArvadosCommandTool(self.arvrunner, wf_runner, **kwargs).job(joborder_resolved, output_callback, **kwargs) else: return super(ArvadosWorkflow, self).job(joborder, output_callback, **kwargs) diff --git a/sdk/cwl/arvados_cwl/crunch_script.py b/sdk/cwl/arvados_cwl/crunch_script.py index bf940eca4b..5024e95f77 100644 --- a/sdk/cwl/arvados_cwl/crunch_script.py +++ b/sdk/cwl/arvados_cwl/crunch_script.py @@ -132,6 +132,7 @@ def run(): args.priority = arvados_cwl.DEFAULT_PRIORITY args.do_validate = True args.disable_js_validation = False + args.tmp_outdir_prefix = "tmp" runner.arv_executor(t, job_order_object, **vars(args)) except Exception as e: diff --git a/sdk/cwl/arvados_cwl/http.py b/sdk/cwl/arvados_cwl/http.py new file mode 100644 index 0000000000..32fc1cf90d --- /dev/null +++ b/sdk/cwl/arvados_cwl/http.py @@ -0,0 +1,141 @@ +import requests +import email.utils +import time +import datetime +import re +import arvados +import arvados.collection +import urlparse +import logging + +logger = logging.getLogger('arvados.cwl-runner') + +def my_formatdate(dt): + return email.utils.formatdate(timeval=time.mktime(dt.timetuple()), + localtime=False, usegmt=True) + +def my_parsedate(text): + parsed = email.utils.parsedate(text) + if parsed: + return datetime.datetime(*parsed[:6]) + else: + return datetime.datetime(1970, 1, 1) + +def fresh_cache(url, properties, now): + pr = properties[url] + expires = None + + logger.debug("Checking cache freshness for %s using %s", url, pr) + + if "Cache-Control" in pr: + if re.match(r"immutable", pr["Cache-Control"]): + return True + + g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"]) + if g: + expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2))) + + if expires is None and "Expires" in pr: + expires = my_parsedate(pr["Expires"]) + + if expires is None: + # Use a default cache time of 24 hours if upstream didn't set + # any cache headers, to reduce redundant downloads. + expires = my_parsedate(pr["Date"]) + datetime.timedelta(hours=24) + + if not expires: + return False + + return (now < expires) + +def remember_headers(url, properties, headers, now): + properties.setdefault(url, {}) + for h in ("Cache-Control", "ETag", "Expires", "Date", "Content-Length"): + if h in headers: + properties[url][h] = headers[h] + if "Date" not in headers: + properties[url]["Date"] = my_formatdate(now) + + +def changed(url, properties, now): + req = requests.head(url, allow_redirects=True) + remember_headers(url, properties, req.headers, now) + + if req.status_code != 200: + raise Exception("Got status %s" % req.status_code) + + pr = properties[url] + if "ETag" in pr and "ETag" in req.headers: + if pr["ETag"] == req.headers["ETag"]: + return False + + return True + +def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow): + r = api.collections().list(filters=[["properties", "exists", url]]).execute() + + now = utcnow() + + for item in r["items"]: + properties = item["properties"] + if fresh_cache(url, properties, now): + # Do nothing + cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) + return "keep:%s/%s" % (item["portable_data_hash"], cr.keys()[0]) + + if not changed(url, properties, now): + # ETag didn't change, same content, just update headers + api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute() + cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) + return "keep:%s/%s" % (item["portable_data_hash"], cr.keys()[0]) + + properties = {} + req = requests.get(url, stream=True, allow_redirects=True) + + if req.status_code != 200: + raise Exception("Failed to download '%s' got status %s " % (url, req.status_code)) + + remember_headers(url, properties, req.headers, now) + + if "Content-Length" in properties[url]: + cl = int(properties[url]["Content-Length"]) + logger.info("Downloading %s (%s bytes)", url, cl) + else: + cl = None + logger.info("Downloading %s (unknown size)", url) + + c = arvados.collection.Collection() + + if req.headers.get("Content-Disposition"): + grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))', req.headers["Content-Disposition"]) + if grp.group(2): + name = grp.group(2) + else: + name = grp.group(4) + else: + name = urlparse.urlparse(url).path.split("/")[-1] + + count = 0 + start = time.time() + checkpoint = start + with c.open(name, "w") as f: + for chunk in req.iter_content(chunk_size=1024): + count += len(chunk) + f.write(chunk) + loopnow = time.time() + if (loopnow - checkpoint) > 20: + bps = (float(count)/float(loopnow - start)) + if cl is not None: + logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left", + float(count * 100) / float(cl), + bps/(1024*1024), + (cl-count)/bps) + else: + logger.info("%d downloaded, %3.2f MiB/s", count, bps/(1024*1024)) + checkpoint = loopnow + + c.save_new(name="Downloaded from %s" % url, owner_uuid=project_uuid, ensure_unique_name=True) + + api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute() + + return "keep:%s/%s" % (c.portable_data_hash(), name) diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py index 6fedb12030..bd4b5283fb 100644 --- a/sdk/cwl/arvados_cwl/pathmapper.py +++ b/sdk/cwl/arvados_cwl/pathmapper.py @@ -16,6 +16,8 @@ from schema_salad.sourceline import SourceLine from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs from cwltool.workflow import WorkflowException +from .http import http_to_keep + logger = logging.getLogger('arvados.cwl-runner') def trim_listing(obj): @@ -81,6 +83,10 @@ class ArvPathMapper(PathMapper): raise WorkflowException("File literal '%s' is missing `contents`" % src) if srcobj["class"] == "Directory" and "listing" not in srcobj: raise WorkflowException("Directory literal '%s' is missing `listing`" % src) + elif src.startswith("http:") or src.startswith("https:"): + keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src) + logger.info("%s is %s", src, keepref) + self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True) else: self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True) diff --git a/sdk/cwl/arvados_version.py b/sdk/cwl/arvados_version.py index a24d53dad6..13e6d36c07 100644 --- a/sdk/cwl/arvados_version.py +++ b/sdk/cwl/arvados_version.py @@ -34,7 +34,7 @@ def get_version(setup_dir, module): else: try: save_version(setup_dir, module, git_latest_tag() + git_timestamp_tag()) - except subprocess.CalledProcessError: + except (subprocess.CalledProcessError, OSError): pass return read_version(setup_dir, module) diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py index 4df89ee755..625f27518d 100644 --- a/sdk/cwl/setup.py +++ b/sdk/cwl/setup.py @@ -33,9 +33,9 @@ setup(name='arvados-cwl-runner', # Note that arvados/build/run-build-packages.sh looks at this # file to determine what version of cwltool and schema-salad to build. install_requires=[ - 'cwltool==1.0.20180522135731', + 'cwltool==1.0.20180524215209', 'schema-salad==2.7.20180501211602', - 'typing==3.5.3.0', + 'typing >= 3.5.3', 'ruamel.yaml >=0.13.11, <0.15', 'arvados-python-client>=1.1.4.20180507184611', 'setuptools', diff --git a/sdk/cwl/tests/test_container.py b/sdk/cwl/tests/test_container.py index 522946a4f4..2295e934ac 100644 --- a/sdk/cwl/tests/test_container.py +++ b/sdk/cwl/tests/test_container.py @@ -53,7 +53,8 @@ class TestContainer(unittest.TestCase): make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0)) arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names, - basedir="", make_fs_access=make_fs_access, loader=Loader({})) + basedir="", make_fs_access=make_fs_access, loader=Loader({}), + metadata={"cwlVersion": "v1.0"}) arvtool.formatgraph = None for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_"+str(enable_reuse), make_fs_access=make_fs_access, tmpdir="/tmp"): @@ -139,7 +140,7 @@ class TestContainer(unittest.TestCase): collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0)) arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names, make_fs_access=make_fs_access, - loader=Loader({})) + loader=Loader({}), metadata={"cwlVersion": "v1.0"}) arvtool.formatgraph = None for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_resource_requirements", make_fs_access=make_fs_access, tmpdir="/tmp"): @@ -251,7 +252,7 @@ class TestContainer(unittest.TestCase): collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0)) arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names, make_fs_access=make_fs_access, - loader=Loader({})) + loader=Loader({}), metadata={"cwlVersion": "v1.0"}) arvtool.formatgraph = None for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_initial_work_dir", make_fs_access=make_fs_access, tmpdir="/tmp"): @@ -352,7 +353,8 @@ class TestContainer(unittest.TestCase): make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0)) arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names, - basedir="", make_fs_access=make_fs_access, loader=Loader({})) + basedir="", make_fs_access=make_fs_access, loader=Loader({}), + metadata={"cwlVersion": "v1.0"}) arvtool.formatgraph = None for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_redirect", make_fs_access=make_fs_access, tmpdir="/tmp"): @@ -477,7 +479,8 @@ class TestContainer(unittest.TestCase): make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0)) arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names, - basedir="", make_fs_access=make_fs_access, loader=Loader({})) + basedir="", make_fs_access=make_fs_access, loader=Loader({}), + metadata={"cwlVersion": "v1.0"}) arvtool.formatgraph = None job_order = { "p1": { @@ -584,7 +587,8 @@ class TestContainer(unittest.TestCase): make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0)) arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names, - basedir="", make_fs_access=make_fs_access, loader=Loader({})) + basedir="", make_fs_access=make_fs_access, loader=Loader({}), + metadata={"cwlVersion": "v1.0"}) arvtool.formatgraph = None job_order = {"pw": "blorp"} diff --git a/sdk/cwl/tests/test_http.py b/sdk/cwl/tests/test_http.py new file mode 100644 index 0000000000..0c66c39c0b --- /dev/null +++ b/sdk/cwl/tests/test_http.py @@ -0,0 +1,286 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +import copy +import cStringIO +import functools +import hashlib +import json +import logging +import mock +import sys +import unittest +import datetime + +import arvados +import arvados.collection +import arvados_cwl +import arvados_cwl.runner +import arvados.keep + +from .matcher import JsonDiffMatcher, StripYAMLComments +from .mock_discovery import get_rootDesc + +import arvados_cwl.http + +import ruamel.yaml as yaml + + +class TestHttpToKeep(unittest.TestCase): + + @mock.patch("requests.get") + @mock.patch("arvados.collection.Collection") + def test_http_get(self, collectionmock, getmock): + api = mock.MagicMock() + + api.collections().list().execute.return_value = { + "items": [] + } + + cm = mock.MagicMock() + cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3" + cm.portable_data_hash.return_value = "99999999999999999999999999999998+99" + collectionmock.return_value = cm + + req = mock.MagicMock() + req.status_code = 200 + req.headers = {} + req.iter_content.return_value = ["abc"] + getmock.return_value = req + + utcnow = mock.MagicMock() + utcnow.return_value = datetime.datetime(2018, 5, 15) + + r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow) + self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt") + + getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True) + + cm.open.assert_called_with("file1.txt", "w") + cm.save_new.assert_called_with(name="Downloaded from http://example.com/file1.txt", + owner_uuid=None, ensure_unique_name=True) + + api.collections().update.assert_has_calls([ + mock.call(uuid=cm.manifest_locator(), + body={"collection":{"properties": {'http://example.com/file1.txt': {'Date': 'Tue, 15 May 2018 00:00:00 GMT'}}}}) + ]) + + + @mock.patch("requests.get") + @mock.patch("arvados.collection.CollectionReader") + def test_http_expires(self, collectionmock, getmock): + api = mock.MagicMock() + + api.collections().list().execute.return_value = { + "items": [{ + "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3", + "portable_data_hash": "99999999999999999999999999999998+99", + "properties": { + 'http://example.com/file1.txt': { + 'Date': 'Tue, 15 May 2018 00:00:00 GMT', + 'Expires': 'Tue, 17 May 2018 00:00:00 GMT' + } + } + }] + } + + cm = mock.MagicMock() + cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3" + cm.portable_data_hash.return_value = "99999999999999999999999999999998+99" + cm.keys.return_value = ["file1.txt"] + collectionmock.return_value = cm + + req = mock.MagicMock() + req.status_code = 200 + req.headers = {} + req.iter_content.return_value = ["abc"] + getmock.return_value = req + + utcnow = mock.MagicMock() + utcnow.return_value = datetime.datetime(2018, 5, 16) + + r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow) + self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt") + + getmock.assert_not_called() + + + @mock.patch("requests.get") + @mock.patch("arvados.collection.CollectionReader") + def test_http_cache_control(self, collectionmock, getmock): + api = mock.MagicMock() + + api.collections().list().execute.return_value = { + "items": [{ + "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3", + "portable_data_hash": "99999999999999999999999999999998+99", + "properties": { + 'http://example.com/file1.txt': { + 'Date': 'Tue, 15 May 2018 00:00:00 GMT', + 'Cache-Control': 'max-age=172800' + } + } + }] + } + + cm = mock.MagicMock() + cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3" + cm.portable_data_hash.return_value = "99999999999999999999999999999998+99" + cm.keys.return_value = ["file1.txt"] + collectionmock.return_value = cm + + req = mock.MagicMock() + req.status_code = 200 + req.headers = {} + req.iter_content.return_value = ["abc"] + getmock.return_value = req + + utcnow = mock.MagicMock() + utcnow.return_value = datetime.datetime(2018, 5, 16) + + r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow) + self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt") + + getmock.assert_not_called() + + + @mock.patch("requests.get") + @mock.patch("requests.head") + @mock.patch("arvados.collection.Collection") + def test_http_expired(self, collectionmock, headmock, getmock): + api = mock.MagicMock() + + api.collections().list().execute.return_value = { + "items": [{ + "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3", + "portable_data_hash": "99999999999999999999999999999998+99", + "properties": { + 'http://example.com/file1.txt': { + 'Date': 'Tue, 15 May 2018 00:00:00 GMT', + 'Expires': 'Tue, 16 May 2018 00:00:00 GMT' + } + } + }] + } + + cm = mock.MagicMock() + cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz4" + cm.portable_data_hash.return_value = "99999999999999999999999999999997+99" + cm.keys.return_value = ["file1.txt"] + collectionmock.return_value = cm + + req = mock.MagicMock() + req.status_code = 200 + req.headers = {'Date': 'Tue, 17 May 2018 00:00:00 GMT'} + req.iter_content.return_value = ["def"] + getmock.return_value = req + headmock.return_value = req + + utcnow = mock.MagicMock() + utcnow.return_value = datetime.datetime(2018, 5, 17) + + r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow) + self.assertEqual(r, "keep:99999999999999999999999999999997+99/file1.txt") + + getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True) + + cm.open.assert_called_with("file1.txt", "w") + cm.save_new.assert_called_with(name="Downloaded from http://example.com/file1.txt", + owner_uuid=None, ensure_unique_name=True) + + api.collections().update.assert_has_calls([ + mock.call(uuid=cm.manifest_locator(), + body={"collection":{"properties": {'http://example.com/file1.txt': {'Date': 'Tue, 17 May 2018 00:00:00 GMT'}}}}) + ]) + + + @mock.patch("requests.get") + @mock.patch("requests.head") + @mock.patch("arvados.collection.CollectionReader") + def test_http_etag(self, collectionmock, headmock, getmock): + api = mock.MagicMock() + + api.collections().list().execute.return_value = { + "items": [{ + "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3", + "portable_data_hash": "99999999999999999999999999999998+99", + "properties": { + 'http://example.com/file1.txt': { + 'Date': 'Tue, 15 May 2018 00:00:00 GMT', + 'Expires': 'Tue, 16 May 2018 00:00:00 GMT', + 'ETag': '123456' + } + } + }] + } + + cm = mock.MagicMock() + cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3" + cm.portable_data_hash.return_value = "99999999999999999999999999999998+99" + cm.keys.return_value = ["file1.txt"] + collectionmock.return_value = cm + + req = mock.MagicMock() + req.status_code = 200 + req.headers = { + 'Date': 'Tue, 17 May 2018 00:00:00 GMT', + 'Expires': 'Tue, 19 May 2018 00:00:00 GMT', + 'ETag': '123456' + } + headmock.return_value = req + + utcnow = mock.MagicMock() + utcnow.return_value = datetime.datetime(2018, 5, 17) + + r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow) + self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt") + + getmock.assert_not_called() + cm.open.assert_not_called() + + api.collections().update.assert_has_calls([ + mock.call(uuid=cm.manifest_locator(), + body={"collection":{"properties": {'http://example.com/file1.txt': { + 'Date': 'Tue, 17 May 2018 00:00:00 GMT', + 'Expires': 'Tue, 19 May 2018 00:00:00 GMT', + 'ETag': '123456' + }}}}) + ]) + + @mock.patch("requests.get") + @mock.patch("arvados.collection.Collection") + def test_http_content_disp(self, collectionmock, getmock): + api = mock.MagicMock() + + api.collections().list().execute.return_value = { + "items": [] + } + + cm = mock.MagicMock() + cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3" + cm.portable_data_hash.return_value = "99999999999999999999999999999998+99" + collectionmock.return_value = cm + + req = mock.MagicMock() + req.status_code = 200 + req.headers = {"Content-Disposition": "attachment; filename=file1.txt"} + req.iter_content.return_value = ["abc"] + getmock.return_value = req + + utcnow = mock.MagicMock() + utcnow.return_value = datetime.datetime(2018, 5, 15) + + r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/download?fn=/file1.txt", utcnow=utcnow) + self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt") + + getmock.assert_called_with("http://example.com/download?fn=/file1.txt", stream=True, allow_redirects=True) + + cm.open.assert_called_with("file1.txt", "w") + cm.save_new.assert_called_with(name="Downloaded from http://example.com/download?fn=/file1.txt", + owner_uuid=None, ensure_unique_name=True) + + api.collections().update.assert_has_calls([ + mock.call(uuid=cm.manifest_locator(), + body={"collection":{"properties": {"http://example.com/download?fn=/file1.txt": {'Date': 'Tue, 15 May 2018 00:00:00 GMT'}}}}) + ]) diff --git a/sdk/cwl/tests/test_job.py b/sdk/cwl/tests/test_job.py index abf96947eb..30930dd49a 100644 --- a/sdk/cwl/tests/test_job.py +++ b/sdk/cwl/tests/test_job.py @@ -59,7 +59,8 @@ class TestJob(unittest.TestCase): make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0)) arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names, - basedir="", make_fs_access=make_fs_access, loader=Loader({})) + basedir="", make_fs_access=make_fs_access, loader=Loader({}), + metadata={"cwlVersion": "v1.0"}) arvtool.formatgraph = None for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access): j.run(enable_reuse=enable_reuse) @@ -150,7 +151,8 @@ class TestJob(unittest.TestCase): make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0)) arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names, - make_fs_access=make_fs_access, loader=Loader({})) + make_fs_access=make_fs_access, loader=Loader({}), + metadata={"cwlVersion": "v1.0"}) arvtool.formatgraph = None for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access): j.run(enable_reuse=True) diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py index 77bef075fa..f8b557f6cb 100644 --- a/sdk/cwl/tests/test_submit.py +++ b/sdk/cwl/tests/test_submit.py @@ -234,7 +234,6 @@ def stubs(func): }, 'secret_mounts': {}, 'state': 'Committed', - 'owner_uuid': None, 'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps', '--disable-validate', '--eval-timeout=20', '--thread-count=4', @@ -751,7 +750,6 @@ class TestSubmit(unittest.TestCase): 'kind': 'json' } }, 'state': 'Committed', - 'owner_uuid': None, 'output_path': '/var/spool/cwl', 'name': 'expect_arvworkflow.cwl#main', 'container_image': 'arvados/jobs:'+arvados_cwl.__version__, @@ -870,7 +868,6 @@ class TestSubmit(unittest.TestCase): 'kind': 'json' } }, 'state': 'Committed', - 'owner_uuid': None, 'output_path': '/var/spool/cwl', 'name': 'a test workflow', 'container_image': 'arvados/jobs:'+arvados_cwl.__version__, @@ -1236,7 +1233,6 @@ class TestSubmit(unittest.TestCase): }, "name": "secret_wf.cwl", "output_path": "/var/spool/cwl", - "owner_uuid": None, "priority": 500, "properties": {}, "runtime_constraints": { @@ -1259,6 +1255,31 @@ class TestSubmit(unittest.TestCase): self.assertEqual(capture_stdout.getvalue(), stubs.expect_container_request_uuid + '\n') + @stubs + def test_submit_request_uuid(self, stubs): + stubs.expect_container_request_uuid = "zzzzz-xvhdp-yyyyyyyyyyyyyyy" + + stubs.api.container_requests().update().execute.return_value = { + "uuid": stubs.expect_container_request_uuid, + "container_uuid": "zzzzz-dz642-zzzzzzzzzzzzzzz", + "state": "Queued" + } + + capture_stdout = cStringIO.StringIO() + try: + exited = arvados_cwl.main( + ["--submit", "--no-wait", "--api=containers", "--debug", "--submit-request-uuid=zzzzz-xvhdp-yyyyyyyyyyyyyyy", + "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"], + capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client) + self.assertEqual(exited, 0) + except: + logging.exception("") + + stubs.api.container_requests().update.assert_called_with( + uuid="zzzzz-xvhdp-yyyyyyyyyyyyyyy", body=JsonDiffMatcher(stubs.expect_container_spec)) + self.assertEqual(capture_stdout.getvalue(), + stubs.expect_container_request_uuid + '\n') + class TestCreateTemplate(unittest.TestCase): existing_template_uuid = "zzzzz-d1hrv-validworkfloyml" diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 7b25d78852..d6a2dde9d7 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -365,27 +365,29 @@ func (bal *Balancer) ComputeChangeSets() { blkid arvados.SizedDigest blk *BlockState } - nWorkers := 1 + runtime.NumCPU() - todo := make(chan balanceTask, nWorkers) - results := make(chan balanceResult, 16) - var wg sync.WaitGroup - for i := 0; i < nWorkers; i++ { - wg.Add(1) - go func() { - for work := range todo { - results <- bal.balanceBlock(work.blkid, work.blk) + workers := runtime.GOMAXPROCS(-1) + todo := make(chan balanceTask, workers) + go func() { + bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) { + todo <- balanceTask{ + blkid: blkid, + blk: blk, } - wg.Done() - }() - } - bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) { - todo <- balanceTask{ - blkid: blkid, - blk: blk, - } - }) - close(todo) + }) + close(todo) + }() + results := make(chan balanceResult, workers) go func() { + var wg sync.WaitGroup + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + for work := range todo { + results <- bal.balanceBlock(work.blkid, work.blk) + } + wg.Done() + }() + } wg.Wait() close(results) }() diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go index 9d4d801928..bdab58927b 100644 --- a/services/keepstore/s3_volume.go +++ b/services/keepstore/s3_volume.go @@ -429,7 +429,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error { case <-ctx.Done(): theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err()) // Our pipe might be stuck in Write(), waiting for - // io.Copy() to read. If so, un-stick it. This means + // PutReader() to read. If so, un-stick it. This means // PutReader will get corrupt data, but that's OK: the // size and MD5 won't match, so the write will fail. go io.Copy(ioutil.Discard, bufr) @@ -438,6 +438,8 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error { theConfig.debugLogf("%s: abandoning PutReader goroutine", v) return ctx.Err() case <-ready: + // Unblock pipe in case PutReader did not consume it. + io.Copy(ioutil.Discard, bufr) return v.translateError(err) } } diff --git a/services/nodemanager/arvnodeman/computenode/__init__.py b/services/nodemanager/arvnodeman/computenode/__init__.py index 3c04118abe..b124c66540 100644 --- a/services/nodemanager/arvnodeman/computenode/__init__.py +++ b/services/nodemanager/arvnodeman/computenode/__init__.py @@ -33,7 +33,7 @@ def arvados_timestamp(timestr): subsecs = float(subsec_match.group(1)) timestr = timestr[:subsec_match.start()] + 'Z' return calendar.timegm(time.strptime(timestr + 'UTC', - ARVADOS_TIMEFMT + '%Z')) + ARVADOS_TIMEFMT + '%Z')) + subsecs def timestamp_fresh(timestamp, fresh_time): return (time.time() - timestamp) < fresh_time diff --git a/services/nodemanager/tests/test_computenode.py b/services/nodemanager/tests/test_computenode.py index 3f11ff6c2b..898112bdd8 100644 --- a/services/nodemanager/tests/test_computenode.py +++ b/services/nodemanager/tests/test_computenode.py @@ -37,3 +37,9 @@ class ShutdownTimerTestCase(unittest.TestCase): time_mock.return_value += 200 self.assertEqual(961, timer.next_opening()) self.assertFalse(timer.window_open()) + + +class ArvadosTimestamp(unittest.TestCase): + def test_arvados_timestamp(self): + self.assertEqual(1527710178, cnode.arvados_timestamp('2018-05-30T19:56:18Z')) + self.assertEqual(1527710178.999371, cnode.arvados_timestamp('2018-05-30T19:56:18.999371Z')) diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py index 8050e69814..d09cbf7235 100644 --- a/services/nodemanager/tests/test_daemon.py +++ b/services/nodemanager/tests/test_daemon.py @@ -17,11 +17,24 @@ from arvnodeman.jobqueue import ServerCalculator from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor from . import testutil from . import test_status +from . import pykka_timeout import logging class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin, unittest.TestCase): + def assertwait(self, f, timeout=pykka_timeout*2): + deadline = time.time() + timeout + while True: + try: + return f() + except AssertionError: + if time.time() > deadline: + raise + pass + time.sleep(.1) + self.daemon.ping().get(self.TIMEOUT) + def busywait(self, f): for n in xrange(200): ok = f() @@ -146,8 +159,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin, self.assertIn('node_quota', status.tracker._latest) def check_monitors_arvados_nodes(self, *arv_nodes): - self.busywait(lambda: len(arv_nodes) == len(self.monitored_arvados_nodes())) - self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes()) + self.assertwait(lambda: self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())) def test_node_pairing(self): cloud_node = testutil.cloud_node_mock(1) @@ -257,7 +269,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin, arvados_nodes=[testutil.arvados_node_mock(1), testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')], want_sizes=[size]) - self.busywait(lambda: 2 == self.paired_monitor_count()) + self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count())) for mon_ref in self.monitor_list(): self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT) self.assertEqual(1, self.node_shutdown.start.call_count) @@ -269,7 +281,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin, arvados_nodes=[testutil.arvados_node_mock(1), testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')], want_sizes=[size]) - self.busywait(lambda: 2 == self.paired_monitor_count()) + self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count())) get_cloud_node = mock.MagicMock(name="get_cloud_node") get_cloud_node.get.return_value = cloud_nodes[1] mock_node_monitor = mock.MagicMock() @@ -278,7 +290,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin, self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy() - self.busywait(lambda: 2 == self.alive_monitor_count()) + self.assertwait(lambda: self.assertEqual(2, self.alive_monitor_count())) for mon_ref in self.monitor_list(): self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT) self.busywait(lambda: 1 == self.node_shutdown.start.call_count) @@ -298,8 +310,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin, arv_node = testutil.arvados_node_mock(2, job_uuid=True) self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node], [size], avail_sizes=[(size, {"cores":1})]) - self.busywait(lambda: 1 == self.paired_monitor_count()) - self.busywait(lambda: self.node_setup.start.called) + self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count())) + self.assertwait(lambda: self.assertEqual(1, self.node_setup.start.called)) def test_boot_new_node_below_min_nodes(self): min_size = testutil.MockSize(1) @@ -543,7 +555,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin, arv_node = testutil.arvados_node_mock(1) size = testutil.MockSize(1) self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size]) - self.busywait(lambda: 1 == self.paired_monitor_count()) + self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count())) monitor = self.monitor_list()[0].proxy() self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT) self.stop_proxy(self.daemon) @@ -553,7 +565,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin, cloud_node = testutil.cloud_node_mock(1) arv_node = testutil.arvados_node_mock(1) self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1) - self.busywait(lambda: 1 == self.paired_monitor_count()) + self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count())) monitor = self.monitor_list()[0].proxy() self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT) self.stop_proxy(self.daemon) @@ -572,7 +584,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin, arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True), testutil.arvados_node_mock(4, job_uuid=None)] self.make_daemon(cloud_nodes, arv_nodes, [size]) - self.busywait(lambda: 2 == self.paired_monitor_count()) + self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count())) for mon_ref in self.monitor_list(): monitor = mon_ref.proxy() if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]: @@ -591,13 +603,13 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin, self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT) self.last_shutdown.success.get.return_value = False self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT) - self.busywait(lambda: 1 == self.paired_monitor_count()) + self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count())) self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT) self.last_shutdown.success.get.return_value = True self.last_shutdown.stop.side_effect = lambda: monitor.stop() self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT) - self.busywait(lambda: 0 == self.paired_monitor_count()) + self.assertwait(lambda: self.assertEqual(0, self.paired_monitor_count())) def test_nodes_shutting_down_replaced_below_max_nodes(self): size = testutil.MockSize(6) @@ -616,7 +628,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin, cloud_node = testutil.cloud_node_mock(7) self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)], max_nodes=1) - self.busywait(lambda: 1 == self.paired_monitor_count()) + self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count())) monitor = self.monitor_list()[0].proxy() self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT) self.assertTrue(self.node_shutdown.start.called) @@ -630,7 +642,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin, arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]] self.make_daemon(cloud_nodes, arv_nodes, [size], avail_sizes=[(size, {"cores":1})]) - self.busywait(lambda: 2 == self.paired_monitor_count()) + self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count())) for mon_ref in self.monitor_list(): self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT) self.assertEqual(1, self.node_shutdown.start.call_count) @@ -671,7 +683,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin, cloud_nodes = [testutil.cloud_node_mock(1, size=size)] arv_nodes = [testutil.arvados_node_mock(1, job_uuid=None)] self.make_daemon(cloud_nodes, arv_nodes, [size]) - self.busywait(lambda: 1 == self.paired_monitor_count()) + self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count())) for mon_ref in self.monitor_list(): monitor = mon_ref.proxy() if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]: @@ -770,7 +782,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin, testutil.arvados_node_mock(3)], want_sizes=[small, small, big], avail_sizes=avail_sizes) - self.busywait(lambda: 3 == self.paired_monitor_count()) + self.assertwait(lambda: self.assertEqual(3, self.paired_monitor_count())) self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT) self.assertEqual(0, self.node_shutdown.start.call_count) diff --git a/services/nodemanager/tests/test_failure.py b/services/nodemanager/tests/test_failure.py index 2d1a17eaec..8bf3ea8741 100644 --- a/services/nodemanager/tests/test_failure.py +++ b/services/nodemanager/tests/test_failure.py @@ -48,9 +48,12 @@ class ActorUnhandledExceptionTest(testutil.ActorTestMixin, unittest.TestCase): def test_nonfatal_error(self): status.tracker.update({'actor_exceptions': 0}) kill_mock = mock.Mock('os.kill') - act = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock).tell_proxy() + bgact = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock) + act_thread = bgact.proxy().get_thread().get() + act = bgact.tell_proxy() act.doStuff() act.actor_ref.stop(block=True) + act_thread.join() self.assertFalse(kill_mock.called) self.assertEqual(1, status.tracker.get('actor_exceptions'))