7696: PySDK KeepClient uses all service types.
[arvados.git] / sdk / python / arvados / keep.py
index c55b8161560a3c2b4b2ad18226fe68af4bb2cc46..b3d64a41981b7bcbaf293a2f90f43abc79128f1a 100644 (file)
@@ -1,28 +1,15 @@
-import bz2
+import cStringIO
 import datetime
-import fcntl
-import functools
-import gflags
 import hashlib
-import json
 import logging
 import os
-import pprint
 import pycurl
 import Queue
 import re
 import socket
 import ssl
-import string
-import cStringIO
-import subprocess
-import sys
 import threading
-import time
 import timer
-import types
-import UserDict
-import zlib
 
 import arvados
 import arvados.config as config
@@ -257,9 +244,9 @@ class KeepClient(object):
                 # we wait here until N other threads have started.
                 while self._started < self._local.sequence:
                     self._start_lock.wait()
+            self._todo_lock.acquire()
             self._started += 1
             self._start_lock.notifyAll()
-            self._todo_lock.acquire()
             self._start_lock.release()
             return self
 
@@ -665,12 +652,13 @@ class KeepClient(object):
                 self._gateway_services = {}
                 self._keep_services = [{
                     'uuid': 'proxy',
+                    'service_type': 'proxy',
                     '_service_root': proxy,
                     }]
                 self._writable_services = self._keep_services
                 self.using_proxy = True
                 self._static_services_list = True
-                self.max_replicas_per_service = 1
+                self.max_replicas_per_service = None
             else:
                 # It's important to avoid instantiating an API client
                 # unless we actually need one, for testing's sake.
@@ -700,6 +688,10 @@ class KeepClient(object):
         t = self.proxy_timeout if self.using_proxy else self.timeout
         return (t[0] * (1 << attempt_number), t[1])
 
+    def _any_nondisk_services(self, service_list):
+        return any(ks.get('service_type', 'disk') != 'disk'
+                   for ks in service_list)
+
     def build_services_list(self, force_rebuild=False):
         if (self._static_services_list or
               (self._keep_services and not force_rebuild)):
@@ -710,12 +702,16 @@ class KeepClient(object):
             except Exception:  # API server predates Keep services.
                 keep_services = self.api_client.keep_disks().list()
 
-            accessible = keep_services.execute().get('items')
-            if not accessible:
+            # Gateway services are only used when specified by UUID,
+            # so there's nothing to gain by filtering them by
+            # service_type.
+            self._gateway_services = {ks['uuid']: ks for ks in
+                                      keep_services.execute()['items']}
+            if not self._gateway_services:
                 raise arvados.errors.NoKeepServersError()
 
             # Precompute the base URI for each service.
-            for r in accessible:
+            for r in self._gateway_services.itervalues():
                 host = r['service_host']
                 if not host.startswith('[') and host.find(':') >= 0:
                     # IPv6 URIs must be formatted like http://[::1]:80/...
@@ -725,27 +721,19 @@ class KeepClient(object):
                     host,
                     r['service_port'])
 
-            # Gateway services are only used when specified by UUID,
-            # so there's nothing to gain by filtering them by
-            # service_type.
-            self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
             _logger.debug(str(self._gateway_services))
-
             self._keep_services = [
-                ks for ks in accessible
-                if ks.get('service_type') in ['disk', 'proxy']]
-            self._writable_services = [
-                ks for ks in accessible
-                if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
-            _logger.debug(str(self._keep_services))
-
-            self.using_proxy = any(ks.get('service_type') == 'proxy'
-                                   for ks in self._keep_services)
+                ks for ks in self._gateway_services.itervalues()
+                if not ks.get('service_type', '').startswith('gateway:')]
+            self._writable_services = [ks for ks in self._keep_services
+                                       if not ks.get('read_only')]
+
             # For disk type services, max_replicas_per_service is 1
-            # It is unknown or unlimited for non-disk typed services.
-            for ks in accessible:
-                if ('disk' != ks.get('service_type')) and (not ks.get('read_only')):
-                    self.max_replicas_per_service = None
+            # It is unknown (unlimited) for other service types.
+            if self._any_nondisk_services(self._writable_services):
+                self.max_replicas_per_service = None
+            else:
+                self.max_replicas_per_service = 1
 
     def _service_weight(self, data_hash, service_uuid):
         """Compute the weight of a Keep service endpoint for a data
@@ -782,7 +770,8 @@ class KeepClient(object):
         # in that order.
         use_services = self._keep_services
         if need_writable:
-          use_services = self._writable_services
+            use_services = self._writable_services
+        self.using_proxy = self._any_nondisk_services(use_services)
         sorted_roots.extend([
             svc['_service_root'] for svc in sorted(
                 use_services,