Merge branch 'main' into 18842-arv-mount-disk-config
authorPeter Amstutz <peter.amstutz@curii.com>
Tue, 22 Nov 2022 18:49:01 +0000 (13:49 -0500)
committerPeter Amstutz <peter.amstutz@curii.com>
Tue, 22 Nov 2022 18:49:01 +0000 (13:49 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

27 files changed:
doc/admin/maintenance-and-upgrading.html.textile.liquid
lib/config/config.default.yml
lib/config/export.go
lib/crunchrun/crunchrun.go
lib/dispatchcloud/node_size.go
lib/lsf/dispatch_test.go
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/tests/test_container.py
sdk/go/arvados/config.go
sdk/go/arvados/container.go
sdk/python/arvados/collection.py
sdk/python/arvados/diskcache.py [new file with mode: 0644]
sdk/python/arvados/keep.py
sdk/python/setup.py
sdk/python/tests/arvados_testutil.py
sdk/python/tests/test_collections.py
sdk/python/tests/test_keep_client.py
services/api/app/models/arvados_model.rb
services/api/app/models/container.rb
services/api/test/fixtures/containers.yml
services/api/test/unit/container_request_test.rb
services/api/test/unit/container_test.rb
services/fuse/arvados_fuse/command.py
services/fuse/setup.py
services/fuse/tests/mount_test_base.py
services/fuse/tests/test_mount.py

index ae2f8276c585fc78873310a65d47b6ed34033fca..970568f0cd0b2f5904ca09d08adb055afbc162e6 100644 (file)
@@ -62,7 +62,9 @@ Upgrading Arvados typically involves the following steps:
 # Wait for the cluster to be idle and stop Arvados services.
 # Make a backup of your database, as a precaution.
 # update the configuration file for the new release, if necessary (see "Maintaining Arvados":#maintaining above)
-# rebuild and deploy the "compute node image":{{site.baseurl}}/install/crunch2-cloud/install-compute-node.html (cloud only)
+# Update compute nodes
+## (cloud) Rebuild and deploy the "compute node image":{{site.baseurl}}/install/crunch2-cloud/install-compute-node.html
+## (slurm/LSF) Upgrade the @python3-arvados-fuse@ package used on your compute nodes
 # Install new packages using @apt-get upgrade@ or @yum upgrade@.
 # Wait for package installation scripts as they perform any necessary data migrations.
 # Run @arvados-server config-check@ to detect configuration errors or deprecated entries.
index 0246cb88d5736e158bb1f502d91423b7b7072832..f7c2beca3372f294bb16762d6f5366e7e989a84c 100644 (file)
@@ -965,8 +965,15 @@ Clusters:
       # troubleshooting purposes.
       LogReuseDecisions: false
 
-      # Default value for keep_cache_ram of a container's runtime_constraints.
-      DefaultKeepCacheRAM: 268435456
+      # Default value for keep_cache_ram of a container's
+      # runtime_constraints.  Note: this gets added to the RAM request
+      # used to allocate a VM or submit an HPC job
+      DefaultKeepCacheRAM: 0
+
+      # Default value for keep_cache_disk of a container's
+      # runtime_constraints.  Note: this gets added to the disk
+      # request used to allocate a VM or submit an HPC job
+      DefaultKeepCacheDisk: 8589934592
 
       # Number of times a container can be unlocked before being
       # automatically cancelled.
index 6352406e90e95dc255d9eb43ff1ca13f0e721fd1..814fc6cd9b9dfc6ab0fbea0d9e29715236a906bd 100644 (file)
@@ -121,6 +121,7 @@ var whitelist = map[string]bool{
        "Containers.CrunchRunArgumentsList":        false,
        "Containers.CrunchRunCommand":              false,
        "Containers.DefaultKeepCacheRAM":           true,
+       "Containers.DefaultKeepCacheDisk":          true,
        "Containers.DispatchPrivateKey":            false,
        "Containers.JobsAPI":                       true,
        "Containers.JobsAPI.Enable":                true,
index 55790f727a61d289d5b0d5080fa2911ee7789515..51e154c0ecfb3b978844947480f1efe7fe2f6fa9 100644 (file)
@@ -428,7 +428,13 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
                arvMountCmd = append(arvMountCmd, "--allow-other")
        }
 
-       if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
+       if runner.Container.RuntimeConstraints.KeepCacheDisk > 0 {
+               keepcachedir, err := runner.MkTempDir(runner.parentTemp, "keepcache")
+               if err != nil {
+                       return nil, fmt.Errorf("while creating keep cache temp dir: %v", err)
+               }
+               arvMountCmd = append(arvMountCmd, "--disk-cache", "--disk-cache-dir", keepcachedir, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheDisk))
+       } else if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
                arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
        }
 
