summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
858a8ce)
* Fix test typo
* Add suffix .keepcache
* Delete tmp file if an error is thrown before it is renamed
* Default disk cache size accounts for both free space and total disk size
* Handle errors and fall back to RAM caching
* Delete old tmp blocks if we find them
* Collection class gets keep client if initialized with ThreadSafeApiCache
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>
# Wait for the cluster to be idle and stop Arvados services.
# Make a backup of your database, as a precaution.
# update the configuration file for the new release, if necessary (see "Maintaining Arvados":#maintaining above)
# Wait for the cluster to be idle and stop Arvados services.
# Make a backup of your database, as a precaution.
# update the configuration file for the new release, if necessary (see "Maintaining Arvados":#maintaining above)
-# rebuild and deploy the "compute node image":{{site.baseurl}}/install/crunch2-cloud/install-compute-node.html (cloud only)
+# Update compute nodes
+## (cloud) Rebuild and deploy the "compute node image":{{site.baseurl}}/install/crunch2-cloud/install-compute-node.html
+## (slurm/LSF) Upgrade the @python3-arvados-fuse@ package used on your compute nodes
# Install new packages using @apt-get upgrade@ or @yum upgrade@.
# Wait for package installation scripts as they perform any necessary data migrations.
# Run @arvados-server config-check@ to detect configuration errors or deprecated entries.
# Install new packages using @apt-get upgrade@ or @yum upgrade@.
# Wait for package installation scripts as they perform any necessary data migrations.
# Run @arvados-server config-check@ to detect configuration errors or deprecated entries.
super(Collection, self).__init__(parent)
self._api_client = api_client
self._keep_client = keep_client
super(Collection, self).__init__(parent)
self._api_client = api_client
self._keep_client = keep_client
+
+ # Use the keep client from ThreadSafeApiCache
+ if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
+ self._keep_client = self._api_client.keep
+
self._block_manager = block_manager
self.replication_desired = replication_desired
self._storage_classes_desired = storage_classes_desired
self._block_manager = block_manager
self.replication_desired = replication_desired
self._storage_classes_desired = storage_classes_desired
import stat
import tempfile
import fcntl
import stat
import tempfile
import fcntl
+import errno
+import logging
+
+_logger = logging.getLogger('arvados.keep')
+
+cacheblock_suffix = ".keepcacheblock"
class DiskCacheSlot(object):
__slots__ = ("locator", "ready", "content", "cachedir")
class DiskCacheSlot(object):
__slots__ = ("locator", "ready", "content", "cachedir")
return self.content
def set(self, value):
return self.content
def set(self, value):
try:
if value is None:
self.content = None
try:
if value is None:
self.content = None
blockdir = os.path.join(self.cachedir, self.locator[0:3])
os.makedirs(blockdir, mode=0o700, exist_ok=True)
blockdir = os.path.join(self.cachedir, self.locator[0:3])
os.makedirs(blockdir, mode=0o700, exist_ok=True)
- final = os.path.join(blockdir, self.locator)
+ final = os.path.join(blockdir, self.locator) + cacheblock_suffix
- f = tempfile.NamedTemporaryFile(dir=blockdir, delete=False)
+ f = tempfile.NamedTemporaryFile(dir=blockdir, delete=False, prefix="tmp", suffix=cacheblock_suffix)
tmpfile = f.name
os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)
tmpfile = f.name
os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)
f.write(value)
f.flush()
os.rename(tmpfile, final)
f.write(value)
f.flush()
os.rename(tmpfile, final)
self.content = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
self.content = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
+ except OSError as e:
+ if e.errno == errno.ENODEV:
+ _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
+ elif e.errno == errno.ENOMEM:
+ _logger.error("Unable to use disk cache: The process's maximum number of mappings would have been exceeded.")
+ elif e.errno == errno.ENOSPC:
+ _logger.error("Unable to use disk cache: Out of disk space.")
+ else:
+ traceback.print_exc()
except Exception as e:
traceback.print_exc()
finally:
except Exception as e:
traceback.print_exc()
finally:
+ if tmpfile is not None:
+ # If the tempfile hasn't been renamed on disk yet, try to delete it.
+ try:
+ os.remove(tmpfile)
+ except:
+ pass
+ if self.content is None:
+ # Something went wrong with the disk cache, fall back
+ # to RAM cache behavior (the alternative is to cache
+ # nothing and return a read error).
+ self.content = value
self.ready.set()
def size(self):
self.ready.set()
def size(self):
# gone.
blockdir = os.path.join(self.cachedir, self.locator[0:3])
# gone.
blockdir = os.path.join(self.cachedir, self.locator[0:3])
- final = os.path.join(blockdir, self.locator)
+ final = os.path.join(blockdir, self.locator) + cacheblock_suffix
try:
with open(final, "rb") as f:
try:
with open(final, "rb") as f:
fcntl.flock(f, fcntl.LOCK_UN)
fcntl.flock(f, fcntl.LOCK_UN)
# try to get an exclusive lock, this ensures other
# processes are not using the block. It is
# try to get an exclusive lock, this ensures other
# processes are not using the block. It is
@staticmethod
def get_from_disk(locator, cachedir):
blockdir = os.path.join(cachedir, locator[0:3])
@staticmethod
def get_from_disk(locator, cachedir):
blockdir = os.path.join(cachedir, locator[0:3])
- final = os.path.join(blockdir, locator)
+ final = os.path.join(blockdir, locator) + cacheblock_suffix
try:
filehandle = open(final, "rb")
try:
filehandle = open(final, "rb")
blocks = []
for root, dirs, files in os.walk(cachedir):
for name in files:
blocks = []
for root, dirs, files in os.walk(cachedir):
for name in files:
+ if not name.endswith(cacheblock_suffix):
+ continue
+
blockpath = os.path.join(root, name)
res = os.stat(blockpath)
blockpath = os.path.join(root, name)
res = os.stat(blockpath)
- blocks.append((name, res.st_atime))
+
+ if len(name) == (32+len(cacheblock_suffix)) and not name.startswith("tmp"):
+ blocks.append((name[0:32], res.st_atime))
+ elif name.startswith("tmp") and ((time.time() - res.st_mtime) > 60):
+ # found a temporary file more than 1 minute old,
+ # try to delete it.
+ try:
+ os.remove(blockpath)
+ except:
+ pass
# sort by access time (atime), going from most recently
# accessed (highest timestamp) to least recently accessed
# sort by access time (atime), going from most recently
# accessed (highest timestamp) to least recently accessed
if self._max_slots == 0:
if self._disk_cache:
if self._max_slots == 0:
if self._disk_cache:
- # default set max slots to half of maximum file handles
+ # default max slots to half of maximum file handles
+ # NOFILE typically defaults to 1024 on Linux so this
+ # will be 512 slots.
self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
else:
self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
else:
+ # RAM cache slots
+ self._max_slots = 512
if self.cache_max == 0:
if self._disk_cache:
fs = os.statvfs(self._disk_cache_dir)
if self.cache_max == 0:
if self._disk_cache:
fs = os.statvfs(self._disk_cache_dir)
- avail = (fs.f_bavail * fs.f_bsize) / 2
- # Half the available space or max_slots * 64 MiB
- self.cache_max = min(avail, (self._max_slots * 64 * 1024 * 1024))
+ avail = (fs.f_bavail * fs.f_bsize) / 4
+ maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
+ # pick smallest of:
+ # 10% of total disk size
+ # 25% of available space
+ # max_slots * 64 MiB
+ self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
else:
# 256 GiB in RAM
self.cache_max = (256 * 1024 * 1024)
else:
# 256 GiB in RAM
self.cache_max = (256 * 1024 * 1024)
return block_cache
class MountTestBase(unittest.TestCase):
return block_cache
class MountTestBase(unittest.TestCase):
def setUp(self, api=None, local_store=True):
# The underlying C implementation of open() makes a fstat() syscall
def setUp(self, api=None, local_store=True):
# The underlying C implementation of open() makes a fstat() syscall
run_test_server.run()
run_test_server.authorize_with("admin")
run_test_server.run()
run_test_server.authorize_with("admin")
- self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings(), keep_params={"block_cache": make_block_cache(self.block_cache)})
+ self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings(), keep_params={"block_cache": make_block_cache(self.disk_cache)})
self.llfuse_thread = None
# This is a copy of Mount's method. TODO: Refactor MountTestBase
self.llfuse_thread = None
# This is a copy of Mount's method. TODO: Refactor MountTestBase