21639: Use a more efficient data structure for the keep block cache
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 import copy
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
15 import collections
16 import datetime
17 import hashlib
18 import errno
19 import io
20 import logging
21 import math
22 import os
23 import pycurl
24 import queue
25 import re
26 import socket
27 import ssl
28 import sys
29 import threading
30 import resource
31 from . import timer
32 import urllib.parse
33 import traceback
34 import weakref
35
36 if sys.version_info >= (3, 0):
37     from io import BytesIO
38 else:
39     from cStringIO import StringIO as BytesIO
40
41 import arvados
42 import arvados.config as config
43 import arvados.errors
44 import arvados.retry as retry
45 import arvados.util
46 import arvados.diskcache
47 from arvados._pycurlhelper import PyCurlHelper
48
49 _logger = logging.getLogger('arvados.keep')
50 global_client_object = None
51
52
53 # Monkey patch TCP constants when not available (apple). Values sourced from:
54 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
55 if sys.platform == 'darwin':
56     if not hasattr(socket, 'TCP_KEEPALIVE'):
57         socket.TCP_KEEPALIVE = 0x010
58     if not hasattr(socket, 'TCP_KEEPINTVL'):
59         socket.TCP_KEEPINTVL = 0x101
60     if not hasattr(socket, 'TCP_KEEPCNT'):
61         socket.TCP_KEEPCNT = 0x102
62
63
64 class KeepLocator(object):
65     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
66     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
67
68     def __init__(self, locator_str):
69         self.hints = []
70         self._perm_sig = None
71         self._perm_expiry = None
72         pieces = iter(locator_str.split('+'))
73         self.md5sum = next(pieces)
74         try:
75             self.size = int(next(pieces))
76         except StopIteration:
77             self.size = None
78         for hint in pieces:
79             if self.HINT_RE.match(hint) is None:
80                 raise ValueError("invalid hint format: {}".format(hint))
81             elif hint.startswith('A'):
82                 self.parse_permission_hint(hint)
83             else:
84                 self.hints.append(hint)
85
86     def __str__(self):
87         return '+'.join(
88             native_str(s)
89             for s in [self.md5sum, self.size,
90                       self.permission_hint()] + self.hints
91             if s is not None)
92
93     def stripped(self):
94         if self.size is not None:
95             return "%s+%i" % (self.md5sum, self.size)
96         else:
97             return self.md5sum
98
99     def _make_hex_prop(name, length):
100         # Build and return a new property with the given name that
101         # must be a hex string of the given length.
102         data_name = '_{}'.format(name)
103         def getter(self):
104             return getattr(self, data_name)
105         def setter(self, hex_str):
106             if not arvados.util.is_hex(hex_str, length):
107                 raise ValueError("{} is not a {}-digit hex string: {!r}".
108                                  format(name, length, hex_str))
109             setattr(self, data_name, hex_str)
110         return property(getter, setter)
111
112     md5sum = _make_hex_prop('md5sum', 32)
113     perm_sig = _make_hex_prop('perm_sig', 40)
114
115     @property
116     def perm_expiry(self):
117         return self._perm_expiry
118
119     @perm_expiry.setter
120     def perm_expiry(self, value):
121         if not arvados.util.is_hex(value, 1, 8):
122             raise ValueError(
123                 "permission timestamp must be a hex Unix timestamp: {}".
124                 format(value))
125         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
126
127     def permission_hint(self):
128         data = [self.perm_sig, self.perm_expiry]
129         if None in data:
130             return None
131         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
132         return "A{}@{:08x}".format(*data)
133
134     def parse_permission_hint(self, s):
135         try:
136             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
137         except IndexError:
138             raise ValueError("bad permission hint {}".format(s))
139
140     def permission_expired(self, as_of_dt=None):
141         if self.perm_expiry is None:
142             return False
143         elif as_of_dt is None:
144             as_of_dt = datetime.datetime.now()
145         return self.perm_expiry <= as_of_dt
146
147
148 class Keep(object):
149     """Simple interface to a global KeepClient object.
150
151     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
152     own API client.  The global KeepClient will build an API client from the
153     current Arvados configuration, which may not match the one you built.
154     """
155     _last_key = None
156
157     @classmethod
158     def global_client_object(cls):
159         global global_client_object
160         # Previously, KeepClient would change its behavior at runtime based
161         # on these configuration settings.  We simulate that behavior here
162         # by checking the values and returning a new KeepClient if any of
163         # them have changed.
164         key = (config.get('ARVADOS_API_HOST'),
165                config.get('ARVADOS_API_TOKEN'),
166                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
167                config.get('ARVADOS_KEEP_PROXY'),
168                os.environ.get('KEEP_LOCAL_STORE'))
169         if (global_client_object is None) or (cls._last_key != key):
170             global_client_object = KeepClient()
171             cls._last_key = key
172         return global_client_object
173
174     @staticmethod
175     def get(locator, **kwargs):
176         return Keep.global_client_object().get(locator, **kwargs)
177
178     @staticmethod
179     def put(data, **kwargs):
180         return Keep.global_client_object().put(data, **kwargs)
181
182 class KeepBlockCache(object):
183     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
184         self.cache_max = cache_max
185         self._cache = collections.OrderedDict()
186         self._cache_lock = threading.Lock()
187         self._max_slots = max_slots
188         self._disk_cache = disk_cache
189         self._disk_cache_dir = disk_cache_dir
190         self._cache_updating = threading.Condition(self._cache_lock)
191
192         if self._disk_cache and self._disk_cache_dir is None:
193             self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
194             os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
195
196         if self._max_slots == 0:
197             if self._disk_cache:
198                 # Each block uses two file descriptors, one used to
199                 # open it initially and hold the flock(), and a second
200                 # hidden one used by mmap().
201                 #
202                 # Set max slots to 1/8 of maximum file handles.  This
203                 # means we'll use at most 1/4 of total file handles.
204                 #
205                 # NOFILE typically defaults to 1024 on Linux so this
206                 # is 128 slots (256 file handles), which means we can
207                 # cache up to 8 GiB of 64 MiB blocks.  This leaves
208                 # 768 file handles for sockets and other stuff.
209                 #
210                 # When we want the ability to have more cache (e.g. in
211                 # arv-mount) we'll increase rlimit before calling
212                 # this.
213                 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
214             else:
215                 # RAM cache slots
216                 self._max_slots = 512
217
218         if self.cache_max == 0:
219             if self._disk_cache:
220                 fs = os.statvfs(self._disk_cache_dir)
221                 # Calculation of available space incorporates existing cache usage
222                 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
223                 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
224                 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
225                 # pick smallest of:
226                 # 10% of total disk size
227                 # 25% of available space
228                 # max_slots * 64 MiB
229                 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
230             else:
231                 # 256 MiB in RAM
232                 self.cache_max = (256 * 1024 * 1024)
233
234         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
235
236         if self._disk_cache:
237             self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
238             self.cap_cache()
239
240
241     class CacheSlot(object):
242         __slots__ = ("locator", "ready", "content")
243
244         def __init__(self, locator):
245             self.locator = locator
246             self.ready = threading.Event()
247             self.content = None
248
249         def get(self):
250             self.ready.wait()
251             return self.content
252
253         def set(self, value):
254             self.content = value
255             self.ready.set()
256
257         def size(self):
258             if self.content is None:
259                 return 0
260             else:
261                 return len(self.content)
262
263         def evict(self):
264             self.content = None
265             return self.gone()
266
267         def gone(self):
268             return (self.content is None)
269
270     def _resize_cache(self, cache_max, max_slots):
271         # Try and make sure the contents of the cache do not exceed
272         # the supplied maximums.
273
274         _evict_candidates = collections.deque(self._cache.values())
275         sm = sum([slot.size() for slot in _evict_candidates])
276         while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
277             slot = _evict_candidates.popleft()
278             if not slot.ready.is_set():
279                 continue
280
281             if slot.content is None:
282                 # error
283                 del self._cache[slot.locator]
284                 continue
285
286             sz = slot.size()
287
288             # If evict returns false it means the
289             # underlying disk cache couldn't lock the file
290             # for deletion because another process was using
291             # it. Don't count it as reducing the amount
292             # of data in the cache, find something else to
293             # throw out.
294             if slot.evict():
295                 sm -= sz
296
297             # check to make sure the underlying data is gone
298             if slot.gone():
299                 # either way we forget about it.  either the
300                 # other process will delete it, or if we need
301                 # it again and it is still there, we'll find
302                 # it on disk.
303                 del self._cache[slot.locator]
304
305
306     def cap_cache(self):
307         '''Cap the cache size to self.cache_max'''
308         with self._cache_updating:
309             self._resize_cache(self.cache_max, self._max_slots)
310             self._cache_updating.notify_all()
311
312     def _get(self, locator):
313         # Test if the locator is already in the cache
314         if locator in self._cache:
315             n = self._cache[locator]
316             self._cache.move_to_back(locator)
317             return n
318         if self._disk_cache:
319             # see if it exists on disk
320             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
321             if n is not None:
322                 self._cache[n.locator] = n
323                 return n
324         return None
325
326     def get(self, locator):
327         with self._cache_lock:
328             return self._get(locator)
329
330     def reserve_cache(self, locator):
331         '''Reserve a cache slot for the specified locator,
332         or return the existing slot.'''
333         with self._cache_updating:
334             n = self._get(locator)
335             if n:
336                 return n, False
337             else:
338                 # Add a new cache slot for the locator
339                 self._resize_cache(self.cache_max, self._max_slots-1)
340                 while len(self._cache) >= self._max_slots:
341                     # If there isn't a slot available, need to wait
342                     # for something to happen that releases one of the
343                     # cache slots.  Idle for 200 ms or woken up by
344                     # another thread
345                     self._cache_updating.wait(timeout=0.2)
346                     self._resize_cache(self.cache_max, self._max_slots-1)
347
348                 if self._disk_cache:
349                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
350                 else:
351                     n = KeepBlockCache.CacheSlot(locator)
352                 self._cache[n.locator] = n
353                 return n, True
354
355     def set(self, slot, blob):
356         try:
357             slot.set(blob)
358             return
359         except OSError as e:
360             if e.errno == errno.ENOMEM:
361                 # Reduce max slots to current - 4, cap cache and retry
362                 with self._cache_lock:
363                     self._max_slots = max(4, len(self._cache) - 4)
364             elif e.errno == errno.ENOSPC:
365                 # Reduce disk max space to current - 256 MiB, cap cache and retry
366                 with self._cache_lock:
367                     sm = sum([st.size() for st in self._cache.values()])
368                     self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
369             elif e.errno == errno.ENODEV:
370                 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
371         except Exception as e:
372             pass
373         finally:
374             # Check if we should evict things from the cache.  Either
375             # because we added a new thing or there was an error and
376             # we possibly adjusted the limits down, so we might need
377             # to push something out.
378             self.cap_cache()
379
380         try:
381             # Only gets here if there was an error the first time. The
382             # exception handler adjusts limits downward in some cases
383             # to free up resources, which would make the operation
384             # succeed.
385             slot.set(blob)
386         except Exception as e:
387             # It failed again.  Give up.
388             slot.set(None)
389             raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
390
391         self.cap_cache()
392
393 class Counter(object):
394     def __init__(self, v=0):
395         self._lk = threading.Lock()
396         self._val = v
397
398     def add(self, v):
399         with self._lk:
400             self._val += v
401
402     def get(self):
403         with self._lk:
404             return self._val
405
406
407 class KeepClient(object):
408     DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
409     DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
410
411     class KeepService(PyCurlHelper):
412         """Make requests to a single Keep service, and track results.
413
414         A KeepService is intended to last long enough to perform one
415         transaction (GET or PUT) against one Keep service. This can
416         involve calling either get() or put() multiple times in order
417         to retry after transient failures. However, calling both get()
418         and put() on a single instance -- or using the same instance
419         to access two different Keep services -- will not produce
420         sensible behavior.
421         """
422
423         HTTP_ERRORS = (
424             socket.error,
425             ssl.SSLError,
426             arvados.errors.HttpError,
427         )
428
429         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
430                      upload_counter=None,
431                      download_counter=None,
432                      headers={},
433                      insecure=False):
434             super(KeepClient.KeepService, self).__init__()
435             self.root = root
436             self._user_agent_pool = user_agent_pool
437             self._result = {'error': None}
438             self._usable = True
439             self._session = None
440             self._socket = None
441             self.get_headers = {'Accept': 'application/octet-stream'}
442             self.get_headers.update(headers)
443             self.put_headers = headers
444             self.upload_counter = upload_counter
445             self.download_counter = download_counter
446             self.insecure = insecure
447
448         def usable(self):
449             """Is it worth attempting a request?"""
450             return self._usable
451
452         def finished(self):
453             """Did the request succeed or encounter permanent failure?"""
454             return self._result['error'] == False or not self._usable
455
456         def last_result(self):
457             return self._result
458
459         def _get_user_agent(self):
460             try:
461                 return self._user_agent_pool.get(block=False)
462             except queue.Empty:
463                 return pycurl.Curl()
464
465         def _put_user_agent(self, ua):
466             try:
467                 ua.reset()
468                 self._user_agent_pool.put(ua, block=False)
469             except:
470                 ua.close()
471
472         def get(self, locator, method="GET", timeout=None):
473             # locator is a KeepLocator object.
474             url = self.root + str(locator)
475             _logger.debug("Request: %s %s", method, url)
476             curl = self._get_user_agent()
477             ok = None
478             try:
479                 with timer.Timer() as t:
480                     self._headers = {}
481                     response_body = BytesIO()
482                     curl.setopt(pycurl.NOSIGNAL, 1)
483                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
484                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
485                     curl.setopt(pycurl.URL, url.encode('utf-8'))
486                     curl.setopt(pycurl.HTTPHEADER, [
487                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
488                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
489                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
490                     if self.insecure:
491                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
492                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
493                     else:
494                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
495                     if method == "HEAD":
496                         curl.setopt(pycurl.NOBODY, True)
497                     else:
498                         curl.setopt(pycurl.HTTPGET, True)
499                     self._setcurltimeouts(curl, timeout, method=="HEAD")
500
501                     try:
502                         curl.perform()
503                     except Exception as e:
504                         raise arvados.errors.HttpError(0, str(e))
505                     finally:
506                         if self._socket:
507                             self._socket.close()
508                             self._socket = None
509                     self._result = {
510                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
511                         'body': response_body.getvalue(),
512                         'headers': self._headers,
513                         'error': False,
514                     }
515
516                 ok = retry.check_http_response_success(self._result['status_code'])
517                 if not ok:
518                     self._result['error'] = arvados.errors.HttpError(
519                         self._result['status_code'],
520                         self._headers.get('x-status-line', 'Error'))
521             except self.HTTP_ERRORS as e:
522                 self._result = {
523                     'error': e,
524                 }
525             self._usable = ok != False
526             if self._result.get('status_code', None):
527                 # The client worked well enough to get an HTTP status
528                 # code, so presumably any problems are just on the
529                 # server side and it's OK to reuse the client.
530                 self._put_user_agent(curl)
531             else:
532                 # Don't return this client to the pool, in case it's
533                 # broken.
534                 curl.close()
535             if not ok:
536                 _logger.debug("Request fail: GET %s => %s: %s",
537                               url, type(self._result['error']), str(self._result['error']))
538                 return None
539             if method == "HEAD":
540                 _logger.info("HEAD %s: %s bytes",
541                          self._result['status_code'],
542                          self._result.get('content-length'))
543                 if self._result['headers'].get('x-keep-locator'):
544                     # This is a response to a remote block copy request, return
545                     # the local copy block locator.
546                     return self._result['headers'].get('x-keep-locator')
547                 return True
548
549             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
550                          self._result['status_code'],
551                          len(self._result['body']),
552                          t.msecs,
553                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
554
555             if self.download_counter:
556                 self.download_counter.add(len(self._result['body']))
557             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
558             if resp_md5 != locator.md5sum:
559                 _logger.warning("Checksum fail: md5(%s) = %s",
560                                 url, resp_md5)
561                 self._result['error'] = arvados.errors.HttpError(
562                     0, 'Checksum fail')
563                 return None
564             return self._result['body']
565
566         def put(self, hash_s, body, timeout=None, headers={}):
567             put_headers = copy.copy(self.put_headers)
568             put_headers.update(headers)
569             url = self.root + hash_s
570             _logger.debug("Request: PUT %s", url)
571             curl = self._get_user_agent()
572             ok = None
573             try:
574                 with timer.Timer() as t:
575                     self._headers = {}
576                     body_reader = BytesIO(body)
577                     response_body = BytesIO()
578                     curl.setopt(pycurl.NOSIGNAL, 1)
579                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
580                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
581                     curl.setopt(pycurl.URL, url.encode('utf-8'))
582                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
583                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
584                     # response) instead of sending the request body immediately.
585                     # This allows the server to reject the request if the request
586                     # is invalid or the server is read-only, without waiting for
587                     # the client to send the entire block.
588                     curl.setopt(pycurl.UPLOAD, True)
589                     curl.setopt(pycurl.INFILESIZE, len(body))
590                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
591                     curl.setopt(pycurl.HTTPHEADER, [
592                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
593                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
594                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
595                     if self.insecure:
596                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
597                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
598                     else:
599                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
600                     self._setcurltimeouts(curl, timeout)
601                     try:
602                         curl.perform()
603                     except Exception as e:
604                         raise arvados.errors.HttpError(0, str(e))
605                     finally:
606                         if self._socket:
607                             self._socket.close()
608                             self._socket = None
609                     self._result = {
610                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
611                         'body': response_body.getvalue().decode('utf-8'),
612                         'headers': self._headers,
613                         'error': False,
614                     }
615                 ok = retry.check_http_response_success(self._result['status_code'])
616                 if not ok:
617                     self._result['error'] = arvados.errors.HttpError(
618                         self._result['status_code'],
619                         self._headers.get('x-status-line', 'Error'))
620             except self.HTTP_ERRORS as e:
621                 self._result = {
622                     'error': e,
623                 }
624             self._usable = ok != False # still usable if ok is True or None
625             if self._result.get('status_code', None):
626                 # Client is functional. See comment in get().
627                 self._put_user_agent(curl)
628             else:
629                 curl.close()
630             if not ok:
631                 _logger.debug("Request fail: PUT %s => %s: %s",
632                               url, type(self._result['error']), str(self._result['error']))
633                 return False
634             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
635                          self._result['status_code'],
636                          len(body),
637                          t.msecs,
638                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
639             if self.upload_counter:
640                 self.upload_counter.add(len(body))
641             return True
642
643
644     class KeepWriterQueue(queue.Queue):
645         def __init__(self, copies, classes=[]):
646             queue.Queue.__init__(self) # Old-style superclass
647             self.wanted_copies = copies
648             self.wanted_storage_classes = classes
649             self.successful_copies = 0
650             self.confirmed_storage_classes = {}
651             self.response = None
652             self.storage_classes_tracking = True
653             self.queue_data_lock = threading.RLock()
654             self.pending_tries = max(copies, len(classes))
655             self.pending_tries_notification = threading.Condition()
656
657         def write_success(self, response, replicas_nr, classes_confirmed):
658             with self.queue_data_lock:
659                 self.successful_copies += replicas_nr
660                 if classes_confirmed is None:
661                     self.storage_classes_tracking = False
662                 elif self.storage_classes_tracking:
663                     for st_class, st_copies in classes_confirmed.items():
664                         try:
665                             self.confirmed_storage_classes[st_class] += st_copies
666                         except KeyError:
667                             self.confirmed_storage_classes[st_class] = st_copies
668                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
669                 self.response = response
670             with self.pending_tries_notification:
671                 self.pending_tries_notification.notify_all()
672
673         def write_fail(self, ks):
674             with self.pending_tries_notification:
675                 self.pending_tries += 1
676                 self.pending_tries_notification.notify()
677
678         def pending_copies(self):
679             with self.queue_data_lock:
680                 return self.wanted_copies - self.successful_copies
681
682         def satisfied_classes(self):
683             with self.queue_data_lock:
684                 if not self.storage_classes_tracking:
685                     # Notifies disabled storage classes expectation to
686                     # the outer loop.
687                     return None
688             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
689
690         def pending_classes(self):
691             with self.queue_data_lock:
692                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
693                     return []
694                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
695                 for st_class, st_copies in self.confirmed_storage_classes.items():
696                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
697                         unsatisfied_classes.remove(st_class)
698                 return unsatisfied_classes
699
700         def get_next_task(self):
701             with self.pending_tries_notification:
702                 while True:
703                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
704                         # This notify_all() is unnecessary --
705                         # write_success() already called notify_all()
706                         # when pending<1 became true, so it's not
707                         # possible for any other thread to be in
708                         # wait() now -- but it's cheap insurance
709                         # against deadlock so we do it anyway:
710                         self.pending_tries_notification.notify_all()
711                         # Drain the queue and then raise Queue.Empty
712                         while True:
713                             self.get_nowait()
714                             self.task_done()
715                     elif self.pending_tries > 0:
716                         service, service_root = self.get_nowait()
717                         if service.finished():
718                             self.task_done()
719                             continue
720                         self.pending_tries -= 1
721                         return service, service_root
722                     elif self.empty():
723                         self.pending_tries_notification.notify_all()
724                         raise queue.Empty
725                     else:
726                         self.pending_tries_notification.wait()
727
728
729     class KeepWriterThreadPool(object):
730         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
731             self.total_task_nr = 0
732             if (not max_service_replicas) or (max_service_replicas >= copies):
733                 num_threads = 1
734             else:
735                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
736             _logger.debug("Pool max threads is %d", num_threads)
737             self.workers = []
738             self.queue = KeepClient.KeepWriterQueue(copies, classes)
739             # Create workers
740             for _ in range(num_threads):
741                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
742                 self.workers.append(w)
743
744         def add_task(self, ks, service_root):
745             self.queue.put((ks, service_root))
746             self.total_task_nr += 1
747
748         def done(self):
749             return self.queue.successful_copies, self.queue.satisfied_classes()
750
751         def join(self):
752             # Start workers
753             for worker in self.workers:
754                 worker.start()
755             # Wait for finished work
756             self.queue.join()
757
758         def response(self):
759             return self.queue.response
760
761
762     class KeepWriterThread(threading.Thread):
763         class TaskFailed(RuntimeError): pass
764
765         def __init__(self, queue, data, data_hash, timeout=None):
766             super(KeepClient.KeepWriterThread, self).__init__()
767             self.timeout = timeout
768             self.queue = queue
769             self.data = data
770             self.data_hash = data_hash
771             self.daemon = True
772
773         def run(self):
774             while True:
775                 try:
776                     service, service_root = self.queue.get_next_task()
777                 except queue.Empty:
778                     return
779                 try:
780                     locator, copies, classes = self.do_task(service, service_root)
781                 except Exception as e:
782                     if not isinstance(e, self.TaskFailed):
783                         _logger.exception("Exception in KeepWriterThread")
784                     self.queue.write_fail(service)
785                 else:
786                     self.queue.write_success(locator, copies, classes)
787                 finally:
788                     self.queue.task_done()
789
790         def do_task(self, service, service_root):
791             classes = self.queue.pending_classes()
792             headers = {}
793             if len(classes) > 0:
794                 classes.sort()
795                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
796             success = bool(service.put(self.data_hash,
797                                         self.data,
798                                         timeout=self.timeout,
799                                         headers=headers))
800             result = service.last_result()
801
802             if not success:
803                 if result.get('status_code'):
804                     _logger.debug("Request fail: PUT %s => %s %s",
805                                   self.data_hash,
806                                   result.get('status_code'),
807                                   result.get('body'))
808                 raise self.TaskFailed()
809
810             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
811                           str(threading.current_thread()),
812                           self.data_hash,
813                           len(self.data),
814                           service_root)
815             try:
816                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
817             except (KeyError, ValueError):
818                 replicas_stored = 1
819
820             classes_confirmed = {}
821             try:
822                 scch = result['headers']['x-keep-storage-classes-confirmed']
823                 for confirmation in scch.replace(' ', '').split(','):
824                     if '=' in confirmation:
825                         stored_class, stored_copies = confirmation.split('=')[:2]
826                         classes_confirmed[stored_class] = int(stored_copies)
827             except (KeyError, ValueError):
828                 # Storage classes confirmed header missing or corrupt
829                 classes_confirmed = None
830
831             return result['body'].strip(), replicas_stored, classes_confirmed
832
833
834     def __init__(self, api_client=None, proxy=None,
835                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
836                  api_token=None, local_store=None, block_cache=None,
837                  num_retries=10, session=None, num_prefetch_threads=None):
838         """Initialize a new KeepClient.
839
840         Arguments:
841         :api_client:
842           The API client to use to find Keep services.  If not
843           provided, KeepClient will build one from available Arvados
844           configuration.
845
846         :proxy:
847           If specified, this KeepClient will send requests to this Keep
848           proxy.  Otherwise, KeepClient will fall back to the setting of the
849           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
850           If you want to KeepClient does not use a proxy, pass in an empty
851           string.
852
853         :timeout:
854           The initial timeout (in seconds) for HTTP requests to Keep
855           non-proxy servers.  A tuple of three floats is interpreted as
856           (connection_timeout, read_timeout, minimum_bandwidth). A connection
857           will be aborted if the average traffic rate falls below
858           minimum_bandwidth bytes per second over an interval of read_timeout
859           seconds. Because timeouts are often a result of transient server
860           load, the actual connection timeout will be increased by a factor
861           of two on each retry.
862           Default: (2, 256, 32768).
863
864         :proxy_timeout:
865           The initial timeout (in seconds) for HTTP requests to
866           Keep proxies. A tuple of three floats is interpreted as
867           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
868           described above for adjusting connection timeouts on retry also
869           applies.
870           Default: (20, 256, 32768).
871
872         :api_token:
873           If you're not using an API client, but only talking
874           directly to a Keep proxy, this parameter specifies an API token
875           to authenticate Keep requests.  It is an error to specify both
876           api_client and api_token.  If you specify neither, KeepClient
877           will use one available from the Arvados configuration.
878
879         :local_store:
880           If specified, this KeepClient will bypass Keep
881           services, and save data to the named directory.  If unspecified,
882           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
883           environment variable.  If you want to ensure KeepClient does not
884           use local storage, pass in an empty string.  This is primarily
885           intended to mock a server for testing.
886
887         :num_retries:
888           The default number of times to retry failed requests.
889           This will be used as the default num_retries value when get() and
890           put() are called.  Default 10.
891         """
892         self.lock = threading.Lock()
893         if proxy is None:
894             if config.get('ARVADOS_KEEP_SERVICES'):
895                 proxy = config.get('ARVADOS_KEEP_SERVICES')
896             else:
897                 proxy = config.get('ARVADOS_KEEP_PROXY')
898         if api_token is None:
899             if api_client is None:
900                 api_token = config.get('ARVADOS_API_TOKEN')
901             else:
902                 api_token = api_client.api_token
903         elif api_client is not None:
904             raise ValueError(
905                 "can't build KeepClient with both API client and token")
906         if local_store is None:
907             local_store = os.environ.get('KEEP_LOCAL_STORE')
908
909         if api_client is None:
910             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
911         else:
912             self.insecure = api_client.insecure
913
914         self.block_cache = block_cache if block_cache else KeepBlockCache()
915         self.timeout = timeout
916         self.proxy_timeout = proxy_timeout
917         self._user_agent_pool = queue.LifoQueue()
918         self.upload_counter = Counter()
919         self.download_counter = Counter()
920         self.put_counter = Counter()
921         self.get_counter = Counter()
922         self.hits_counter = Counter()
923         self.misses_counter = Counter()
924         self._storage_classes_unsupported_warning = False
925         self._default_classes = []
926         self.num_prefetch_threads = num_prefetch_threads or 2
927         self._prefetch_queue = None
928         self._prefetch_threads = None
929
930         if local_store:
931             self.local_store = local_store
932             self.head = self.local_store_head
933             self.get = self.local_store_get
934             self.put = self.local_store_put
935         else:
936             self.num_retries = num_retries
937             self.max_replicas_per_service = None
938             if proxy:
939                 proxy_uris = proxy.split()
940                 for i in range(len(proxy_uris)):
941                     if not proxy_uris[i].endswith('/'):
942                         proxy_uris[i] += '/'
943                     # URL validation
944                     url = urllib.parse.urlparse(proxy_uris[i])
945                     if not (url.scheme and url.netloc):
946                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
947                 self.api_token = api_token
948                 self._gateway_services = {}
949                 self._keep_services = [{
950                     'uuid': "00000-bi6l4-%015d" % idx,
951                     'service_type': 'proxy',
952                     '_service_root': uri,
953                     } for idx, uri in enumerate(proxy_uris)]
954                 self._writable_services = self._keep_services
955                 self.using_proxy = True
956                 self._static_services_list = True
957             else:
958                 # It's important to avoid instantiating an API client
959                 # unless we actually need one, for testing's sake.
960                 if api_client is None:
961                     api_client = arvados.api('v1')
962                 self.api_client = api_client
963                 self.api_token = api_client.api_token
964                 self._gateway_services = {}
965                 self._keep_services = None
966                 self._writable_services = None
967                 self.using_proxy = None
968                 self._static_services_list = False
969                 try:
970                     self._default_classes = [
971                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
972                 except KeyError:
973                     # We're talking to an old cluster
974                     pass
975
976     def current_timeout(self, attempt_number):
977         """Return the appropriate timeout to use for this client.
978
979         The proxy timeout setting if the backend service is currently a proxy,
980         the regular timeout setting otherwise.  The `attempt_number` indicates
981         how many times the operation has been tried already (starting from 0
982         for the first try), and scales the connection timeout portion of the
983         return value accordingly.
984
985         """
986         # TODO(twp): the timeout should be a property of a
987         # KeepService, not a KeepClient. See #4488.
988         t = self.proxy_timeout if self.using_proxy else self.timeout
989         if len(t) == 2:
990             return (t[0] * (1 << attempt_number), t[1])
991         else:
992             return (t[0] * (1 << attempt_number), t[1], t[2])
993     def _any_nondisk_services(self, service_list):
994         return any(ks.get('service_type', 'disk') != 'disk'
995                    for ks in service_list)
996
997     def build_services_list(self, force_rebuild=False):
998         if (self._static_services_list or
999               (self._keep_services and not force_rebuild)):
1000             return
1001         with self.lock:
1002             try:
1003                 keep_services = self.api_client.keep_services().accessible()
1004             except Exception:  # API server predates Keep services.
1005                 keep_services = self.api_client.keep_disks().list()
1006
1007             # Gateway services are only used when specified by UUID,
1008             # so there's nothing to gain by filtering them by
1009             # service_type.
1010             self._gateway_services = {ks['uuid']: ks for ks in
1011                                       keep_services.execute()['items']}
1012             if not self._gateway_services:
1013                 raise arvados.errors.NoKeepServersError()
1014
1015             # Precompute the base URI for each service.
1016             for r in self._gateway_services.values():
1017                 host = r['service_host']
1018                 if not host.startswith('[') and host.find(':') >= 0:
1019                     # IPv6 URIs must be formatted like http://[::1]:80/...
1020                     host = '[' + host + ']'
1021                 r['_service_root'] = "{}://{}:{:d}/".format(
1022                     'https' if r['service_ssl_flag'] else 'http',
1023                     host,
1024                     r['service_port'])
1025
1026             _logger.debug(str(self._gateway_services))
1027             self._keep_services = [
1028                 ks for ks in self._gateway_services.values()
1029                 if not ks.get('service_type', '').startswith('gateway:')]
1030             self._writable_services = [ks for ks in self._keep_services
1031                                        if not ks.get('read_only')]
1032
1033             # For disk type services, max_replicas_per_service is 1
1034             # It is unknown (unlimited) for other service types.
1035             if self._any_nondisk_services(self._writable_services):
1036                 self.max_replicas_per_service = None
1037             else:
1038                 self.max_replicas_per_service = 1
1039
1040     def _service_weight(self, data_hash, service_uuid):
1041         """Compute the weight of a Keep service endpoint for a data
1042         block with a known hash.
1043
1044         The weight is md5(h + u) where u is the last 15 characters of
1045         the service endpoint's UUID.
1046         """
1047         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1048
1049     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1050         """Return an array of Keep service endpoints, in the order in
1051         which they should be probed when reading or writing data with
1052         the given hash+hints.
1053         """
1054         self.build_services_list(force_rebuild)
1055
1056         sorted_roots = []
1057         # Use the services indicated by the given +K@... remote
1058         # service hints, if any are present and can be resolved to a
1059         # URI.
1060         for hint in locator.hints:
1061             if hint.startswith('K@'):
1062                 if len(hint) == 7:
1063                     sorted_roots.append(
1064                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1065                 elif len(hint) == 29:
1066                     svc = self._gateway_services.get(hint[2:])
1067                     if svc:
1068                         sorted_roots.append(svc['_service_root'])
1069
1070         # Sort the available local services by weight (heaviest first)
1071         # for this locator, and return their service_roots (base URIs)
1072         # in that order.
1073         use_services = self._keep_services
1074         if need_writable:
1075             use_services = self._writable_services
1076         self.using_proxy = self._any_nondisk_services(use_services)
1077         sorted_roots.extend([
1078             svc['_service_root'] for svc in sorted(
1079                 use_services,
1080                 reverse=True,
1081                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1082         _logger.debug("{}: {}".format(locator, sorted_roots))
1083         return sorted_roots
1084
1085     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1086         # roots_map is a dictionary, mapping Keep service root strings
1087         # to KeepService objects.  Poll for Keep services, and add any
1088         # new ones to roots_map.  Return the current list of local
1089         # root strings.
1090         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1091         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1092         for root in local_roots:
1093             if root not in roots_map:
1094                 roots_map[root] = self.KeepService(
1095                     root, self._user_agent_pool,
1096                     upload_counter=self.upload_counter,
1097                     download_counter=self.download_counter,
1098                     headers=headers,
1099                     insecure=self.insecure)
1100         return local_roots
1101
1102     @staticmethod
1103     def _check_loop_result(result):
1104         # KeepClient RetryLoops should save results as a 2-tuple: the
1105         # actual result of the request, and the number of servers available
1106         # to receive the request this round.
1107         # This method returns True if there's a real result, False if
1108         # there are no more servers available, otherwise None.
1109         if isinstance(result, Exception):
1110             return None
1111         result, tried_server_count = result
1112         if (result is not None) and (result is not False):
1113             return True
1114         elif tried_server_count < 1:
1115             _logger.info("No more Keep services to try; giving up")
1116             return False
1117         else:
1118             return None
1119
1120     def get_from_cache(self, loc_s):
1121         """Fetch a block only if is in the cache, otherwise return None."""
1122         locator = KeepLocator(loc_s)
1123         slot = self.block_cache.get(locator.md5sum)
1124         if slot is not None and slot.ready.is_set():
1125             return slot.get()
1126         else:
1127             return None
1128
1129     def refresh_signature(self, loc):
1130         """Ask Keep to get the remote block and return its local signature"""
1131         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1132         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1133
1134     @retry.retry_method
1135     def head(self, loc_s, **kwargs):
1136         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1137
1138     @retry.retry_method
1139     def get(self, loc_s, **kwargs):
1140         return self._get_or_head(loc_s, method="GET", **kwargs)
1141
1142     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1143         """Get data from Keep.
1144
1145         This method fetches one or more blocks of data from Keep.  It
1146         sends a request each Keep service registered with the API
1147         server (or the proxy provided when this client was
1148         instantiated), then each service named in location hints, in
1149         sequence.  As soon as one service provides the data, it's
1150         returned.
1151
1152         Arguments:
1153         * loc_s: A string of one or more comma-separated locators to fetch.
1154           This method returns the concatenation of these blocks.
1155         * num_retries: The number of times to retry GET requests to
1156           *each* Keep server if it returns temporary failures, with
1157           exponential backoff.  Note that, in each loop, the method may try
1158           to fetch data from every available Keep service, along with any
1159           that are named in location hints in the locator.  The default value
1160           is set when the KeepClient is initialized.
1161         """
1162         if ',' in loc_s:
1163             return ''.join(self.get(x) for x in loc_s.split(','))
1164
1165         self.get_counter.add(1)
1166
1167         request_id = (request_id or
1168                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1169                       arvados.util.new_request_id())
1170         if headers is None:
1171             headers = {}
1172         headers['X-Request-Id'] = request_id
1173
1174         slot = None
1175         blob = None
1176         try:
1177             locator = KeepLocator(loc_s)
1178             if method == "GET":
1179                 while slot is None:
1180                     slot, first = self.block_cache.reserve_cache(locator.md5sum)
1181                     if first:
1182                         # Fresh and empty "first time it is used" slot
1183                         break
1184                     if prefetch:
1185                         # this is request for a prefetch to fill in
1186                         # the cache, don't need to wait for the
1187                         # result, so if it is already in flight return
1188                         # immediately.  Clear 'slot' to prevent
1189                         # finally block from calling slot.set()
1190                         slot = None
1191                         return None
1192
1193                     blob = slot.get()
1194                     if blob is not None:
1195                         self.hits_counter.add(1)
1196                         return blob
1197
1198                     # If blob is None, this means either
1199                     #
1200                     # (a) another thread was fetching this block and
1201                     # failed with an error or
1202                     #
1203                     # (b) cache thrashing caused the slot to be
1204                     # evicted (content set to None) by another thread
1205                     # between the call to reserve_cache() and get().
1206                     #
1207                     # We'll handle these cases by reserving a new slot
1208                     # and then doing a full GET request.
1209                     slot = None
1210
1211             self.misses_counter.add(1)
1212
1213             # If the locator has hints specifying a prefix (indicating a
1214             # remote keepproxy) or the UUID of a local gateway service,
1215             # read data from the indicated service(s) instead of the usual
1216             # list of local disk services.
1217             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1218                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1219             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1220                                for hint in locator.hints if (
1221                                        hint.startswith('K@') and
1222                                        len(hint) == 29 and
1223                                        self._gateway_services.get(hint[2:])
1224                                        )])
1225             # Map root URLs to their KeepService objects.
1226             roots_map = {
1227                 root: self.KeepService(root, self._user_agent_pool,
1228                                        upload_counter=self.upload_counter,
1229                                        download_counter=self.download_counter,
1230                                        headers=headers,
1231                                        insecure=self.insecure)
1232                 for root in hint_roots
1233             }
1234
1235             # See #3147 for a discussion of the loop implementation.  Highlights:
1236             # * Refresh the list of Keep services after each failure, in case
1237             #   it's being updated.
1238             # * Retry until we succeed, we're out of retries, or every available
1239             #   service has returned permanent failure.
1240             sorted_roots = []
1241             roots_map = {}
1242             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1243                                    backoff_start=2)
1244             for tries_left in loop:
1245                 try:
1246                     sorted_roots = self.map_new_services(
1247                         roots_map, locator,
1248                         force_rebuild=(tries_left < num_retries),
1249                         need_writable=False,
1250                         headers=headers)
1251                 except Exception as error:
1252                     loop.save_result(error)
1253                     continue
1254
1255                 # Query KeepService objects that haven't returned
1256                 # permanent failure, in our specified shuffle order.
1257                 services_to_try = [roots_map[root]
1258                                    for root in sorted_roots
1259                                    if roots_map[root].usable()]
1260                 for keep_service in services_to_try:
1261                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1262                     if blob is not None:
1263                         break
1264                 loop.save_result((blob, len(services_to_try)))
1265
1266             # Always cache the result, then return it if we succeeded.
1267             if loop.success():
1268                 return blob
1269         finally:
1270             if slot is not None:
1271                 self.block_cache.set(slot, blob)
1272
1273         # Q: Including 403 is necessary for the Keep tests to continue
1274         # passing, but maybe they should expect KeepReadError instead?
1275         not_founds = sum(1 for key in sorted_roots
1276                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1277         service_errors = ((key, roots_map[key].last_result()['error'])
1278                           for key in sorted_roots)
1279         if not roots_map:
1280             raise arvados.errors.KeepReadError(
1281                 "[{}] failed to read {}: no Keep services available ({})".format(
1282                     request_id, loc_s, loop.last_result()))
1283         elif not_founds == len(sorted_roots):
1284             raise arvados.errors.NotFoundError(
1285                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1286         else:
1287             raise arvados.errors.KeepReadError(
1288                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1289
1290     @retry.retry_method
1291     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1292         """Save data in Keep.
1293
1294         This method will get a list of Keep services from the API server, and
1295         send the data to each one simultaneously in a new thread.  Once the
1296         uploads are finished, if enough copies are saved, this method returns
1297         the most recent HTTP response body.  If requests fail to upload
1298         enough copies, this method raises KeepWriteError.
1299
1300         Arguments:
1301         * data: The string of data to upload.
1302         * copies: The number of copies that the user requires be saved.
1303           Default 2.
1304         * num_retries: The number of times to retry PUT requests to
1305           *each* Keep server if it returns temporary failures, with
1306           exponential backoff.  The default value is set when the
1307           KeepClient is initialized.
1308         * classes: An optional list of storage class names where copies should
1309           be written.
1310         """
1311
1312         classes = classes or self._default_classes
1313
1314         if not isinstance(data, bytes):
1315             data = data.encode()
1316
1317         self.put_counter.add(1)
1318
1319         data_hash = hashlib.md5(data).hexdigest()
1320         loc_s = data_hash + '+' + str(len(data))
1321         if copies < 1:
1322             return loc_s
1323         locator = KeepLocator(loc_s)
1324
1325         request_id = (request_id or
1326                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1327                       arvados.util.new_request_id())
1328         headers = {
1329             'X-Request-Id': request_id,
1330             'X-Keep-Desired-Replicas': str(copies),
1331         }
1332         roots_map = {}
1333         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1334                                backoff_start=2)
1335         done_copies = 0
1336         done_classes = []
1337         for tries_left in loop:
1338             try:
1339                 sorted_roots = self.map_new_services(
1340                     roots_map, locator,
1341                     force_rebuild=(tries_left < num_retries),
1342                     need_writable=True,
1343                     headers=headers)
1344             except Exception as error:
1345                 loop.save_result(error)
1346                 continue
1347
1348             pending_classes = []
1349             if done_classes is not None:
1350                 pending_classes = list(set(classes) - set(done_classes))
1351             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1352                                                         data_hash=data_hash,
1353                                                         copies=copies - done_copies,
1354                                                         max_service_replicas=self.max_replicas_per_service,
1355                                                         timeout=self.current_timeout(num_retries - tries_left),
1356                                                         classes=pending_classes)
1357             for service_root, ks in [(root, roots_map[root])
1358                                      for root in sorted_roots]:
1359                 if ks.finished():
1360                     continue
1361                 writer_pool.add_task(ks, service_root)
1362             writer_pool.join()
1363             pool_copies, pool_classes = writer_pool.done()
1364             done_copies += pool_copies
1365             if (done_classes is not None) and (pool_classes is not None):
1366                 done_classes += pool_classes
1367                 loop.save_result(
1368                     (done_copies >= copies and set(done_classes) == set(classes),
1369                     writer_pool.total_task_nr))
1370             else:
1371                 # Old keepstore contacted without storage classes support:
1372                 # success is determined only by successful copies.
1373                 #
1374                 # Disable storage classes tracking from this point forward.
1375                 if not self._storage_classes_unsupported_warning:
1376                     self._storage_classes_unsupported_warning = True
1377                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1378                 done_classes = None
1379                 loop.save_result(
1380                     (done_copies >= copies, writer_pool.total_task_nr))
1381
1382         if loop.success():
1383             return writer_pool.response()
1384         if not roots_map:
1385             raise arvados.errors.KeepWriteError(
1386                 "[{}] failed to write {}: no Keep services available ({})".format(
1387                     request_id, data_hash, loop.last_result()))
1388         else:
1389             service_errors = ((key, roots_map[key].last_result()['error'])
1390                               for key in sorted_roots
1391                               if roots_map[key].last_result()['error'])
1392             raise arvados.errors.KeepWriteError(
1393                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1394                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1395
1396     def _block_prefetch_worker(self):
1397         """The background downloader thread."""
1398         while True:
1399             try:
1400                 b = self._prefetch_queue.get()
1401                 if b is None:
1402                     return
1403                 self.get(b, prefetch=True)
1404             except Exception:
1405                 _logger.exception("Exception doing block prefetch")
1406
1407     def _start_prefetch_threads(self):
1408         if self._prefetch_threads is None:
1409             with self.lock:
1410                 if self._prefetch_threads is not None:
1411                     return
1412                 self._prefetch_queue = queue.Queue()
1413                 self._prefetch_threads = []
1414                 for i in range(0, self.num_prefetch_threads):
1415                     thread = threading.Thread(target=self._block_prefetch_worker)
1416                     self._prefetch_threads.append(thread)
1417                     thread.daemon = True
1418                     thread.start()
1419
1420     def block_prefetch(self, locator):
1421         """
1422         This relies on the fact that KeepClient implements a block cache,
1423         so repeated requests for the same block will not result in repeated
1424         downloads (unless the block is evicted from the cache.)  This method
1425         does not block.
1426         """
1427
1428         if self.block_cache.get(locator) is not None:
1429             return
1430
1431         self._start_prefetch_threads()
1432         self._prefetch_queue.put(locator)
1433
1434     def stop_prefetch_threads(self):
1435         with self.lock:
1436             if self._prefetch_threads is not None:
1437                 for t in self._prefetch_threads:
1438                     self._prefetch_queue.put(None)
1439                 for t in self._prefetch_threads:
1440                     t.join()
1441             self._prefetch_threads = None
1442             self._prefetch_queue = None
1443
1444     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1445         """A stub for put().
1446
1447         This method is used in place of the real put() method when
1448         using local storage (see constructor's local_store argument).
1449
1450         copies and num_retries arguments are ignored: they are here
1451         only for the sake of offering the same call signature as
1452         put().
1453
1454         Data stored this way can be retrieved via local_store_get().
1455         """
1456         md5 = hashlib.md5(data).hexdigest()
1457         locator = '%s+%d' % (md5, len(data))
1458         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1459             f.write(data)
1460         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1461                   os.path.join(self.local_store, md5))
1462         return locator
1463
1464     def local_store_get(self, loc_s, num_retries=None):
1465         """Companion to local_store_put()."""
1466         try:
1467             locator = KeepLocator(loc_s)
1468         except ValueError:
1469             raise arvados.errors.NotFoundError(
1470                 "Invalid data locator: '%s'" % loc_s)
1471         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1472             return b''
1473         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1474             return f.read()
1475
1476     def local_store_head(self, loc_s, num_retries=None):
1477         """Companion to local_store_put()."""
1478         try:
1479             locator = KeepLocator(loc_s)
1480         except ValueError:
1481             raise arvados.errors.NotFoundError(
1482                 "Invalid data locator: '%s'" % loc_s)
1483         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1484             return True
1485         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1486             return True