Merge branch 'wtsi/13113-acr-collectioncache-retries' refs #13113
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Fri, 23 Feb 2018 18:28:50 +0000 (13:28 -0500)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Fri, 23 Feb 2018 18:28:50 +0000 (13:28 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

16 files changed:
sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/usage.go
services/fuse/arvados_fuse/__init__.py
services/fuse/arvados_fuse/fresh.py
services/fuse/arvados_fuse/fusefile.py
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/mounts_test.go
services/keepstore/s3_volume.go
services/keepstore/s3_volume_test.go
services/keepstore/volume.go
services/keepstore/volume_test.go
services/keepstore/volume_unix.go
services/keepstore/volume_unix_test.go

index aa6bdad90bea0551f7043fbdd0889f6dea2ff6a8..f4580f346bbed43a5642e974d54c8e5922c24efd 100644 (file)
@@ -866,6 +866,9 @@ class ArvadosFile(object):
 
     """
 
+    __slots__ = ('parent', 'name', '_writers', '_committed',
+                 '_segments', 'lock', '_current_bblock', 'fuse_entry')
+
     def __init__(self, parent, name, stream=[], segments=[]):
         """
         ArvadosFile constructor.
index 4be098d3511656e42a176b5fe46ea0de83355b10..33333ee86558c4b0244917a9ffb2c75645d321fa 100644 (file)
@@ -1531,6 +1531,10 @@ class Collection(RichCollectionBase):
 
         return text
 
+    _token_re = re.compile(r'(\S+)(\s+|$)')
+    _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
+    _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
+
     @synchronized
     def _import_manifest(self, manifest_text):
         """Import a manifest into a `Collection`.
@@ -1549,7 +1553,7 @@ class Collection(RichCollectionBase):
         stream_name = None
         state = STREAM_NAME
 
-        for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
+        for token_and_separator in self._token_re.finditer(manifest_text):
             tok = token_and_separator.group(1)
             sep = token_and_separator.group(2)
 
@@ -1564,7 +1568,7 @@ class Collection(RichCollectionBase):
                 continue
 
             if state == BLOCKS:
-                block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
+                block_locator = self._block_re.match(tok)
                 if block_locator:
                     blocksize = int(block_locator.group(1))
                     blocks.append(Range(tok, streamoffset, blocksize, 0))
@@ -1573,7 +1577,7 @@ class Collection(RichCollectionBase):
                     state = SEGMENTS
 
             if state == SEGMENTS:
-                file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
+                file_segment = self._segment_re.match(tok)
                 if file_segment:
                     pos = int(file_segment.group(1))
                     size = int(file_segment.group(2))
index 0879de20f9de4b884e38612f428cfe92b37bfe63..f77023697e0f54ccaa12e2e7bc1bf3dd39f71509 100644 (file)
@@ -47,6 +47,10 @@ type Dispatcher struct {
        // Example: []string{"crunch-run", "--cgroup-parent-subsystem=memory"}
        CrunchRunCommand []string
 
+       // Extra RAM to reserve (in Bytes) for SLURM job, in addition
+       // to the amount specified in the container's RuntimeConstraints
+       ReserveExtraRAM int64
+
        // Minimum time between two attempts to run the same container
        MinRetryPeriod arvados.Duration
 }
@@ -206,7 +210,7 @@ func (disp *Dispatcher) niceness(priority int) int {
 }
 
 func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) {
-       mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
+       mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM+disp.ReserveExtraRAM) / float64(1048576)))
 
        var disk int64
        for _, m := range container.Mounts {
index f64c5023952b42d9ed7e4483941b6dfd561f98a1..032d86284d5e0a9fc8a3d712a0283597ec29d765 100644 (file)
@@ -20,7 +20,8 @@ var exampleConfigFile = []byte(`
        },
        "CrunchRunCommand": ["crunch-run"],
        "PollPeriod": "10s",
-       "SbatchArguments": ["--partition=foo", "--exclude=node13"]
+       "SbatchArguments": ["--partition=foo", "--exclude=node13"],
+       "ReserveExtraRAM": 268435456,
     }`)
 
 func usage(fs *flag.FlagSet) {
index 788d475e33c0d094d719503e6b9fc4dba386e1ec..f1e49f5afcffff32143b9033c5f83dddcd0c7c65 100644 (file)
@@ -156,12 +156,30 @@ class InodeCache(object):
 
     def _remove(self, obj, clear):
         if clear:
-            if obj.in_use():
-                _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
-                return
+            # Kernel behavior seems to be that if a file is
+            # referenced, its parents remain referenced too. This
+            # means has_ref() exits early when a collection is not
+            # candidate for eviction.
+            #
+            # By contrast, in_use() doesn't increment references on
+            # parents, so it requires a full tree walk to determine if
+            # a collection is a candidate for eviction.  This takes
+            # .07s for 240000 files, which becomes a major drag when
+            # cap_cache is being called several times a second and
+            # there are multiple non-evictable collections in the
+            # cache.
+            #
+            # So it is important for performance that we do the
+            # has_ref() check first.
+
             if obj.has_ref(True):
                 _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
                 return
+
+            if obj.in_use():
+                _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
+                return
+
             obj.kernel_invalidate()
             _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
             obj.clear()
@@ -202,7 +220,8 @@ class InodeCache(object):
                     if obj not in self._by_uuid[obj.cache_uuid]:
                         self._by_uuid[obj.cache_uuid].append(obj)
             self._total += obj.objsize()
-            _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
+            _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
+                          obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
             self.cap_cache()
 
     def touch(self, obj):
index 8b680f0663d25cf423e68251f1a82b8ed7384bc2..2a3a19c54c66005a6f96cd8d1dbd6de3c6345aad 100644 (file)
@@ -59,6 +59,10 @@ class FreshBase(object):
     * Clear the object contents (invalidates the object)
 
     """
+
+    __slots__ = ("_stale", "_poll", "_last_update", "_atime", "_poll_time", "use_count",
+                 "ref_count", "dead", "cache_size", "cache_uuid", "allow_attr_cache")
+
     def __init__(self):
         self._stale = True
         self._poll = False
index 585536176007bdfcc889a47647f85114e6a34fb7..cedb4fb451cdf6fbdaefe0b4caa3a20ef424d69e 100644 (file)
@@ -15,6 +15,8 @@ _logger = logging.getLogger('arvados.arvados_fuse')
 class File(FreshBase):
     """Base for file objects."""
 
+    __slots__ = ("inode", "parent_inode", "_mtime")
+
     def __init__(self, parent_inode, _mtime=0):
         super(File, self).__init__()
         self.inode = None
@@ -46,6 +48,8 @@ class File(FreshBase):
 class FuseArvadosFile(File):
     """Wraps a ArvadosFile."""
 
+    __slots__ = ('arvfile',)
+
     def __init__(self, parent_inode, arvfile, _mtime):
         super(FuseArvadosFile, self).__init__(parent_inode, _mtime)
         self.arvfile = arvfile
index c4eb8b23d5fc10078d9891ed2fbc88ef315ba30d..f18d82c06b29b7948a90431be686001b1bd9e572 100644 (file)
@@ -105,6 +105,7 @@ type AzureBlobVolume struct {
        AzureReplication      int
        ReadOnly              bool
        RequestTimeout        arvados.Duration
+       StorageClasses        []string
 
        azClient  storage.Client
        container *azureContainer
@@ -590,6 +591,11 @@ func (v *AzureBlobVolume) Replication() int {
        return v.AzureReplication
 }
 
+// GetStorageClasses implements Volume
+func (v *AzureBlobVolume) GetStorageClasses() []string {
+       return v.StorageClasses
+}
+
 // If possible, translate an Azure SDK error to a recognizable error
 // like os.ErrNotExist.
 func (v *AzureBlobVolume) translateError(err error) error {
index 5c6e2ab44c7cc45e16982a63c808e4eea2b6393e..60a7911768f009ef6209292d6c1e04b6cccbe6e7 100644 (file)
@@ -27,6 +27,7 @@ import (
        "time"
 
        "github.com/Azure/azure-sdk-for-go/storage"
+       "github.com/ghodss/yaml"
        check "gopkg.in/check.v1"
 )
 
@@ -707,6 +708,18 @@ func (s *StubbedAzureBlobSuite) TestStats(c *check.C) {
        c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
 }
 
+func (s *StubbedAzureBlobSuite) TestConfig(c *check.C) {
+       var cfg Config
+       err := yaml.Unmarshal([]byte(`
+Volumes:
+  - Type: Azure
+    StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+       c.Check(err, check.IsNil)
+       c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
+
 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
        v.azHandler.PutRaw(v.ContainerName, locator, data)
 }
index 883aa712e3db0bdcb8fee2eb0a583affd3150e53..66a212456d51c78e09cc12dc41bdccb17bf953df 100644 (file)
@@ -46,11 +46,11 @@ func (s *MountsSuite) TestMounts(c *check.C) {
        resp := s.call("GET", "/mounts", "", nil)
        c.Check(resp.Code, check.Equals, http.StatusOK)
        var mntList []struct {
-               UUID        string
-               DeviceID    string
-               ReadOnly    bool
-               Replication int
-               Tier        int
+               UUID           string
+               DeviceID       string
+               ReadOnly       bool
+               Replication    int
+               StorageClasses []string
        }
        err := json.Unmarshal(resp.Body.Bytes(), &mntList)
        c.Assert(err, check.IsNil)
@@ -61,7 +61,7 @@ func (s *MountsSuite) TestMounts(c *check.C) {
                c.Check(m.DeviceID, check.Equals, "mock-device-id")
                c.Check(m.ReadOnly, check.Equals, false)
                c.Check(m.Replication, check.Equals, 1)
-               c.Check(m.Tier, check.Equals, 1)
+               c.Check(m.StorageClasses, check.DeepEquals, []string{"default"})
        }
        c.Check(mntList[0].UUID, check.Not(check.Equals), mntList[1].UUID)
 
index 90e8a1b4f6913da10b6789a4111a7ac8aa479721..a60b2fc27e321f553c9784691702282ecb39a6e4 100644 (file)
@@ -152,6 +152,7 @@ type S3Volume struct {
        RaceWindow         arvados.Duration
        ReadOnly           bool
        UnsafeDelete       bool
+       StorageClasses     []string
 
        bucket *s3bucket
 
@@ -686,6 +687,11 @@ func (v *S3Volume) Replication() int {
        return v.S3Replication
 }
 
+// GetStorageClasses implements Volume
+func (v *S3Volume) GetStorageClasses() []string {
+       return v.StorageClasses
+}
+
 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
 func (v *S3Volume) isKeepBlock(s string) bool {
index acc1b11df32526c132d763d970915f9f30735437..4081e1e63c4825a08712a93bd552de7818f018d5 100644 (file)
@@ -19,6 +19,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/AdRoll/goamz/s3"
        "github.com/AdRoll/goamz/s3/s3test"
+       "github.com/ghodss/yaml"
        check "gopkg.in/check.v1"
 )
 
@@ -435,6 +436,18 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration,
        return v
 }
 
+func (s *StubbedS3Suite) TestConfig(c *check.C) {
+       var cfg Config
+       err := yaml.Unmarshal([]byte(`
+Volumes:
+  - Type: S3
+    StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+       c.Check(err, check.IsNil)
+       c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
+
 func (v *TestableS3Volume) Start() error {
        tmp, err := ioutil.TempFile("", "keepstore")
        v.c.Assert(err, check.IsNil)
index 69802abdd1b5c4e22422293331d6cb0eec371896..1f8fba5d067c2a0731cb05eeebf81cc76bc315b7 100644 (file)
@@ -240,6 +240,9 @@ type Volume interface {
        // Return a globally unique ID of the underlying storage
        // device if possible, otherwise "".
        DeviceID() string
+
+       // Get the storage classes associated with this volume
+       GetStorageClasses() []string
 }
 
 // A VolumeWithExamples provides example configs to display in the
@@ -284,12 +287,12 @@ type VolumeManager interface {
 
 // A VolumeMount is an attachment of a Volume to a VolumeManager.
 type VolumeMount struct {
-       UUID        string
-       DeviceID    string
-       ReadOnly    bool
-       Replication int
-       Tier        int
-       volume      Volume
+       UUID           string
+       DeviceID       string
+       ReadOnly       bool
+       Replication    int
+       StorageClasses []string
+       volume         Volume
 }
 
 // Generate a UUID the way API server would for a "KeepVolumeMount"
@@ -326,13 +329,17 @@ func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager {
        }
        vm.mountMap = make(map[string]*VolumeMount)
        for _, v := range volumes {
+               sc := v.GetStorageClasses()
+               if len(sc) == 0 {
+                       sc = []string{"default"}
+               }
                mnt := &VolumeMount{
-                       UUID:        (*VolumeMount)(nil).generateUUID(),
-                       DeviceID:    v.DeviceID(),
-                       ReadOnly:    !v.Writable(),
-                       Replication: v.Replication(),
-                       Tier:        1,
-                       volume:      v,
+                       UUID:           (*VolumeMount)(nil).generateUUID(),
+                       DeviceID:       v.DeviceID(),
+                       ReadOnly:       !v.Writable(),
+                       Replication:    v.Replication(),
+                       StorageClasses: sc,
+                       volume:         v,
                }
                vm.iostats[v] = &ioStats{}
                vm.mounts = append(vm.mounts, mnt)
index baed6a71b60ae556a28b0cc3478e147ad77f90af..43ddd090cc1cfd22419e80aa86f1e838ffebd479 100644 (file)
@@ -241,3 +241,7 @@ func (v *MockVolume) Replication() int {
 
 func (v *MockVolume) EmptyTrash() {
 }
+
+func (v *MockVolume) GetStorageClasses() []string {
+       return nil
+}
index ea9aa489c5af8c357cd8c195851e94456537ce2a..b4f18ad13e6d0c93e0c0b40023b9153d3c7a6d99 100644 (file)
@@ -110,6 +110,7 @@ type UnixVolume struct {
        ReadOnly             bool
        Serialize            bool
        DirectoryReplication int
+       StorageClasses       []string
 
        // something to lock during IO, typically a sync.Mutex (or nil
        // to skip locking)
@@ -644,6 +645,11 @@ func (v *UnixVolume) Replication() int {
        return v.DirectoryReplication
 }
 
+// GetStorageClasses implements Volume
+func (v *UnixVolume) GetStorageClasses() []string {
+       return v.StorageClasses
+}
+
 // InternalStats returns I/O and filesystem ops counters.
 func (v *UnixVolume) InternalStats() interface{} {
        return &v.os.stats
index ea3d91d98c2e42deed99417c87254af42b8148e8..7f1cd219644ab241f2c0a8a0e2353c8f4c16844f 100644 (file)
@@ -19,6 +19,7 @@ import (
        "testing"
        "time"
 
+       "github.com/ghodss/yaml"
        check "gopkg.in/check.v1"
 )
 
@@ -427,3 +428,15 @@ func (s *UnixVolumeSuite) TestStats(c *check.C) {
        c.Check(err, check.IsNil)
        c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
 }
+
+func (s *UnixVolumeSuite) TestConfig(c *check.C) {
+       var cfg Config
+       err := yaml.Unmarshal([]byte(`
+Volumes:
+  - Type: Directory
+    StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+       c.Check(err, check.IsNil)
+       c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}