index 8cc63dc208c8c48e3afc47a1dab1435b11a12996..0b394f4cfe4f76849fc2eb42541ed613e325921f 100644 (file)
@@ -56,7 +56,7 @@ func estimateDockerImageSize(collectionPDH string) int64 {
 // EstimateScratchSpace estimates how much available disk space (in
 // bytes) is needed to run the container by summing the capacity
 // requested by 'tmp' mounts plus disk space required to load the
-// Docker image.
+// Docker image plus arv-mount block cache.
 func EstimateScratchSpace(ctr *arvados.Container) (needScratch int64) {
        for _, m := range ctr.Mounts {
                if m.Kind == "tmp" {
@@ -80,6 +80,9 @@ func EstimateScratchSpace(ctr *arvados.Container) (needScratch int64) {
        // Now reserve space for the extracted image on disk.
        needScratch += dockerImageSize
 
+       // Now reserve space the arv-mount disk cache
+       needScratch += ctr.RuntimeConstraints.KeepCacheDisk
+
        return
 }
 
index a09a941ffb0333db432d4475801f9e5d0d5142cd..a381b25e9d075bc993f6327c579107025a62fe79 100644 (file)
@@ -180,10 +180,10 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...
                                c.Check(args, check.DeepEquals, []string{
                                        "-J", s.crPending.ContainerUUID,
                                        "-n", "2",
-                                       "-D", "608MB",
-                                       "-R", "rusage[mem=608MB:tmp=256MB] span[hosts=1]",
-                                       "-R", "select[mem>=608MB]",
-                                       "-R", "select[tmp>=256MB]",
+                                       "-D", "352MB",
+                                       "-R", "rusage[mem=352MB:tmp=8448MB] span[hosts=1]",
+                                       "-R", "select[mem>=352MB]",
+                                       "-R", "select[tmp>=8448MB]",
                                        "-R", "select[ncpus>=2]"})
                                mtx.Lock()
                                fakejobq[nextjobid] = args[1]
index 196bea03907848a8ff795ac3326337d169c0aedc..550ecba1c100c95df9fc5358564d6bcd4fe9bacc 100644 (file)
@@ -333,7 +333,8 @@ def main(args=sys.argv[1:],
             # Make an API object now so errors are reported early.
             api_client.users().current().execute()
         if keep_client is None:
-            keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
+            block_cache = arvados.keep.KeepBlockCache(disk_cache=True)
+            keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4, block_cache=block_cache)
         executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4, stdout=stdout)
     except WorkflowException as e:
         logger.error(e, exc_info=(sys.exc_info()[1] if arvargs.debug else False))
index f8715f7e7b805f8191e79cd68b4ae4324d04bc59..6fcf366e02aeed8aca3bc25a56d8562f0ba812f7 100644 (file)
@@ -267,7 +267,11 @@ class ArvadosContainer(JobBase):
         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
         if runtime_req:
             if "keep_cache" in runtime_req:
-                runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
+                if self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheDisk", 0) > 0:
+                    # If DefaultKeepCacheDisk is non-zero it means we should use disk cache.
+                    runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20)
+                else:
+                    runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
             if "outputDirType" in runtime_req:
                 if runtime_req["outputDirType"] == "local_output_dir":
                     # Currently the default behavior.
index ae3eab6ed822f88685f1fb2743bb9e31d4d436f8..75371e2b7856ffd36fdb51f8f7a69b4d89624d07 100644 (file)
@@ -201,6 +201,7 @@ class TestContainer(unittest.TestCase):
         runner.intermediate_output_ttl = 3600
         runner.secret_store = cwltool.secrets.SecretStore()
         runner.api._rootDesc = {"revision": "20210628"}
+        runner.api.config.return_value = {"Containers": {"DefaultKeepCacheDisk": 0}}
 
         keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
         runner.api.collections().get().execute.return_value = {
index 2871356e9827059352026432082c5fcdee2f3fce..bc6aab298fdcab19991528c1949b9d7bbac5efaf 100644 (file)
@@ -447,6 +447,7 @@ type ContainersConfig struct {
        CrunchRunCommand              string
        CrunchRunArgumentsList        []string
        DefaultKeepCacheRAM           ByteSize
+       DefaultKeepCacheDisk          ByteSize
        DispatchPrivateKey            string
        LogReuseDecisions             bool
        MaxComputeVMs                 int
index 45f92017c4d02be4a6d4063439ea8cd515dbd268..165c8112e8f1ed39cde40e2b6a913072ced0fe32 100644 (file)
@@ -107,11 +107,12 @@ type CUDARuntimeConstraints struct {
 // RuntimeConstraints specify a container's compute resources (RAM,
 // CPU) and network connectivity.
 type RuntimeConstraints struct {
-       API          bool                   `json:"API"`
-       RAM          int64                  `json:"ram"`
-       VCPUs        int                    `json:"vcpus"`
-       KeepCacheRAM int64                  `json:"keep_cache_ram"`
-       CUDA         CUDARuntimeConstraints `json:"cuda"`
+       API           bool                   `json:"API"`
+       RAM           int64                  `json:"ram"`
+       VCPUs         int                    `json:"vcpus"`
+       KeepCacheRAM  int64                  `json:"keep_cache_ram"`
+       KeepCacheDisk int64                  `json:"keep_cache_disk"`
+       CUDA          CUDARuntimeConstraints `json:"cuda"`
 }
 
 // SchedulingParameters specify a container's scheduling parameters
index 998481ab661105b68b0247d1a82c09211fa0d66e..e1138910aebfc501bdfd875c03bd568ea76c3f3e 100644 (file)
@@ -1308,6 +1308,11 @@ class Collection(RichCollectionBase):
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
+
+        # Use the keep client from ThreadSafeApiCache
+        if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
+            self._keep_client = self._api_client.keep
+
         self._block_manager = block_manager
         self.replication_desired = replication_desired
         self._storage_classes_desired = storage_classes_desired
diff --git a/sdk/python/arvados/diskcache.py b/sdk/python/arvados/diskcache.py
new file mode 100644 (file)
index 0000000..9734d93
--- /dev/null
@@ -0,0 +1,223 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import threading
+import mmap
+import os
+import traceback
+import stat
+import tempfile
+import fcntl
+import errno
+import logging
+
+_logger = logging.getLogger('arvados.keep')
+
+cacheblock_suffix = ".keepcacheblock"
+
+class DiskCacheSlot(object):
+    __slots__ = ("locator", "ready", "content", "cachedir")
+
+    def __init__(self, locator, cachedir):
+        self.locator = locator
+        self.ready = threading.Event()
+        self.content = None
+        self.cachedir = cachedir
+
+    def get(self):
+        self.ready.wait()
+        return self.content
+
+    def set(self, value):
+        tmpfile = None
+        try:
+            if value is None:
+                self.content = None
+                return
+
+            if len(value) == 0:
+                # Can't mmap a 0 length file
+                self.content = b''
+                return
+
+            if self.content is not None:
+                # Has been set already
+                return
+
+            blockdir = os.path.join(self.cachedir, self.locator[0:3])
+            os.makedirs(blockdir, mode=0o700, exist_ok=True)
+
+            final = os.path.join(blockdir, self.locator) + cacheblock_suffix
+
+            f = tempfile.NamedTemporaryFile(dir=blockdir, delete=False, prefix="tmp", suffix=cacheblock_suffix)
+            tmpfile = f.name
+            os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)
+
+            # aquire a shared lock, this tells other processes that
+            # we're using this block and to please not delete it.
+            fcntl.flock(f, fcntl.LOCK_SH)
+
+            f.write(value)
+            f.flush()
+            os.rename(tmpfile, final)
+            tmpfile = None
+
+            self.content = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
+        except OSError as e:
+            if e.errno == errno.ENODEV:
+                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
+            elif e.errno == errno.ENOMEM:
+                _logger.error("Unable to use disk cache: The process's maximum number of mappings would have been exceeded.")
+            elif e.errno == errno.ENOSPC:
+                _logger.error("Unable to use disk cache: Out of disk space.")
+            else:
+                traceback.print_exc()
+        except Exception as e:
+            traceback.print_exc()
+        finally:
+            if tmpfile is not None:
+                # If the tempfile hasn't been renamed on disk yet, try to delete it.
+                try:
+                    os.remove(tmpfile)
+                except:
+                    pass
+            if self.content is None:
+                # Something went wrong with the disk cache, fall back
+                # to RAM cache behavior (the alternative is to cache
+                # nothing and return a read error).
+                self.content = value
+            self.ready.set()
+
+    def size(self):
+        if self.content is None:
+            return 0
+        else:
+            return len(self.content)
+
+    def evict(self):
+        if self.content is not None and len(self.content) > 0:
+            # The mmap region might be in use when we decided to evict
+            # it.  This can happen if the cache is too small.
+            #
+            # If we call close() now, it'll throw an error if
+            # something tries to access it.
+            #
+            # However, we don't need to explicitly call mmap.close()
+            #
+            # I confirmed in mmapmodule.c that that both close
+            # and deallocate do the same thing:
+            #
+            # a) close the file descriptor
+            # b) unmap the memory range
+            #
+            # So we can forget it in the cache and delete the file on
+            # disk, and it will tear it down after any other
+            # lingering Python references to the mapped memory are
+            # gone.
+
+            blockdir = os.path.join(self.cachedir, self.locator[0:3])
+            final = os.path.join(blockdir, self.locator) + cacheblock_suffix
+            try:
+                with open(final, "rb") as f:
+                    # unlock
+                    fcntl.flock(f, fcntl.LOCK_UN)
+                    self.content = None
+
+                    # try to get an exclusive lock, this ensures other
+                    # processes are not using the block.  It is
+                    # nonblocking and will throw an exception if we
+                    # can't get it, which is fine because that means
+                    # we just won't try to delete it.
+                    #
+                    # I should note here, the file locking is not
+                    # strictly necessary, we could just remove it and
+                    # the kernel would ensure that the underlying
+                    # inode remains available as long as other
+                    # processes still have the file open.  However, if
+                    # you have multiple processes sharing the cache
+                    # and deleting each other's files, you'll end up
+                    # with a bunch of ghost files that don't show up
+                    # in the file system but are still taking up
+                    # space, which isn't particularly user friendly.
+                    # The locking strategy ensures that cache blocks
+                    # in use remain visible.
+                    #
+                    fcntl.flock(filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)
+
+                    os.remove(final)
+                    return True
+            except OSError:
+                pass
+            return False
+
+    @staticmethod
+    def get_from_disk(locator, cachedir):
+        blockdir = os.path.join(cachedir, locator[0:3])
+        final = os.path.join(blockdir, locator) + cacheblock_suffix
+
+        try:
+            filehandle = open(final, "rb")
+
+            # aquire a shared lock, this tells other processes that
+            # we're using this block and to please not delete it.
+            fcntl.flock(filehandle, fcntl.LOCK_SH)
+
+            content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ)
+            dc = DiskCacheSlot(locator, cachedir)
+            dc.content = content
+            dc.ready.set()
+            return dc
+        except FileNotFoundError:
+            pass
+        except Exception as e:
+            traceback.print_exc()
+
+        return None
+
+    @staticmethod
+    def init_cache(cachedir, maxslots):
+        # map in all the files in the cache directory, up to max slots.
+        # after max slots, try to delete the excess blocks.
+        #
+        # this gives the calling process ownership of all the blocks
+
+        blocks = []
+        for root, dirs, files in os.walk(cachedir):
+            for name in files:
+                if not name.endswith(cacheblock_suffix):
+                    continue
+
+                blockpath = os.path.join(root, name)
+                res = os.stat(blockpath)
+
+                if len(name) == (32+len(cacheblock_suffix)) and not name.startswith("tmp"):
+                    blocks.append((name[0:32], res.st_atime))
+                elif name.startswith("tmp") and ((time.time() - res.st_mtime) > 60):
+                    # found a temporary file more than 1 minute old,
+                    # try to delete it.
+                    try:
+                        os.remove(blockpath)
+                    except:
+                        pass
+
+        # sort by access time (atime), going from most recently
+        # accessed (highest timestamp) to least recently accessed
+        # (lowest timestamp).
+        blocks.sort(key=lambda x: x[1], reverse=True)
+
+        # Map in all the files we found, up to maxslots, if we exceed
+        # maxslots, start throwing things out.
+        cachelist = []
+        for b in blocks:
+            got = DiskCacheSlot.get_from_disk(b[0], cachedir)
+            if got is None:
+                continue
+            if len(cachelist) < maxslots:
+                cachelist.append(got)
+            else:
+                # we found more blocks than maxslots, try to
+                # throw it out of the cache.
+                got.evict()
+
+        return cachelist
index 44e915776734fe87020ba46b5d95d9985f8e8dfe..dd99e8b928054ed03e42196608f0a9697f65870b 100644 (file)
@@ -26,6 +26,7 @@ import socket
 import ssl
 import sys
 import threading
+import resource
 from . import timer
 import urllib.parse
 
@@ -39,6 +40,7 @@ import arvados.config as config
 import arvados.errors
 import arvados.retry as retry
 import arvados.util
+import arvados.diskcache
 
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
@@ -174,11 +176,48 @@ class Keep(object):
         return Keep.global_client_object().put(data, **kwargs)
 
 class KeepBlockCache(object):
-    # Default RAM cache is 256MiB
-    def __init__(self, cache_max=(256 * 1024 * 1024)):
+    def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
         self.cache_max = cache_max
         self._cache = []
         self._cache_lock = threading.Lock()
+        self._max_slots = max_slots
+        self._disk_cache = disk_cache
+        self._disk_cache_dir = disk_cache_dir
+
+        if self._disk_cache and self._disk_cache_dir is None:
+            self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
+            os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
+
+        if self._max_slots == 0:
+            if self._disk_cache:
+                # default max slots to half of maximum file handles
+                # NOFILE typically defaults to 1024 on Linux so this
+                # will be 512 slots.
+                self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
+            else:
+                # RAM cache slots
+                self._max_slots = 512
+
+        if self.cache_max == 0:
+            if self._disk_cache:
+                fs = os.statvfs(self._disk_cache_dir)
+                avail = (fs.f_bavail * fs.f_bsize) / 4
+                maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
+                # pick smallest of:
+                # 10% of total disk size
+                # 25% of available space
+                # max_slots * 64 MiB
+                self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
+            else:
+                # 256 GiB in RAM
+                self.cache_max = (256 * 1024 * 1024)
+
+        self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
+
+        if self._disk_cache:
+            self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
+            self.cap_cache()
+
 
     class CacheSlot(object):
         __slots__ = ("locator", "ready", "content")
@@ -202,6 +241,9 @@ class KeepBlockCache(object):
             else:
                 return len(self.content)
 
+        def evict(self):
+            return True
+
     def cap_cache(self):
         '''Cap the cache size to self.cache_max'''
         with self._cache_lock:
@@ -209,12 +251,27 @@ class KeepBlockCache(object):
             # None (that means there was an error reading the block).
             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
             sm = sum([slot.size() for slot in self._cache])
-            while len(self._cache) > 0 and sm > self.cache_max:
+            while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
                 for i in range(len(self._cache)-1, -1, -1):
+                    # start from the back, find a slot that is a candidate to evict
                     if self._cache[i].ready.is_set():
+                        sz = self._cache[i].size()
+
+                        # If evict returns false it means the
+                        # underlying disk cache couldn't lock the file
+                        # for deletion because another process was using
+                        # it. Don't count it as reducing the amount
+                        # of data in the cache, find something else to
+                        # throw out.
+                        if self._cache[i].evict():
+                            sm -= sz
+
+                        # either way we forget about it.  either the
+                        # other process will delete it, or if we need
+                        # it again and it is still there, we'll find
+                        # it on disk.
                         del self._cache[i]
                         break
-                sm = sum([slot.size() for slot in self._cache])
 
     def _get(self, locator):
         # Test if the locator is already in the cache
@@ -226,6 +283,12 @@ class KeepBlockCache(object):
                     del self._cache[i]
                     self._cache.insert(0, n)
                 return n
+        if self._disk_cache:
+            # see if it exists on disk
+            n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
+            if n is not None:
+                self._cache.insert(0, n)
+                return n
         return None
 
     def get(self, locator):
@@ -241,7 +304,10 @@ class KeepBlockCache(object):
                 return n, False
             else:
                 # Add a new cache slot for the locator
-                n = KeepBlockCache.CacheSlot(locator)
+                if self._disk_cache:
+                    n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
+                else:
+                    n = KeepBlockCache.CacheSlot(locator)
                 self._cache.insert(0, n)
                 return n, True
 
index d85e2a5cde56056416fee47ce85fa0197d6e757d..1daafc97adcf89e2f6fac2f1899db2003967f25c 100644 (file)
@@ -64,6 +64,6 @@ setup(name='arvados-python-client',
           'Programming Language :: Python :: 3',
       ],
       test_suite='tests',
-      tests_require=['pbr<1.7.0', 'mock>=1.0,<4', 'PyYAML'],
+      tests_require=['pbr<1.7.0', 'mock>=1.0,<4', 'PyYAML', 'parameterized'],
       zip_safe=False
       )
index d9b3ca86c4f9055dde2fa9b54ad63ed65d16d755..3772761b88a2b9ef6be3b7b4ec724936f4e36d5a 100644 (file)
@@ -280,3 +280,18 @@ if sys.version_info < (3, 0):
         return self.assertNotRegexpMatches(*args, **kwargs)
     unittest.TestCase.assertRegex = assertRegex
     unittest.TestCase.assertNotRegex = assertNotRegex
+
+def binary_compare(a, b):
+    if len(a) != len(b):
+        return False
+    for i in range(0, len(a)):
+        if a[i] != b[i]:
+            return False
+    return True
+
+def make_block_cache(disk_cache):
+    if disk_cache:
+        disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
+        shutil.rmtree(disk_cache_dir, ignore_errors=True)
+    block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
+    return block_cache
index b4849c21ff30909ab3c7f9bb0af8040582e82a58..8aded823bde3410a68dfa0e1110e0d0537d6c868 100644 (file)
@@ -16,11 +16,13 @@ import datetime
 import ciso8601
 import time
 import unittest
+import parameterized
 
 from . import run_test_server
 from arvados._ranges import Range, LocatorAndRange
 from arvados.collection import Collection, CollectionReader
 from . import arvados_testutil as tutil
+from .arvados_testutil import make_block_cache
 
 class TestResumableWriter(arvados.ResumableCollectionWriter):
     KEEP_BLOCK_SIZE = 1024  # PUT to Keep every 1K.
@@ -28,9 +30,10 @@ class TestResumableWriter(arvados.ResumableCollectionWriter):
     def current_state(self):
         return self.dump_state(copy.deepcopy)
 
-
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
                              tutil.ArvadosBaseTestCase):
+    disk_cache = False
     MAIN_SERVER = {}
 
     @classmethod
@@ -40,7 +43,8 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
         run_test_server.authorize_with('admin')
         cls.api_client = arvados.api('v1')
         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
-                                             local_store=cls.local_store)
+                                             local_store=cls.local_store,
+                                             block_cache=make_block_cache(cls.disk_cache))
 
     def write_foo_bar_baz(self):
         cw = arvados.CollectionWriter(self.api_client)
