import collections
import datetime
import hashlib
+import errno
import io
import logging
import math
import resource
from . import timer
import urllib.parse
+import traceback
+import weakref
if sys.version_info >= (3, 0):
from io import BytesIO
self._max_slots = max_slots
self._disk_cache = disk_cache
self._disk_cache_dir = disk_cache_dir
+ self._cache_updating = threading.Condition(self._cache_lock)
if self._disk_cache and self._disk_cache_dir is None:
self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
if self._max_slots == 0:
if self._disk_cache:
- # default set max slots to half of maximum file handles
- self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
+ # Each block uses two file descriptors, one used to
+ # open it initially and hold the flock(), and a second
+ # hidden one used by mmap().
+ #
+ # Set max slots to 1/8 of maximum file handles. This
+ # means we'll use at most 1/4 of total file handles.
+ #
+ # NOFILE typically defaults to 1024 on Linux so this
+ # is 128 slots (256 file handles), which means we can
+ # cache up to 8 GiB of 64 MiB blocks. This leaves
+ # 768 file handles for sockets and other stuff.
+ #
+ # When we want the ability to have more cache (e.g. in
+ # arv-mount) we'll increase rlimit before calling
+ # this.
+ self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
else:
- self._max_slots = 1024
+ # RAM cache slots
+ self._max_slots = 512
if self.cache_max == 0:
if self._disk_cache:
fs = os.statvfs(self._disk_cache_dir)
- avail = (fs.f_bavail * fs.f_bsize) / 2
- # Half the available space or max_slots * 64 MiB
- self.cache_max = min(avail, (self._max_slots * 64 * 1024 * 1024))
+ # Calculation of available space incorporates existing cache usage
+ existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
+ avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
+ maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
+ # pick smallest of:
+ # 10% of total disk size
+ # 25% of available space
+ # max_slots * 64 MiB
+ self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
else:
- # 256 GiB in RAM
+ # 256 MiB in RAM
self.cache_max = (256 * 1024 * 1024)
self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
return len(self.content)
def evict(self):
- return True
-
- def cap_cache(self):
- '''Cap the cache size to self.cache_max'''
- with self._cache_lock:
- # Select all slots except those where ready.is_set() and content is
- # None (that means there was an error reading the block).
- self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
- sm = sum([slot.size() for slot in self._cache])
- while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
- for i in range(len(self._cache)-1, -1, -1):
- # start from the back, find a slot that is a candidate to evict
- if self._cache[i].ready.is_set():
- sz = self._cache[i].size()
-
- # If evict returns false it means the
- # underlying disk cache couldn't lock the file
- # for deletion because another process was using
- # it. Don't count it as reducing the amount
- # of data in the cache, find something else to
- # throw out.
- if self._cache[i].evict():
- sm -= sz
-
+ self.content = None
+ return self.gone()
+
+ def gone(self):
+ return (self.content is None)
+
+ def _resize_cache(self, cache_max, max_slots):
+ # Try and make sure the contents of the cache do not exceed
+ # the supplied maximums.
+
+ # Select all slots except those where ready.is_set() and content is
+ # None (that means there was an error reading the block).
+ self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
+ sm = sum([slot.size() for slot in self._cache])
+ while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
+ for i in range(len(self._cache)-1, -1, -1):
+ # start from the back, find a slot that is a candidate to evict
+ if self._cache[i].ready.is_set():
+ sz = self._cache[i].size()
+
+ # If evict returns false it means the
+ # underlying disk cache couldn't lock the file
+ # for deletion because another process was using
+ # it. Don't count it as reducing the amount
+ # of data in the cache, find something else to
+ # throw out.
+ if self._cache[i].evict():
+ sm -= sz
+
+ # check to make sure the underlying data is gone
+ if self._cache[i].gone():
# either way we forget about it. either the
# other process will delete it, or if we need
# it again and it is still there, we'll find
# it on disk.
del self._cache[i]
- break
+ break
+
+
+ def cap_cache(self):
+ '''Cap the cache size to self.cache_max'''
+ with self._cache_updating:
+ self._resize_cache(self.cache_max, self._max_slots)
+ self._cache_updating.notify_all()
def _get(self, locator):
# Test if the locator is already in the cache
def reserve_cache(self, locator):
'''Reserve a cache slot for the specified locator,
or return the existing slot.'''
- with self._cache_lock:
+ with self._cache_updating:
n = self._get(locator)
if n:
return n, False
else:
# Add a new cache slot for the locator
+ self._resize_cache(self.cache_max, self._max_slots-1)
+ while len(self._cache) >= self._max_slots:
+ # If there isn't a slot available, need to wait
+ # for something to happen that releases one of the
+ # cache slots. Idle for 200 ms or woken up by
+ # another thread
+ self._cache_updating.wait(timeout=0.2)
+ self._resize_cache(self.cache_max, self._max_slots-1)
+
if self._disk_cache:
n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
else:
self._cache.insert(0, n)
return n, True
+ def set(self, slot, blob):
+ try:
+ slot.set(blob)
+ return
+ except OSError as e:
+ if e.errno == errno.ENOMEM:
+ # Reduce max slots to current - 4, cap cache and retry
+ with self._cache_lock:
+ self._max_slots = max(4, len(self._cache) - 4)
+ elif e.errno == errno.ENOSPC:
+ # Reduce disk max space to current - 256 MiB, cap cache and retry
+ with self._cache_lock:
+ sm = sum([st.size() for st in self._cache])
+ self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
+ elif e.errno == errno.ENODEV:
+ _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
+ except Exception as e:
+ pass
+ finally:
+ # Check if we should evict things from the cache. Either
+ # because we added a new thing or there was an error and
+ # we possibly adjusted the limits down, so we might need
+ # to push something out.
+ self.cap_cache()
+
+ try:
+ # Only gets here if there was an error the first time. The
+ # exception handler adjusts limits downward in some cases
+ # to free up resources, which would make the operation
+ # succeed.
+ slot.set(blob)
+ except Exception as e:
+ # It failed again. Give up.
+ slot.set(None)
+ raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
+
+ self.cap_cache()
+
class Counter(object):
def __init__(self, v=0):
self._lk = threading.Lock()
return blob
finally:
if slot is not None:
- slot.set(blob)
- self.block_cache.cap_cache()
+ self.block_cache.set(slot, blob)
# Q: Including 403 is necessary for the Keep tests to continue
# passing, but maybe they should expect KeepReadError instead?