Merge branch 'master' of git.curoverse.com:arvados into 13076-r-autogen-api
authorFuad Muhic <fmuhic@capeannenterprises.com>
Thu, 22 Feb 2018 15:27:08 +0000 (16:27 +0100)
committerFuad Muhic <fmuhic@capeannenterprises.com>
Thu, 22 Feb 2018 15:27:08 +0000 (16:27 +0100)
Arvados-DCO-1.1-Signed-off-by: Fuad Muhic <fmuhic@capeannenterprises.com>

13 files changed:
sdk/go/arvados/collection_fs_test.go
sdk/python/arvados/keep.py
sdk/python/tests/test_keep_client.py
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/mounts_test.go
services/keepstore/s3_volume.go
services/keepstore/s3_volume_test.go
services/keepstore/volume.go
services/keepstore/volume_test.go
services/keepstore/volume_unix.go
services/keepstore/volume_unix_test.go
tools/sync-groups/sync-groups.go

index bd5d08bcf5e8f278606be6bac2037ce7b9215ecb..5b9d0e2effc153ca5322a8ffea3955301def2000 100644 (file)
@@ -433,7 +433,7 @@ func (s *CollectionFSSuite) TestMkdir(c *check.C) {
        err = s.fs.Remove("foo/bar")
        c.Check(err, check.IsNil)
 
-       // mkdir succeds after the file is deleted
+       // mkdir succeeds after the file is deleted
        err = s.fs.Mkdir("foo/bar", 0755)
        c.Check(err, check.IsNil)
 
index 351f7f5dda8a96ebb805fd4d4896380cb3addbb8..e8e95afc7013650c67e753a3f2de4e7ec227fc44 100644 (file)
@@ -541,7 +541,7 @@ class KeepClient(object):
             self._lastheadername = name
             self._headers[name] = value
             # Returning None implies all bytes were written
-    
+
 
     class KeepWriterQueue(queue.Queue):
         def __init__(self, copies):
@@ -552,19 +552,19 @@ class KeepClient(object):
             self.successful_copies_lock = threading.Lock()
             self.pending_tries = copies
             self.pending_tries_notification = threading.Condition()
-        
+
         def write_success(self, response, replicas_nr):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
-        
+
         def write_fail(self, ks):
             with self.pending_tries_notification:
                 self.pending_tries += 1
                 self.pending_tries_notification.notify()
-        
+
         def pending_copies(self):
             with self.successful_copies_lock:
                 return self.wanted_copies - self.successful_copies
@@ -613,25 +613,25 @@ class KeepClient(object):
             for _ in range(num_threads):
                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
                 self.workers.append(w)
-        
+
         def add_task(self, ks, service_root):
             self.queue.put((ks, service_root))
             self.total_task_nr += 1
-        
+
         def done(self):
             return self.queue.successful_copies
-        
+
         def join(self):
             # Start workers
             for worker in self.workers:
                 worker.start()
             # Wait for finished work
             self.queue.join()
-        
+
         def response(self):
             return self.queue.response
-    
-    
+
+
     class KeepWriterThread(threading.Thread):
         TaskFailed = RuntimeError()
 
@@ -996,84 +996,90 @@ class KeepClient(object):
 
         self.get_counter.add(1)
 
-        locator = KeepLocator(loc_s)
-        if method == "GET":
-            slot, first = self.block_cache.reserve_cache(locator.md5sum)
-            if not first:
-                self.hits_counter.add(1)
-                v = slot.get()
-                return v
-
-        self.misses_counter.add(1)
-
-        headers = {
-            'X-Request-Id': (request_id or
-                             (hasattr(self, 'api_client') and self.api_client.request_id) or
-                             arvados.util.new_request_id()),
-        }
-
-        # If the locator has hints specifying a prefix (indicating a
-        # remote keepproxy) or the UUID of a local gateway service,
-        # read data from the indicated service(s) instead of the usual
-        # list of local disk services.
-        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
-                      for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
-        hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
-                           for hint in locator.hints if (
-                                   hint.startswith('K@') and
-                                   len(hint) == 29 and
-                                   self._gateway_services.get(hint[2:])
-                                   )])
-        # Map root URLs to their KeepService objects.
-        roots_map = {
-            root: self.KeepService(root, self._user_agent_pool,
-                                   upload_counter=self.upload_counter,
-                                   download_counter=self.download_counter,
-                                   headers=headers)
-            for root in hint_roots
-        }
-
-        # See #3147 for a discussion of the loop implementation.  Highlights:
-        # * Refresh the list of Keep services after each failure, in case
-        #   it's being updated.
-        # * Retry until we succeed, we're out of retries, or every available
-        #   service has returned permanent failure.
-        sorted_roots = []
-        roots_map = {}
+        slot = None
         blob = None
-        loop = retry.RetryLoop(num_retries, self._check_loop_result,
-                               backoff_start=2)
-        for tries_left in loop:
-            try:
-                sorted_roots = self.map_new_services(
-                    roots_map, locator,
-                    force_rebuild=(tries_left < num_retries),
-                    need_writable=False,
-                    headers=headers)
-            except Exception as error:
-                loop.save_result(error)
-                continue
+        try:
+            locator = KeepLocator(loc_s)
+            if method == "GET":
+                slot, first = self.block_cache.reserve_cache(locator.md5sum)
+                if not first:
+                    self.hits_counter.add(1)
+                    blob = slot.get()
+                    if blob is None:
+                        raise arvados.errors.KeepReadError(
+                            "failed to read {}".format(loc_s))
+                    return blob
+
+            self.misses_counter.add(1)
+
+            headers = {
+                'X-Request-Id': (request_id or
+                                 (hasattr(self, 'api_client') and self.api_client.request_id) or
+                                 arvados.util.new_request_id()),
+            }
+
+            # If the locator has hints specifying a prefix (indicating a
+            # remote keepproxy) or the UUID of a local gateway service,
+            # read data from the indicated service(s) instead of the usual
+            # list of local disk services.
+            hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+                          for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+            hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+                               for hint in locator.hints if (
+                                       hint.startswith('K@') and
+                                       len(hint) == 29 and
+                                       self._gateway_services.get(hint[2:])
+                                       )])
+            # Map root URLs to their KeepService objects.
+            roots_map = {
+                root: self.KeepService(root, self._user_agent_pool,
+                                       upload_counter=self.upload_counter,
+                                       download_counter=self.download_counter,
+                                       headers=headers)
+                for root in hint_roots
+            }
+
+            # See #3147 for a discussion of the loop implementation.  Highlights:
+            # * Refresh the list of Keep services after each failure, in case
+            #   it's being updated.
+            # * Retry until we succeed, we're out of retries, or every available
+            #   service has returned permanent failure.
+            sorted_roots = []
+            roots_map = {}
+            loop = retry.RetryLoop(num_retries, self._check_loop_result,
+                                   backoff_start=2)
+            for tries_left in loop:
+                try:
+                    sorted_roots = self.map_new_services(
+                        roots_map, locator,
+                        force_rebuild=(tries_left < num_retries),
+                        need_writable=False,
+                        headers=headers)
+                except Exception as error:
+                    loop.save_result(error)
+                    continue
 
