import collections
import datetime
import hashlib
+import errno
import io
import logging
import math
import resource
from . import timer
import urllib.parse
+import traceback
if sys.version_info >= (3, 0):
from io import BytesIO
if self.cache_max == 0:
if self._disk_cache:
fs = os.statvfs(self._disk_cache_dir)
- avail = (fs.f_bavail * fs.f_bsize) / 4
+ # Calculation of available space incorporates existing cache usage
+ existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
+ avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
# pick smallest of:
# 10% of total disk size
# max_slots * 64 MiB
self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
else:
- # 256 GiB in RAM
+ # 256 MiB in RAM
self.cache_max = (256 * 1024 * 1024)
self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
self._cache.insert(0, n)
return n, True
+ def set(self, slot, blob):
+ tryagain = False
+
+ try:
+ slot.set(blob)
+ except OSError as e:
+ tryagain = True
+ if e.errno == errno.ENOMEM:
+ # Reduce max slots to current - 4, cap cache and retry
+ with self._cache_lock:
+ self._max_slots = max(4, len(self._cache) - 4)
+ elif e.errno == errno.ENOSPC:
+ # Reduce disk max space to current - 256 MiB, cap cache and retry
+ with self._cache_lock:
+ sm = sum([st.size() for st in self._cache])
+ self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
+ elif e.errno == errno.ENODEV:
+ _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
+ except Exception as e:
+ tryagain = True
+
+ # Check if we should evict things from the cache. Either
+ # because we added a new thing or we adjusted the limits down,
+ # so we might need to push something out.
+ self.cap_cache()
+
+ if not tryagain:
+ # Done
+ return
+
+ try:
+ # There was an error, we ran cap_cache so try one more time.
+ slot.set(blob)
+ except Exception as e:
+ # It failed again. Give up.
+ raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
+ finally:
+ # Set the notice that that we are done with the cache
+ # slot one way or another.
+ slot.ready.set()
+
+
class Counter(object):
def __init__(self, v=0):
self._lk = threading.Lock()
return blob
finally:
if slot is not None:
- slot.set(blob)
- self.block_cache.cap_cache()
+ self.block_cache.set(slot, blob)
# Q: Including 403 is necessary for the Keep tests to continue
# passing, but maybe they should expect KeepReadError instead?