import stat
import tempfile
import fcntl
+import errno
+import logging
+
+_logger = logging.getLogger('arvados.keep')
+
+cacheblock_suffix = ".keepcacheblock"
class DiskCacheSlot(object):
__slots__ = ("locator", "ready", "content", "cachedir")
return self.content
def set(self, value):
+ tmpfile = None
try:
if value is None:
self.content = None
blockdir = os.path.join(self.cachedir, self.locator[0:3])
os.makedirs(blockdir, mode=0o700, exist_ok=True)
- final = os.path.join(blockdir, self.locator)
+ final = os.path.join(blockdir, self.locator) + cacheblock_suffix
- f = tempfile.NamedTemporaryFile(dir=blockdir, delete=False)
+ f = tempfile.NamedTemporaryFile(dir=blockdir, delete=False, prefix="tmp", suffix=cacheblock_suffix)
tmpfile = f.name
os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)
f.write(value)
f.flush()
os.rename(tmpfile, final)
+ tmpfile = None
self.content = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
+ except OSError as e:
+ if e.errno == errno.ENODEV:
+ _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
+ elif e.errno == errno.ENOMEM:
+ _logger.error("Unable to use disk cache: The process's maximum number of mappings would have been exceeded.")
+ elif e.errno == errno.ENOSPC:
+ _logger.error("Unable to use disk cache: Out of disk space.")
+ else:
+ traceback.print_exc()
except Exception as e:
traceback.print_exc()
finally:
+ if tmpfile is not None:
+ # If the tempfile hasn't been renamed on disk yet, try to delete it.
+ try:
+ os.remove(tmpfile)
+ except:
+ pass
+ if self.content is None:
+ # Something went wrong with the disk cache, fall back
+ # to RAM cache behavior (the alternative is to cache
+ # nothing and return a read error).
+ self.content = value
self.ready.set()
def size(self):
# gone.
blockdir = os.path.join(self.cachedir, self.locator[0:3])
- final = os.path.join(blockdir, self.locator)
+ final = os.path.join(blockdir, self.locator) + cacheblock_suffix
try:
with open(final, "rb") as f:
- # unlock,
+ # unlock
fcntl.flock(f, fcntl.LOCK_UN)
+ self.content = None
# try to get an exclusive lock, this ensures other
# processes are not using the block. It is
@staticmethod
def get_from_disk(locator, cachedir):
blockdir = os.path.join(cachedir, locator[0:3])
- final = os.path.join(blockdir, locator)
+ final = os.path.join(blockdir, locator) + cacheblock_suffix
try:
filehandle = open(final, "rb")
blocks = []
for root, dirs, files in os.walk(cachedir):
for name in files:
+ if not name.endswith(cacheblock_suffix):
+ continue
+
blockpath = os.path.join(root, name)
res = os.stat(blockpath)
- blocks.append((name, res.st_atime))
+
+ if len(name) == (32+len(cacheblock_suffix)) and not name.startswith("tmp"):
+ blocks.append((name[0:32], res.st_atime))
+ elif name.startswith("tmp") and ((time.time() - res.st_mtime) > 60):
+ # found a temporary file more than 1 minute old,
+ # try to delete it.
+ try:
+ os.remove(blockpath)
+ except:
+ pass
# sort by access time (atime), going from most recently
# accessed (highest timestamp) to least recently accessed
if self._max_slots == 0:
if self._disk_cache:
- # default set max slots to half of maximum file handles
+ # default max slots to half of maximum file handles
+ # NOFILE typically defaults to 1024 on Linux so this
+ # will be 512 slots.
self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
else:
- self._max_slots = 1024
+ # RAM cache slots
+ self._max_slots = 512
if self.cache_max == 0:
if self._disk_cache:
fs = os.statvfs(self._disk_cache_dir)
- avail = (fs.f_bavail * fs.f_bsize) / 2
- # Half the available space or max_slots * 64 MiB
- self.cache_max = min(avail, (self._max_slots * 64 * 1024 * 1024))
+ avail = (fs.f_bavail * fs.f_bsize) / 4
+ maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
+ # pick smallest of:
+ # 10% of total disk size
+ # 25% of available space
+ # max_slots * 64 MiB
+ self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
else:
# 256 GiB in RAM
self.cache_max = (256 * 1024 * 1024)