From c31c6528cac695bc86d4244516e07ea316cac979 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 8 Nov 2022 22:24:52 -0500 Subject: [PATCH] 19699: Add --prefer-cached-downloads, add tests Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- sdk/cwl/arvados_cwl/__init__.py | 3 + sdk/cwl/arvados_cwl/arvcontainer.py | 3 + sdk/cwl/arvados_cwl/context.py | 1 + sdk/cwl/arvados_cwl/executor.py | 7 +- sdk/cwl/arvados_cwl/http.py | 47 +++++--- sdk/cwl/arvados_cwl/pathmapper.py | 4 +- sdk/cwl/tests/test_http.py | 165 +++++++++++++++++++++++++++- sdk/cwl/tests/test_submit.py | 44 ++++++++ 8 files changed, 250 insertions(+), 24 deletions(-) diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 8de1b60bc4..196bea0390 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -224,6 +224,9 @@ def arg_parser(): # type: () -> argparse.ArgumentParser parser.add_argument("--varying-url-params", type=str, default="", help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.") + parser.add_argument("--prefer-cached-downloads", action="store_true", default=False, + help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).") + exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--enable-preemptible", dest="enable_preemptible", default=None, action="store_true", help="Use preemptible instances. Control individual steps with arv:UsePreemptible hint.") exgroup.add_argument("--disable-preemptible", dest="enable_preemptible", default=None, action="store_false", help="Don't use preemptible instances.") diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index b0ef4a22cc..f8715f7e7b 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -607,6 +607,9 @@ class RunnerContainer(Runner): if runtimeContext.varying_url_params: command.append("--varying-url-params="+runtimeContext.varying_url_params) + if runtimeContext.prefer_cached_downloads: + command.append("--prefer-cached-downloads") + command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"]) container_req["command"] = command diff --git a/sdk/cwl/arvados_cwl/context.py b/sdk/cwl/arvados_cwl/context.py index 66ccb26fae..3ce561f66d 100644 --- a/sdk/cwl/arvados_cwl/context.py +++ b/sdk/cwl/arvados_cwl/context.py @@ -41,6 +41,7 @@ class ArvRuntimeContext(RuntimeContext): self.copy_deps = None self.defer_downloads = False self.varying_url_params = "" + self.prefer_cached_downloads = False super(ArvRuntimeContext, self).__init__(kwargs) diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index cbeee92427..b042401d6b 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -113,6 +113,8 @@ class ArvCwlExecutor(object): arvargs.thread_count = 1 arvargs.collection_cache_size = None arvargs.git_info = True + arvargs.submit = False + arvargs.defer_downloads = False self.api = api_client self.processes = {} @@ -207,7 +209,6 @@ The 'jobs' API is no longer supported. collection_cache=self.collection_cache) self.defer_downloads = arvargs.submit and arvargs.defer_downloads - self.varying_url_params = arvargs.varying_url_params validate_cluster_target(self, self.toplevel_runtimeContext) @@ -364,8 +365,8 @@ The 'jobs' API is no longer supported. page = keys[:pageSize] try: proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries) - except Exception: - logger.exception("Error checking states on API server: %s") + except Exception as e: + logger.exception("Error checking states on API server: %s", e) remain_wait = self.poll_interval continue diff --git a/sdk/cwl/arvados_cwl/http.py b/sdk/cwl/arvados_cwl/http.py index 34f9211333..f2415bcffe 100644 --- a/sdk/cwl/arvados_cwl/http.py +++ b/sdk/cwl/arvados_cwl/http.py @@ -74,17 +74,21 @@ def remember_headers(url, properties, headers, now): def changed(url, clean_url, properties, now): req = requests.head(url, allow_redirects=True) - remember_headers(url, properties, req.headers, now) if req.status_code != 200: # Sometimes endpoints are misconfigured and will deny HEAD but # allow GET so instead of failing here, we'll try GET If-None-Match return True - pr = properties[clean_url] - if "ETag" in pr and "ETag" in req.headers: - if pr["ETag"] == req.headers["ETag"]: - return False + etag = properties[url].get("ETag") + + if url in properties: + del properties[url] + remember_headers(clean_url, properties, req.headers, now) + + if "ETag" in req.headers and etag == req.headers["ETag"]: + # Didn't change + return False return True @@ -97,36 +101,51 @@ def etag_quote(etag): return '"' + etag + '"' -def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow, varying_url_params=""): +def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow, varying_url_params="", prefer_cached_downloads=False): varying_params = [s.strip() for s in varying_url_params.split(",")] parsed = urllib.parse.urlparse(url) query = [q for q in urllib.parse.parse_qsl(parsed.query) if q[0] not in varying_params] + clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params, - urllib.parse.urlencode(query), parsed.fragment)) + urllib.parse.urlencode(query, safe="/"), parsed.fragment)) - r = api.collections().list(filters=[["properties", "exists", clean_url]]).execute() + r1 = api.collections().list(filters=[["properties", "exists", url]]).execute() + + if clean_url == url: + items = r1["items"] + else: + r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute() + items = r1["items"] + r2["items"] now = utcnow() etags = {} - for item in r["items"]: + for item in items: properties = item["properties"] - if fresh_cache(clean_url, properties, now): - # Do nothing + + if clean_url in properties: + cache_url = clean_url + elif url in properties: + cache_url = url + else: + return False + + if prefer_cached_downloads or fresh_cache(cache_url, properties, now): + # HTTP caching rules say we should use the cache cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0]) - if not changed(url, clean_url, properties, now): + if not changed(cache_url, clean_url, properties, now): # ETag didn't change, same content, just update headers api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute() cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0]) - if "ETag" in properties and len(properties["ETag"]) > 2: - etags[properties["ETag"]] = item + if "ETag" in properties[cache_url] and len(properties[cache_url]["ETag"]) > 2: + etags[properties[cache_url]["ETag"]] = item logger.debug("Found ETags %s", etags) diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py index 700b5b5f94..e2e287bf1d 100644 --- a/sdk/cwl/arvados_cwl/pathmapper.py +++ b/sdk/cwl/arvados_cwl/pathmapper.py @@ -109,7 +109,9 @@ class ArvPathMapper(PathMapper): # passthrough, we'll download it later. self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True) else: - keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src, varying_url_params=self.arvrunner.varying_url_params) + keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src, + varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params, + prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads) logger.info("%s is %s", src, keepref) self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True) except Exception as e: diff --git a/sdk/cwl/tests/test_http.py b/sdk/cwl/tests/test_http.py index 650b5f0598..5598b1f138 100644 --- a/sdk/cwl/tests/test_http.py +++ b/sdk/cwl/tests/test_http.py @@ -58,7 +58,7 @@ class TestHttpToKeep(unittest.TestCase): r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow) self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt") - getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True) + getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True, headers={}) cm.open.assert_called_with("file1.txt", "wb") cm.save_new.assert_called_with(name="Downloaded from http%3A%2F%2Fexample.com%2Ffile1.txt", @@ -186,7 +186,7 @@ class TestHttpToKeep(unittest.TestCase): r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow) self.assertEqual(r, "keep:99999999999999999999999999999997+99/file1.txt") - getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True) + getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True, headers={}) cm.open.assert_called_with("file1.txt", "wb") cm.save_new.assert_called_with(name="Downloaded from http%3A%2F%2Fexample.com%2Ffile1.txt", @@ -212,7 +212,7 @@ class TestHttpToKeep(unittest.TestCase): 'http://example.com/file1.txt': { 'Date': 'Tue, 15 May 2018 00:00:00 GMT', 'Expires': 'Tue, 16 May 2018 00:00:00 GMT', - 'ETag': '123456' + 'ETag': '"123456"' } } }] @@ -229,7 +229,7 @@ class TestHttpToKeep(unittest.TestCase): req.headers = { 'Date': 'Tue, 17 May 2018 00:00:00 GMT', 'Expires': 'Tue, 19 May 2018 00:00:00 GMT', - 'ETag': '123456' + 'ETag': '"123456"' } headmock.return_value = req @@ -247,7 +247,7 @@ class TestHttpToKeep(unittest.TestCase): body={"collection":{"properties": {'http://example.com/file1.txt': { 'Date': 'Tue, 17 May 2018 00:00:00 GMT', 'Expires': 'Tue, 19 May 2018 00:00:00 GMT', - 'ETag': '123456' + 'ETag': '"123456"' }}}}) ]) @@ -277,7 +277,7 @@ class TestHttpToKeep(unittest.TestCase): r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/download?fn=/file1.txt", utcnow=utcnow) self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt") - getmock.assert_called_with("http://example.com/download?fn=/file1.txt", stream=True, allow_redirects=True) + getmock.assert_called_with("http://example.com/download?fn=/file1.txt", stream=True, allow_redirects=True, headers={}) cm.open.assert_called_with("file1.txt", "wb") cm.save_new.assert_called_with(name="Downloaded from http%3A%2F%2Fexample.com%2Fdownload%3Ffn%3D%2Ffile1.txt", @@ -287,3 +287,156 @@ class TestHttpToKeep(unittest.TestCase): mock.call(uuid=cm.manifest_locator(), body={"collection":{"properties": {"http://example.com/download?fn=/file1.txt": {'Date': 'Tue, 15 May 2018 00:00:00 GMT'}}}}) ]) + + @mock.patch("requests.get") + @mock.patch("requests.head") + @mock.patch("arvados.collection.CollectionReader") + def test_http_etag_if_none_match(self, collectionmock, headmock, getmock): + api = mock.MagicMock() + + api.collections().list().execute.return_value = { + "items": [{ + "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3", + "portable_data_hash": "99999999999999999999999999999998+99", + "properties": { + 'http://example.com/file1.txt': { + 'Date': 'Tue, 15 May 2018 00:00:00 GMT', + 'Expires': 'Tue, 16 May 2018 00:00:00 GMT', + 'ETag': '"123456"' + } + } + }] + } + + cm = mock.MagicMock() + cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3" + cm.portable_data_hash.return_value = "99999999999999999999999999999998+99" + cm.keys.return_value = ["file1.txt"] + collectionmock.return_value = cm + + # Head request fails, will try a conditional GET instead + req = mock.MagicMock() + req.status_code = 403 + req.headers = { + } + headmock.return_value = req + + utcnow = mock.MagicMock() + utcnow.return_value = datetime.datetime(2018, 5, 17) + + req = mock.MagicMock() + req.status_code = 304 + req.headers = { + 'Date': 'Tue, 17 May 2018 00:00:00 GMT', + 'Expires': 'Tue, 19 May 2018 00:00:00 GMT', + 'ETag': '"123456"' + } + getmock.return_value = req + + r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow) + self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt") + + getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True, headers={"If-None-Match": '"123456"'}) + cm.open.assert_not_called() + + api.collections().update.assert_has_calls([ + mock.call(uuid=cm.manifest_locator(), + body={"collection":{"properties": {'http://example.com/file1.txt': { + 'Date': 'Tue, 17 May 2018 00:00:00 GMT', + 'Expires': 'Tue, 19 May 2018 00:00:00 GMT', + 'ETag': '"123456"' + }}}}) + ]) + + + @mock.patch("requests.get") + @mock.patch("requests.head") + @mock.patch("arvados.collection.CollectionReader") + def test_http_prefer_cached_downloads(self, collectionmock, headmock, getmock): + api = mock.MagicMock() + + api.collections().list().execute.return_value = { + "items": [{ + "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3", + "portable_data_hash": "99999999999999999999999999999998+99", + "properties": { + 'http://example.com/file1.txt': { + 'Date': 'Tue, 15 May 2018 00:00:00 GMT', + 'Expires': 'Tue, 16 May 2018 00:00:00 GMT', + 'ETag': '"123456"' + } + } + }] + } + + cm = mock.MagicMock() + cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3" + cm.portable_data_hash.return_value = "99999999999999999999999999999998+99" + cm.keys.return_value = ["file1.txt"] + collectionmock.return_value = cm + + utcnow = mock.MagicMock() + utcnow.return_value = datetime.datetime(2018, 5, 17) + + r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow, prefer_cached_downloads=True) + self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt") + + headmock.assert_not_called() + getmock.assert_not_called() + cm.open.assert_not_called() + api.collections().update.assert_not_called() + + @mock.patch("requests.get") + @mock.patch("requests.head") + @mock.patch("arvados.collection.CollectionReader") + def test_http_varying_url_params(self, collectionmock, headmock, getmock): + for prurl in ("http://example.com/file1.txt", "http://example.com/file1.txt?KeyId=123&Signature=456&Expires=789"): + api = mock.MagicMock() + + api.collections().list().execute.return_value = { + "items": [{ + "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3", + "portable_data_hash": "99999999999999999999999999999998+99", + "properties": { + prurl: { + 'Date': 'Tue, 15 May 2018 00:00:00 GMT', + 'Expires': 'Tue, 16 May 2018 00:00:00 GMT', + 'ETag': '"123456"' + } + } + }] + } + + cm = mock.MagicMock() + cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3" + cm.portable_data_hash.return_value = "99999999999999999999999999999998+99" + cm.keys.return_value = ["file1.txt"] + collectionmock.return_value = cm + + req = mock.MagicMock() + req.status_code = 200 + req.headers = { + 'Date': 'Tue, 17 May 2018 00:00:00 GMT', + 'Expires': 'Tue, 19 May 2018 00:00:00 GMT', + 'ETag': '"123456"' + } + headmock.return_value = req + + utcnow = mock.MagicMock() + utcnow.return_value = datetime.datetime(2018, 5, 17) + + r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt?KeyId=123&Signature=456&Expires=789", + utcnow=utcnow, varying_url_params="KeyId,Signature,Expires") + self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt") + + getmock.assert_not_called() + cm.open.assert_not_called() + + api.collections().update.assert_has_calls([ + mock.call(uuid=cm.manifest_locator(), + body={"collection":{"properties": {'http://example.com/file1.txt': { + 'Date': 'Tue, 17 May 2018 00:00:00 GMT', + 'Expires': 'Tue, 19 May 2018 00:00:00 GMT', + 'ETag': '"123456"' + }}}}) + ]) diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py index 17c09f1ee7..dcbee726b6 100644 --- a/sdk/cwl/tests/test_submit.py +++ b/sdk/cwl/tests/test_submit.py @@ -1587,6 +1587,50 @@ class TestSubmit(unittest.TestCase): stubs.expect_container_request_uuid + '\n') self.assertEqual(exited, 0) + @stubs() + def test_submit_container_prefer_cached_downloads(self, stubs): + exited = arvados_cwl.main( + ["--submit", "--no-wait", "--api=containers", "--debug", "--prefer-cached-downloads", + "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"], + stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client) + + expect_container = copy.deepcopy(stubs.expect_container_spec) + expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', + '--no-log-timestamps', '--disable-validate', '--disable-color', + '--eval-timeout=20', '--thread-count=0', + '--enable-reuse', "--collection-cache-size=256", + '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"], + '--debug', "--on-error=continue", '--prefer-cached-downloads', + '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'] + + stubs.api.container_requests().create.assert_called_with( + body=JsonDiffMatcher(expect_container)) + self.assertEqual(stubs.capture_stdout.getvalue(), + stubs.expect_container_request_uuid + '\n') + self.assertEqual(exited, 0) + + @stubs() + def test_submit_container_varying_url_params(self, stubs): + exited = arvados_cwl.main( + ["--submit", "--no-wait", "--api=containers", "--debug", "--varying-url-params", "KeyId,Signature", + "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"], + stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client) + + expect_container = copy.deepcopy(stubs.expect_container_spec) + expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', + '--no-log-timestamps', '--disable-validate', '--disable-color', + '--eval-timeout=20', '--thread-count=0', + '--enable-reuse', "--collection-cache-size=256", + '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"], + '--debug', "--on-error=continue", "--varying-url-params=KeyId,Signature", + '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'] + + stubs.api.container_requests().create.assert_called_with( + body=JsonDiffMatcher(expect_container)) + self.assertEqual(stubs.capture_stdout.getvalue(), + stubs.expect_container_request_uuid + '\n') + self.assertEqual(exited, 0) + class TestCreateWorkflow(unittest.TestCase): existing_workflow_uuid = "zzzzz-7fd4e-validworkfloyml" -- 2.30.2