index 87e4eefb29d666b4f3fa724151ba8cf268f7d20b..126116393df8ea476f62d959c108e0a0d063bd0a 100644 (file)
@@ -15,12 +15,15 @@ import os
 import pycurl
 import random
 import re
+import shutil
 import socket
 import sys
 import time
 import unittest
 import urllib.parse
 
+import parameterized
+
 import arvados
 import arvados.retry
 import arvados.util
@@ -28,7 +31,11 @@ from . import arvados_testutil as tutil
 from . import keepstub
 from . import run_test_server
 
+from .arvados_testutil import make_block_cache
+
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepTestCase(run_test_server.TestCaseWithServers):
+    disk_cache = False
     MAIN_SERVER = {}
     KEEP_SERVER = {}
 
@@ -38,7 +45,8 @@ class KeepTestCase(run_test_server.TestCaseWithServers):
         run_test_server.authorize_with("admin")
         cls.api_client = arvados.api('v1')
         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
-                                             proxy='', local_store='')
+                                             proxy='', local_store='',
+                                             block_cache=make_block_cache(cls.disk_cache))
 
     def test_KeepBasicRWTest(self):
         self.assertEqual(0, self.keep_client.upload_counter.get())
@@ -52,8 +60,8 @@ class KeepTestCase(run_test_server.TestCaseWithServers):
         self.assertEqual(6, self.keep_client.upload_counter.get())
 
         self.assertEqual(0, self.keep_client.download_counter.get())
