From: Fuad Muhic Date: Thu, 22 Feb 2018 15:27:08 +0000 (+0100) Subject: Merge branch 'master' of git.curoverse.com:arvados into 13076-r-autogen-api X-Git-Tag: 1.2.0~202^2~24 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/4df7d22706adfd8654b87d22c3400a9dc33035a9?hp=6c786c23b5b44ac27570be617d79d5cf5f9e2c7f Merge branch 'master' of git.curoverse.com:arvados into 13076-r-autogen-api Arvados-DCO-1.1-Signed-off-by: Fuad Muhic --- diff --git a/sdk/go/arvados/collection_fs_test.go b/sdk/go/arvados/collection_fs_test.go index bd5d08bcf5..5b9d0e2eff 100644 --- a/sdk/go/arvados/collection_fs_test.go +++ b/sdk/go/arvados/collection_fs_test.go @@ -433,7 +433,7 @@ func (s *CollectionFSSuite) TestMkdir(c *check.C) { err = s.fs.Remove("foo/bar") c.Check(err, check.IsNil) - // mkdir succeds after the file is deleted + // mkdir succeeds after the file is deleted err = s.fs.Mkdir("foo/bar", 0755) c.Check(err, check.IsNil) diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 351f7f5dda..e8e95afc70 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -541,7 +541,7 @@ class KeepClient(object): self._lastheadername = name self._headers[name] = value # Returning None implies all bytes were written - + class KeepWriterQueue(queue.Queue): def __init__(self, copies): @@ -552,19 +552,19 @@ class KeepClient(object): self.successful_copies_lock = threading.Lock() self.pending_tries = copies self.pending_tries_notification = threading.Condition() - + def write_success(self, response, replicas_nr): with self.successful_copies_lock: self.successful_copies += replicas_nr self.response = response with self.pending_tries_notification: self.pending_tries_notification.notify_all() - + def write_fail(self, ks): with self.pending_tries_notification: self.pending_tries += 1 self.pending_tries_notification.notify() - + def pending_copies(self): with self.successful_copies_lock: return self.wanted_copies - self.successful_copies @@ -613,25 +613,25 @@ class KeepClient(object): for _ in range(num_threads): w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout) self.workers.append(w) - + def add_task(self, ks, service_root): self.queue.put((ks, service_root)) self.total_task_nr += 1 - + def done(self): return self.queue.successful_copies - + def join(self): # Start workers for worker in self.workers: worker.start() # Wait for finished work self.queue.join() - + def response(self): return self.queue.response - - + + class KeepWriterThread(threading.Thread): TaskFailed = RuntimeError() @@ -996,84 +996,90 @@ class KeepClient(object): self.get_counter.add(1) - locator = KeepLocator(loc_s) - if method == "GET": - slot, first = self.block_cache.reserve_cache(locator.md5sum) - if not first: - self.hits_counter.add(1) - v = slot.get() - return v - - self.misses_counter.add(1) - - headers = { - 'X-Request-Id': (request_id or - (hasattr(self, 'api_client') and self.api_client.request_id) or - arvados.util.new_request_id()), - } - - # If the locator has hints specifying a prefix (indicating a - # remote keepproxy) or the UUID of a local gateway service, - # read data from the indicated service(s) instead of the usual - # list of local disk services. - hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:]) - for hint in locator.hints if hint.startswith('K@') and len(hint) == 7] - hint_roots.extend([self._gateway_services[hint[2:]]['_service_root'] - for hint in locator.hints if ( - hint.startswith('K@') and - len(hint) == 29 and - self._gateway_services.get(hint[2:]) - )]) - # Map root URLs to their KeepService objects. - roots_map = { - root: self.KeepService(root, self._user_agent_pool, - upload_counter=self.upload_counter, - download_counter=self.download_counter, - headers=headers) - for root in hint_roots - } - - # See #3147 for a discussion of the loop implementation. Highlights: - # * Refresh the list of Keep services after each failure, in case - # it's being updated. - # * Retry until we succeed, we're out of retries, or every available - # service has returned permanent failure. - sorted_roots = [] - roots_map = {} + slot = None blob = None - loop = retry.RetryLoop(num_retries, self._check_loop_result, - backoff_start=2) - for tries_left in loop: - try: - sorted_roots = self.map_new_services( - roots_map, locator, - force_rebuild=(tries_left < num_retries), - need_writable=False, - headers=headers) - except Exception as error: - loop.save_result(error) - continue + try: + locator = KeepLocator(loc_s) + if method == "GET": + slot, first = self.block_cache.reserve_cache(locator.md5sum) + if not first: + self.hits_counter.add(1) + blob = slot.get() + if blob is None: + raise arvados.errors.KeepReadError( + "failed to read {}".format(loc_s)) + return blob + + self.misses_counter.add(1) + + headers = { + 'X-Request-Id': (request_id or + (hasattr(self, 'api_client') and self.api_client.request_id) or + arvados.util.new_request_id()), + } + + # If the locator has hints specifying a prefix (indicating a + # remote keepproxy) or the UUID of a local gateway service, + # read data from the indicated service(s) instead of the usual + # list of local disk services. + hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:]) + for hint in locator.hints if hint.startswith('K@') and len(hint) == 7] + hint_roots.extend([self._gateway_services[hint[2:]]['_service_root'] + for hint in locator.hints if ( + hint.startswith('K@') and + len(hint) == 29 and + self._gateway_services.get(hint[2:]) + )]) + # Map root URLs to their KeepService objects. + roots_map = { + root: self.KeepService(root, self._user_agent_pool, + upload_counter=self.upload_counter, + download_counter=self.download_counter, + headers=headers) + for root in hint_roots + } + + # See #3147 for a discussion of the loop implementation. Highlights: + # * Refresh the list of Keep services after each failure, in case + # it's being updated. + # * Retry until we succeed, we're out of retries, or every available + # service has returned permanent failure. + sorted_roots = [] + roots_map = {} + loop = retry.RetryLoop(num_retries, self._check_loop_result, + backoff_start=2) + for tries_left in loop: + try: + sorted_roots = self.map_new_services( + roots_map, locator, + force_rebuild=(tries_left < num_retries), + need_writable=False, + headers=headers) + except Exception as error: + loop.save_result(error) + continue - # Query KeepService objects that haven't returned - # permanent failure, in our specified shuffle order. - services_to_try = [roots_map[root] - for root in sorted_roots - if roots_map[root].usable()] - for keep_service in services_to_try: - blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left)) - if blob is not None: - break - loop.save_result((blob, len(services_to_try))) - - # Always cache the result, then return it if we succeeded. - if method == "GET": - slot.set(blob) - self.block_cache.cap_cache() - if loop.success(): - if method == "HEAD": - return True - else: - return blob + # Query KeepService objects that haven't returned + # permanent failure, in our specified shuffle order. + services_to_try = [roots_map[root] + for root in sorted_roots + if roots_map[root].usable()] + for keep_service in services_to_try: + blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left)) + if blob is not None: + break + loop.save_result((blob, len(services_to_try))) + + # Always cache the result, then return it if we succeeded. + if loop.success(): + if method == "HEAD": + return True + else: + return blob + finally: + if slot is not None: + slot.set(blob) + self.block_cache.cap_cache() # Q: Including 403 is necessary for the Keep tests to continue # passing, but maybe they should expect KeepReadError instead? @@ -1144,7 +1150,7 @@ class KeepClient(object): loop.save_result(error) continue - writer_pool = KeepClient.KeepWriterThreadPool(data=data, + writer_pool = KeepClient.KeepWriterThreadPool(data=data, data_hash=data_hash, copies=copies - done, max_service_replicas=self.max_replicas_per_service, diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py index e0bb734b21..872c93bae2 100644 --- a/sdk/python/tests/test_keep_client.py +++ b/sdk/python/tests/test_keep_client.py @@ -1171,7 +1171,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock): def finished(self): return False - + def setUp(self): self.copies = 3 self.pool = arvados.KeepClient.KeepWriterThreadPool( @@ -1215,7 +1215,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock): self.pool.add_task(ks, None) self.pool.join() self.assertEqual(self.pool.done(), self.copies-1) - + @tutil.skip_sleep class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock): @@ -1250,3 +1250,27 @@ class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock): with self.assertRaises(arvados.errors.KeepWriteError): self.keep_client.put('foo', num_retries=1, copies=2) self.assertEqual(2, req_mock.call_count) + +class KeepClientAPIErrorTest(unittest.TestCase): + def test_api_fail(self): + class ApiMock(object): + def __getattr__(self, r): + if r == "api_token": + return "abc" + else: + raise arvados.errors.KeepReadError() + keep_client = arvados.KeepClient(api_client=ApiMock(), + proxy='', local_store='') + + # The bug this is testing for is that if an API (not + # keepstore) exception is thrown as part of a get(), the next + # attempt to get that same block will result in a deadlock. + # This is why there are two get()s in a row. Unfortunately, + # the failure mode for this test is that the test suite + # deadlocks, there isn't a good way to avoid that without + # adding a special case that has no use except for this test. + + with self.assertRaises(arvados.errors.KeepReadError): + keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3") + with self.assertRaises(arvados.errors.KeepReadError): + keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3") diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go index c4eb8b23d5..f18d82c06b 100644 --- a/services/keepstore/azure_blob_volume.go +++ b/services/keepstore/azure_blob_volume.go @@ -105,6 +105,7 @@ type AzureBlobVolume struct { AzureReplication int ReadOnly bool RequestTimeout arvados.Duration + StorageClasses []string azClient storage.Client container *azureContainer @@ -590,6 +591,11 @@ func (v *AzureBlobVolume) Replication() int { return v.AzureReplication } +// GetStorageClasses implements Volume +func (v *AzureBlobVolume) GetStorageClasses() []string { + return v.StorageClasses +} + // If possible, translate an Azure SDK error to a recognizable error // like os.ErrNotExist. func (v *AzureBlobVolume) translateError(err error) error { diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go index 5c6e2ab44c..60a7911768 100644 --- a/services/keepstore/azure_blob_volume_test.go +++ b/services/keepstore/azure_blob_volume_test.go @@ -27,6 +27,7 @@ import ( "time" "github.com/Azure/azure-sdk-for-go/storage" + "github.com/ghodss/yaml" check "gopkg.in/check.v1" ) @@ -707,6 +708,18 @@ func (s *StubbedAzureBlobSuite) TestStats(c *check.C) { c.Check(stats(), check.Matches, `.*"InBytes":6,.*`) } +func (s *StubbedAzureBlobSuite) TestConfig(c *check.C) { + var cfg Config + err := yaml.Unmarshal([]byte(` +Volumes: + - Type: Azure + StorageClasses: ["class_a", "class_b"] +`), &cfg) + + c.Check(err, check.IsNil) + c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"}) +} + func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) { v.azHandler.PutRaw(v.ContainerName, locator, data) } diff --git a/services/keepstore/mounts_test.go b/services/keepstore/mounts_test.go index 883aa712e3..66a212456d 100644 --- a/services/keepstore/mounts_test.go +++ b/services/keepstore/mounts_test.go @@ -46,11 +46,11 @@ func (s *MountsSuite) TestMounts(c *check.C) { resp := s.call("GET", "/mounts", "", nil) c.Check(resp.Code, check.Equals, http.StatusOK) var mntList []struct { - UUID string - DeviceID string - ReadOnly bool - Replication int - Tier int + UUID string + DeviceID string + ReadOnly bool + Replication int + StorageClasses []string } err := json.Unmarshal(resp.Body.Bytes(), &mntList) c.Assert(err, check.IsNil) @@ -61,7 +61,7 @@ func (s *MountsSuite) TestMounts(c *check.C) { c.Check(m.DeviceID, check.Equals, "mock-device-id") c.Check(m.ReadOnly, check.Equals, false) c.Check(m.Replication, check.Equals, 1) - c.Check(m.Tier, check.Equals, 1) + c.Check(m.StorageClasses, check.DeepEquals, []string{"default"}) } c.Check(mntList[0].UUID, check.Not(check.Equals), mntList[1].UUID) diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go index 90e8a1b4f6..a60b2fc27e 100644 --- a/services/keepstore/s3_volume.go +++ b/services/keepstore/s3_volume.go @@ -152,6 +152,7 @@ type S3Volume struct { RaceWindow arvados.Duration ReadOnly bool UnsafeDelete bool + StorageClasses []string bucket *s3bucket @@ -686,6 +687,11 @@ func (v *S3Volume) Replication() int { return v.S3Replication } +// GetStorageClasses implements Volume +func (v *S3Volume) GetStorageClasses() []string { + return v.StorageClasses +} + var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`) func (v *S3Volume) isKeepBlock(s string) bool { diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go index acc1b11df3..4081e1e63c 100644 --- a/services/keepstore/s3_volume_test.go +++ b/services/keepstore/s3_volume_test.go @@ -19,6 +19,7 @@ import ( "git.curoverse.com/arvados.git/sdk/go/arvados" "github.com/AdRoll/goamz/s3" "github.com/AdRoll/goamz/s3/s3test" + "github.com/ghodss/yaml" check "gopkg.in/check.v1" ) @@ -435,6 +436,18 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration, return v } +func (s *StubbedS3Suite) TestConfig(c *check.C) { + var cfg Config + err := yaml.Unmarshal([]byte(` +Volumes: + - Type: S3 + StorageClasses: ["class_a", "class_b"] +`), &cfg) + + c.Check(err, check.IsNil) + c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"}) +} + func (v *TestableS3Volume) Start() error { tmp, err := ioutil.TempFile("", "keepstore") v.c.Assert(err, check.IsNil) diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go index 69802abdd1..1f8fba5d06 100644 --- a/services/keepstore/volume.go +++ b/services/keepstore/volume.go @@ -240,6 +240,9 @@ type Volume interface { // Return a globally unique ID of the underlying storage // device if possible, otherwise "". DeviceID() string + + // Get the storage classes associated with this volume + GetStorageClasses() []string } // A VolumeWithExamples provides example configs to display in the @@ -284,12 +287,12 @@ type VolumeManager interface { // A VolumeMount is an attachment of a Volume to a VolumeManager. type VolumeMount struct { - UUID string - DeviceID string - ReadOnly bool - Replication int - Tier int - volume Volume + UUID string + DeviceID string + ReadOnly bool + Replication int + StorageClasses []string + volume Volume } // Generate a UUID the way API server would for a "KeepVolumeMount" @@ -326,13 +329,17 @@ func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager { } vm.mountMap = make(map[string]*VolumeMount) for _, v := range volumes { + sc := v.GetStorageClasses() + if len(sc) == 0 { + sc = []string{"default"} + } mnt := &VolumeMount{ - UUID: (*VolumeMount)(nil).generateUUID(), - DeviceID: v.DeviceID(), - ReadOnly: !v.Writable(), - Replication: v.Replication(), - Tier: 1, - volume: v, + UUID: (*VolumeMount)(nil).generateUUID(), + DeviceID: v.DeviceID(), + ReadOnly: !v.Writable(), + Replication: v.Replication(), + StorageClasses: sc, + volume: v, } vm.iostats[v] = &ioStats{} vm.mounts = append(vm.mounts, mnt) diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go index baed6a71b6..43ddd090cc 100644 --- a/services/keepstore/volume_test.go +++ b/services/keepstore/volume_test.go @@ -241,3 +241,7 @@ func (v *MockVolume) Replication() int { func (v *MockVolume) EmptyTrash() { } + +func (v *MockVolume) GetStorageClasses() []string { + return nil +} diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go index ea9aa489c5..b4f18ad13e 100644 --- a/services/keepstore/volume_unix.go +++ b/services/keepstore/volume_unix.go @@ -110,6 +110,7 @@ type UnixVolume struct { ReadOnly bool Serialize bool DirectoryReplication int + StorageClasses []string // something to lock during IO, typically a sync.Mutex (or nil // to skip locking) @@ -644,6 +645,11 @@ func (v *UnixVolume) Replication() int { return v.DirectoryReplication } +// GetStorageClasses implements Volume +func (v *UnixVolume) GetStorageClasses() []string { + return v.StorageClasses +} + // InternalStats returns I/O and filesystem ops counters. func (v *UnixVolume) InternalStats() interface{} { return &v.os.stats diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go index ea3d91d98c..7f1cd21964 100644 --- a/services/keepstore/volume_unix_test.go +++ b/services/keepstore/volume_unix_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/ghodss/yaml" check "gopkg.in/check.v1" ) @@ -427,3 +428,15 @@ func (s *UnixVolumeSuite) TestStats(c *check.C) { c.Check(err, check.IsNil) c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`) } + +func (s *UnixVolumeSuite) TestConfig(c *check.C) { + var cfg Config + err := yaml.Unmarshal([]byte(` +Volumes: + - Type: Directory + StorageClasses: ["class_a", "class_b"] +`), &cfg) + + c.Check(err, check.IsNil) + c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"}) +} diff --git a/tools/sync-groups/sync-groups.go b/tools/sync-groups/sync-groups.go index 10569b2e13..af7b2e92eb 100644 --- a/tools/sync-groups/sync-groups.go +++ b/tools/sync-groups/sync-groups.go @@ -217,7 +217,7 @@ func SetParentGroup(cfg *ConfigParams) error { return fmt.Errorf("error searching for parent group: %s", err) } if len(gl.Items) == 0 { - // Default parent group not existant, create one. + // Default parent group does not exist, create it. if cfg.Verbose { log.Println("Default parent group not found, creating...") }