-            # Query KeepService objects that haven't returned
-            # permanent failure, in our specified shuffle order.
-            services_to_try = [roots_map[root]
-                               for root in sorted_roots
-                               if roots_map[root].usable()]
-            for keep_service in services_to_try:
-                blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
-                if blob is not None:
-                    break
-            loop.save_result((blob, len(services_to_try)))
-
-        # Always cache the result, then return it if we succeeded.
-        if method == "GET":
-            slot.set(blob)
-            self.block_cache.cap_cache()
-        if loop.success():
-            if method == "HEAD":
-                return True
-            else:
-                return blob
+                # Query KeepService objects that haven't returned
+                # permanent failure, in our specified shuffle order.
+                services_to_try = [roots_map[root]
+                                   for root in sorted_roots
+                                   if roots_map[root].usable()]
+                for keep_service in services_to_try:
+                    blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
+                    if blob is not None:
+                        break
+                loop.save_result((blob, len(services_to_try)))
+
+            # Always cache the result, then return it if we succeeded.
+            if loop.success():
+                if method == "HEAD":
+                    return True
+                else:
+                    return blob
+        finally:
+            if slot is not None:
+                slot.set(blob)
+                self.block_cache.cap_cache()
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
@@ -1144,7 +1150,7 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
-            writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
+            writer_pool = KeepClient.KeepWriterThreadPool(data=data,
                                                         data_hash=data_hash,
                                                         copies=copies - done,
                                                         max_service_replicas=self.max_replicas_per_service,
index e0bb734b21fbf2671c51a4ce22dd5c954432a488..872c93bae25b5480de1cbf91400f716543415700 100644 (file)
@@ -1171,7 +1171,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
 
         def finished(self):
             return False
-    
+
     def setUp(self):
         self.copies = 3
         self.pool = arvados.KeepClient.KeepWriterThreadPool(
@@ -1215,7 +1215,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
             self.pool.add_task(ks, None)
         self.pool.join()
         self.assertEqual(self.pool.done(), self.copies-1)
-    
+
 
 @tutil.skip_sleep
 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
@@ -1250,3 +1250,27 @@ class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
             with self.assertRaises(arvados.errors.KeepWriteError):
                 self.keep_client.put('foo', num_retries=1, copies=2)
         self.assertEqual(2, req_mock.call_count)
+
+class KeepClientAPIErrorTest(unittest.TestCase):
+    def test_api_fail(self):
+        class ApiMock(object):
+            def __getattr__(self, r):
+                if r == "api_token":
+                    return "abc"
+                else:
+                    raise arvados.errors.KeepReadError()
+        keep_client = arvados.KeepClient(api_client=ApiMock(),
+                                             proxy='', local_store='')
+
+        # The bug this is testing for is that if an API (not
+        # keepstore) exception is thrown as part of a get(), the next
+        # attempt to get that same block will result in a deadlock.
+        # This is why there are two get()s in a row.  Unfortunately,
+        # the failure mode for this test is that the test suite
+        # deadlocks, there isn't a good way to avoid that without
+        # adding a special case that has no use except for this test.
+
+        with self.assertRaises(arvados.errors.KeepReadError):
+            keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
+        with self.assertRaises(arvados.errors.KeepReadError):
+            keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
index c4eb8b23d5fc10078d9891ed2fbc88ef315ba30d..f18d82c06b29b7948a90431be686001b1bd9e572 100644 (file)
@@ -105,6 +105,7 @@ type AzureBlobVolume struct {
        AzureReplication      int
        ReadOnly              bool
        RequestTimeout        arvados.Duration
+       StorageClasses        []string
 
        azClient  storage.Client
        container *azureContainer
@@ -590,6 +591,11 @@ func (v *AzureBlobVolume) Replication() int {
        return v.AzureReplication
 }
 
+// GetStorageClasses implements Volume
+func (v *AzureBlobVolume) GetStorageClasses() []string {
+       return v.StorageClasses
+}
+
 // If possible, translate an Azure SDK error to a recognizable error
 // like os.ErrNotExist.
 func (v *AzureBlobVolume) translateError(err error) error {
index 5c6e2ab44c7cc45e16982a63c808e4eea2b6393e..60a7911768f009ef6209292d6c1e04b6cccbe6e7 100644 (file)
@@ -27,6 +27,7 @@ import (
        "time"
 
        "github.com/Azure/azure-sdk-for-go/storage"
+       "github.com/ghodss/yaml"
        check "gopkg.in/check.v1"
 )
 
@@ -707,6 +708,18 @@ func (s *StubbedAzureBlobSuite) TestStats(c *check.C) {
        c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
 }
 
+func (s *StubbedAzureBlobSuite) TestConfig(c *check.C) {
+       var cfg Config
+       err := yaml.Unmarshal([]byte(`
+Volumes:
+  - Type: Azure
+    StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+       c.Check(err, check.IsNil)
+       c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
+
 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
        v.azHandler.PutRaw(v.ContainerName, locator, data)
 }
index 883aa712e3db0bdcb8fee2eb0a583affd3150e53..66a212456d51c78e09cc12dc41bdccb17bf953df 100644 (file)
@@ -46,11 +46,11 @@ func (s *MountsSuite) TestMounts(c *check.C) {
        resp := s.call("GET", "/mounts", "", nil)
        c.Check(resp.Code, check.Equals, http.StatusOK)
        var mntList []struct {
-               UUID        string
-               DeviceID    string
-               ReadOnly    bool
-               Replication int
-               Tier        int
+               UUID           string
+               DeviceID       string
+               ReadOnly       bool
+               Replication    int
+               StorageClasses []string
        }
        err := json.Unmarshal(resp.Body.Bytes(), &mntList)
        c.Assert(err, check.IsNil)
@@ -61,7 +61,7 @@ func (s *MountsSuite) TestMounts(c *check.C) {
                c.Check(m.DeviceID, check.Equals, "mock-device-id")
                c.Check(m.ReadOnly, check.Equals, false)
                c.Check(m.Replication, check.Equals, 1)
-               c.Check(m.Tier, check.Equals, 1)
+               c.Check(m.StorageClasses, check.DeepEquals, []string{"default"})
        }
        c.Check(mntList[0].UUID, check.Not(check.Equals), mntList[1].UUID)
 
index 90e8a1b4f6913da10b6789a4111a7ac8aa479721..a60b2fc27e321f553c9784691702282ecb39a6e4 100644 (file)
@@ -152,6 +152,7 @@ type S3Volume struct {
        RaceWindow         arvados.Duration
        ReadOnly           bool
        UnsafeDelete       bool
+       StorageClasses     []string
 
        bucket *s3bucket
 
@@ -686,6 +687,11 @@ func (v *S3Volume) Replication() int {
        return v.S3Replication
 }
 
+// GetStorageClasses implements Volume
+func (v *S3Volume) GetStorageClasses() []string {
+       return v.StorageClasses
+}
+
 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
 func (v *S3Volume) isKeepBlock(s string) bool {
index acc1b11df32526c132d763d970915f9f30735437..4081e1e63c4825a08712a93bd552de7818f018d5 100644 (file)
@@ -19,6 +19,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/AdRoll/goamz/s3"
        "github.com/AdRoll/goamz/s3/s3test"
+       "github.com/ghodss/yaml"
        check "gopkg.in/check.v1"
 )
 
@@ -435,6 +436,18 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration,
        return v
 }
 
+func (s *StubbedS3Suite) TestConfig(c *check.C) {
+       var cfg Config
+       err := yaml.Unmarshal([]byte(`
+Volumes:
+  - Type: S3
+    StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+       c.Check(err, check.IsNil)
+       c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
+
 func (v *TestableS3Volume) Start() error {
        tmp, err := ioutil.TempFile("", "keepstore")
        v.c.Assert(err, check.IsNil)
index 69802abdd1b5c4e22422293331d6cb0eec371896..1f8fba5d067c2a0731cb05eeebf81cc76bc315b7 100644 (file)
@@ -240,6 +240,9 @@ type Volume interface {
        // Return a globally unique ID of the underlying storage
        // device if possible, otherwise "".
        DeviceID() string
+
+       // Get the storage classes associated with this volume
+       GetStorageClasses() []string
 }
 
 // A VolumeWithExamples provides example configs to display in the
@@ -284,12 +287,12 @@ type VolumeManager interface {
 
 // A VolumeMount is an attachment of a Volume to a VolumeManager.
 type VolumeMount struct {
-       UUID        string
-       DeviceID    string
-       ReadOnly    bool
-       Replication int
-       Tier        int
-       volume      Volume
+       UUID           string
+       DeviceID       string
+       ReadOnly       bool
+       Replication    int
+       StorageClasses []string
+       volume         Volume
 }
 
 // Generate a UUID the way API server would for a "KeepVolumeMount"
@@ -326,13 +329,17 @@ func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager {
        }
        vm.mountMap = make(map[string]*VolumeMount)
        for _, v := range volumes {
+               sc := v.GetStorageClasses()
+               if len(sc) == 0 {
+                       sc = []string{"default"}
+               }
                mnt := &VolumeMount{
-                       UUID:        (*VolumeMount)(nil).generateUUID(),
-                       DeviceID:    v.DeviceID(),
-                       ReadOnly:    !v.Writable(),
-                       Replication: v.Replication(),
-                       Tier:        1,
-                       volume:      v,
+                       UUID:           (*VolumeMount)(nil).generateUUID(),
+                       DeviceID:       v.DeviceID(),
+                       ReadOnly:       !v.Writable(),
+                       Replication:    v.Replication(),
+                       StorageClasses: sc,
+                       volume:         v,
                }
                vm.iostats[v] = &ioStats{}
                vm.mounts = append(vm.mounts, mnt)
index baed6a71b60ae556a28b0cc3478e147ad77f90af..43ddd090cc1cfd22419e80aa86f1e838ffebd479 100644 (file)
@@ -241,3 +241,7 @@ func (v *MockVolume) Replication() int {
 
 func (v *MockVolume) EmptyTrash() {
 }
+
+func (v *MockVolume) GetStorageClasses() []string {
+       return nil
+}
index ea9aa489c5af8c357cd8c195851e94456537ce2a..b4f18ad13e6d0c93e0c0b40023b9153d3c7a6d99 100644 (file)
@@ -110,6 +110,7 @@ type UnixVolume struct {
        ReadOnly             bool
        Serialize            bool
        DirectoryReplication int
+       StorageClasses       []string
 
        // something to lock during IO, typically a sync.Mutex (or nil
        // to skip locking)
@@ -644,6 +645,11 @@ func (v *UnixVolume) Replication() int {
        return v.DirectoryReplication
 }
 
+// GetStorageClasses implements Volume
+func (v *UnixVolume) GetStorageClasses() []string {
+       return v.StorageClasses
+}
+
 // InternalStats returns I/O and filesystem ops counters.
 func (v *UnixVolume) InternalStats() interface{} {
        return &v.os.stats
index ea3d91d98c2e42deed99417c87254af42b8148e8..7f1cd219644ab241f2c0a8a0e2353c8f4c16844f 100644 (file)
@@ -19,6 +19,7 @@ import (
        "testing"
        "time"
 
+       "github.com/ghodss/yaml"
        check "gopkg.in/check.v1"
 )
 
@@ -427,3 +428,15 @@ func (s *UnixVolumeSuite) TestStats(c *check.C) {
        c.Check(err, check.IsNil)
        c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
 }
+
+func (s *UnixVolumeSuite) TestConfig(c *check.C) {
+       var cfg Config
+       err := yaml.Unmarshal([]byte(`
+Volumes:
+  - Type: Directory
+    StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+       c.Check(err, check.IsNil)
+       c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
index 10569b2e139b89a2904820ab62dab6cbc3a747b5..af7b2e92ebeb0cb60697ac03dacc25e33553782b 100644 (file)
@@ -217,7 +217,7 @@ func SetParentGroup(cfg *ConfigParams) error {
                        return fmt.Errorf("error searching for parent group: %s", err)
                }
                if len(gl.Items) == 0 {
-                       // Default parent group not existant, create one.
+                       // Default parent group does not exist, create it.
                        if cfg.Verbose {
                                log.Println("Default parent group not found, creating...")
                        }