-        self.assertEqual(self.keep_client.get(foo_locator),
-                         b'foo',
+        self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
+                         b'foo'),
                          'wrong content from Keep.get(md5("foo"))')
         self.assertEqual(3, self.keep_client.download_counter.get())
 
@@ -128,13 +136,15 @@ class KeepTestCase(run_test_server.TestCaseWithServers):
                          b'test_head',
                          'wrong content from Keep.get for "test_head"')
 
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
+    disk_cache = False
     MAIN_SERVER = {}
     KEEP_SERVER = {'blob_signing': True}
 
     def test_KeepBasicRWTest(self):
         run_test_server.authorize_with('active')
-        keep_client = arvados.KeepClient()
+        keep_client = arvados.KeepClient(block_cache=make_block_cache(self.disk_cache))
         foo_locator = keep_client.put('foo')
         self.assertRegex(
             foo_locator,
@@ -171,8 +181,9 @@ class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
                           keep_client.get,
                           unsigned_bar_locator)
 
-
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
+    disk_cache = False
     MAIN_SERVER = {}
     KEEP_SERVER = {}
     KEEP_PROXY_SERVER = {}
@@ -190,7 +201,7 @@ class KeepProxyTestCase(run_test_server.TestCaseWithServers):
         # Will use ARVADOS_KEEP_SERVICES environment variable that
         # is set by setUpClass().
         keep_client = arvados.KeepClient(api_client=self.api_client,
-                                         local_store='')
+                                         local_store='', block_cache=make_block_cache(self.disk_cache))
         baz_locator = keep_client.put('baz')
         self.assertRegex(
             baz_locator,
@@ -206,7 +217,8 @@ class KeepProxyTestCase(run_test_server.TestCaseWithServers):
         # existing proxy setting and setting multiple proxies
         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
         keep_client = arvados.KeepClient(api_client=self.api_client,
-                                         local_store='')
+                                         local_store='',
+                                         block_cache=make_block_cache(self.disk_cache))
         uris = [x['_service_root'] for x in keep_client._keep_services]
         self.assertEqual(uris, ['http://10.0.0.1/',
                                 'https://foo.example.org:1234/'])
