18842: Respond to review comments
authorPeter Amstutz <peter.amstutz@curii.com>
Tue, 1 Nov 2022 21:05:42 +0000 (17:05 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Tue, 1 Nov 2022 21:05:42 +0000 (17:05 -0400)
* Fix test typo
* Add suffix .keepcache
* Delete tmp file if an error is thrown before it is renamed
* Default disk cache size accounts for both free space and total disk size
* Handle errors and fall back to RAM caching
* Delete old tmp blocks if we find them
* Collection class gets keep client if initialized with ThreadSafeApiCache

Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

doc/admin/maintenance-and-upgrading.html.textile.liquid
sdk/python/arvados/collection.py
sdk/python/arvados/diskcache.py
sdk/python/arvados/keep.py
services/fuse/tests/mount_test_base.py

index 3cc80a35609f46c9909bdb2c0ce9785ba80ac8fa..2b634fb9e9e63f1e9373c890fef15cf107cc13f7 100644 (file)
@@ -60,7 +60,9 @@ Upgrading Arvados typically involves the following steps:
 # Wait for the cluster to be idle and stop Arvados services.
 # Make a backup of your database, as a precaution.
 # update the configuration file for the new release, if necessary (see "Maintaining Arvados":#maintaining above)
-# rebuild and deploy the "compute node image":{{site.baseurl}}/install/crunch2-cloud/install-compute-node.html (cloud only)
+# Update compute nodes
+## (cloud) Rebuild and deploy the "compute node image":{{site.baseurl}}/install/crunch2-cloud/install-compute-node.html
+## (slurm/LSF) Upgrade the @python3-arvados-fuse@ package used on your compute nodes
 # Install new packages using @apt-get upgrade@ or @yum upgrade@.
 # Wait for package installation scripts as they perform any necessary data migrations.
 # Run @arvados-server config-check@ to detect configuration errors or deprecated entries.
index 998481ab661105b68b0247d1a82c09211fa0d66e..e1138910aebfc501bdfd875c03bd568ea76c3f3e 100644 (file)
@@ -1308,6 +1308,11 @@ class Collection(RichCollectionBase):
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
+
+        # Use the keep client from ThreadSafeApiCache
+        if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
+            self._keep_client = self._api_client.keep
+
         self._block_manager = block_manager
         self.replication_desired = replication_desired
         self._storage_classes_desired = storage_classes_desired
index 6f1ccb97e577cca3140867652161d06535a15ca7..9734d93a7742d6bf395b4fe2dbed2e907ab50a4d 100644 (file)
@@ -9,6 +9,12 @@ import traceback
 import stat
 import tempfile
 import fcntl
+import errno
+import logging
+
+_logger = logging.getLogger('arvados.keep')
+
+cacheblock_suffix = ".keepcacheblock"
 
 class DiskCacheSlot(object):
     __slots__ = ("locator", "ready", "content", "cachedir")
@@ -24,6 +30,7 @@ class DiskCacheSlot(object):
         return self.content
 
     def set(self, value):
+        tmpfile = None
         try:
             if value is None:
                 self.content = None
@@ -41,9 +48,9 @@ class DiskCacheSlot(object):
             blockdir = os.path.join(self.cachedir, self.locator[0:3])
             os.makedirs(blockdir, mode=0o700, exist_ok=True)
 
-            final = os.path.join(blockdir, self.locator)
+            final = os.path.join(blockdir, self.locator) + cacheblock_suffix
 
-            f = tempfile.NamedTemporaryFile(dir=blockdir, delete=False)
+            f = tempfile.NamedTemporaryFile(dir=blockdir, delete=False, prefix="tmp", suffix=cacheblock_suffix)
             tmpfile = f.name
             os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)
 
@@ -54,11 +61,32 @@ class DiskCacheSlot(object):
             f.write(value)
             f.flush()
             os.rename(tmpfile, final)
+            tmpfile = None
 
             self.content = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
+        except OSError as e:
+            if e.errno == errno.ENODEV:
+                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
+            elif e.errno == errno.ENOMEM:
+                _logger.error("Unable to use disk cache: The process's maximum number of mappings would have been exceeded.")
+            elif e.errno == errno.ENOSPC:
+                _logger.error("Unable to use disk cache: Out of disk space.")
+            else:
+                traceback.print_exc()
         except Exception as e:
             traceback.print_exc()
         finally:
