19699: Add --prefer-cached-downloads, add tests 19699-cwl-http-dl
authorPeter Amstutz <peter.amstutz@curii.com>
Wed, 9 Nov 2022 03:24:52 +0000 (22:24 -0500)
committerPeter Amstutz <peter.amstutz@curii.com>
Mon, 14 Nov 2022 16:27:45 +0000 (11:27 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/context.py
sdk/cwl/arvados_cwl/executor.py
sdk/cwl/arvados_cwl/http.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/tests/test_http.py
sdk/cwl/tests/test_submit.py

index 8de1b60bc4bbc4c8a7f5688d845a15fcad9864f9..196bea03907848a8ff795ac3326337d169c0aedc 100644 (file)
@@ -224,6 +224,9 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     parser.add_argument("--varying-url-params", type=str, default="",
                         help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
 
+    parser.add_argument("--prefer-cached-downloads", action="store_true", default=False,
+                        help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
+
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--enable-preemptible", dest="enable_preemptible", default=None, action="store_true", help="Use preemptible instances. Control individual steps with arv:UsePreemptible hint.")
     exgroup.add_argument("--disable-preemptible", dest="enable_preemptible", default=None, action="store_false", help="Don't use preemptible instances.")
index b0ef4a22cc1f7c55bcfa0d1746fcb7cb5fe082f5..f8715f7e7b805f8191e79cd68b4ae4324d04bc59 100644 (file)
@@ -607,6 +607,9 @@ class RunnerContainer(Runner):
         if runtimeContext.varying_url_params:
             command.append("--varying-url-params="+runtimeContext.varying_url_params)
 
+        if runtimeContext.prefer_cached_downloads:
+            command.append("--prefer-cached-downloads")
+
         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
 
         container_req["command"] = command
index 66ccb26fae1e6884955faabde9af10b4224a9d68..3ce561f66d3404e03c4aab19470439af22bf83dd 100644 (file)
@@ -41,6 +41,7 @@ class ArvRuntimeContext(RuntimeContext):
         self.copy_deps = None
         self.defer_downloads = False
         self.varying_url_params = ""
+        self.prefer_cached_downloads = False
 
         super(ArvRuntimeContext, self).__init__(kwargs)
 
index cbeee924278d12fba688ce5389fb14e42e47775e..b042401d6b74ddc7bfa617fad91ccd40dbbb4b0c 100644 (file)
@@ -113,6 +113,8 @@ class ArvCwlExecutor(object):
             arvargs.thread_count = 1
             arvargs.collection_cache_size = None
             arvargs.git_info = True
+            arvargs.submit = False
+            arvargs.defer_downloads = False
 
         self.api = api_client
         self.processes = {}
@@ -207,7 +209,6 @@ The 'jobs' API is no longer supported.
                                                      collection_cache=self.collection_cache)
 
         self.defer_downloads = arvargs.submit and arvargs.defer_downloads
-        self.varying_url_params = arvargs.varying_url_params
 
         validate_cluster_target(self, self.toplevel_runtimeContext)
 
@@ -364,8 +365,8 @@ The 'jobs' API is no longer supported.
                     page = keys[:pageSize]
                     try:
                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
-                    except Exception:
-                        logger.exception("Error checking states on API server: %s")
+                    except Exception as e:
+                        logger.exception("Error checking states on API server: %s", e)
                         remain_wait = self.poll_interval
                         continue
 
index 34f921133344705ac114bf6e2e8cfff501deb551..f2415bcffef40ef805b4e3a0213778caac16f63e 100644 (file)
@@ -74,17 +74,21 @@ def remember_headers(url, properties, headers, now):
 
 def changed(url, clean_url, properties, now):
     req = requests.head(url, allow_redirects=True)
-    remember_headers(url, properties, req.headers, now)
 
     if req.status_code != 200:
         # Sometimes endpoints are misconfigured and will deny HEAD but
         # allow GET so instead of failing here, we'll try GET If-None-Match
         return True
 
-    pr = properties[clean_url]
-    if "ETag" in pr and "ETag" in req.headers:
-        if pr["ETag"] == req.headers["ETag"]:
-            return False
+    etag = properties[url].get("ETag")
+
+    if url in properties:
+        del properties[url]
+    remember_headers(clean_url, properties, req.headers, now)
+
+    if "ETag" in req.headers and etag == req.headers["ETag"]:
+        # Didn't change
+        return False
 
     return True
 
@@ -97,36 +101,51 @@ def etag_quote(etag):
         return '"' + etag + '"'
 
 
-def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow, varying_url_params=""):
+def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow, varying_url_params="", prefer_cached_downloads=False):
     varying_params = [s.strip() for s in varying_url_params.split(",")]
 
     parsed = urllib.parse.urlparse(url)
     query = [q for q in urllib.parse.parse_qsl(parsed.query)
              if q[0] not in varying_params]