@@ -215,12 +227,15 @@ class KeepProxyTestCase(run_test_server.TestCaseWithServers):
         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
         with self.assertRaises(arvados.errors.ArgumentError):
             keep_client = arvados.KeepClient(api_client=self.api_client,
-                                             local_store='')
-
+                                             local_store='',
+                                             block_cache=make_block_cache(self.disk_cache))
 
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
+    disk_cache = False
+
     def get_service_roots(self, api_client):
-        keep_client = arvados.KeepClient(api_client=api_client)
+        keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
         return [urllib.parse.urlparse(url) for url in sorted(services)]
 
@@ -239,7 +254,8 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
 
     def test_recognize_proxy_services_in_controller_response(self):
         keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
-            service_type='proxy', service_host='localhost', service_port=9, count=1))
+            service_type='proxy', service_host='localhost', service_port=9, count=1),
+                                         block_cache=make_block_cache(self.disk_cache))
         try:
             # this will fail, but it ensures we get the service
             # discovery response
@@ -254,7 +270,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
 
         api_client.insecure = True
         with tutil.mock_keep_responses(b'foo', 200) as mock:
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
             self.assertEqual(
                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
@@ -265,7 +281,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
 
         api_client.insecure = False
         with tutil.mock_keep_responses(b'foo', 200) as mock:
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
             # getopt()==None here means we didn't change the
             # default. If we were using real pycurl instead of a mock,
@@ -286,7 +302,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         headers = {'X-Keep-Locator':local_loc}
         with tutil.mock_keep_responses('', 200, **headers):
             # Check that the translated locator gets returned
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
             # Check that refresh_signature() uses the correct method and headers
             keep_client._get_or_head = mock.MagicMock()
@@ -305,7 +321,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         api_client = self.mock_keep_services(count=1)
         force_timeout = socket.timeout("timed out")
         with tutil.mock_keep_responses(force_timeout, 0) as mock:
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             with self.assertRaises(arvados.errors.KeepReadError):
                 keep_client.get('ffffffffffffffffffffffffffffffff')
             self.assertEqual(
@@ -322,7 +338,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         api_client = self.mock_keep_services(count=1)
         force_timeout = socket.timeout("timed out")
         with tutil.mock_keep_responses(force_timeout, 0) as mock:
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             with self.assertRaises(arvados.errors.KeepWriteError):
                 keep_client.put(b'foo')
             self.assertEqual(
@@ -339,7 +355,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         api_client = self.mock_keep_services(count=1)
         force_timeout = socket.timeout("timed out")
         with tutil.mock_keep_responses(force_timeout, 0) as mock:
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             with self.assertRaises(arvados.errors.KeepReadError):
                 keep_client.head('ffffffffffffffffffffffffffffffff')
             self.assertEqual(
@@ -356,7 +372,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         api_client = self.mock_keep_services(service_type='proxy', count=1)
         force_timeout = socket.timeout("timed out")
         with tutil.mock_keep_responses(force_timeout, 0) as mock:
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             with self.assertRaises(arvados.errors.KeepReadError):
                 keep_client.get('ffffffffffffffffffffffffffffffff')
             self.assertEqual(
@@ -373,7 +389,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         api_client = self.mock_keep_services(service_type='proxy', count=1)
         force_timeout = socket.timeout("timed out")
         with tutil.mock_keep_responses(force_timeout, 0) as mock:
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             with self.assertRaises(arvados.errors.KeepReadError):
                 keep_client.head('ffffffffffffffffffffffffffffffff')
             self.assertEqual(
@@ -407,7 +423,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         api_client = mock.MagicMock(name='api_client')
         api_client.keep_services().accessible().execute.side_effect = (
             arvados.errors.ApiError)
-        keep_client = arvados.KeepClient(api_client=api_client)
+        keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
         with self.assertRaises(exc_class) as err_check:
             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
         self.assertEqual(0, len(err_check.exception.request_errors()))
@@ -427,7 +443,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
         with req_mock, tutil.skip_sleep, \
                 self.assertRaises(exc_class) as err_check:
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
                                        num_retries=3)
         self.assertEqual([502, 502], [
@@ -450,7 +466,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         api_client = self.mock_keep_services(count=3)
         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             keep_client.put(data)
         self.assertEqual(2, len(exc_check.exception.request_errors()))
 
@@ -460,7 +476,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
-          keep_client = arvados.KeepClient(api_client=api_client)
+          keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
           keep_client.put(data)
         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
         self.assertEqual(0, len(exc_check.exception.request_errors()))
@@ -469,7 +485,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         body = b'oddball service get'
         api_client = self.mock_keep_services(service_type='fancynewblobstore')
         with tutil.mock_keep_responses(body, 200):
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             actual = keep_client.get(tutil.str_keep_locator(body))
         self.assertEqual(body, actual)
 
@@ -478,7 +494,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         pdh = tutil.str_keep_locator(body)
         api_client = self.mock_keep_services(service_type='fancynewblobstore')
         with tutil.mock_keep_responses(pdh, 200):
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             actual = keep_client.put(body, copies=1)
         self.assertEqual(pdh, actual)
 
@@ -490,17 +506,19 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         headers = {'x-keep-replicas-stored': 3}
         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
                                        **headers) as req_mock:
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             actual = keep_client.put(body, copies=2)
         self.assertEqual(pdh, actual)
         self.assertEqual(1, req_mock.call_count)
 
-
 @tutil.skip_sleep
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
+    disk_cache = False
+
     def setUp(self):
         self.api_client = self.mock_keep_services(count=2)
-        self.keep_client = arvados.KeepClient(api_client=self.api_client)
+        self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
         self.data = b'xyzzy'
         self.locator = '1271ed5ef305aadabc605b1609e24c52'
 
@@ -524,19 +542,22 @@ class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
     def test_head_and_then_get_return_different_responses(self, get_mock):
         head_resp = None
         get_resp = None
-        get_mock.side_effect = ['first response', 'second response']
+        get_mock.side_effect = [b'first response', b'second response']
         with tutil.mock_keep_responses(self.data, 200, 200):
             head_resp = self.keep_client.head(self.locator)
             get_resp = self.keep_client.get(self.locator)
-        self.assertEqual('first response', head_resp)
+        self.assertEqual(b'first response', head_resp)
         # First reponse was not cached because it was from a HEAD request.
         self.assertNotEqual(head_resp, get_resp)
 
 @tutil.skip_sleep
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
+    disk_cache = False
+
     def setUp(self):
         self.api_client = self.mock_keep_services(count=2)
-        self.keep_client = arvados.KeepClient(api_client=self.api_client)
+        self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
         self.data = b'xyzzy'
         self.locator = '1271ed5ef305aadabc605b1609e24c52'
 
@@ -550,7 +571,7 @@ class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
             }
         }
         api_client = self.mock_keep_services(api_mock=api_mock, count=2)
-        keep_client = arvados.KeepClient(api_client=api_client)
+        keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
         resp_hdr = {
             'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
             'x-keep-replicas-stored': 1
@@ -644,10 +665,13 @@ class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
 
 @tutil.skip_sleep
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
+    disk_cache = False
+
     def setUp(self):
         self.api_client = self.mock_keep_services(count=2)
-        self.keep_client = arvados.KeepClient(api_client=self.api_client)
+        self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
         self.data = b'xyzzy'
         self.locator = '1271ed5ef305aadabc605b1609e24c52'
         self.test_id = arvados.util.new_request_id()
@@ -732,7 +756,9 @@ class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
 
 
 @tutil.skip_sleep
+#@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
+    disk_cache = False
 
     def setUp(self):
         # expected_order[i] is the probe order for
@@ -755,7 +781,7 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
             hashlib.md5(self.blocks[x]).hexdigest()
             for x in range(len(self.expected_order))]
         self.api_client = self.mock_keep_services(count=self.services)
-        self.keep_client = arvados.KeepClient(api_client=self.api_client)
+        self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
 
     def test_weighted_service_roots_against_reference_set(self):
         # Confirm weighted_service_roots() returns the correct order
@@ -828,12 +854,12 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
         initial_services = 12
         self.api_client = self.mock_keep_services(count=initial_services)
-        self.keep_client = arvados.KeepClient(api_client=self.api_client)
+        self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
         probes_before = [
             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
         for added_services in range(1, 12):
             api_client = self.mock_keep_services(count=initial_services+added_services)
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             total_penalty = 0
             for hash_index in range(len(hashes)):
                 probe_after = keep_client.weighted_service_roots(
@@ -869,7 +895,7 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
         # Arbitrary port number:
         aport = random.randint(1024,65535)
         api_client = self.mock_keep_services(service_port=aport, count=self.services)
-        keep_client = arvados.KeepClient(api_client=api_client)
+        keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
         with mock.patch('pycurl.Curl') as curl_mock, \
              self.assertRaises(exc_class) as err_check:
             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
@@ -885,8 +911,10 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
     def test_put_error_shows_probe_order(self):
         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
 
-
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
+    disk_cache = False
+
     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
     # 1s worth of data and then trigger bandwidth errors before running
     # out of data.
@@ -923,7 +951,7 @@ class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
         return arvados.KeepClient(
             api_client=self.api_client,
-            timeout=timeouts)
+            timeout=timeouts, block_cache=make_block_cache(self.disk_cache))
 
     def test_timeout_slow_connect(self):
         # Can't simulate TCP delays with our own socket. Leave our
@@ -1027,8 +1055,10 @@ class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
             with self.assertRaises(arvados.errors.KeepWriteError):
                 kc.put(self.DATA, copies=1, num_retries=0)
 
-
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
+    disk_cache = False
+
     def mock_disks_and_gateways(self, disks=3, gateways=1):
         self.gateways = [{
                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
@@ -1043,7 +1073,7 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
             for gw in self.gateways]
         self.api_client = self.mock_keep_services(
             count=disks, additional_services=self.gateways)
-        self.keepClient = arvados.KeepClient(api_client=self.api_client)
+        self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
 
     @mock.patch('pycurl.Curl')
     def test_get_with_gateway_hint_first(self, MockCurl):
@@ -1124,8 +1154,9 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
                          MockCurl.return_value.getopt(pycurl.URL).decode())
 
-
 class KeepClientRetryTestMixin(object):
+    disk_cache = False
+
     # Testing with a local Keep store won't exercise the retry behavior.
     # Instead, our strategy is:
     # * Create a client with one proxy specified (pointed at a black
@@ -1150,6 +1181,7 @@ class KeepClientRetryTestMixin(object):
     def new_client(self, **caller_kwargs):
         kwargs = self.client_kwargs.copy()
         kwargs.update(caller_kwargs)
+        kwargs['block_cache'] = make_block_cache(self.disk_cache)
         return arvados.KeepClient(**kwargs)
 
     def run_method(self, *args, **kwargs):
@@ -1199,6 +1231,7 @@ class KeepClientRetryTestMixin(object):
 
 
 @tutil.skip_sleep
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
@@ -1243,6 +1276,7 @@ class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
             self.check_success(locator=self.HINTED_LOCATOR)
 
 @tutil.skip_sleep
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
     DEFAULT_EXPECT = True
     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
@@ -1281,6 +1315,7 @@ class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
             self.check_success(locator=self.HINTED_LOCATOR)
 
 @tutil.skip_sleep
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
@@ -1369,13 +1404,16 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
 
 
 @tutil.skip_sleep
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
+    block_cache = False
+
     # Test put()s that need two distinct servers to succeed, possibly
     # requiring multiple passes through the retry loop.
 
     def setUp(self):
         self.api_client = self.mock_keep_services(count=2)
-        self.keep_client = arvados.KeepClient(api_client=self.api_client)
+        self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
 
     def test_success_after_exception(self):
         with tutil.mock_keep_responses(
@@ -1402,7 +1440,10 @@ class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
                 self.keep_client.put('foo', num_retries=1, copies=2)
         self.assertEqual(2, req_mock.call_count)
 
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepClientAPIErrorTest(unittest.TestCase):
+    disk_cache = False
+
     def test_api_fail(self):
         class ApiMock(object):
             def __getattr__(self, r):
@@ -1415,7 +1456,8 @@ class KeepClientAPIErrorTest(unittest.TestCase):
                 else:
                     raise arvados.errors.KeepReadError()
         keep_client = arvados.KeepClient(api_client=ApiMock(),
-                                             proxy='', local_store='')
+                                         proxy='', local_store='',
+                                         block_cache=make_block_cache(self.disk_cache))
 
         # The bug this is testing for is that if an API (not
         # keepstore) exception is thrown as part of a get(), the next
index a369292fb38c09a417b32bd3d33ea165319de647..1ff46c3616975f4d90e23b5ef4603facd1ccc217 100644 (file)
@@ -950,6 +950,10 @@ class ArvadosModel < ApplicationRecord
   # value in the database to an implicit zero/false value in an update
   # request.
   def fill_container_defaults
+    # Make sure this is correctly sorted by key, because we merge in
+    # whatever is in the database on top of it, this will be the order
+    # that gets used downstream rather than the order the keys appear
+    # in the database.
     self.runtime_constraints = {
       'API' => false,
       'cuda' => {
@@ -957,6 +961,7 @@ class ArvadosModel < ApplicationRecord
         'driver_version' => '',
         'hardware_capability' => '',
       },
+      'keep_cache_disk' => 0,
       'keep_cache_ram' => 0,
       'ram' => 0,
       'vcpus' => 0,
index 3e3f73b838dab5f4809bef12cd8c3d3dc1b02b08..4b02ad52e56dc55925d9418e528434917897379e 100644 (file)
@@ -227,6 +227,10 @@ class Container < ArvadosModel
     if rc['keep_cache_ram'] == 0
       rc['keep_cache_ram'] = Rails.configuration.Containers.DefaultKeepCacheRAM
     end
+    if rc['keep_cache_disk'] == 0 and rc['keep_cache_ram'] == 0
+      # Only set if keep_cache_ram isn't set.
+      rc['keep_cache_disk'] = Rails.configuration.Containers.DefaultKeepCacheDisk
+    end
     rc
   end
 
@@ -306,6 +310,15 @@ class Container < ArvadosModel
       # records that don't have a 'cuda' section in runtime_constraints
       resolved_runtime_constraints << resolved_runtime_constraints[0].except('cuda')
     end
+    if resolved_runtime_constraints[0]['keep_cache_disk'] == 0
+      # If no disk cache requested, extend search to include older container
+      # records that don't have a 'keep_cache_disk' field in runtime_constraints
+      if resolved_runtime_constraints.length == 2
+        # exclude the one that also excludes CUDA
+        resolved_runtime_constraints << resolved_runtime_constraints[1].except('keep_cache_disk')
+      end
+      resolved_runtime_constraints << resolved_runtime_constraints[0].except('keep_cache_disk')
+    end
 
     candidates = candidates.where_serialized(:runtime_constraints, resolved_runtime_constraints, md5: true, multivalue: true)
     log_reuse_info(candidates) { "after filtering on runtime_constraints #{attrs[:runtime_constraints].inspect}" }
index a61fb07177bba1045aca1f949bcdaa9632efb1ff..73b17280bb28db534c7802b90b3df5061bd5b14a 100644 (file)
@@ -170,6 +170,7 @@ diagnostics_completed_requester:
            ]
   runtime_constraints:
     API: true
+    keep_cache_disk: 0
     keep_cache_ram: 268435456
     ram: 1342177280
     vcpus: 1
@@ -195,6 +196,7 @@ diagnostics_completed_hasher1:
            ]
   runtime_constraints:
     API: true
+    keep_cache_disk: 0
     keep_cache_ram: 268435456
     ram: 268435456
     vcpus: 1
@@ -220,6 +222,7 @@ diagnostics_completed_hasher2:
            ]
   runtime_constraints:
     API: true
+    keep_cache_disk: 0
     keep_cache_ram: 268435456
     ram: 268435456
     vcpus: 2
@@ -245,6 +248,7 @@ diagnostics_completed_hasher3:
            ]
   runtime_constraints:
     API: true
+    keep_cache_disk: 0
     keep_cache_ram: 268435456
     ram: 268435456
     vcpus: 1
@@ -281,6 +285,7 @@ diagnostics_completed_requester2:
            ]
   runtime_constraints:
     API: true
+    keep_cache_disk: 0
     keep_cache_ram: 268435456
     ram: 1342177280
     vcpus: 1
index e6db412179b663e39696e9c884af584af287ddbf..efc61eee8c80fcfa509478ebc0ec51e947e3cccb 100644 (file)
@@ -177,6 +177,9 @@ class ContainerRequestTest < ActiveSupport::TestCase
 
     assert ({"vcpus" => 2, "ram" => 30}.to_a - cr.runtime_constraints.to_a).empty?
 
+    assert_equal 0, Rails.configuration.Containers.DefaultKeepCacheRAM
+    assert_equal 8589934592, Rails.configuration.Containers.DefaultKeepCacheDisk
+
     assert_not_nil cr.container_uuid
     c = Container.find_by_uuid cr.container_uuid
     assert_not_nil c
@@ -186,7 +189,7 @@ class ContainerRequestTest < ActiveSupport::TestCase
     assert_equal({}, c.environment)
     assert_equal({"/out" => {"kind"=>"tmp", "capacity"=>1000000}}, c.mounts)
     assert_equal "/out", c.output_path
-    assert ({"keep_cache_ram"=>268435456, "vcpus" => 2, "ram" => 30}.to_a - c.runtime_constraints.to_a).empty?
+    assert ({"keep_cache_disk"=>8589934592, "keep_cache_ram"=>0, "vcpus" => 2, "ram" => 30}.to_a - c.runtime_constraints.to_a).empty?
     assert_operator 0, :<, c.priority
 
     assert_raises(ActiveRecord::RecordInvalid) do
index a4c0ce17926092ec451583404a21f56374f79176..f804aca2d982fd8d000fcf10d19eec0d47173965 100644 (file)
@@ -24,6 +24,7 @@ class ContainerTest < ActiveSupport::TestCase
     output_path: "test",
     runtime_constraints: {
       "API" => false,
+      "keep_cache_disk" => 0,
       "keep_cache_ram" => 0,
       "ram" => 12000000000,
       "vcpus" => 4
@@ -229,7 +230,7 @@ class ContainerTest < ActiveSupport::TestCase
     set_user_from_auth :active
     env = {"C" => "3", "B" => "2", "A" => "1"}
     m = {"F" => {"kind" => "3"}, "E" => {"kind" => "2"}, "D" => {"kind" => "1"}}
-    rc = {"vcpus" => 1, "ram" => 1, "keep_cache_ram" => 1, "API" => true, "cuda" => {"device_count":0, "driver_version": "", "hardware_capability": ""}}
+    rc = {"vcpus" => 1, "ram" => 1, "keep_cache_ram" => 1, "keep_cache_disk" => 0, "API" => true, "cuda" => {"device_count":0, "driver_version": "", "hardware_capability": ""}}
     c, _ = minimal_new(environment: env, mounts: m, runtime_constraints: rc)
     c.reload
     assert_equal Container.deep_sort_hash(env).to_json, c.environment.to_json
@@ -594,14 +595,14 @@ class ContainerTest < ActiveSupport::TestCase
     set_user_from_auth :active
     # No cuda
     no_cuda_attrs = REUSABLE_COMMON_ATTRS.merge({use_existing:false, priority:1, environment:{"var" => "queued"},
-                                                runtime_constraints: {"vcpus" => 1, "ram" => 1, "keep_cache_ram"=>268435456, "API" => false,
+                                                runtime_constraints: {"vcpus" => 1, "ram" => 1, "keep_cache_disk"=>0, "keep_cache_ram"=>268435456, "API" => false,
                                                                       "cuda" => {"device_count":0, "driver_version": "", "hardware_capability": ""}},})
     c1, _ = minimal_new(no_cuda_attrs)
     assert_equal Container::Queued, c1.state
 
     # has cuda
     cuda_attrs = REUSABLE_COMMON_ATTRS.merge({use_existing:false, priority:1, environment:{"var" => "queued"},
-                                                runtime_constraints: {"vcpus" => 1, "ram" => 1, "keep_cache_ram"=>268435456, "API" => false,
+                                                runtime_constraints: {"vcpus" => 1, "ram" => 1, "keep_cache_disk"=>0, "keep_cache_ram"=>268435456, "API" => false,
                                                                       "cuda" => {"device_count":1, "driver_version": "11.0", "hardware_capability": "9.0"}},})
     c2, _ = minimal_new(cuda_attrs)
     assert_equal Container::Queued, c2.state
index 5f0a1f80f6a4e9f693c91b8946ce41cac6c2f227..994c998823905e4f2398b15eb911768de6e03aa5 100644 (file)
@@ -83,13 +83,20 @@ class ArgumentParser(argparse.ArgumentParser):
                             type=str, metavar='PATH', action='append', default=[],
                             help="Create a new collection, mount it in read/write mode at mountpoint/PATH, and delete it when unmounting.")
 
+
         self.add_argument('--debug', action='store_true', help="""Debug mode""")
         self.add_argument('--logfile', help="""Write debug logs and errors to the specified file (default stderr).""")
         self.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False)
         self.add_argument('--encoding', type=str, help="Character encoding to use for filesystem, default is utf-8 (see Python codec registry for list of available encodings)", default="utf-8")
 
-        self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 256MiB)", default=256*1024*1024)
-        self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128MiB)", default=128*1024*1024)
+        self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 8 GiB for disk-based cache or 256 MiB with RAM-only cache)", default=0)
+        self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128 MiB)", default=128*1024*1024)
+
+        cachetype = self.add_mutually_exclusive_group()
+        cachetype.add_argument('--ram-cache', action='store_false', dest='disk_cache', help="Use in-memory caching only", default=True)
+        cachetype.add_argument('--disk-cache', action='store_true', dest='disk_cache', help="Use disk based caching (default)", default=True)
+
+        self.add_argument('--disk-cache-dir', type=str, help="Disk cache location (default ~/.cache/arvados/keep)", default=None)
 
         self.add_argument('--disable-event-listening', action='store_true', help="Don't subscribe to events on the API server", dest="disable_event_listening", default=False)
 
