Merge branch '20937-arv-copy-http' refs #20937
authorPeter Amstutz <peter.amstutz@curii.com>
Wed, 11 Oct 2023 13:20:29 +0000 (09:20 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Tue, 28 Nov 2023 14:30:31 +0000 (09:30 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

doc/user/topics/arv-copy.html.textile.liquid
sdk/cwl/arvados_cwl/pathmapper.py
sdk/python/arvados/commands/arv_copy.py
sdk/python/arvados/http_to_keep.py
sdk/python/tests/test_http.py

index 15c9623224dd9440703b883c8d97dff2b97fab0c..86b68f2854a4fee7eb73e0006d2dd3516521fe6e 100644 (file)
@@ -15,7 +15,7 @@ This tutorial describes how to copy Arvados objects from one cluster to another
 
 h2. arv-copy
 
-@arv-copy@ allows users to copy collections, workflow definitions and projects from one cluster to another.
+@arv-copy@ allows users to copy collections, workflow definitions and projects from one cluster to another.  You can also use @arv-copy@ to import resources from HTTP URLs into Keep.
 
 For projects, @arv-copy@ will copy all the collections workflow definitions owned by the project, and recursively copy subprojects.
 
@@ -92,7 +92,7 @@ h3. How to copy a project
 We will use the uuid @jutro-j7d0g-xj19djofle3aryq@ as an example project.
 
 <notextile>
-<pre><code>~$ <span class="userinput">peteramstutz@shell:~$ arv-copy --project-uuid pirca-j7d0g-lr8sq3tx3ovn68k jutro-j7d0g-xj19djofle3aryq
+<pre><code>~$ <span class="userinput">~$ arv-copy --project-uuid pirca-j7d0g-lr8sq3tx3ovn68k jutro-j7d0g-xj19djofle3aryq
 2021-09-08 21:29:32 arvados.arv-copy[6377] INFO:
 2021-09-08 21:29:32 arvados.arv-copy[6377] INFO: Success: created copy with uuid pirca-j7d0g-ig9gvu5piznducp
 </code></pre>
@@ -101,3 +101,23 @@ We will use the uuid @jutro-j7d0g-xj19djofle3aryq@ as an example project.
 The name and description of the original project will be used for the destination copy.  If a project already exists with the same name, collections and workflow definitions will be copied into the project with the same name.
 
 If you would like to copy the project but not its subproject, you can use the @--no-recursive@ flag.
+
+h3. Importing HTTP resources to Keep
+
+You can also use @arv-copy@ to copy the contents of a HTTP URL into Keep.  When you do this, Arvados keeps track of the original URL the resource came from.  This allows you to refer to the resource by its original URL in Workflow inputs, but actually read from the local copy in Keep.
+
+<notextile>
+<pre><code>~$ <span class="userinput">~$ arv-copy --project-uuid tordo-j7d0g-lr8sq3tx3ovn68k https://example.com/index.html
+tordo-4zz18-dhpb6y9km2byb94
+2023-10-06 10:15:36 arvados.arv-copy[374147] INFO: Success: created copy with uuid tordo-4zz18-dhpb6y9km2byb94
+</code></pre>
+</notextile>
+
+In addition, when importing from HTTP URLs, you may provide a different cluster than the destination in @--src@. This tells @arv-copy@ to search the other cluster for a collection associated with that URL, and if found, copy the collection from that cluster instead of downloading from the original URL.
+
+The following @arv-copy@ command line options affect the behavior of HTTP import.
+
+table(table table-bordered table-condensed).
+|_. Option |_. Description |
+|==--varying-url-params== VARYING_URL_PARAMS|A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.|
+|==--prefer-cached-downloads==|If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).|
index 539188fddd995b9cda5c58c89f1f8ef1dd96293a..448facf776823c68f5c706cc0ec1707460222cf7 100644 (file)
@@ -109,9 +109,10 @@ class ArvPathMapper(PathMapper):
                         # passthrough, we'll download it later.
                         self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
                     else:
-                        keepref = "keep:%s/%s" % http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
+                        results = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
                                                               varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
                                                               prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
+                        keepref = "keep:%s/%s" % (results[0], results[1])
                         logger.info("%s is %s", src, keepref)
                         self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
                 except Exception as e:
index 10fe9d702490fe5b15302d03dd8af495a89b966a..ef3936e267ce803f48f1ba4bf537d06f446d90b1 100755 (executable)
@@ -36,6 +36,9 @@ import logging
 import tempfile
 import urllib.parse
 import io
+import json
+import queue
+import threading
 
 import arvados
 import arvados.config
@@ -43,6 +46,7 @@ import arvados.keep
 import arvados.util
 import arvados.commands._util as arv_cmd
 import arvados.commands.keepdocker
+import arvados.http_to_keep
 import ruamel.yaml as yaml
 
 from arvados.api import OrderedJsonModel
@@ -106,6 +110,11 @@ def main():
     copy_opts.add_argument(
         '--storage-classes', dest='storage_classes',
         help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
+    copy_opts.add_argument("--varying-url-params", type=str, default="",
+                        help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
+
+    copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
+                        help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
 
     copy_opts.add_argument(
         'object_uuid',
@@ -126,7 +135,7 @@ def main():
     else:
         logger.setLevel(logging.INFO)
 
-    if not args.source_arvados:
+    if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
         args.source_arvados = args.object_uuid[:5]
 
     # Create API clients for the source and destination instances
@@ -138,19 +147,26 @@ def main():
 
     # Identify the kind of object we have been given, and begin copying.
     t = uuid_type(src_arv, args.object_uuid)
-    if t == 'Collection':
-        set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
-        result = copy_collection(args.object_uuid,
-                                 src_arv, dst_arv,
-                                 args)
-    elif t == 'Workflow':
-        set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
-        result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
-    elif t == 'Group':
-        set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
-        result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
-    else:
-        abort("cannot copy object {} of type {}".format(args.object_uuid, t))
+
+    try:
+        if t == 'Collection':
+            set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
+            result = copy_collection(args.object_uuid,
+                                     src_arv, dst_arv,
+                                     args)
+        elif t == 'Workflow':
+            set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
+            result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
+        elif t == 'Group':
+            set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
+            result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
+        elif t == 'httpURL':
+            result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
+        else:
+            abort("cannot copy object {} of type {}".format(args.object_uuid, t))
+    except Exception as e:
+        logger.error("%s", e, exc_info=args.verbose)
+        exit(1)
 
     # Clean up any outstanding temp git repositories.
     for d in listvalues(local_repo_dir):
@@ -158,8 +174,9 @@ def main():
 
     # If no exception was thrown and the response does not have an
     # error_token field, presume success
-    if 'error_token' in result or 'uuid' not in result:
-        logger.error("API server returned an error result: {}".format(result))
+    if result is None or 'error_token' in result or 'uuid' not in result:
+        if result:
+            logger.error("API server returned an error result: {}".format(result))
         exit(1)
 
     print(result['uuid'])
@@ -567,6 +584,125 @@ def copy_collection(obj_uuid, src, dst, args):
     else:
         progress_writer = None
 
+    # go through the words
+    # put each block loc into 'get' queue
+    # 'get' threads get block and put it into 'put' queue
+    # 'put' threads put block and then update dst_locators
+    #
+    # after going through the whole manifest we go back through it
+    # again and build dst_manifest
+
+    lock = threading.Lock()
+
+    # the get queue should be unbounded because we'll add all the
+    # block hashes we want to get, but these are small
+    get_queue = queue.Queue()
+
+    threadcount = 4
+
+    # the put queue contains full data blocks
+    # and if 'get' is faster than 'put' we could end up consuming
+    # a great deal of RAM if it isn't bounded.
+    put_queue = queue.Queue(threadcount)
+    transfer_error = []
+
+    def get_thread():
+        while True:
+            word = get_queue.get()
+            if word is None:
+                put_queue.put(None)
+                get_queue.task_done()
+                return
+
+            blockhash = arvados.KeepLocator(word).md5sum
+            with lock:
+                if blockhash in dst_locators:
+                    # Already uploaded
+                    get_queue.task_done()
+                    continue
+
+            try:
+                logger.debug("Getting block %s", word)
+                data = src_keep.get(word)
+                put_queue.put((word, data))
+            except e:
+                logger.error("Error getting block %s: %s", word, e)
+                transfer_error.append(e)
+                try:
+                    # Drain the 'get' queue so we end early
+                    while True:
+                        get_queue.get(False)
+                        get_queue.task_done()
+                except queue.Empty:
+                    pass
+            finally:
+                get_queue.task_done()
+
+    def put_thread():
+        nonlocal bytes_written
+        while True:
+            item = put_queue.get()
+            if item is None:
+                put_queue.task_done()
+                return
+
+            word, data = item
+            loc = arvados.KeepLocator(word)
+            blockhash = loc.md5sum
+            with lock:
+                if blockhash in dst_locators:
+                    # Already uploaded
+                    put_queue.task_done()
+                    continue
+
+            try:
+                logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
+                dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
+                with lock:
+                    dst_locators[blockhash] = dst_locator
+                    bytes_written += loc.size
+                    if progress_writer:
+                        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
+            except e:
+                logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
+                try:
+                    # Drain the 'get' queue so we end early
+                    while True:
+                        get_queue.get(False)
+                        get_queue.task_done()
+                except queue.Empty:
+                    pass
+                transfer_error.append(e)
+            finally:
+                put_queue.task_done()
+
+    for line in manifest.splitlines():
+        words = line.split()
+        for word in words[1:]:
+            try:
+                loc = arvados.KeepLocator(word)
+            except ValueError:
+                # If 'word' can't be parsed as a locator,
+                # presume it's a filename.
+                continue
+
+            get_queue.put(word)
+
+    for i in range(0, threadcount):
+        get_queue.put(None)
+
+    for i in range(0, threadcount):
+        threading.Thread(target=get_thread, daemon=True).start()
+
+    for i in range(0, threadcount):
+        threading.Thread(target=put_thread, daemon=True).start()
+
+    get_queue.join()
+    put_queue.join()
+
+    if len(transfer_error) > 0:
+        return {"error_token": "Failed to transfer blocks"}
+
     for line in manifest.splitlines():
         words = line.split()
         dst_manifest.write(words[0])
@@ -580,16 +716,6 @@ def copy_collection(obj_uuid, src, dst, args):
                 dst_manifest.write(word)
                 continue
             blockhash = loc.md5sum
-            # copy this block if we haven't seen it before
-            # (otherwise, just reuse the existing dst_locator)
-            if blockhash not in dst_locators:
-                logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
-                if progress_writer:
-                    progress_writer.report(obj_uuid, bytes_written, bytes_expected)
-                data = src_keep.get(word)
-                dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
-                dst_locators[blockhash] = dst_locator
-                bytes_written += loc.size
             dst_manifest.write(' ')
             dst_manifest.write(dst_locators[blockhash])
         dst_manifest.write("\n")
@@ -758,6 +884,10 @@ def git_rev_parse(rev, repo):
 def uuid_type(api, object_uuid):
     if re.match(arvados.util.keep_locator_pattern, object_uuid):
         return 'Collection'
+
+    if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
+        return 'httpURL'
+
     p = object_uuid.split('-')
     if len(p) == 3:
         type_prefix = p[1]
@@ -767,6 +897,27 @@ def uuid_type(api, object_uuid):
                 return k
     return None
 
+
+def copy_from_http(url, src, dst, args):
+
+    project_uuid = args.project_uuid
+    varying_url_params = args.varying_url_params
+    prefer_cached_downloads = args.prefer_cached_downloads
+
+    cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
+                                                   varying_url_params=varying_url_params,
+                                                   prefer_cached_downloads=prefer_cached_downloads)
+    if cached[2] is not None:
+        return copy_collection(cached[2], src, dst, args)
+
+    cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
+                                               varying_url_params=varying_url_params,
+                                               prefer_cached_downloads=prefer_cached_downloads)
+
+    if cached is not None:
+        return {"uuid": cached[2]}
+
+
 def abort(msg, code=1):
     logger.info("arv-copy: %s", msg)
     exit(code)
index 16c3dc4778cede64160d331456f334d84dfc5832..1da8cf4946c652bfca3208ca632821144ccab1f5 100644 (file)
@@ -182,6 +182,10 @@ class _Downloader(PyCurlHelper):
         mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"])
         code = int(mt.group(3))
 
+        if not self.name:
+            logger.error("Cannot determine filename from URL or headers")
+            return
+
         if code == 200:
             self.target = self.collection.open(self.name, "wb")
 
@@ -191,6 +195,13 @@ class _Downloader(PyCurlHelper):
             self._first_chunk = False
 
         self.count += len(chunk)
+
+        if self.target is None:
+            # "If this number is not equal to the size of the byte
+            # string, this signifies an error and libcurl will abort
+            # the request."
+            return 0
+
         self.target.write(chunk)
         loopnow = time.time()
         if (loopnow - self.checkpoint) < 20:
@@ -238,16 +249,10 @@ def _etag_quote(etag):
         return '"' + etag + '"'
 
 
-def http_to_keep(api, project_uuid, url,
-                 utcnow=datetime.datetime.utcnow, varying_url_params="",
-                 prefer_cached_downloads=False):
-    """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
-
-    Before downloading the URL, checks to see if the URL already
-    exists in Keep and applies HTTP caching policy, the
-    varying_url_params and prefer_cached_downloads flags in order to
-    decide whether to use the version in Keep or re-download it.
-    """
+def check_cached_url(api, project_uuid, url, etags,
+                     utcnow=datetime.datetime.utcnow,
+                     varying_url_params="",
+                     prefer_cached_downloads=False):
 
     logger.info("Checking Keep for %s", url)
 
@@ -270,8 +275,6 @@ def http_to_keep(api, project_uuid, url,
 
     now = utcnow()
 
-    etags = {}
-
     curldownloader = _Downloader(api)
 
     for item in items:
@@ -287,13 +290,13 @@ def http_to_keep(api, project_uuid, url,
         if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
             # HTTP caching rules say we should use the cache
             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
-            return (item["portable_data_hash"], next(iter(cr.keys())) )
+            return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
 
         if not _changed(cache_url, clean_url, properties, now, curldownloader):
             # Etag didn't change, same content, just update headers
             api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
-            return (item["portable_data_hash"], next(iter(cr.keys())))
+            return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
 
         for etagstr in ("Etag", "ETag"):
             if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
@@ -301,6 +304,31 @@ def http_to_keep(api, project_uuid, url,
 
     logger.debug("Found ETag values %s", etags)
 
+    return (None, None, None, clean_url, now)
+
+
+def http_to_keep(api, project_uuid, url,
+                 utcnow=datetime.datetime.utcnow, varying_url_params="",
+                 prefer_cached_downloads=False):
+    """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
+
+    Before downloading the URL, checks to see if the URL already
+    exists in Keep and applies HTTP caching policy, the
+    varying_url_params and prefer_cached_downloads flags in order to
+    decide whether to use the version in Keep or re-download it.
+    """
+
+    etags = {}
+    cache_result = check_cached_url(api, project_uuid, url, etags,
+                                    utcnow, varying_url_params,
+                                    prefer_cached_downloads)
+
+    if cache_result[0] is not None:
+        return cache_result
+
+    clean_url = cache_result[3]
+    now = cache_result[4]
+
     properties = {}
     headers = {}
     if etags:
@@ -309,6 +337,8 @@ def http_to_keep(api, project_uuid, url,
 
     logger.info("Beginning download of %s", url)
 
+    curldownloader = _Downloader(api)
+
     req = curldownloader.download(url, headers)
 
     c = curldownloader.collection
@@ -326,7 +356,7 @@ def http_to_keep(api, project_uuid, url,
         item["properties"].update(properties)
         api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
         cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
-        return (item["portable_data_hash"], list(cr.keys())[0])
+        return (item["portable_data_hash"], list(cr.keys())[0], item["uuid"], clean_url, now)
 
     logger.info("Download complete")
 
@@ -344,4 +374,4 @@ def http_to_keep(api, project_uuid, url,
 
     api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
 
-    return (c.portable_data_hash(), curldownloader.name)
+    return (c.portable_data_hash(), curldownloader.name, c.manifest_locator(), clean_url, now)
index 381a61e2aa044103d53470d4bfe84fa99569344a..bce57eda61b7be549af205525bb0c81eba9e035d 100644 (file)
@@ -97,7 +97,9 @@ class TestHttpToKeep(unittest.TestCase):
         utcnow.return_value = datetime.datetime(2018, 5, 15)
 
         r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
-        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
+        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt",
+                             'zzzzz-4zz18-zzzzzzzzzzzzzz3', 'http://example.com/file1.txt',
+                             datetime.datetime(2018, 5, 15, 0, 0)))
 
         assert mockobj.url == b"http://example.com/file1.txt"
         assert mockobj.perform_was_called is True
@@ -146,7 +148,9 @@ class TestHttpToKeep(unittest.TestCase):
         utcnow.return_value = datetime.datetime(2018, 5, 16)
 
         r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
-        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
+        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt",
+                             'zzzzz-4zz18-zzzzzzzzzzzzzz3', 'http://example.com/file1.txt',
+                             datetime.datetime(2018, 5, 16, 0, 0)))
 
         assert mockobj.perform_was_called is False
 
@@ -185,7 +189,8 @@ class TestHttpToKeep(unittest.TestCase):
         utcnow.return_value = datetime.datetime(2018, 5, 16)
 
         r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
-        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
+        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt", 'zzzzz-4zz18-zzzzzzzzzzzzzz3',
+                             'http://example.com/file1.txt', datetime.datetime(2018, 5, 16, 0, 0)))
 
         assert mockobj.perform_was_called is False
 
@@ -224,7 +229,10 @@ class TestHttpToKeep(unittest.TestCase):
         utcnow.return_value = datetime.datetime(2018, 5, 17)
 
         r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
-        self.assertEqual(r, ("99999999999999999999999999999997+99", "file1.txt"))
+        self.assertEqual(r, ("99999999999999999999999999999997+99", "file1.txt",
+                             'zzzzz-4zz18-zzzzzzzzzzzzzz4',
+                             'http://example.com/file1.txt', datetime.datetime(2018, 5, 17, 0, 0)))
+
 
         assert mockobj.url == b"http://example.com/file1.txt"
         assert mockobj.perform_was_called is True
@@ -278,7 +286,9 @@ class TestHttpToKeep(unittest.TestCase):
         utcnow.return_value = datetime.datetime(2018, 5, 17)
 
         r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
-        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
+        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt",
+                             'zzzzz-4zz18-zzzzzzzzzzzzzz3', 'http://example.com/file1.txt',
+                             datetime.datetime(2018, 5, 17, 0, 0)))
 
         cm.open.assert_not_called()
 
