- for ks in keep_services)
-
- roots = ("{}://[{}]:{:d}/".format(
- 'https' if ks['service_ssl_flag'] else 'http',
- ks['service_host'],
- ks['service_port'])
- for ks in keep_services)
- self.service_roots = sorted(set(roots))
- _logger.debug(str(self.service_roots))
-
- def shuffled_service_roots(self, hash, force_rebuild=False):
- self.build_service_roots(force_rebuild)
-
- # Build an ordering with which to query the Keep servers based on the
- # contents of the hash.
- # "hash" is a hex-encoded number at least 8 digits
- # (32 bits) long
-
- # seed used to calculate the next keep server from 'pool'
- # to be added to 'pseq'
- seed = hash
-
- # Keep servers still to be added to the ordering
- pool = self.service_roots[:]
-
- # output probe sequence
- pseq = []
-
- # iterate while there are servers left to be assigned
- while len(pool) > 0:
- if len(seed) < 8:
- # ran out of digits in the seed
- if len(pseq) < len(hash) / 4:
- # the number of servers added to the probe sequence is less
- # than the number of 4-digit slices in 'hash' so refill the
- # seed with the last 4 digits and then append the contents
- # of 'hash'.
- seed = hash[-4:] + hash
- else:
- # refill the seed with the contents of 'hash'
- seed += hash
-
- # Take the next 8 digits (32 bytes) and interpret as an integer,
- # then modulus with the size of the remaining pool to get the next
- # selected server.
- probe = int(seed[0:8], 16) % len(pool)
-
- # Append the selected server to the probe sequence and remove it
- # from the pool.
- pseq += [pool[probe]]
- pool = pool[:probe] + pool[probe+1:]
-
- # Remove the digits just used from the seed
- seed = seed[8:]
- _logger.debug(str(pseq))
- return pseq
-
-
- def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
+ for ks in self._keep_services)
+ # For disk type services, max_replicas_per_service is 1
+ # It is unknown or unlimited for non-disk typed services.
+ for ks in accessible:
+ if ('disk' != ks.get('service_type')) and (not ks.get('read_only')):
+ self.max_replicas_per_service = None
+
+ def _service_weight(self, data_hash, service_uuid):
+ """Compute the weight of a Keep service endpoint for a data
+ block with a known hash.
+
+ The weight is md5(h + u) where u is the last 15 characters of
+ the service endpoint's UUID.
+ """
+ return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
+
+ def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
+ """Return an array of Keep service endpoints, in the order in
+ which they should be probed when reading or writing data with
+ the given hash+hints.
+ """
+ self.build_services_list(force_rebuild)
+
+ sorted_roots = []
+ # Use the services indicated by the given +K@... remote
+ # service hints, if any are present and can be resolved to a
+ # URI.
+ for hint in locator.hints:
+ if hint.startswith('K@'):
+ if len(hint) == 7:
+ sorted_roots.append(
+ "https://keep.{}.arvadosapi.com/".format(hint[2:]))
+ elif len(hint) == 29:
+ svc = self._gateway_services.get(hint[2:])
+ if svc:
+ sorted_roots.append(svc['_service_root'])
+
+ # Sort the available local services by weight (heaviest first)
+ # for this locator, and return their service_roots (base URIs)
+ # in that order.
+ use_services = self._keep_services
+ if need_writable:
+ use_services = self._writable_services
+ sorted_roots.extend([
+ svc['_service_root'] for svc in sorted(
+ use_services,
+ reverse=True,
+ key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
+ _logger.debug("{}: {}".format(locator, sorted_roots))
+ return sorted_roots
+
+ def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):