@@ -213,8 +220,12 @@ class Mount(object):
         try:
             self.api = arvados.safeapi.ThreadSafeApiCache(
                 apiconfig=arvados.config.settings(),
+                # default value of file_cache is 0, this tells KeepBlockCache to
+                # choose a default based on whether disk_cache is enabled or not.
                 keep_params={
-                    'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
+                    'block_cache': arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
+                                                               disk_cache=self.args.disk_cache,
+                                                               disk_cache_dir=self.args.disk_cache_dir),
                     'num_retries': self.args.retries,
                 })
         except KeyError as e:
index 545b4bfa01c70135585491dcb2c946bd847a4871..d0c46f132040aa400645473ddf347c53be135d23 100644 (file)
@@ -59,6 +59,6 @@ setup(name='arvados_fuse',
           'Programming Language :: Python :: 3',
       ],
       test_suite='tests',
-      tests_require=['pbr<1.7.0', 'mock>=1.0', 'PyYAML'],
+      tests_require=['pbr<1.7.0', 'mock>=1.0', 'PyYAML', 'parameterized',],
       zip_safe=False
       )
index 7cf8aa373a9e3b215593d507da0bb216531cf8d4..e82660408bbeb784f07dda1db344991de882f9c4 100644 (file)
@@ -4,6 +4,7 @@
 
 from __future__ import absolute_import
 import arvados
