From: Peter Amstutz Date: Fri, 23 Feb 2018 18:28:50 +0000 (-0500) Subject: Merge branch 'wtsi/13113-acr-collectioncache-retries' refs #13113 X-Git-Tag: 1.1.4~67 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/68cde7e2a496c4e57cb1576bb8bf29415d3c0b67?hp=b3f083ce49a50521d96daa28f896d857384222fb Merge branch 'wtsi/13113-acr-collectioncache-retries' refs #13113 Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index aa6bdad90b..f4580f346b 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -866,6 +866,9 @@ class ArvadosFile(object): """ + __slots__ = ('parent', 'name', '_writers', '_committed', + '_segments', 'lock', '_current_bblock', 'fuse_entry') + def __init__(self, parent, name, stream=[], segments=[]): """ ArvadosFile constructor. diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 4be098d351..33333ee865 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -1531,6 +1531,10 @@ class Collection(RichCollectionBase): return text + _token_re = re.compile(r'(\S+)(\s+|$)') + _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*') + _segment_re = re.compile(r'(\d+):(\d+):(\S+)') + @synchronized def _import_manifest(self, manifest_text): """Import a manifest into a `Collection`. @@ -1549,7 +1553,7 @@ class Collection(RichCollectionBase): stream_name = None state = STREAM_NAME - for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text): + for token_and_separator in self._token_re.finditer(manifest_text): tok = token_and_separator.group(1) sep = token_and_separator.group(2) @@ -1564,7 +1568,7 @@ class Collection(RichCollectionBase): continue if state == BLOCKS: - block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok) + block_locator = self._block_re.match(tok) if block_locator: blocksize = int(block_locator.group(1)) blocks.append(Range(tok, streamoffset, blocksize, 0)) @@ -1573,7 +1577,7 @@ class Collection(RichCollectionBase): state = SEGMENTS if state == SEGMENTS: - file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok) + file_segment = self._segment_re.match(tok) if file_segment: pos = int(file_segment.group(1)) size = int(file_segment.group(2)) diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go index 0879de20f9..f77023697e 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go @@ -47,6 +47,10 @@ type Dispatcher struct { // Example: []string{"crunch-run", "--cgroup-parent-subsystem=memory"} CrunchRunCommand []string + // Extra RAM to reserve (in Bytes) for SLURM job, in addition + // to the amount specified in the container's RuntimeConstraints + ReserveExtraRAM int64 + // Minimum time between two attempts to run the same container MinRetryPeriod arvados.Duration } @@ -206,7 +210,7 @@ func (disp *Dispatcher) niceness(priority int) int { } func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) { - mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576))) + mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM+disp.ReserveExtraRAM) / float64(1048576))) var disk int64 for _, m := range container.Mounts { diff --git a/services/crunch-dispatch-slurm/usage.go b/services/crunch-dispatch-slurm/usage.go index f64c502395..032d86284d 100644 --- a/services/crunch-dispatch-slurm/usage.go +++ b/services/crunch-dispatch-slurm/usage.go @@ -20,7 +20,8 @@ var exampleConfigFile = []byte(` }, "CrunchRunCommand": ["crunch-run"], "PollPeriod": "10s", - "SbatchArguments": ["--partition=foo", "--exclude=node13"] + "SbatchArguments": ["--partition=foo", "--exclude=node13"], + "ReserveExtraRAM": 268435456, }`) func usage(fs *flag.FlagSet) { diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py index 788d475e33..f1e49f5afc 100644 --- a/services/fuse/arvados_fuse/__init__.py +++ b/services/fuse/arvados_fuse/__init__.py @@ -156,12 +156,30 @@ class InodeCache(object): def _remove(self, obj, clear): if clear: - if obj.in_use(): - _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode) - return + # Kernel behavior seems to be that if a file is + # referenced, its parents remain referenced too. This + # means has_ref() exits early when a collection is not + # candidate for eviction. + # + # By contrast, in_use() doesn't increment references on + # parents, so it requires a full tree walk to determine if + # a collection is a candidate for eviction. This takes + # .07s for 240000 files, which becomes a major drag when + # cap_cache is being called several times a second and + # there are multiple non-evictable collections in the + # cache. + # + # So it is important for performance that we do the + # has_ref() check first. + if obj.has_ref(True): _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode) return + + if obj.in_use(): + _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode) + return + obj.kernel_invalidate() _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode) obj.clear() @@ -202,7 +220,8 @@ class InodeCache(object): if obj not in self._by_uuid[obj.cache_uuid]: self._by_uuid[obj.cache_uuid].append(obj) self._total += obj.objsize() - _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total) + _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)", + obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries)) self.cap_cache() def touch(self, obj): diff --git a/services/fuse/arvados_fuse/fresh.py b/services/fuse/arvados_fuse/fresh.py index 8b680f0663..2a3a19c54c 100644 --- a/services/fuse/arvados_fuse/fresh.py +++ b/services/fuse/arvados_fuse/fresh.py @@ -59,6 +59,10 @@ class FreshBase(object): * Clear the object contents (invalidates the object) """ + + __slots__ = ("_stale", "_poll", "_last_update", "_atime", "_poll_time", "use_count", + "ref_count", "dead", "cache_size", "cache_uuid", "allow_attr_cache") + def __init__(self): self._stale = True self._poll = False diff --git a/services/fuse/arvados_fuse/fusefile.py b/services/fuse/arvados_fuse/fusefile.py index 5855361760..cedb4fb451 100644 --- a/services/fuse/arvados_fuse/fusefile.py +++ b/services/fuse/arvados_fuse/fusefile.py @@ -15,6 +15,8 @@ _logger = logging.getLogger('arvados.arvados_fuse') class File(FreshBase): """Base for file objects.""" + __slots__ = ("inode", "parent_inode", "_mtime") + def __init__(self, parent_inode, _mtime=0): super(File, self).__init__() self.inode = None @@ -46,6 +48,8 @@ class File(FreshBase): class FuseArvadosFile(File): """Wraps a ArvadosFile.""" + __slots__ = ('arvfile',) + def __init__(self, parent_inode, arvfile, _mtime): super(FuseArvadosFile, self).__init__(parent_inode, _mtime) self.arvfile = arvfile diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go index c4eb8b23d5..f18d82c06b 100644 --- a/services/keepstore/azure_blob_volume.go +++ b/services/keepstore/azure_blob_volume.go @@ -105,6 +105,7 @@ type AzureBlobVolume struct { AzureReplication int ReadOnly bool RequestTimeout arvados.Duration + StorageClasses []string azClient storage.Client container *azureContainer @@ -590,6 +591,11 @@ func (v *AzureBlobVolume) Replication() int { return v.AzureReplication } +// GetStorageClasses implements Volume +func (v *AzureBlobVolume) GetStorageClasses() []string { + return v.StorageClasses +} + // If possible, translate an Azure SDK error to a recognizable error // like os.ErrNotExist. func (v *AzureBlobVolume) translateError(err error) error { diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go index 5c6e2ab44c..60a7911768 100644 --- a/services/keepstore/azure_blob_volume_test.go +++ b/services/keepstore/azure_blob_volume_test.go @@ -27,6 +27,7 @@ import ( "time" "github.com/Azure/azure-sdk-for-go/storage" + "github.com/ghodss/yaml" check "gopkg.in/check.v1" ) @@ -707,6 +708,18 @@ func (s *StubbedAzureBlobSuite) TestStats(c *check.C) { c.Check(stats(), check.Matches, `.*"InBytes":6,.*`) } +func (s *StubbedAzureBlobSuite) TestConfig(c *check.C) { + var cfg Config + err := yaml.Unmarshal([]byte(` +Volumes: + - Type: Azure + StorageClasses: ["class_a", "class_b"] +`), &cfg) + + c.Check(err, check.IsNil) + c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"}) +} + func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) { v.azHandler.PutRaw(v.ContainerName, locator, data) } diff --git a/services/keepstore/mounts_test.go b/services/keepstore/mounts_test.go index 883aa712e3..66a212456d 100644 --- a/services/keepstore/mounts_test.go +++ b/services/keepstore/mounts_test.go @@ -46,11 +46,11 @@ func (s *MountsSuite) TestMounts(c *check.C) { resp := s.call("GET", "/mounts", "", nil) c.Check(resp.Code, check.Equals, http.StatusOK) var mntList []struct { - UUID string - DeviceID string - ReadOnly bool - Replication int - Tier int + UUID string + DeviceID string + ReadOnly bool + Replication int + StorageClasses []string } err := json.Unmarshal(resp.Body.Bytes(), &mntList) c.Assert(err, check.IsNil) @@ -61,7 +61,7 @@ func (s *MountsSuite) TestMounts(c *check.C) { c.Check(m.DeviceID, check.Equals, "mock-device-id") c.Check(m.ReadOnly, check.Equals, false) c.Check(m.Replication, check.Equals, 1) - c.Check(m.Tier, check.Equals, 1) + c.Check(m.StorageClasses, check.DeepEquals, []string{"default"}) } c.Check(mntList[0].UUID, check.Not(check.Equals), mntList[1].UUID) diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go index 90e8a1b4f6..a60b2fc27e 100644 --- a/services/keepstore/s3_volume.go +++ b/services/keepstore/s3_volume.go @@ -152,6 +152,7 @@ type S3Volume struct { RaceWindow arvados.Duration ReadOnly bool UnsafeDelete bool + StorageClasses []string bucket *s3bucket @@ -686,6 +687,11 @@ func (v *S3Volume) Replication() int { return v.S3Replication } +// GetStorageClasses implements Volume +func (v *S3Volume) GetStorageClasses() []string { + return v.StorageClasses +} + var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`) func (v *S3Volume) isKeepBlock(s string) bool { diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go index acc1b11df3..4081e1e63c 100644 --- a/services/keepstore/s3_volume_test.go +++ b/services/keepstore/s3_volume_test.go @@ -19,6 +19,7 @@ import ( "git.curoverse.com/arvados.git/sdk/go/arvados" "github.com/AdRoll/goamz/s3" "github.com/AdRoll/goamz/s3/s3test" + "github.com/ghodss/yaml" check "gopkg.in/check.v1" ) @@ -435,6 +436,18 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration, return v } +func (s *StubbedS3Suite) TestConfig(c *check.C) { + var cfg Config + err := yaml.Unmarshal([]byte(` +Volumes: + - Type: S3 + StorageClasses: ["class_a", "class_b"] +`), &cfg) + + c.Check(err, check.IsNil) + c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"}) +} + func (v *TestableS3Volume) Start() error { tmp, err := ioutil.TempFile("", "keepstore") v.c.Assert(err, check.IsNil) diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go index 69802abdd1..1f8fba5d06 100644 --- a/services/keepstore/volume.go +++ b/services/keepstore/volume.go @@ -240,6 +240,9 @@ type Volume interface { // Return a globally unique ID of the underlying storage // device if possible, otherwise "". DeviceID() string + + // Get the storage classes associated with this volume + GetStorageClasses() []string } // A VolumeWithExamples provides example configs to display in the @@ -284,12 +287,12 @@ type VolumeManager interface { // A VolumeMount is an attachment of a Volume to a VolumeManager. type VolumeMount struct { - UUID string - DeviceID string - ReadOnly bool - Replication int - Tier int - volume Volume + UUID string + DeviceID string + ReadOnly bool + Replication int + StorageClasses []string + volume Volume } // Generate a UUID the way API server would for a "KeepVolumeMount" @@ -326,13 +329,17 @@ func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager { } vm.mountMap = make(map[string]*VolumeMount) for _, v := range volumes { + sc := v.GetStorageClasses() + if len(sc) == 0 { + sc = []string{"default"} + } mnt := &VolumeMount{ - UUID: (*VolumeMount)(nil).generateUUID(), - DeviceID: v.DeviceID(), - ReadOnly: !v.Writable(), - Replication: v.Replication(), - Tier: 1, - volume: v, + UUID: (*VolumeMount)(nil).generateUUID(), + DeviceID: v.DeviceID(), + ReadOnly: !v.Writable(), + Replication: v.Replication(), + StorageClasses: sc, + volume: v, } vm.iostats[v] = &ioStats{} vm.mounts = append(vm.mounts, mnt) diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go index baed6a71b6..43ddd090cc 100644 --- a/services/keepstore/volume_test.go +++ b/services/keepstore/volume_test.go @@ -241,3 +241,7 @@ func (v *MockVolume) Replication() int { func (v *MockVolume) EmptyTrash() { } + +func (v *MockVolume) GetStorageClasses() []string { + return nil +} diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go index ea9aa489c5..b4f18ad13e 100644 --- a/services/keepstore/volume_unix.go +++ b/services/keepstore/volume_unix.go @@ -110,6 +110,7 @@ type UnixVolume struct { ReadOnly bool Serialize bool DirectoryReplication int + StorageClasses []string // something to lock during IO, typically a sync.Mutex (or nil // to skip locking) @@ -644,6 +645,11 @@ func (v *UnixVolume) Replication() int { return v.DirectoryReplication } +// GetStorageClasses implements Volume +func (v *UnixVolume) GetStorageClasses() []string { + return v.StorageClasses +} + // InternalStats returns I/O and filesystem ops counters. func (v *UnixVolume) InternalStats() interface{} { return &v.os.stats diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go index ea3d91d98c..7f1cd21964 100644 --- a/services/keepstore/volume_unix_test.go +++ b/services/keepstore/volume_unix_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/ghodss/yaml" check "gopkg.in/check.v1" ) @@ -427,3 +428,15 @@ func (s *UnixVolumeSuite) TestStats(c *check.C) { c.Check(err, check.IsNil) c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`) } + +func (s *UnixVolumeSuite) TestConfig(c *check.C) { + var cfg Config + err := yaml.Unmarshal([]byte(` +Volumes: + - Type: Directory + StorageClasses: ["class_a", "class_b"] +`), &cfg) + + c.Check(err, check.IsNil) + c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"}) +}