+
     clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
-                                         urllib.parse.urlencode(query),  parsed.fragment))
+                                         urllib.parse.urlencode(query, safe="/"),  parsed.fragment))
 
-    r = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
+    r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
+
+    if clean_url == url:
+        items = r1["items"]
+    else:
+        r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
+        items = r1["items"] + r2["items"]
 
     now = utcnow()
 
     etags = {}
 
-    for item in r["items"]:
+    for item in items:
         properties = item["properties"]
-        if fresh_cache(clean_url, properties, now):
-            # Do nothing
+
+        if clean_url in properties:
+            cache_url = clean_url
+        elif url in properties:
+            cache_url = url
+        else:
+            return False
+
+        if prefer_cached_downloads or fresh_cache(cache_url, properties, now):
+            # HTTP caching rules say we should use the cache
             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
             return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
 
-        if not changed(url, clean_url, properties, now):
+        if not changed(cache_url, clean_url, properties, now):
             # ETag didn't change, same content, just update headers
             api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
             return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
 
-        if "ETag" in properties and len(properties["ETag"]) > 2:
-            etags[properties["ETag"]] = item
+        if "ETag" in properties[cache_url] and len(properties[cache_url]["ETag"]) > 2:
+            etags[properties[cache_url]["ETag"]] = item
 
     logger.debug("Found ETags %s", etags)
 
index 700b5b5f94aab099a85ab1a0e239337400517752..e2e287bf1dbd9cbcfbe63275ae40087393bb1d1f 100644 (file)
@@ -109,7 +109,9 @@ class ArvPathMapper(PathMapper):
                         # passthrough, we'll download it later.
                         self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
                     else:
-                        keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src, varying_url_params=self.arvrunner.varying_url_params)
+                        keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
+                                               varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
+                                               prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
                         logger.info("%s is %s", src, keepref)
                         self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
                 except Exception as e:
index 650b5f0598514bbe9fd5ea0de96ab848d2375ad0..5598b1f1387a33a4c53d45eac5fe7dbc042dbeef 100644 (file)
@@ -58,7 +58,7 @@ class TestHttpToKeep(unittest.TestCase):
         r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
         self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
 
-        getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True)
+        getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True, headers={})
 
         cm.open.assert_called_with("file1.txt", "wb")
         cm.save_new.assert_called_with(name="Downloaded from http%3A%2F%2Fexample.com%2Ffile1.txt",
@@ -186,7 +186,7 @@ class TestHttpToKeep(unittest.TestCase):
         r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
         self.assertEqual(r, "keep:99999999999999999999999999999997+99/file1.txt")
 
-        getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True)
+        getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True, headers={})
 
         cm.open.assert_called_with("file1.txt", "wb")
         cm.save_new.assert_called_with(name="Downloaded from http%3A%2F%2Fexample.com%2Ffile1.txt",
@@ -212,7 +212,7 @@ class TestHttpToKeep(unittest.TestCase):
                     'http://example.com/file1.txt': {
                         'Date': 'Tue, 15 May 2018 00:00:00 GMT',
                         'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
-                        'ETag': '123456'
+                        'ETag': '"123456"'
                     }
                 }
             }]