+import arvados.keep
 import arvados_fuse as fuse
 import arvados.safeapi
 import llfuse
@@ -24,7 +25,16 @@ logger = logging.getLogger('arvados.arv-mount')
 
 from .integration_test import workerPool
 
+def make_block_cache(disk_cache):
+    if disk_cache:
+        disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
+        shutil.rmtree(disk_cache_dir, ignore_errors=True)
+    block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
+    return block_cache
+
 class MountTestBase(unittest.TestCase):
+    disk_cache = False
+
     def setUp(self, api=None, local_store=True):
         # The underlying C implementation of open() makes a fstat() syscall
         # with the GIL still held.  When the GETATTR message comes back to
@@ -43,7 +53,8 @@ class MountTestBase(unittest.TestCase):
         self.mounttmp = tempfile.mkdtemp()
         run_test_server.run()
         run_test_server.authorize_with("admin")
-        self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
+
+        self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings(), keep_params={"block_cache": make_block_cache(self.disk_cache)})
         self.llfuse_thread = None
 
     # This is a copy of Mount's method.  TODO: Refactor MountTestBase
index 1601db59440be8b57c35b988869a1a56229ef92b..df3d4263417bcc271b77c05dc75aec0ee8343aea 100644 (file)
@@ -16,6 +16,7 @@ import subprocess
 import time
 import unittest
 import tempfile