@@ -315,7 +325,10 @@ class TestHttpToKeep(unittest.TestCase):
         utcnow.return_value = datetime.datetime(2018, 5, 15)
 
         r = http_to_keep(api, None, "http://example.com/download?fn=/file1.txt", utcnow=utcnow)
-        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
+        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt",
+                             'zzzzz-4zz18-zzzzzzzzzzzzzz3',
+                             'http://example.com/download?fn=/file1.txt',
+                             datetime.datetime(2018, 5, 15, 0, 0)))
 
         assert mockobj.url == b"http://example.com/download?fn=/file1.txt"
 
@@ -369,7 +382,9 @@ class TestHttpToKeep(unittest.TestCase):
         utcnow.return_value = datetime.datetime(2018, 5, 17)
 
         r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
-        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
+        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt",
+                             'zzzzz-4zz18-zzzzzzzzzzzzzz3', 'http://example.com/file1.txt',
+                             datetime.datetime(2018, 5, 17, 0, 0)))
 
         print(mockobj.req_headers)
         assert mockobj.req_headers == ["Accept: application/octet-stream", "If-None-Match: \"123456\""]
@@ -418,7 +433,8 @@ class TestHttpToKeep(unittest.TestCase):
         utcnow.return_value = datetime.datetime(2018, 5, 17)
 
         r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow, prefer_cached_downloads=True)
-        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
+        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt", 'zzzzz-4zz18-zzzzzzzzzzzzzz3',
+                             'http://example.com/file1.txt', datetime.datetime(2018, 5, 17, 0, 0)))
 
         assert mockobj.perform_was_called is False
         cm.open.assert_not_called()
@@ -465,7 +481,8 @@ class TestHttpToKeep(unittest.TestCase):
 
             r = http_to_keep(api, None, "http://example.com/file1.txt?KeyId=123&Signature=456&Expires=789",
                                               utcnow=utcnow, varying_url_params="KeyId,Signature,Expires")
-            self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
+            self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt", 'zzzzz-4zz18-zzzzzzzzzzzzzz3',
+                                 'http://example.com/file1.txt', datetime.datetime(2018, 5, 17, 0, 0)))
 
             assert mockobj.perform_was_called is True
             cm.open.assert_not_called()