Merge branch '11645-keepstore-storageclasses' closes #11645
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 21 Feb 2018 20:46:32 +0000 (15:46 -0500)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 21 Feb 2018 20:46:32 +0000 (15:46 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

sdk/go/arvados/collection_fs_test.go
sdk/python/arvados/keep.py
sdk/python/tests/test_keep_client.py
tools/sync-groups/sync-groups.go

index bd5d08bcf5e8f278606be6bac2037ce7b9215ecb..5b9d0e2effc153ca5322a8ffea3955301def2000 100644 (file)
@@ -433,7 +433,7 @@ func (s *CollectionFSSuite) TestMkdir(c *check.C) {
        err = s.fs.Remove("foo/bar")
        c.Check(err, check.IsNil)
 
-       // mkdir succeds after the file is deleted
+       // mkdir succeeds after the file is deleted
        err = s.fs.Mkdir("foo/bar", 0755)
        c.Check(err, check.IsNil)
 
index 351f7f5dda8a96ebb805fd4d4896380cb3addbb8..e8e95afc7013650c67e753a3f2de4e7ec227fc44 100644 (file)
@@ -541,7 +541,7 @@ class KeepClient(object):
             self._lastheadername = name
             self._headers[name] = value
             # Returning None implies all bytes were written
-    
+
 
     class KeepWriterQueue(queue.Queue):
         def __init__(self, copies):
@@ -552,19 +552,19 @@ class KeepClient(object):
             self.successful_copies_lock = threading.Lock()
             self.pending_tries = copies
             self.pending_tries_notification = threading.Condition()
-        
+
         def write_success(self, response, replicas_nr):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
-        
+
         def write_fail(self, ks):
             with self.pending_tries_notification:
                 self.pending_tries += 1
                 self.pending_tries_notification.notify()
-        
+
         def pending_copies(self):
             with self.successful_copies_lock:
                 return self.wanted_copies - self.successful_copies
@@ -613,25 +613,25 @@ class KeepClient(object):
             for _ in range(num_threads):
                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
                 self.workers.append(w)
-        
+
         def add_task(self, ks, service_root):
             self.queue.put((ks, service_root))
             self.total_task_nr += 1
-        
+
         def done(self):
             return self.queue.successful_copies
-        
+
         def join(self):
             # Start workers
             for worker in self.workers:
                 worker.start()
             # Wait for finished work
             self.queue.join()
-        
+
         def response(self):
             return self.queue.response
-    
-    
+
+
     class KeepWriterThread(threading.Thread):
         TaskFailed = RuntimeError()
 
@@ -996,84 +996,90 @@ class KeepClient(object):
 
         self.get_counter.add(1)
 
-        locator = KeepLocator(loc_s)
-        if method == "GET":
-            slot, first = self.block_cache.reserve_cache(locator.md5sum)
-            if not first:
-                self.hits_counter.add(1)
-                v = slot.get()
-                return v
-
-        self.misses_counter.add(1)
-
-        headers = {
-            'X-Request-Id': (request_id or
-                             (hasattr(self, 'api_client') and self.api_client.request_id) or
-                             arvados.util.new_request_id()),
-        }
-
-        # If the locator has hints specifying a prefix (indicating a
-        # remote keepproxy) or the UUID of a local gateway service,
-        # read data from the indicated service(s) instead of the usual
-        # list of local disk services.
-        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
-                      for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
-        hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
-                           for hint in locator.hints if (
-                                   hint.startswith('K@') and
-                                   len(hint) == 29 and
-                                   self._gateway_services.get(hint[2:])
-                                   )])
-        # Map root URLs to their KeepService objects.
-        roots_map = {
-            root: self.KeepService(root, self._user_agent_pool,
-                                   upload_counter=self.upload_counter,
-                                   download_counter=self.download_counter,
-                                   headers=headers)
-            for root in hint_roots
-        }
-
-        # See #3147 for a discussion of the loop implementation.  Highlights:
-        # * Refresh the list of Keep services after each failure, in case
-        #   it's being updated.
-        # * Retry until we succeed, we're out of retries, or every available
-        #   service has returned permanent failure.
-        sorted_roots = []
-        roots_map = {}
+        slot = None
         blob = None