+import parameterized
 
 import arvados
 import arvados_fuse as fuse
@@ -54,7 +55,7 @@ class AssertWithTimeout(object):
         else:
             self.done = True
 
-
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class FuseMountTest(MountTestBase):
     def setUp(self):
         super(FuseMountTest, self).setUp()
@@ -125,6 +126,7 @@ class FuseMountTest(MountTestBase):
                 self.assertEqual(v, f.read().decode())
 
 
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class FuseMagicTest(MountTestBase):
     def setUp(self, api=None):
         super(FuseMagicTest, self).setUp(api=api)
@@ -283,6 +285,7 @@ def fuseSharedTestHelper(mounttmp):
 
     Test().runTest()
 
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class FuseSharedTest(MountTestBase):
     def runTest(self):
         self.make_mount(fuse.SharedDirectory,
@@ -343,6 +346,7 @@ def fuseModifyFileTestHelperReadEndContents(mounttmp):
                 self.assertEqual("plnp", f.read())
     Test().runTest()
 
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class FuseModifyFileTest(MountTestBase):
     def runTest(self):
         collection = arvados.collection.Collection(api_client=self.api)
@@ -363,6 +367,7 @@ class FuseModifyFileTest(MountTestBase):
         self.pool.apply(fuseModifyFileTestHelperReadEndContents, (self.mounttmp,))
 
 
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class FuseAddFileToCollectionTest(MountTestBase):
     def runTest(self):
         collection = arvados.collection.Collection(api_client=self.api)
@@ -385,6 +390,7 @@ class FuseAddFileToCollectionTest(MountTestBase):
         self.assertEqual(["file1.txt", "file2.txt"], sorted(d1))
 
 
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class FuseRemoveFileFromCollectionTest(MountTestBase):
     def runTest(self):
         collection = arvados.collection.Collection(api_client=self.api)
@@ -416,6 +422,7 @@ def fuseCreateFileTestHelper(mounttmp):
                 pass
     Test().runTest()
 
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class FuseCreateFileTest(MountTestBase):
     def runTest(self):
         collection = arvados.collection.Collection(api_client=self.api)
@@ -459,6 +466,7 @@ def fuseWriteFileTestHelperReadFile(mounttmp):
                 self.assertEqual(f.read(), "Hello world!")
     Test().runTest()
 
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class FuseWriteFileTest(MountTestBase):
     def runTest(self):
         collection = arvados.collection.Collection(api_client=self.api)
@@ -507,6 +515,7 @@ def fuseUpdateFileTestHelper(mounttmp):
 
     Test().runTest()
 
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class FuseUpdateFileTest(MountTestBase):
     def runTest(self):
         collection = arvados.collection.Collection(api_client=self.api)