+            if tmpfile is not None:
+                # If the tempfile hasn't been renamed on disk yet, try to delete it.
+                try:
+                    os.remove(tmpfile)
+                except:
+                    pass
+            if self.content is None:
+                # Something went wrong with the disk cache, fall back
+                # to RAM cache behavior (the alternative is to cache
+                # nothing and return a read error).
+                self.content = value
             self.ready.set()
 
     def size(self):
@@ -89,11 +117,12 @@ class DiskCacheSlot(object):
             # gone.
 
             blockdir = os.path.join(self.cachedir, self.locator[0:3])
-            final = os.path.join(blockdir, self.locator)
+            final = os.path.join(blockdir, self.locator) + cacheblock_suffix
             try:
                 with open(final, "rb") as f:
-                    # unlock,
+                    # unlock
                     fcntl.flock(f, fcntl.LOCK_UN)
+                    self.content = None
 
                     # try to get an exclusive lock, this ensures other
                     # processes are not using the block.  It is
@@ -125,7 +154,7 @@ class DiskCacheSlot(object):
     @staticmethod
     def get_from_disk(locator, cachedir):
         blockdir = os.path.join(cachedir, locator[0:3])
-        final = os.path.join(blockdir, locator)
+        final = os.path.join(blockdir, locator) + cacheblock_suffix
 
         try:
             filehandle = open(final, "rb")
@@ -156,9 +185,21 @@ class DiskCacheSlot(object):
         blocks = []
         for root, dirs, files in os.walk(cachedir):
             for name in files:
+                if not name.endswith(cacheblock_suffix):
+                    continue
+
                 blockpath = os.path.join(root, name)
                 res = os.stat(blockpath)
-                blocks.append((name, res.st_atime))
+
+                if len(name) == (32+len(cacheblock_suffix)) and not name.startswith("tmp"):
+                    blocks.append((name[0:32], res.st_atime))
+                elif name.startswith("tmp") and ((time.time() - res.st_mtime) > 60):
+                    # found a temporary file more than 1 minute old,
+                    # try to delete it.
+                    try:
+                        os.remove(blockpath)
+                    except:
+                        pass
 
         # sort by access time (atime), going from most recently
         # accessed (highest timestamp) to least recently accessed
index b6c98d14360dea0dd96d5fe890216e589328720c..dd99e8b928054ed03e42196608f0a9697f65870b 100644 (file)
@@ -190,17 +190,24 @@ class KeepBlockCache(object):
 
         if self._max_slots == 0:
             if self._disk_cache:
-                # default set max slots to half of maximum file handles
+                # default max slots to half of maximum file handles
+                # NOFILE typically defaults to 1024 on Linux so this
+                # will be 512 slots.
                 self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
             else:
-                self._max_slots = 1024
+                # RAM cache slots
+                self._max_slots = 512
 
         if self.cache_max == 0:
             if self._disk_cache:
                 fs = os.statvfs(self._disk_cache_dir)
-                avail = (fs.f_bavail * fs.f_bsize) / 2
-                # Half the available space or max_slots * 64 MiB
-                self.cache_max = min(avail, (self._max_slots * 64 * 1024 * 1024))
+                avail = (fs.f_bavail * fs.f_bsize) / 4
+                maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
+                # pick smallest of:
+                # 10% of total disk size
+                # 25% of available space
+                # max_slots * 64 MiB
+                self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
             else:
                 # 256 GiB in RAM
                 self.cache_max = (256 * 1024 * 1024)
index b1383d36bbbc7cffafb5819b1388d273542aa3ce..e82660408bbeb784f07dda1db344991de882f9c4 100644 (file)
@@ -33,7 +33,7 @@ def make_block_cache(disk_cache):
     return block_cache
 
 class MountTestBase(unittest.TestCase):
-    block_cache = False
+    disk_cache = False
 
     def setUp(self, api=None, local_store=True):
         # The underlying C implementation of open() makes a fstat() syscall
@@ -54,7 +54,7 @@ class MountTestBase(unittest.TestCase):
         run_test_server.run()
         run_test_server.authorize_with("admin")
 
-        self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings(), keep_params={"block_cache": make_block_cache(self.block_cache)})
+        self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings(), keep_params={"block_cache": make_block_cache(self.disk_cache)})
         self.llfuse_thread = None
 
     # This is a copy of Mount's method.  TODO: Refactor MountTestBase