@@ -229,7 +229,7 @@ class TestHttpToKeep(unittest.TestCase):
         req.headers = {
             'Date': 'Tue, 17 May 2018 00:00:00 GMT',
             'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
-            'ETag': '123456'
+            'ETag': '"123456"'
         }
         headmock.return_value = req
 
@@ -247,7 +247,7 @@ class TestHttpToKeep(unittest.TestCase):
                       body={"collection":{"properties": {'http://example.com/file1.txt': {
                           'Date': 'Tue, 17 May 2018 00:00:00 GMT',
                           'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
-                          'ETag': '123456'
+                          'ETag': '"123456"'
                       }}}})
                       ])
 
@@ -277,7 +277,7 @@ class TestHttpToKeep(unittest.TestCase):
         r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/download?fn=/file1.txt", utcnow=utcnow)
         self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
 
-        getmock.assert_called_with("http://example.com/download?fn=/file1.txt", stream=True, allow_redirects=True)
+        getmock.assert_called_with("http://example.com/download?fn=/file1.txt", stream=True, allow_redirects=True, headers={})
 
         cm.open.assert_called_with("file1.txt", "wb")
         cm.save_new.assert_called_with(name="Downloaded from http%3A%2F%2Fexample.com%2Fdownload%3Ffn%3D%2Ffile1.txt",
@@ -287,3 +287,156 @@ class TestHttpToKeep(unittest.TestCase):
             mock.call(uuid=cm.manifest_locator(),
                       body={"collection":{"properties": {"http://example.com/download?fn=/file1.txt": {'Date': 'Tue, 15 May 2018 00:00:00 GMT'}}}})
         ])