-        loop = retry.RetryLoop(num_retries, self._check_loop_result,
-                               backoff_start=2)
-        for tries_left in loop:
-            try:
-                sorted_roots = self.map_new_services(
-                    roots_map, locator,
-                    force_rebuild=(tries_left < num_retries),
-                    need_writable=False,
-                    headers=headers)
-            except Exception as error:
-                loop.save_result(error)
-                continue
+        try:
+            locator = KeepLocator(loc_s)
+            if method == "GET":
+                slot, first = self.block_cache.reserve_cache(locator.md5sum)
+                if not first:
+                    self.hits_counter.add(1)
+                    blob = slot.get()
+                    if blob is None:
+                        raise arvados.errors.KeepReadError(
+                            "failed to read {}".format(loc_s))
+                    return blob
+
+            self.misses_counter.add(1)
+
+            headers = {
+                'X-Request-Id': (request_id or
+                                 (hasattr(self, 'api_client') and self.api_client.request_id) or
+                                 arvados.util.new_request_id()),
+            }
+
+            # If the locator has hints specifying a prefix (indicating a
+            # remote keepproxy) or the UUID of a local gateway service,
+            # read data from the indicated service(s) instead of the usual
+            # list of local disk services.
+            hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+                          for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+            hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+                               for hint in locator.hints if (
+                                       hint.startswith('K@') and
+                                       len(hint) == 29 and
+                                       self._gateway_services.get(hint[2:])
+                                       )])
+            # Map root URLs to their KeepService objects.
+            roots_map = {
+                root: self.KeepService(root, self._user_agent_pool,
+                                       upload_counter=self.upload_counter,
+                                       download_counter=self.download_counter,
+                                       headers=headers)
+                for root in hint_roots
+            }
+
+            # See #3147 for a discussion of the loop implementation.  Highlights:
+            # * Refresh the list of Keep services after each failure, in case
+            #   it's being updated.
+            # * Retry until we succeed, we're out of retries, or every available
+            #   service has returned permanent failure.
+            sorted_roots = []
+            roots_map = {}
+            loop = retry.RetryLoop(num_retries, self._check_loop_result,
+                                   backoff_start=2)
+            for tries_left in loop:
+                try:
+                    sorted_roots = self.map_new_services(
+                        roots_map, locator,
+                        force_rebuild=(tries_left < num_retries),
+                        need_writable=False,
+                        headers=headers)
+                except Exception as error:
+                    loop.save_result(error)
+                    continue
 
-            # Query KeepService objects that haven't returned
-            # permanent failure, in our specified shuffle order.
-            services_to_try = [roots_map[root]
-                               for root in sorted_roots
-                               if roots_map[root].usable()]
-            for keep_service in services_to_try:
-                blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
-                if blob is not None:
-                    break
-            loop.save_result((blob, len(services_to_try)))
-
-        # Always cache the result, then return it if we succeeded.
-        if method == "GET":
-            slot.set(blob)
-            self.block_cache.cap_cache()
-        if loop.success():
-            if method == "HEAD":
-                return True
-            else:
-                return blob
+                # Query KeepService objects that haven't returned
+                # permanent failure, in our specified shuffle order.
+                services_to_try = [roots_map[root]
+                                   for root in sorted_roots
+                                   if roots_map[root].usable()]
+                for keep_service in services_to_try:
+                    blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
+                    if blob is not None:
+                        break
+                loop.save_result((blob, len(services_to_try)))
+
+            # Always cache the result, then return it if we succeeded.
+            if loop.success():
+                if method == "HEAD":
+                    return True
+                else:
+                    return blob
+        finally:
+            if slot is not None:
+                slot.set(blob)
+                self.block_cache.cap_cache()
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
@@ -1144,7 +1150,7 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
-            writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
+            writer_pool = KeepClient.KeepWriterThreadPool(data=data,
                                                         data_hash=data_hash,
                                                         copies=copies - done,
                                                         max_service_replicas=self.max_replicas_per_service,
index e0bb734b21fbf2671c51a4ce22dd5c954432a488..872c93bae25b5480de1cbf91400f716543415700 100644 (file)
@@ -1171,7 +1171,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
 
         def finished(self):
             return False
-    
+
     def setUp(self):
         self.copies = 3
         self.pool = arvados.KeepClient.KeepWriterThreadPool(
@@ -1215,7 +1215,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
             self.pool.add_task(ks, None)
         self.pool.join()
         self.assertEqual(self.pool.done(), self.copies-1)
-    
+
 
 @tutil.skip_sleep
 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
@@ -1250,3 +1250,27 @@ class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
             with self.assertRaises(arvados.errors.KeepWriteError):
                 self.keep_client.put('foo', num_retries=1, copies=2)
         self.assertEqual(2, req_mock.call_count)
+
+class KeepClientAPIErrorTest(unittest.TestCase):
+    def test_api_fail(self):
+        class ApiMock(object):
+            def __getattr__(self, r):
+                if r == "api_token":
+                    return "abc"
+                else:
+                    raise arvados.errors.KeepReadError()
+        keep_client = arvados.KeepClient(api_client=ApiMock(),
+                                             proxy='', local_store='')
+
+        # The bug this is testing for is that if an API (not
+        # keepstore) exception is thrown as part of a get(), the next
+        # attempt to get that same block will result in a deadlock.
+        # This is why there are two get()s in a row.  Unfortunately,
+        # the failure mode for this test is that the test suite
+        # deadlocks, there isn't a good way to avoid that without
+        # adding a special case that has no use except for this test.
+
+        with self.assertRaises(arvados.errors.KeepReadError):
+            keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
+        with self.assertRaises(arvados.errors.KeepReadError):
+            keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
index 10569b2e139b89a2904820ab62dab6cbc3a747b5..af7b2e92ebeb0cb60697ac03dacc25e33553782b 100644 (file)
@@ -217,7 +217,7 @@ func SetParentGroup(cfg *ConfigParams) error {
                        return fmt.Errorf("error searching for parent group: %s", err)
                }
                if len(gl.Items) == 0 {
-                       // Default parent group not existant, create one.
+                       // Default parent group does not exist, create it.
                        if cfg.Verbose {
                                log.Println("Default parent group not found, creating...")
                        }