# Wait for the cluster to be idle and stop Arvados services.
# Make a backup of your database, as a precaution.
# update the configuration file for the new release, if necessary (see "Maintaining Arvados":#maintaining above)
-# rebuild and deploy the "compute node image":{{site.baseurl}}/install/crunch2-cloud/install-compute-node.html (cloud only)
+# Update compute nodes
+## (cloud) Rebuild and deploy the "compute node image":{{site.baseurl}}/install/crunch2-cloud/install-compute-node.html
+## (slurm/LSF) Upgrade the @python3-arvados-fuse@ package used on your compute nodes
# Install new packages using @apt-get upgrade@ or @yum upgrade@.
# Wait for package installation scripts as they perform any necessary data migrations.
# Run @arvados-server config-check@ to detect configuration errors or deprecated entries.
# troubleshooting purposes.
LogReuseDecisions: false
- # Default value for keep_cache_ram of a container's runtime_constraints.
- DefaultKeepCacheRAM: 268435456
+ # Default value for keep_cache_ram of a container's
+ # runtime_constraints. Note: this gets added to the RAM request
+ # used to allocate a VM or submit an HPC job
+ DefaultKeepCacheRAM: 0
+
+ # Default value for keep_cache_disk of a container's
+ # runtime_constraints. Note: this gets added to the disk
+ # request used to allocate a VM or submit an HPC job
+ DefaultKeepCacheDisk: 8589934592
# Number of times a container can be unlocked before being
# automatically cancelled.
"Containers.CrunchRunArgumentsList": false,
"Containers.CrunchRunCommand": false,
"Containers.DefaultKeepCacheRAM": true,
+ "Containers.DefaultKeepCacheDisk": true,
"Containers.DispatchPrivateKey": false,
"Containers.JobsAPI": true,
"Containers.JobsAPI.Enable": true,
arvMountCmd = append(arvMountCmd, "--allow-other")
}
- if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
+ if runner.Container.RuntimeConstraints.KeepCacheDisk > 0 {
+ keepcachedir, err := runner.MkTempDir(runner.parentTemp, "keepcache")
+ if err != nil {
+ return nil, fmt.Errorf("while creating keep cache temp dir: %v", err)
+ }
+ arvMountCmd = append(arvMountCmd, "--disk-cache", "--disk-cache-dir", keepcachedir, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheDisk))
+ } else if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
}
// EstimateScratchSpace estimates how much available disk space (in
// bytes) is needed to run the container by summing the capacity
// requested by 'tmp' mounts plus disk space required to load the
-// Docker image.
+// Docker image plus arv-mount block cache.
func EstimateScratchSpace(ctr *arvados.Container) (needScratch int64) {
for _, m := range ctr.Mounts {
if m.Kind == "tmp" {
// Now reserve space for the extracted image on disk.
needScratch += dockerImageSize
+ // Now reserve space the arv-mount disk cache
+ needScratch += ctr.RuntimeConstraints.KeepCacheDisk
+
return
}
c.Check(args, check.DeepEquals, []string{
"-J", s.crPending.ContainerUUID,
"-n", "2",
- "-D", "608MB",
- "-R", "rusage[mem=608MB:tmp=256MB] span[hosts=1]",
- "-R", "select[mem>=608MB]",
- "-R", "select[tmp>=256MB]",
+ "-D", "352MB",
+ "-R", "rusage[mem=352MB:tmp=8448MB] span[hosts=1]",
+ "-R", "select[mem>=352MB]",
+ "-R", "select[tmp>=8448MB]",
"-R", "select[ncpus>=2]"})
mtx.Lock()
fakejobq[nextjobid] = args[1]
# Make an API object now so errors are reported early.
api_client.users().current().execute()
if keep_client is None:
- keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True)
+ keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4, block_cache=block_cache)
executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4, stdout=stdout)
except WorkflowException as e:
logger.error(e, exc_info=(sys.exc_info()[1] if arvargs.debug else False))
runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
if runtime_req:
if "keep_cache" in runtime_req:
- runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
+ if self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheDisk", 0) > 0:
+ # If DefaultKeepCacheDisk is non-zero it means we should use disk cache.
+ runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20)
+ else:
+ runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
if "outputDirType" in runtime_req:
if runtime_req["outputDirType"] == "local_output_dir":
# Currently the default behavior.
runner.intermediate_output_ttl = 3600
runner.secret_store = cwltool.secrets.SecretStore()
runner.api._rootDesc = {"revision": "20210628"}
+ runner.api.config.return_value = {"Containers": {"DefaultKeepCacheDisk": 0}}
keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
runner.api.collections().get().execute.return_value = {
CrunchRunCommand string
CrunchRunArgumentsList []string
DefaultKeepCacheRAM ByteSize
+ DefaultKeepCacheDisk ByteSize
DispatchPrivateKey string
LogReuseDecisions bool
MaxComputeVMs int
// RuntimeConstraints specify a container's compute resources (RAM,
// CPU) and network connectivity.
type RuntimeConstraints struct {
- API bool `json:"API"`
- RAM int64 `json:"ram"`
- VCPUs int `json:"vcpus"`
- KeepCacheRAM int64 `json:"keep_cache_ram"`
- CUDA CUDARuntimeConstraints `json:"cuda"`
+ API bool `json:"API"`
+ RAM int64 `json:"ram"`
+ VCPUs int `json:"vcpus"`
+ KeepCacheRAM int64 `json:"keep_cache_ram"`
+ KeepCacheDisk int64 `json:"keep_cache_disk"`
+ CUDA CUDARuntimeConstraints `json:"cuda"`
}
// SchedulingParameters specify a container's scheduling parameters
super(Collection, self).__init__(parent)
self._api_client = api_client
self._keep_client = keep_client
+
+ # Use the keep client from ThreadSafeApiCache
+ if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
+ self._keep_client = self._api_client.keep
+
self._block_manager = block_manager
self.replication_desired = replication_desired
self._storage_classes_desired = storage_classes_desired
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import threading
+import mmap
+import os
+import traceback
+import stat
+import tempfile
+import fcntl
+import errno
+import logging
+
+_logger = logging.getLogger('arvados.keep')
+
+cacheblock_suffix = ".keepcacheblock"
+
+class DiskCacheSlot(object):
+ __slots__ = ("locator", "ready", "content", "cachedir")
+
+ def __init__(self, locator, cachedir):
+ self.locator = locator
+ self.ready = threading.Event()
+ self.content = None
+ self.cachedir = cachedir
+
+ def get(self):
+ self.ready.wait()
+ return self.content
+
+ def set(self, value):
+ tmpfile = None
+ try:
+ if value is None:
+ self.content = None
+ return
+
+ if len(value) == 0:
+ # Can't mmap a 0 length file
+ self.content = b''
+ return
+
+ if self.content is not None:
+ # Has been set already
+ return
+
+ blockdir = os.path.join(self.cachedir, self.locator[0:3])
+ os.makedirs(blockdir, mode=0o700, exist_ok=True)
+
+ final = os.path.join(blockdir, self.locator) + cacheblock_suffix
+
+ f = tempfile.NamedTemporaryFile(dir=blockdir, delete=False, prefix="tmp", suffix=cacheblock_suffix)
+ tmpfile = f.name
+ os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)
+
+ # aquire a shared lock, this tells other processes that
+ # we're using this block and to please not delete it.
+ fcntl.flock(f, fcntl.LOCK_SH)
+
+ f.write(value)
+ f.flush()
+ os.rename(tmpfile, final)
+ tmpfile = None
+
+ self.content = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
+ except OSError as e:
+ if e.errno == errno.ENODEV:
+ _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
+ elif e.errno == errno.ENOMEM:
+ _logger.error("Unable to use disk cache: The process's maximum number of mappings would have been exceeded.")
+ elif e.errno == errno.ENOSPC:
+ _logger.error("Unable to use disk cache: Out of disk space.")
+ else:
+ traceback.print_exc()
+ except Exception as e:
+ traceback.print_exc()
+ finally:
+ if tmpfile is not None:
+ # If the tempfile hasn't been renamed on disk yet, try to delete it.
+ try:
+ os.remove(tmpfile)
+ except:
+ pass
+ if self.content is None:
+ # Something went wrong with the disk cache, fall back
+ # to RAM cache behavior (the alternative is to cache
+ # nothing and return a read error).
+ self.content = value
+ self.ready.set()
+
+ def size(self):
+ if self.content is None:
+ return 0
+ else:
+ return len(self.content)
+
+ def evict(self):
+ if self.content is not None and len(self.content) > 0:
+ # The mmap region might be in use when we decided to evict
+ # it. This can happen if the cache is too small.
+ #
+ # If we call close() now, it'll throw an error if
+ # something tries to access it.
+ #
+ # However, we don't need to explicitly call mmap.close()
+ #
+ # I confirmed in mmapmodule.c that that both close
+ # and deallocate do the same thing:
+ #
+ # a) close the file descriptor
+ # b) unmap the memory range
+ #
+ # So we can forget it in the cache and delete the file on
+ # disk, and it will tear it down after any other
+ # lingering Python references to the mapped memory are
+ # gone.
+
+ blockdir = os.path.join(self.cachedir, self.locator[0:3])
+ final = os.path.join(blockdir, self.locator) + cacheblock_suffix
+ try:
+ with open(final, "rb") as f:
+ # unlock
+ fcntl.flock(f, fcntl.LOCK_UN)
+ self.content = None
+
+ # try to get an exclusive lock, this ensures other
+ # processes are not using the block. It is
+ # nonblocking and will throw an exception if we
+ # can't get it, which is fine because that means
+ # we just won't try to delete it.
+ #
+ # I should note here, the file locking is not
+ # strictly necessary, we could just remove it and
+ # the kernel would ensure that the underlying
+ # inode remains available as long as other
+ # processes still have the file open. However, if
+ # you have multiple processes sharing the cache
+ # and deleting each other's files, you'll end up
+ # with a bunch of ghost files that don't show up
+ # in the file system but are still taking up
+ # space, which isn't particularly user friendly.
+ # The locking strategy ensures that cache blocks
+ # in use remain visible.
+ #
+ fcntl.flock(filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)
+
+ os.remove(final)
+ return True
+ except OSError:
+ pass
+ return False
+
+ @staticmethod
+ def get_from_disk(locator, cachedir):
+ blockdir = os.path.join(cachedir, locator[0:3])
+ final = os.path.join(blockdir, locator) + cacheblock_suffix
+
+ try:
+ filehandle = open(final, "rb")
+
+ # aquire a shared lock, this tells other processes that
+ # we're using this block and to please not delete it.
+ fcntl.flock(filehandle, fcntl.LOCK_SH)
+
+ content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ)
+ dc = DiskCacheSlot(locator, cachedir)
+ dc.content = content
+ dc.ready.set()
+ return dc
+ except FileNotFoundError:
+ pass
+ except Exception as e:
+ traceback.print_exc()
+
+ return None
+
+ @staticmethod
+ def init_cache(cachedir, maxslots):
+ # map in all the files in the cache directory, up to max slots.
+ # after max slots, try to delete the excess blocks.
+ #
+ # this gives the calling process ownership of all the blocks
+
+ blocks = []
+ for root, dirs, files in os.walk(cachedir):
+ for name in files:
+ if not name.endswith(cacheblock_suffix):
+ continue
+
+ blockpath = os.path.join(root, name)
+ res = os.stat(blockpath)
+
+ if len(name) == (32+len(cacheblock_suffix)) and not name.startswith("tmp"):
+ blocks.append((name[0:32], res.st_atime))
+ elif name.startswith("tmp") and ((time.time() - res.st_mtime) > 60):
+ # found a temporary file more than 1 minute old,
+ # try to delete it.
+ try:
+ os.remove(blockpath)
+ except:
+ pass
+
+ # sort by access time (atime), going from most recently
+ # accessed (highest timestamp) to least recently accessed
+ # (lowest timestamp).
+ blocks.sort(key=lambda x: x[1], reverse=True)
+
+ # Map in all the files we found, up to maxslots, if we exceed
+ # maxslots, start throwing things out.
+ cachelist = []
+ for b in blocks:
+ got = DiskCacheSlot.get_from_disk(b[0], cachedir)
+ if got is None:
+ continue
+ if len(cachelist) < maxslots:
+ cachelist.append(got)
+ else:
+ # we found more blocks than maxslots, try to
+ # throw it out of the cache.
+ got.evict()
+
+ return cachelist
import ssl
import sys
import threading
+import resource
from . import timer
import urllib.parse
import arvados.errors
import arvados.retry as retry
import arvados.util
+import arvados.diskcache
_logger = logging.getLogger('arvados.keep')
global_client_object = None
return Keep.global_client_object().put(data, **kwargs)
class KeepBlockCache(object):
- # Default RAM cache is 256MiB
- def __init__(self, cache_max=(256 * 1024 * 1024)):
+ def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
self.cache_max = cache_max
self._cache = []
self._cache_lock = threading.Lock()
+ self._max_slots = max_slots
+ self._disk_cache = disk_cache
+ self._disk_cache_dir = disk_cache_dir
+
+ if self._disk_cache and self._disk_cache_dir is None:
+ self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
+ os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
+
+ if self._max_slots == 0:
+ if self._disk_cache:
+ # default max slots to half of maximum file handles
+ # NOFILE typically defaults to 1024 on Linux so this
+ # will be 512 slots.
+ self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
+ else:
+ # RAM cache slots
+ self._max_slots = 512
+
+ if self.cache_max == 0:
+ if self._disk_cache:
+ fs = os.statvfs(self._disk_cache_dir)
+ avail = (fs.f_bavail * fs.f_bsize) / 4
+ maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
+ # pick smallest of:
+ # 10% of total disk size
+ # 25% of available space
+ # max_slots * 64 MiB
+ self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
+ else:
+ # 256 GiB in RAM
+ self.cache_max = (256 * 1024 * 1024)
+
+ self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
+
+ if self._disk_cache:
+ self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
+ self.cap_cache()
+
class CacheSlot(object):
__slots__ = ("locator", "ready", "content")
else:
return len(self.content)
+ def evict(self):
+ return True
+
def cap_cache(self):
'''Cap the cache size to self.cache_max'''
with self._cache_lock:
# None (that means there was an error reading the block).
self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
sm = sum([slot.size() for slot in self._cache])
- while len(self._cache) > 0 and sm > self.cache_max:
+ while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
for i in range(len(self._cache)-1, -1, -1):
+ # start from the back, find a slot that is a candidate to evict
if self._cache[i].ready.is_set():
+ sz = self._cache[i].size()
+
+ # If evict returns false it means the
+ # underlying disk cache couldn't lock the file
+ # for deletion because another process was using
+ # it. Don't count it as reducing the amount
+ # of data in the cache, find something else to
+ # throw out.
+ if self._cache[i].evict():
+ sm -= sz
+
+ # either way we forget about it. either the
+ # other process will delete it, or if we need
+ # it again and it is still there, we'll find
+ # it on disk.
del self._cache[i]
break
- sm = sum([slot.size() for slot in self._cache])
def _get(self, locator):
# Test if the locator is already in the cache
del self._cache[i]
self._cache.insert(0, n)
return n
+ if self._disk_cache:
+ # see if it exists on disk
+ n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
+ if n is not None:
+ self._cache.insert(0, n)
+ return n
return None
def get(self, locator):
return n, False
else:
# Add a new cache slot for the locator
- n = KeepBlockCache.CacheSlot(locator)
+ if self._disk_cache:
+ n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
+ else:
+ n = KeepBlockCache.CacheSlot(locator)
self._cache.insert(0, n)
return n, True
'Programming Language :: Python :: 3',
],
test_suite='tests',
- tests_require=['pbr<1.7.0', 'mock>=1.0,<4', 'PyYAML'],
+ tests_require=['pbr<1.7.0', 'mock>=1.0,<4', 'PyYAML', 'parameterized'],
zip_safe=False
)
return self.assertNotRegexpMatches(*args, **kwargs)
unittest.TestCase.assertRegex = assertRegex
unittest.TestCase.assertNotRegex = assertNotRegex
+
+def binary_compare(a, b):
+ if len(a) != len(b):
+ return False
+ for i in range(0, len(a)):
+ if a[i] != b[i]:
+ return False
+ return True
+
+def make_block_cache(disk_cache):
+ if disk_cache:
+ disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
+ shutil.rmtree(disk_cache_dir, ignore_errors=True)
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
+ return block_cache
import ciso8601
import time
import unittest
+import parameterized
from . import run_test_server
from arvados._ranges import Range, LocatorAndRange
from arvados.collection import Collection, CollectionReader
from . import arvados_testutil as tutil
+from .arvados_testutil import make_block_cache
class TestResumableWriter(arvados.ResumableCollectionWriter):
KEEP_BLOCK_SIZE = 1024 # PUT to Keep every 1K.
def current_state(self):
return self.dump_state(copy.deepcopy)
-
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
tutil.ArvadosBaseTestCase):
+ disk_cache = False
MAIN_SERVER = {}
@classmethod
run_test_server.authorize_with('admin')
cls.api_client = arvados.api('v1')
cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
- local_store=cls.local_store)
+ local_store=cls.local_store,
+ block_cache=make_block_cache(cls.disk_cache))
def write_foo_bar_baz(self):
cw = arvados.CollectionWriter(self.api_client)
import pycurl
import random
import re
+import shutil
import socket
import sys
import time
import unittest
import urllib.parse
+import parameterized
+
import arvados
import arvados.retry
import arvados.util
from . import keepstub
from . import run_test_server
+from .arvados_testutil import make_block_cache
+
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepTestCase(run_test_server.TestCaseWithServers):
+ disk_cache = False
MAIN_SERVER = {}
KEEP_SERVER = {}
run_test_server.authorize_with("admin")
cls.api_client = arvados.api('v1')
cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
- proxy='', local_store='')
+ proxy='', local_store='',
+ block_cache=make_block_cache(cls.disk_cache))
def test_KeepBasicRWTest(self):
self.assertEqual(0, self.keep_client.upload_counter.get())
self.assertEqual(6, self.keep_client.upload_counter.get())
self.assertEqual(0, self.keep_client.download_counter.get())
- self.assertEqual(self.keep_client.get(foo_locator),
- b'foo',
+ self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
+ b'foo'),
'wrong content from Keep.get(md5("foo"))')
self.assertEqual(3, self.keep_client.download_counter.get())
b'test_head',
'wrong content from Keep.get for "test_head"')
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
+ disk_cache = False
MAIN_SERVER = {}
KEEP_SERVER = {'blob_signing': True}
def test_KeepBasicRWTest(self):
run_test_server.authorize_with('active')
- keep_client = arvados.KeepClient()
+ keep_client = arvados.KeepClient(block_cache=make_block_cache(self.disk_cache))
foo_locator = keep_client.put('foo')
self.assertRegex(
foo_locator,
keep_client.get,
unsigned_bar_locator)
-
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepProxyTestCase(run_test_server.TestCaseWithServers):
+ disk_cache = False
MAIN_SERVER = {}
KEEP_SERVER = {}
KEEP_PROXY_SERVER = {}
# Will use ARVADOS_KEEP_SERVICES environment variable that
# is set by setUpClass().
keep_client = arvados.KeepClient(api_client=self.api_client,
- local_store='')
+ local_store='', block_cache=make_block_cache(self.disk_cache))
baz_locator = keep_client.put('baz')
self.assertRegex(
baz_locator,
# existing proxy setting and setting multiple proxies
arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
keep_client = arvados.KeepClient(api_client=self.api_client,
- local_store='')
+ local_store='',
+ block_cache=make_block_cache(self.disk_cache))
uris = [x['_service_root'] for x in keep_client._keep_services]
self.assertEqual(uris, ['http://10.0.0.1/',
'https://foo.example.org:1234/'])
arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
with self.assertRaises(arvados.errors.ArgumentError):
keep_client = arvados.KeepClient(api_client=self.api_client,
- local_store='')
-
+ local_store='',
+ block_cache=make_block_cache(self.disk_cache))
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
+ disk_cache = False
+
def get_service_roots(self, api_client):
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
return [urllib.parse.urlparse(url) for url in sorted(services)]
def test_recognize_proxy_services_in_controller_response(self):
keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
- service_type='proxy', service_host='localhost', service_port=9, count=1))
+ service_type='proxy', service_host='localhost', service_port=9, count=1),
+ block_cache=make_block_cache(self.disk_cache))
try:
# this will fail, but it ensures we get the service
# discovery response
api_client.insecure = True
with tutil.mock_keep_responses(b'foo', 200) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
self.assertEqual(
mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
api_client.insecure = False
with tutil.mock_keep_responses(b'foo', 200) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
# getopt()==None here means we didn't change the
# default. If we were using real pycurl instead of a mock,
headers = {'X-Keep-Locator':local_loc}
with tutil.mock_keep_responses('', 200, **headers):
# Check that the translated locator gets returned
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
# Check that refresh_signature() uses the correct method and headers
keep_client._get_or_head = mock.MagicMock()
api_client = self.mock_keep_services(count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.get('ffffffffffffffffffffffffffffffff')
self.assertEqual(
api_client = self.mock_keep_services(count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
with self.assertRaises(arvados.errors.KeepWriteError):
keep_client.put(b'foo')
self.assertEqual(
api_client = self.mock_keep_services(count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.head('ffffffffffffffffffffffffffffffff')
self.assertEqual(
api_client = self.mock_keep_services(service_type='proxy', count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.get('ffffffffffffffffffffffffffffffff')
self.assertEqual(
api_client = self.mock_keep_services(service_type='proxy', count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.head('ffffffffffffffffffffffffffffffff')
self.assertEqual(
api_client = mock.MagicMock(name='api_client')
api_client.keep_services().accessible().execute.side_effect = (
arvados.errors.ApiError)
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
with self.assertRaises(exc_class) as err_check:
getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
self.assertEqual(0, len(err_check.exception.request_errors()))
"retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
with req_mock, tutil.skip_sleep, \
self.assertRaises(exc_class) as err_check:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
num_retries=3)
self.assertEqual([502, 502], [
api_client = self.mock_keep_services(count=3)
with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
keep_client.put(data)
self.assertEqual(2, len(exc_check.exception.request_errors()))
api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
keep_client.put(data)
self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
self.assertEqual(0, len(exc_check.exception.request_errors()))
body = b'oddball service get'
api_client = self.mock_keep_services(service_type='fancynewblobstore')
with tutil.mock_keep_responses(body, 200):
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
actual = keep_client.get(tutil.str_keep_locator(body))
self.assertEqual(body, actual)
pdh = tutil.str_keep_locator(body)
api_client = self.mock_keep_services(service_type='fancynewblobstore')
with tutil.mock_keep_responses(pdh, 200):
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
actual = keep_client.put(body, copies=1)
self.assertEqual(pdh, actual)
headers = {'x-keep-replicas-stored': 3}
with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
**headers) as req_mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
actual = keep_client.put(body, copies=2)
self.assertEqual(pdh, actual)
self.assertEqual(1, req_mock.call_count)
-
@tutil.skip_sleep
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
+ disk_cache = False
+
def setUp(self):
self.api_client = self.mock_keep_services(count=2)
- self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
self.data = b'xyzzy'
self.locator = '1271ed5ef305aadabc605b1609e24c52'
def test_head_and_then_get_return_different_responses(self, get_mock):
head_resp = None
get_resp = None
- get_mock.side_effect = ['first response', 'second response']
+ get_mock.side_effect = [b'first response', b'second response']
with tutil.mock_keep_responses(self.data, 200, 200):
head_resp = self.keep_client.head(self.locator)
get_resp = self.keep_client.get(self.locator)
- self.assertEqual('first response', head_resp)
+ self.assertEqual(b'first response', head_resp)
# First reponse was not cached because it was from a HEAD request.
self.assertNotEqual(head_resp, get_resp)
@tutil.skip_sleep
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
+ disk_cache = False
+
def setUp(self):
self.api_client = self.mock_keep_services(count=2)
- self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
self.data = b'xyzzy'
self.locator = '1271ed5ef305aadabc605b1609e24c52'
}
}
api_client = self.mock_keep_services(api_mock=api_mock, count=2)
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
resp_hdr = {
'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
'x-keep-replicas-stored': 1
self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
@tutil.skip_sleep
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
+ disk_cache = False
+
def setUp(self):
self.api_client = self.mock_keep_services(count=2)
- self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
self.data = b'xyzzy'
self.locator = '1271ed5ef305aadabc605b1609e24c52'
self.test_id = arvados.util.new_request_id()
@tutil.skip_sleep
+#@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
+ disk_cache = False
def setUp(self):
# expected_order[i] is the probe order for
hashlib.md5(self.blocks[x]).hexdigest()
for x in range(len(self.expected_order))]
self.api_client = self.mock_keep_services(count=self.services)
- self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
def test_weighted_service_roots_against_reference_set(self):
# Confirm weighted_service_roots() returns the correct order
hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
initial_services = 12
self.api_client = self.mock_keep_services(count=initial_services)
- self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
probes_before = [
self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
for added_services in range(1, 12):
api_client = self.mock_keep_services(count=initial_services+added_services)
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
total_penalty = 0
for hash_index in range(len(hashes)):
probe_after = keep_client.weighted_service_roots(
# Arbitrary port number:
aport = random.randint(1024,65535)
api_client = self.mock_keep_services(service_port=aport, count=self.services)
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
with mock.patch('pycurl.Curl') as curl_mock, \
self.assertRaises(exc_class) as err_check:
curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
def test_put_error_shows_probe_order(self):
self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
-
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
+ disk_cache = False
+
# BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
# 1s worth of data and then trigger bandwidth errors before running
# out of data.
def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
return arvados.KeepClient(
api_client=self.api_client,
- timeout=timeouts)
+ timeout=timeouts, block_cache=make_block_cache(self.disk_cache))
def test_timeout_slow_connect(self):
# Can't simulate TCP delays with our own socket. Leave our
with self.assertRaises(arvados.errors.KeepWriteError):
kc.put(self.DATA, copies=1, num_retries=0)
-
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
+ disk_cache = False
+
def mock_disks_and_gateways(self, disks=3, gateways=1):
self.gateways = [{
'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
for gw in self.gateways]
self.api_client = self.mock_keep_services(
count=disks, additional_services=self.gateways)
- self.keepClient = arvados.KeepClient(api_client=self.api_client)
+ self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
@mock.patch('pycurl.Curl')
def test_get_with_gateway_hint_first(self, MockCurl):
self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
MockCurl.return_value.getopt(pycurl.URL).decode())
-
class KeepClientRetryTestMixin(object):
+ disk_cache = False
+
# Testing with a local Keep store won't exercise the retry behavior.
# Instead, our strategy is:
# * Create a client with one proxy specified (pointed at a black
def new_client(self, **caller_kwargs):
kwargs = self.client_kwargs.copy()
kwargs.update(caller_kwargs)
+ kwargs['block_cache'] = make_block_cache(self.disk_cache)
return arvados.KeepClient(**kwargs)
def run_method(self, *args, **kwargs):
@tutil.skip_sleep
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
DEFAULT_EXCEPTION = arvados.errors.KeepReadError
self.check_success(locator=self.HINTED_LOCATOR)
@tutil.skip_sleep
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
DEFAULT_EXPECT = True
DEFAULT_EXCEPTION = arvados.errors.KeepReadError
self.check_success(locator=self.HINTED_LOCATOR)
@tutil.skip_sleep
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
@tutil.skip_sleep
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
+ block_cache = False
+
# Test put()s that need two distinct servers to succeed, possibly
# requiring multiple passes through the retry loop.
def setUp(self):
self.api_client = self.mock_keep_services(count=2)
- self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
def test_success_after_exception(self):
with tutil.mock_keep_responses(
self.keep_client.put('foo', num_retries=1, copies=2)
self.assertEqual(2, req_mock.call_count)
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepClientAPIErrorTest(unittest.TestCase):
+ disk_cache = False
+
def test_api_fail(self):
class ApiMock(object):
def __getattr__(self, r):
else:
raise arvados.errors.KeepReadError()
keep_client = arvados.KeepClient(api_client=ApiMock(),
- proxy='', local_store='')
+ proxy='', local_store='',
+ block_cache=make_block_cache(self.disk_cache))
# The bug this is testing for is that if an API (not
# keepstore) exception is thrown as part of a get(), the next
# value in the database to an implicit zero/false value in an update
# request.
def fill_container_defaults
+ # Make sure this is correctly sorted by key, because we merge in
+ # whatever is in the database on top of it, this will be the order
+ # that gets used downstream rather than the order the keys appear
+ # in the database.
self.runtime_constraints = {
'API' => false,
'cuda' => {
'driver_version' => '',
'hardware_capability' => '',
},
+ 'keep_cache_disk' => 0,
'keep_cache_ram' => 0,
'ram' => 0,
'vcpus' => 0,
if rc['keep_cache_ram'] == 0
rc['keep_cache_ram'] = Rails.configuration.Containers.DefaultKeepCacheRAM
end
+ if rc['keep_cache_disk'] == 0 and rc['keep_cache_ram'] == 0
+ # Only set if keep_cache_ram isn't set.
+ rc['keep_cache_disk'] = Rails.configuration.Containers.DefaultKeepCacheDisk
+ end
rc
end
# records that don't have a 'cuda' section in runtime_constraints
resolved_runtime_constraints << resolved_runtime_constraints[0].except('cuda')
end
+ if resolved_runtime_constraints[0]['keep_cache_disk'] == 0
+ # If no disk cache requested, extend search to include older container
+ # records that don't have a 'keep_cache_disk' field in runtime_constraints
+ if resolved_runtime_constraints.length == 2
+ # exclude the one that also excludes CUDA
+ resolved_runtime_constraints << resolved_runtime_constraints[1].except('keep_cache_disk')
+ end
+ resolved_runtime_constraints << resolved_runtime_constraints[0].except('keep_cache_disk')
+ end
candidates = candidates.where_serialized(:runtime_constraints, resolved_runtime_constraints, md5: true, multivalue: true)
log_reuse_info(candidates) { "after filtering on runtime_constraints #{attrs[:runtime_constraints].inspect}" }
]
runtime_constraints:
API: true
+ keep_cache_disk: 0
keep_cache_ram: 268435456
ram: 1342177280
vcpus: 1
]
runtime_constraints:
API: true
+ keep_cache_disk: 0
keep_cache_ram: 268435456
ram: 268435456
vcpus: 1
]
runtime_constraints:
API: true
+ keep_cache_disk: 0
keep_cache_ram: 268435456
ram: 268435456
vcpus: 2
]
runtime_constraints:
API: true
+ keep_cache_disk: 0
keep_cache_ram: 268435456
ram: 268435456
vcpus: 1
]
runtime_constraints:
API: true
+ keep_cache_disk: 0
keep_cache_ram: 268435456
ram: 1342177280
vcpus: 1
assert ({"vcpus" => 2, "ram" => 30}.to_a - cr.runtime_constraints.to_a).empty?
+ assert_equal 0, Rails.configuration.Containers.DefaultKeepCacheRAM
+ assert_equal 8589934592, Rails.configuration.Containers.DefaultKeepCacheDisk
+
assert_not_nil cr.container_uuid
c = Container.find_by_uuid cr.container_uuid
assert_not_nil c
assert_equal({}, c.environment)
assert_equal({"/out" => {"kind"=>"tmp", "capacity"=>1000000}}, c.mounts)
assert_equal "/out", c.output_path
- assert ({"keep_cache_ram"=>268435456, "vcpus" => 2, "ram" => 30}.to_a - c.runtime_constraints.to_a).empty?
+ assert ({"keep_cache_disk"=>8589934592, "keep_cache_ram"=>0, "vcpus" => 2, "ram" => 30}.to_a - c.runtime_constraints.to_a).empty?
assert_operator 0, :<, c.priority
assert_raises(ActiveRecord::RecordInvalid) do
output_path: "test",
runtime_constraints: {
"API" => false,
+ "keep_cache_disk" => 0,
"keep_cache_ram" => 0,
"ram" => 12000000000,
"vcpus" => 4
set_user_from_auth :active
env = {"C" => "3", "B" => "2", "A" => "1"}
m = {"F" => {"kind" => "3"}, "E" => {"kind" => "2"}, "D" => {"kind" => "1"}}
- rc = {"vcpus" => 1, "ram" => 1, "keep_cache_ram" => 1, "API" => true, "cuda" => {"device_count":0, "driver_version": "", "hardware_capability": ""}}
+ rc = {"vcpus" => 1, "ram" => 1, "keep_cache_ram" => 1, "keep_cache_disk" => 0, "API" => true, "cuda" => {"device_count":0, "driver_version": "", "hardware_capability": ""}}
c, _ = minimal_new(environment: env, mounts: m, runtime_constraints: rc)
c.reload
assert_equal Container.deep_sort_hash(env).to_json, c.environment.to_json
set_user_from_auth :active
# No cuda
no_cuda_attrs = REUSABLE_COMMON_ATTRS.merge({use_existing:false, priority:1, environment:{"var" => "queued"},
- runtime_constraints: {"vcpus" => 1, "ram" => 1, "keep_cache_ram"=>268435456, "API" => false,
+ runtime_constraints: {"vcpus" => 1, "ram" => 1, "keep_cache_disk"=>0, "keep_cache_ram"=>268435456, "API" => false,
"cuda" => {"device_count":0, "driver_version": "", "hardware_capability": ""}},})
c1, _ = minimal_new(no_cuda_attrs)
assert_equal Container::Queued, c1.state
# has cuda
cuda_attrs = REUSABLE_COMMON_ATTRS.merge({use_existing:false, priority:1, environment:{"var" => "queued"},
- runtime_constraints: {"vcpus" => 1, "ram" => 1, "keep_cache_ram"=>268435456, "API" => false,
+ runtime_constraints: {"vcpus" => 1, "ram" => 1, "keep_cache_disk"=>0, "keep_cache_ram"=>268435456, "API" => false,
"cuda" => {"device_count":1, "driver_version": "11.0", "hardware_capability": "9.0"}},})
c2, _ = minimal_new(cuda_attrs)
assert_equal Container::Queued, c2.state
type=str, metavar='PATH', action='append', default=[],
help="Create a new collection, mount it in read/write mode at mountpoint/PATH, and delete it when unmounting.")
+
self.add_argument('--debug', action='store_true', help="""Debug mode""")
self.add_argument('--logfile', help="""Write debug logs and errors to the specified file (default stderr).""")
self.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False)
self.add_argument('--encoding', type=str, help="Character encoding to use for filesystem, default is utf-8 (see Python codec registry for list of available encodings)", default="utf-8")
- self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 256MiB)", default=256*1024*1024)
- self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128MiB)", default=128*1024*1024)
+ self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 8 GiB for disk-based cache or 256 MiB with RAM-only cache)", default=0)
+ self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128 MiB)", default=128*1024*1024)
+
+ cachetype = self.add_mutually_exclusive_group()
+ cachetype.add_argument('--ram-cache', action='store_false', dest='disk_cache', help="Use in-memory caching only", default=True)
+ cachetype.add_argument('--disk-cache', action='store_true', dest='disk_cache', help="Use disk based caching (default)", default=True)
+
+ self.add_argument('--disk-cache-dir', type=str, help="Disk cache location (default ~/.cache/arvados/keep)", default=None)
self.add_argument('--disable-event-listening', action='store_true', help="Don't subscribe to events on the API server", dest="disable_event_listening", default=False)
try:
self.api = arvados.safeapi.ThreadSafeApiCache(
apiconfig=arvados.config.settings(),
+ # default value of file_cache is 0, this tells KeepBlockCache to
+ # choose a default based on whether disk_cache is enabled or not.
keep_params={
- 'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
+ 'block_cache': arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
+ disk_cache=self.args.disk_cache,
+ disk_cache_dir=self.args.disk_cache_dir),
'num_retries': self.args.retries,
})
except KeyError as e:
'Programming Language :: Python :: 3',
],
test_suite='tests',
- tests_require=['pbr<1.7.0', 'mock>=1.0', 'PyYAML'],
+ tests_require=['pbr<1.7.0', 'mock>=1.0', 'PyYAML', 'parameterized',],
zip_safe=False
)
from __future__ import absolute_import
import arvados
+import arvados.keep
import arvados_fuse as fuse
import arvados.safeapi
import llfuse
from .integration_test import workerPool
+def make_block_cache(disk_cache):
+ if disk_cache:
+ disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
+ shutil.rmtree(disk_cache_dir, ignore_errors=True)
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
+ return block_cache
+
class MountTestBase(unittest.TestCase):
+ disk_cache = False
+
def setUp(self, api=None, local_store=True):
# The underlying C implementation of open() makes a fstat() syscall
# with the GIL still held. When the GETATTR message comes back to
self.mounttmp = tempfile.mkdtemp()
run_test_server.run()
run_test_server.authorize_with("admin")
- self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
+
+ self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings(), keep_params={"block_cache": make_block_cache(self.disk_cache)})
self.llfuse_thread = None
# This is a copy of Mount's method. TODO: Refactor MountTestBase
import time
import unittest
import tempfile
+import parameterized
import arvados
import arvados_fuse as fuse
else:
self.done = True
-
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseMountTest(MountTestBase):
def setUp(self):
super(FuseMountTest, self).setUp()
self.assertEqual(v, f.read().decode())
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseMagicTest(MountTestBase):
def setUp(self, api=None):
super(FuseMagicTest, self).setUp(api=api)
Test().runTest()
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseSharedTest(MountTestBase):
def runTest(self):
self.make_mount(fuse.SharedDirectory,
self.assertEqual("plnp", f.read())
Test().runTest()
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseModifyFileTest(MountTestBase):
def runTest(self):
collection = arvados.collection.Collection(api_client=self.api)
self.pool.apply(fuseModifyFileTestHelperReadEndContents, (self.mounttmp,))
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseAddFileToCollectionTest(MountTestBase):
def runTest(self):
collection = arvados.collection.Collection(api_client=self.api)
self.assertEqual(["file1.txt", "file2.txt"], sorted(d1))
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseRemoveFileFromCollectionTest(MountTestBase):
def runTest(self):
collection = arvados.collection.Collection(api_client=self.api)
pass
Test().runTest()
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseCreateFileTest(MountTestBase):
def runTest(self):
collection = arvados.collection.Collection(api_client=self.api)
self.assertEqual(f.read(), "Hello world!")
Test().runTest()
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseWriteFileTest(MountTestBase):
def runTest(self):
collection = arvados.collection.Collection(api_client=self.api)
Test().runTest()
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseUpdateFileTest(MountTestBase):
def runTest(self):
collection = arvados.collection.Collection(api_client=self.api)