+
+    @mock.patch("requests.get")
+    @mock.patch("requests.head")
+    @mock.patch("arvados.collection.CollectionReader")
+    def test_http_etag_if_none_match(self, collectionmock, headmock, getmock):
+        api = mock.MagicMock()
+
+        api.collections().list().execute.return_value = {
+            "items": [{
+                "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+                "portable_data_hash": "99999999999999999999999999999998+99",
+                "properties": {
+                    'http://example.com/file1.txt': {
+                        'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+                        'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
+                        'ETag': '"123456"'
+                    }
+                }
+            }]
+        }
+
+        cm = mock.MagicMock()
+        cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+        cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+        cm.keys.return_value = ["file1.txt"]
+        collectionmock.return_value = cm
+
+        # Head request fails, will try a conditional GET instead
+        req = mock.MagicMock()
+        req.status_code = 403
+        req.headers = {
+        }
+        headmock.return_value = req
+
+        utcnow = mock.MagicMock()
+        utcnow.return_value = datetime.datetime(2018, 5, 17)
+
+        req = mock.MagicMock()
+        req.status_code = 304
+        req.headers = {
+            'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+            'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+            'ETag': '"123456"'
+        }
+        getmock.return_value = req
+
+        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+        self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+        getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True, headers={"If-None-Match": '"123456"'})
+        cm.open.assert_not_called()
+
+        api.collections().update.assert_has_calls([
+            mock.call(uuid=cm.manifest_locator(),
+                      body={"collection":{"properties": {'http://example.com/file1.txt': {
+                          'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+                          'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+                          'ETag': '"123456"'
+                      }}}})
+                      ])
+
+
+    @mock.patch("requests.get")
+    @mock.patch("requests.head")
+    @mock.patch("arvados.collection.CollectionReader")
+    def test_http_prefer_cached_downloads(self, collectionmock, headmock, getmock):
+        api = mock.MagicMock()
+
+        api.collections().list().execute.return_value = {
+            "items": [{
+                "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+                "portable_data_hash": "99999999999999999999999999999998+99",
+                "properties": {
+                    'http://example.com/file1.txt': {
+                        'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+                        'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
+                        'ETag': '"123456"'
+                    }
+                }
+            }]
+        }
+
+        cm = mock.MagicMock()
+        cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+        cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+        cm.keys.return_value = ["file1.txt"]
+        collectionmock.return_value = cm
+
+        utcnow = mock.MagicMock()
+        utcnow.return_value = datetime.datetime(2018, 5, 17)
+
+        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow, prefer_cached_downloads=True)
+        self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+        headmock.assert_not_called()
+        getmock.assert_not_called()
+        cm.open.assert_not_called()
+        api.collections().update.assert_not_called()
+
+    @mock.patch("requests.get")
+    @mock.patch("requests.head")
+    @mock.patch("arvados.collection.CollectionReader")
+    def test_http_varying_url_params(self, collectionmock, headmock, getmock):
+        for prurl in ("http://example.com/file1.txt", "http://example.com/file1.txt?KeyId=123&Signature=456&Expires=789"):
+            api = mock.MagicMock()
+
+            api.collections().list().execute.return_value = {
+                "items": [{
+                    "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+                    "portable_data_hash": "99999999999999999999999999999998+99",
+                    "properties": {
+                        prurl: {
+                            'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+                            'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
+                            'ETag': '"123456"'
+                        }
+                    }
+                }]
+            }
+
+            cm = mock.MagicMock()
+            cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+            cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+            cm.keys.return_value = ["file1.txt"]
+            collectionmock.return_value = cm
+
+            req = mock.MagicMock()
+            req.status_code = 200
+            req.headers = {
+                'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+                'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+                'ETag': '"123456"'
+            }
+            headmock.return_value = req
+
+            utcnow = mock.MagicMock()
+            utcnow.return_value = datetime.datetime(2018, 5, 17)
+
+            r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt?KeyId=123&Signature=456&Expires=789",
+                                              utcnow=utcnow, varying_url_params="KeyId,Signature,Expires")
+            self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+            getmock.assert_not_called()
+            cm.open.assert_not_called()
+
+            api.collections().update.assert_has_calls([
+                mock.call(uuid=cm.manifest_locator(),
+                          body={"collection":{"properties": {'http://example.com/file1.txt': {
+                              'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+                              'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+                              'ETag': '"123456"'
+                          }}}})
+                          ])
index 17c09f1ee78db4c5a1b45b1d71769f33d71fd8d7..dcbee726b6ce4962692d8255d7be2b41b76c5f09 100644 (file)
@@ -1587,6 +1587,50 @@ class TestSubmit(unittest.TestCase):
                          stubs.expect_container_request_uuid + '\n')
         self.assertEqual(exited, 0)
 
+    @stubs()
+    def test_submit_container_prefer_cached_downloads(self, stubs):
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait", "--api=containers", "--debug", "--prefer-cached-downloads",
+                "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+            stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+        expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
+                                       '--no-log-timestamps', '--disable-validate', '--disable-color',
+                                       '--eval-timeout=20', '--thread-count=0',
+                                       '--enable-reuse', "--collection-cache-size=256",
+                                       '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"],
+                                       '--debug', "--on-error=continue", '--prefer-cached-downloads',
+                                       '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+        stubs.api.container_requests().create.assert_called_with(
+            body=JsonDiffMatcher(expect_container))
+        self.assertEqual(stubs.capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+        self.assertEqual(exited, 0)
+
+    @stubs()
+    def test_submit_container_varying_url_params(self, stubs):
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait", "--api=containers", "--debug", "--varying-url-params", "KeyId,Signature",
+                "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+            stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+        expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
+                                       '--no-log-timestamps', '--disable-validate', '--disable-color',
+                                       '--eval-timeout=20', '--thread-count=0',
+                                       '--enable-reuse', "--collection-cache-size=256",
+                                       '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"],
+                                       '--debug', "--on-error=continue", "--varying-url-params=KeyId,Signature",
+                                       '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+        stubs.api.container_requests().create.assert_called_with(
+            body=JsonDiffMatcher(expect_container))
+        self.assertEqual(stubs.capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+        self.assertEqual(exited, 0)
+
 
 class TestCreateWorkflow(unittest.TestCase):
     existing_workflow_uuid = "zzzzz-7fd4e-validworkfloyml"