- def get(self, locator):
- if re.search(r',', locator):
- return ''.join(self.get(x) for x in locator.split(','))
- if 'KEEP_LOCAL_STORE' in os.environ:
- return KeepClient.local_store_get(locator)
- expect_hash = re.sub(r'\+.*', '', locator)
- for service_root in self.shuffled_service_roots(expect_hash):
- h = httplib2.Http()
- url = service_root + expect_hash
- api_token = config.get('ARVADOS_API_TOKEN')
- headers = {'Authorization': "OAuth2 %s" % api_token,
- 'Accept': 'application/octet-stream'}
+
+ def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
+ # roots_map is a dictionary, mapping Keep service root strings
+ # to KeepService objects. Poll for Keep services, and add any
+ # new ones to roots_map. Return the current list of local
+ # root strings.
+ headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
+ local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
+ for root in local_roots:
+ if root not in roots_map:
+ roots_map[root] = self.KeepService(root, **headers)
+ return local_roots
+
+ @staticmethod
+ def _check_loop_result(result):
+ # KeepClient RetryLoops should save results as a 2-tuple: the
+ # actual result of the request, and the number of servers available
+ # to receive the request this round.
+ # This method returns True if there's a real result, False if
+ # there are no more servers available, otherwise None.
+ if isinstance(result, Exception):
+ return None
+ result, tried_server_count = result
+ if (result is not None) and (result is not False):
+ return True
+ elif tried_server_count < 1:
+ _logger.info("No more Keep services to try; giving up")
+ return False
+ else:
+ return None
+
+ def get(self, loc_s, num_retries=0):
+ """Get data from Keep.
+
+ This method fetches one or more blocks of data from Keep. It
+ sends a request each Keep service registered with the API
+ server (or the proxy provided when this client was
+ instantiated), then each service named in location hints, in
+ sequence. As soon as one service provides the data, it's
+ returned.
+
+ Arguments:
+ * loc_s: A string of one or more comma-separated locators to fetch.
+ This method returns the concatenation of these blocks.
+ * num_retries: The number of times to retry GET requests to
+ *each* Keep server if it returns temporary failures, with
+ exponential backoff. Note that, in each loop, the method may try
+ to fetch data from every available Keep service, along with any
+ that are named in location hints in the locator. Default 0.
+ """
+ if ',' in loc_s:
+ return ''.join(self.get(x) for x in loc_s.split(','))
+ locator = KeepLocator(loc_s)
+ expect_hash = locator.md5sum
+
+ slot, first = self.block_cache.reserve_cache(expect_hash)
+ if not first:
+ v = slot.get()
+ return v
+
+ # See #3147 for a discussion of the loop implementation. Highlights:
+ # * Refresh the list of Keep services after each failure, in case
+ # it's being updated.
+ # * Retry until we succeed, we're out of retries, or every available
+ # service has returned permanent failure.
+ hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+ for hint in locator.hints if hint.startswith('K@')]
+ # Map root URLs their KeepService objects.
+ roots_map = {root: self.KeepService(root) for root in hint_roots}
+ blob = None
+ loop = retry.RetryLoop(num_retries, self._check_loop_result,
+ backoff_start=2)
+ for tries_left in loop: