21639: Don't return broken cache slots
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 import copy
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
15 import collections
16 import datetime
17 import hashlib
18 import errno
19 import io
20 import logging
21 import math
22 import os
23 import pycurl
24 import queue
25 import re
26 import socket
27 import ssl
28 import sys
29 import threading
30 import resource
31 from . import timer
32 import urllib.parse
33 import traceback
34 import weakref
35
36 if sys.version_info >= (3, 0):
37     from io import BytesIO
38 else:
39     from cStringIO import StringIO as BytesIO
40
41 import arvados
42 import arvados.config as config
43 import arvados.errors
44 import arvados.retry as retry
45 import arvados.util
46 import arvados.diskcache
47 from arvados._pycurlhelper import PyCurlHelper
48
49 _logger = logging.getLogger('arvados.keep')
50 global_client_object = None
51
52
53 # Monkey patch TCP constants when not available (apple). Values sourced from:
54 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
55 if sys.platform == 'darwin':
56     if not hasattr(socket, 'TCP_KEEPALIVE'):
57         socket.TCP_KEEPALIVE = 0x010
58     if not hasattr(socket, 'TCP_KEEPINTVL'):
59         socket.TCP_KEEPINTVL = 0x101
60     if not hasattr(socket, 'TCP_KEEPCNT'):
61         socket.TCP_KEEPCNT = 0x102
62
63
64 class KeepLocator(object):
65     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
66     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
67
68     def __init__(self, locator_str):
69         self.hints = []
70         self._perm_sig = None
71         self._perm_expiry = None
72         pieces = iter(locator_str.split('+'))
73         self.md5sum = next(pieces)
74         try:
75             self.size = int(next(pieces))
76         except StopIteration:
77             self.size = None
78         for hint in pieces:
79             if self.HINT_RE.match(hint) is None:
80                 raise ValueError("invalid hint format: {}".format(hint))
81             elif hint.startswith('A'):
82                 self.parse_permission_hint(hint)
83             else:
84                 self.hints.append(hint)
85
86     def __str__(self):
87         return '+'.join(
88             native_str(s)
89             for s in [self.md5sum, self.size,
90                       self.permission_hint()] + self.hints
91             if s is not None)
92
93     def stripped(self):
94         if self.size is not None:
95             return "%s+%i" % (self.md5sum, self.size)
96         else:
97             return self.md5sum
98
99     def _make_hex_prop(name, length):
100         # Build and return a new property with the given name that
101         # must be a hex string of the given length.
102         data_name = '_{}'.format(name)
103         def getter(self):
104             return getattr(self, data_name)
105         def setter(self, hex_str):
106             if not arvados.util.is_hex(hex_str, length):
107                 raise ValueError("{} is not a {}-digit hex string: {!r}".
108                                  format(name, length, hex_str))
109             setattr(self, data_name, hex_str)
110         return property(getter, setter)
111
112     md5sum = _make_hex_prop('md5sum', 32)
113     perm_sig = _make_hex_prop('perm_sig', 40)
114
115     @property
116     def perm_expiry(self):
117         return self._perm_expiry
118
119     @perm_expiry.setter
120     def perm_expiry(self, value):
121         if not arvados.util.is_hex(value, 1, 8):
122             raise ValueError(
123                 "permission timestamp must be a hex Unix timestamp: {}".
124                 format(value))
125         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
126
127     def permission_hint(self):
128         data = [self.perm_sig, self.perm_expiry]
129         if None in data:
130             return None
131         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
132         return "A{}@{:08x}".format(*data)
133
134     def parse_permission_hint(self, s):
135         try:
136             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
137         except IndexError:
138             raise ValueError("bad permission hint {}".format(s))
139
140     def permission_expired(self, as_of_dt=None):
141         if self.perm_expiry is None:
142             return False
143         elif as_of_dt is None:
144             as_of_dt = datetime.datetime.now()
145         return self.perm_expiry <= as_of_dt
146
147
148 class Keep(object):
149     """Simple interface to a global KeepClient object.
150
151     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
152     own API client.  The global KeepClient will build an API client from the
153     current Arvados configuration, which may not match the one you built.
154     """
155     _last_key = None
156
157     @classmethod
158     def global_client_object(cls):
159         global global_client_object
160         # Previously, KeepClient would change its behavior at runtime based
161         # on these configuration settings.  We simulate that behavior here
162         # by checking the values and returning a new KeepClient if any of
163         # them have changed.
164         key = (config.get('ARVADOS_API_HOST'),
165                config.get('ARVADOS_API_TOKEN'),
166                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
167                config.get('ARVADOS_KEEP_PROXY'),
168                os.environ.get('KEEP_LOCAL_STORE'))
169         if (global_client_object is None) or (cls._last_key != key):
170             global_client_object = KeepClient()
171             cls._last_key = key
172         return global_client_object
173
174     @staticmethod
175     def get(locator, **kwargs):
176         return Keep.global_client_object().get(locator, **kwargs)
177
178     @staticmethod
179     def put(data, **kwargs):
180         return Keep.global_client_object().put(data, **kwargs)
181
182 class KeepBlockCache(object):
183     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
184         self.cache_max = cache_max
185         self._cache = collections.OrderedDict()
186         self._cache_lock = threading.Lock()
187         self._max_slots = max_slots
188         self._disk_cache = disk_cache
189         self._disk_cache_dir = disk_cache_dir
190         self._cache_updating = threading.Condition(self._cache_lock)
191
192         if self._disk_cache and self._disk_cache_dir is None:
193             self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
194             os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
195
196         if self._max_slots == 0:
197             if self._disk_cache:
198                 # Each block uses two file descriptors, one used to
199                 # open it initially and hold the flock(), and a second
200                 # hidden one used by mmap().
201                 #
202                 # Set max slots to 1/8 of maximum file handles.  This
203                 # means we'll use at most 1/4 of total file handles.
204                 #
205                 # NOFILE typically defaults to 1024 on Linux so this
206                 # is 128 slots (256 file handles), which means we can
207                 # cache up to 8 GiB of 64 MiB blocks.  This leaves
208                 # 768 file handles for sockets and other stuff.
209                 #
210                 # When we want the ability to have more cache (e.g. in
211                 # arv-mount) we'll increase rlimit before calling
212                 # this.
213                 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
214             else:
215                 # RAM cache slots
216                 self._max_slots = 512
217
218         if self.cache_max == 0:
219             if self._disk_cache:
220                 fs = os.statvfs(self._disk_cache_dir)
221                 # Calculation of available space incorporates existing cache usage
222                 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
223                 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
224                 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
225                 # pick smallest of:
226                 # 10% of total disk size
227                 # 25% of available space
228                 # max_slots * 64 MiB
229                 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
230             else:
231                 # 256 MiB in RAM
232                 self.cache_max = (256 * 1024 * 1024)
233
234         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
235
236         if self._disk_cache:
237             self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
238             self.cap_cache()
239
240
241     class CacheSlot(object):
242         __slots__ = ("locator", "ready", "content")
243
244         def __init__(self, locator):
245             self.locator = locator
246             self.ready = threading.Event()
247             self.content = None
248
249         def get(self):
250             self.ready.wait()
251             return self.content
252
253         def set(self, value):
254             self.content = value
255             self.ready.set()
256
257         def size(self):
258             if self.content is None:
259                 return 0
260             else:
261                 return len(self.content)
262
263         def evict(self):
264             self.content = None
265             return self.gone()
266
267         def gone(self):
268             return (self.content is None)
269
270     def _resize_cache(self, cache_max, max_slots):
271         # Try and make sure the contents of the cache do not exceed
272         # the supplied maximums.
273
274         _evict_candidates = collections.deque(self._cache.values())
275         sm = sum([slot.size() for slot in _evict_candidates])
276         while len(_evict_candidates) > 0 and (sm > cache_max or len(self._cache) > max_slots):
277             slot = _evict_candidates.popleft()
278             if not slot.ready.is_set():
279                 continue
280
281             if slot.content is None:
282                 # error
283                 del self._cache[slot.locator]
284                 continue
285
286             sz = slot.size()
287
288             # If evict returns false it means the
289             # underlying disk cache couldn't lock the file
290             # for deletion because another process was using
291             # it. Don't count it as reducing the amount
292             # of data in the cache, find something else to
293             # throw out.
294             if slot.evict():
295                 sm -= sz
296
297             # check to make sure the underlying data is gone
298             if slot.gone():
299                 # either way we forget about it.  either the
300                 # other process will delete it, or if we need
301                 # it again and it is still there, we'll find
302                 # it on disk.
303                 del self._cache[slot.locator]
304
305
306     def cap_cache(self):
307         '''Cap the cache size to self.cache_max'''
308         with self._cache_updating:
309             self._resize_cache(self.cache_max, self._max_slots)
310             self._cache_updating.notify_all()
311
312     def _get(self, locator):
313         # Test if the locator is already in the cache
314         if locator in self._cache:
315             n = self._cache[locator]
316             if n.ready.is_set() and n.content is None:
317                 del self._cache[n.locator]
318                 return None
319             self._cache.move_to_end(locator)
320             return n
321         if self._disk_cache:
322             # see if it exists on disk
323             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
324             if n is not None:
325                 self._cache[n.locator] = n
326                 return n
327         return None
328
329     def get(self, locator):
330         with self._cache_lock:
331             return self._get(locator)
332
333     def reserve_cache(self, locator):
334         '''Reserve a cache slot for the specified locator,
335         or return the existing slot.'''
336         with self._cache_updating:
337             n = self._get(locator)
338             if n:
339                 return n, False
340             else:
341                 # Add a new cache slot for the locator
342                 self._resize_cache(self.cache_max, self._max_slots-1)
343                 while len(self._cache) >= self._max_slots:
344                     # If there isn't a slot available, need to wait
345                     # for something to happen that releases one of the
346                     # cache slots.  Idle for 200 ms or woken up by
347                     # another thread
348                     self._cache_updating.wait(timeout=0.2)
349                     self._resize_cache(self.cache_max, self._max_slots-1)
350
351                 if self._disk_cache:
352                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
353                 else:
354                     n = KeepBlockCache.CacheSlot(locator)
355                 self._cache[n.locator] = n
356                 return n, True
357
358     def set(self, slot, blob):
359         try:
360             slot.set(blob)
361             return
362         except OSError as e:
363             if e.errno == errno.ENOMEM:
364                 # Reduce max slots to current - 4, cap cache and retry
365                 with self._cache_lock:
366                     self._max_slots = max(4, len(self._cache) - 4)
367             elif e.errno == errno.ENOSPC:
368                 # Reduce disk max space to current - 256 MiB, cap cache and retry
369                 with self._cache_lock:
370                     sm = sum([st.size() for st in self._cache.values()])
371                     self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
372             elif e.errno == errno.ENODEV:
373                 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
374         except Exception as e:
375             pass
376         finally:
377             # Check if we should evict things from the cache.  Either
378             # because we added a new thing or there was an error and
379             # we possibly adjusted the limits down, so we might need
380             # to push something out.
381             self.cap_cache()
382
383         try:
384             # Only gets here if there was an error the first time. The
385             # exception handler adjusts limits downward in some cases
386             # to free up resources, which would make the operation
387             # succeed.
388             slot.set(blob)
389         except Exception as e:
390             # It failed again.  Give up.
391             slot.set(None)
392             raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
393
394         self.cap_cache()
395
396 class Counter(object):
397     def __init__(self, v=0):
398         self._lk = threading.Lock()
399         self._val = v
400
401     def add(self, v):
402         with self._lk:
403             self._val += v
404
405     def get(self):
406         with self._lk:
407             return self._val
408
409
410 class KeepClient(object):
411     DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
412     DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
413
414     class KeepService(PyCurlHelper):
415         """Make requests to a single Keep service, and track results.
416
417         A KeepService is intended to last long enough to perform one
418         transaction (GET or PUT) against one Keep service. This can
419         involve calling either get() or put() multiple times in order
420         to retry after transient failures. However, calling both get()
421         and put() on a single instance -- or using the same instance
422         to access two different Keep services -- will not produce
423         sensible behavior.
424         """
425
426         HTTP_ERRORS = (
427             socket.error,
428             ssl.SSLError,
429             arvados.errors.HttpError,
430         )
431
432         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
433                      upload_counter=None,
434                      download_counter=None,
435                      headers={},
436                      insecure=False):
437             super(KeepClient.KeepService, self).__init__()
438             self.root = root
439             self._user_agent_pool = user_agent_pool
440             self._result = {'error': None}
441             self._usable = True
442             self._session = None
443             self._socket = None
444             self.get_headers = {'Accept': 'application/octet-stream'}
445             self.get_headers.update(headers)
446             self.put_headers = headers
447             self.upload_counter = upload_counter
448             self.download_counter = download_counter
449             self.insecure = insecure
450
451         def usable(self):
452             """Is it worth attempting a request?"""
453             return self._usable
454
455         def finished(self):
456             """Did the request succeed or encounter permanent failure?"""
457             return self._result['error'] == False or not self._usable
458
459         def last_result(self):
460             return self._result
461
462         def _get_user_agent(self):
463             try:
464                 return self._user_agent_pool.get(block=False)
465             except queue.Empty:
466                 return pycurl.Curl()
467
468         def _put_user_agent(self, ua):
469             try:
470                 ua.reset()
471                 self._user_agent_pool.put(ua, block=False)
472             except:
473                 ua.close()
474
475         def get(self, locator, method="GET", timeout=None):
476             # locator is a KeepLocator object.
477             url = self.root + str(locator)
478             _logger.debug("Request: %s %s", method, url)
479             curl = self._get_user_agent()
480             ok = None
481             try:
482                 with timer.Timer() as t:
483                     self._headers = {}
484                     response_body = BytesIO()
485                     curl.setopt(pycurl.NOSIGNAL, 1)
486                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
487                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
488                     curl.setopt(pycurl.URL, url.encode('utf-8'))
489                     curl.setopt(pycurl.HTTPHEADER, [
490                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
491                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
492                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
493                     if self.insecure:
494                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
495                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
496                     else:
497                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
498                     if method == "HEAD":
499                         curl.setopt(pycurl.NOBODY, True)
500                     else:
501                         curl.setopt(pycurl.HTTPGET, True)
502                     self._setcurltimeouts(curl, timeout, method=="HEAD")
503
504                     try:
505                         curl.perform()
506                     except Exception as e:
507                         raise arvados.errors.HttpError(0, str(e))
508                     finally:
509                         if self._socket:
510                             self._socket.close()
511                             self._socket = None
512                     self._result = {
513                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
514                         'body': response_body.getvalue(),
515                         'headers': self._headers,
516                         'error': False,
517                     }
518
519                 ok = retry.check_http_response_success(self._result['status_code'])
520                 if not ok:
521                     self._result['error'] = arvados.errors.HttpError(
522                         self._result['status_code'],
523                         self._headers.get('x-status-line', 'Error'))
524             except self.HTTP_ERRORS as e:
525                 self._result = {
526                     'error': e,
527                 }
528             self._usable = ok != False
529             if self._result.get('status_code', None):
530                 # The client worked well enough to get an HTTP status
531                 # code, so presumably any problems are just on the
532                 # server side and it's OK to reuse the client.
533                 self._put_user_agent(curl)
534             else:
535                 # Don't return this client to the pool, in case it's
536                 # broken.
537                 curl.close()
538             if not ok:
539                 _logger.debug("Request fail: GET %s => %s: %s",
540                               url, type(self._result['error']), str(self._result['error']))
541                 return None
542             if method == "HEAD":
543                 _logger.info("HEAD %s: %s bytes",
544                          self._result['status_code'],
545                          self._result.get('content-length'))
546                 if self._result['headers'].get('x-keep-locator'):
547                     # This is a response to a remote block copy request, return
548                     # the local copy block locator.
549                     return self._result['headers'].get('x-keep-locator')
550                 return True
551
552             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
553                          self._result['status_code'],
554                          len(self._result['body']),
555                          t.msecs,
556                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
557
558             if self.download_counter:
559                 self.download_counter.add(len(self._result['body']))
560             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
561             if resp_md5 != locator.md5sum:
562                 _logger.warning("Checksum fail: md5(%s) = %s",
563                                 url, resp_md5)
564                 self._result['error'] = arvados.errors.HttpError(
565                     0, 'Checksum fail')
566                 return None
567             return self._result['body']
568
569         def put(self, hash_s, body, timeout=None, headers={}):
570             put_headers = copy.copy(self.put_headers)
571             put_headers.update(headers)
572             url = self.root + hash_s
573             _logger.debug("Request: PUT %s", url)
574             curl = self._get_user_agent()
575             ok = None
576             try:
577                 with timer.Timer() as t:
578                     self._headers = {}
579                     body_reader = BytesIO(body)
580                     response_body = BytesIO()
581                     curl.setopt(pycurl.NOSIGNAL, 1)
582                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
583                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
584                     curl.setopt(pycurl.URL, url.encode('utf-8'))
585                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
586                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
587                     # response) instead of sending the request body immediately.
588                     # This allows the server to reject the request if the request
589                     # is invalid or the server is read-only, without waiting for
590                     # the client to send the entire block.
591                     curl.setopt(pycurl.UPLOAD, True)
592                     curl.setopt(pycurl.INFILESIZE, len(body))
593                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
594                     curl.setopt(pycurl.HTTPHEADER, [
595                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
596                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
597                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
598                     if self.insecure:
599                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
600                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
601                     else:
602                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
603                     self._setcurltimeouts(curl, timeout)
604                     try:
605                         curl.perform()
606                     except Exception as e:
607                         raise arvados.errors.HttpError(0, str(e))
608                     finally:
609                         if self._socket:
610                             self._socket.close()
611                             self._socket = None
612                     self._result = {
613                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
614                         'body': response_body.getvalue().decode('utf-8'),
615                         'headers': self._headers,
616                         'error': False,
617                     }
618                 ok = retry.check_http_response_success(self._result['status_code'])
619                 if not ok:
620                     self._result['error'] = arvados.errors.HttpError(
621                         self._result['status_code'],
622                         self._headers.get('x-status-line', 'Error'))
623             except self.HTTP_ERRORS as e:
624                 self._result = {
625                     'error': e,
626                 }
627             self._usable = ok != False # still usable if ok is True or None
628             if self._result.get('status_code', None):
629                 # Client is functional. See comment in get().
630                 self._put_user_agent(curl)
631             else:
632                 curl.close()
633             if not ok:
634                 _logger.debug("Request fail: PUT %s => %s: %s",
635                               url, type(self._result['error']), str(self._result['error']))
636                 return False
637             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
638                          self._result['status_code'],
639                          len(body),
640                          t.msecs,
641                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
642             if self.upload_counter:
643                 self.upload_counter.add(len(body))
644             return True
645
646
647     class KeepWriterQueue(queue.Queue):
648         def __init__(self, copies, classes=[]):
649             queue.Queue.__init__(self) # Old-style superclass
650             self.wanted_copies = copies
651             self.wanted_storage_classes = classes
652             self.successful_copies = 0
653             self.confirmed_storage_classes = {}
654             self.response = None
655             self.storage_classes_tracking = True
656             self.queue_data_lock = threading.RLock()
657             self.pending_tries = max(copies, len(classes))
658             self.pending_tries_notification = threading.Condition()
659
660         def write_success(self, response, replicas_nr, classes_confirmed):
661             with self.queue_data_lock:
662                 self.successful_copies += replicas_nr
663                 if classes_confirmed is None:
664                     self.storage_classes_tracking = False
665                 elif self.storage_classes_tracking:
666                     for st_class, st_copies in classes_confirmed.items():
667                         try:
668                             self.confirmed_storage_classes[st_class] += st_copies
669                         except KeyError:
670                             self.confirmed_storage_classes[st_class] = st_copies
671                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
672                 self.response = response
673             with self.pending_tries_notification:
674                 self.pending_tries_notification.notify_all()
675
676         def write_fail(self, ks):
677             with self.pending_tries_notification:
678                 self.pending_tries += 1
679                 self.pending_tries_notification.notify()
680
681         def pending_copies(self):
682             with self.queue_data_lock:
683                 return self.wanted_copies - self.successful_copies
684
685         def satisfied_classes(self):
686             with self.queue_data_lock:
687                 if not self.storage_classes_tracking:
688                     # Notifies disabled storage classes expectation to
689                     # the outer loop.
690                     return None
691             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
692
693         def pending_classes(self):
694             with self.queue_data_lock:
695                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
696                     return []
697                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
698                 for st_class, st_copies in self.confirmed_storage_classes.items():
699                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
700                         unsatisfied_classes.remove(st_class)
701                 return unsatisfied_classes
702
703         def get_next_task(self):
704             with self.pending_tries_notification:
705                 while True:
706                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
707                         # This notify_all() is unnecessary --
708                         # write_success() already called notify_all()
709                         # when pending<1 became true, so it's not
710                         # possible for any other thread to be in
711                         # wait() now -- but it's cheap insurance
712                         # against deadlock so we do it anyway:
713                         self.pending_tries_notification.notify_all()
714                         # Drain the queue and then raise Queue.Empty
715                         while True:
716                             self.get_nowait()
717                             self.task_done()
718                     elif self.pending_tries > 0:
719                         service, service_root = self.get_nowait()
720                         if service.finished():
721                             self.task_done()
722                             continue
723                         self.pending_tries -= 1
724                         return service, service_root
725                     elif self.empty():
726                         self.pending_tries_notification.notify_all()
727                         raise queue.Empty
728                     else:
729                         self.pending_tries_notification.wait()
730
731
732     class KeepWriterThreadPool(object):
733         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
734             self.total_task_nr = 0
735             if (not max_service_replicas) or (max_service_replicas >= copies):
736                 num_threads = 1
737             else:
738                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
739             _logger.debug("Pool max threads is %d", num_threads)
740             self.workers = []
741             self.queue = KeepClient.KeepWriterQueue(copies, classes)
742             # Create workers
743             for _ in range(num_threads):
744                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
745                 self.workers.append(w)
746
747         def add_task(self, ks, service_root):
748             self.queue.put((ks, service_root))
749             self.total_task_nr += 1
750
751         def done(self):
752             return self.queue.successful_copies, self.queue.satisfied_classes()
753
754         def join(self):
755             # Start workers
756             for worker in self.workers:
757                 worker.start()
758             # Wait for finished work
759             self.queue.join()
760
761         def response(self):
762             return self.queue.response
763
764
765     class KeepWriterThread(threading.Thread):
766         class TaskFailed(RuntimeError): pass
767
768         def __init__(self, queue, data, data_hash, timeout=None):
769             super(KeepClient.KeepWriterThread, self).__init__()
770             self.timeout = timeout
771             self.queue = queue
772             self.data = data
773             self.data_hash = data_hash
774             self.daemon = True
775
776         def run(self):
777             while True:
778                 try:
779                     service, service_root = self.queue.get_next_task()
780                 except queue.Empty:
781                     return
782                 try:
783                     locator, copies, classes = self.do_task(service, service_root)
784                 except Exception as e:
785                     if not isinstance(e, self.TaskFailed):
786                         _logger.exception("Exception in KeepWriterThread")
787                     self.queue.write_fail(service)
788                 else:
789                     self.queue.write_success(locator, copies, classes)
790                 finally:
791                     self.queue.task_done()
792
793         def do_task(self, service, service_root):
794             classes = self.queue.pending_classes()
795             headers = {}
796             if len(classes) > 0:
797                 classes.sort()
798                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
799             success = bool(service.put(self.data_hash,
800                                         self.data,
801                                         timeout=self.timeout,
802                                         headers=headers))
803             result = service.last_result()
804
805             if not success:
806                 if result.get('status_code'):
807                     _logger.debug("Request fail: PUT %s => %s %s",
808                                   self.data_hash,
809                                   result.get('status_code'),
810                                   result.get('body'))
811                 raise self.TaskFailed()
812
813             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
814                           str(threading.current_thread()),
815                           self.data_hash,
816                           len(self.data),
817                           service_root)
818             try:
819                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
820             except (KeyError, ValueError):
821                 replicas_stored = 1
822
823             classes_confirmed = {}
824             try:
825                 scch = result['headers']['x-keep-storage-classes-confirmed']
826                 for confirmation in scch.replace(' ', '').split(','):
827                     if '=' in confirmation:
828                         stored_class, stored_copies = confirmation.split('=')[:2]
829                         classes_confirmed[stored_class] = int(stored_copies)
830             except (KeyError, ValueError):
831                 # Storage classes confirmed header missing or corrupt
832                 classes_confirmed = None
833
834             return result['body'].strip(), replicas_stored, classes_confirmed
835
836
837     def __init__(self, api_client=None, proxy=None,
838                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
839                  api_token=None, local_store=None, block_cache=None,
840                  num_retries=10, session=None, num_prefetch_threads=None):
841         """Initialize a new KeepClient.
842
843         Arguments:
844         :api_client:
845           The API client to use to find Keep services.  If not
846           provided, KeepClient will build one from available Arvados
847           configuration.
848
849         :proxy:
850           If specified, this KeepClient will send requests to this Keep
851           proxy.  Otherwise, KeepClient will fall back to the setting of the
852           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
853           If you want to KeepClient does not use a proxy, pass in an empty
854           string.
855
856         :timeout:
857           The initial timeout (in seconds) for HTTP requests to Keep
858           non-proxy servers.  A tuple of three floats is interpreted as
859           (connection_timeout, read_timeout, minimum_bandwidth). A connection
860           will be aborted if the average traffic rate falls below
861           minimum_bandwidth bytes per second over an interval of read_timeout
862           seconds. Because timeouts are often a result of transient server
863           load, the actual connection timeout will be increased by a factor
864           of two on each retry.
865           Default: (2, 256, 32768).
866
867         :proxy_timeout:
868           The initial timeout (in seconds) for HTTP requests to
869           Keep proxies. A tuple of three floats is interpreted as
870           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
871           described above for adjusting connection timeouts on retry also
872           applies.
873           Default: (20, 256, 32768).
874
875         :api_token:
876           If you're not using an API client, but only talking
877           directly to a Keep proxy, this parameter specifies an API token
878           to authenticate Keep requests.  It is an error to specify both
879           api_client and api_token.  If you specify neither, KeepClient
880           will use one available from the Arvados configuration.
881
882         :local_store:
883           If specified, this KeepClient will bypass Keep
884           services, and save data to the named directory.  If unspecified,
885           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
886           environment variable.  If you want to ensure KeepClient does not
887           use local storage, pass in an empty string.  This is primarily
888           intended to mock a server for testing.
889
890         :num_retries:
891           The default number of times to retry failed requests.
892           This will be used as the default num_retries value when get() and
893           put() are called.  Default 10.
894         """
895         self.lock = threading.Lock()
896         if proxy is None:
897             if config.get('ARVADOS_KEEP_SERVICES'):
898                 proxy = config.get('ARVADOS_KEEP_SERVICES')
899             else:
900                 proxy = config.get('ARVADOS_KEEP_PROXY')
901         if api_token is None:
902             if api_client is None:
903                 api_token = config.get('ARVADOS_API_TOKEN')
904             else:
905                 api_token = api_client.api_token
906         elif api_client is not None:
907             raise ValueError(
908                 "can't build KeepClient with both API client and token")
909         if local_store is None:
910             local_store = os.environ.get('KEEP_LOCAL_STORE')
911
912         if api_client is None:
913             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
914         else:
915             self.insecure = api_client.insecure
916
917         self.block_cache = block_cache if block_cache else KeepBlockCache()
918         self.timeout = timeout
919         self.proxy_timeout = proxy_timeout
920         self._user_agent_pool = queue.LifoQueue()
921         self.upload_counter = Counter()
922         self.download_counter = Counter()
923         self.put_counter = Counter()
924         self.get_counter = Counter()
925         self.hits_counter = Counter()
926         self.misses_counter = Counter()
927         self._storage_classes_unsupported_warning = False
928         self._default_classes = []
929         self.num_prefetch_threads = num_prefetch_threads or 2
930         self._prefetch_queue = None
931         self._prefetch_threads = None
932
933         if local_store:
934             self.local_store = local_store
935             self.head = self.local_store_head
936             self.get = self.local_store_get
937             self.put = self.local_store_put
938         else:
939             self.num_retries = num_retries
940             self.max_replicas_per_service = None
941             if proxy:
942                 proxy_uris = proxy.split()
943                 for i in range(len(proxy_uris)):
944                     if not proxy_uris[i].endswith('/'):
945                         proxy_uris[i] += '/'
946                     # URL validation
947                     url = urllib.parse.urlparse(proxy_uris[i])
948                     if not (url.scheme and url.netloc):
949                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
950                 self.api_token = api_token
951                 self._gateway_services = {}
952                 self._keep_services = [{
953                     'uuid': "00000-bi6l4-%015d" % idx,
954                     'service_type': 'proxy',
955                     '_service_root': uri,
956                     } for idx, uri in enumerate(proxy_uris)]
957                 self._writable_services = self._keep_services
958                 self.using_proxy = True
959                 self._static_services_list = True
960             else:
961                 # It's important to avoid instantiating an API client
962                 # unless we actually need one, for testing's sake.
963                 if api_client is None:
964                     api_client = arvados.api('v1')
965                 self.api_client = api_client
966                 self.api_token = api_client.api_token
967                 self._gateway_services = {}
968                 self._keep_services = None
969                 self._writable_services = None
970                 self.using_proxy = None
971                 self._static_services_list = False
972                 try:
973                     self._default_classes = [
974                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
975                 except KeyError:
976                     # We're talking to an old cluster
977                     pass
978
979     def current_timeout(self, attempt_number):
980         """Return the appropriate timeout to use for this client.
981
982         The proxy timeout setting if the backend service is currently a proxy,
983         the regular timeout setting otherwise.  The `attempt_number` indicates
984         how many times the operation has been tried already (starting from 0
985         for the first try), and scales the connection timeout portion of the
986         return value accordingly.
987
988         """
989         # TODO(twp): the timeout should be a property of a
990         # KeepService, not a KeepClient. See #4488.
991         t = self.proxy_timeout if self.using_proxy else self.timeout
992         if len(t) == 2:
993             return (t[0] * (1 << attempt_number), t[1])
994         else:
995             return (t[0] * (1 << attempt_number), t[1], t[2])
996     def _any_nondisk_services(self, service_list):
997         return any(ks.get('service_type', 'disk') != 'disk'
998                    for ks in service_list)
999
1000     def build_services_list(self, force_rebuild=False):
1001         if (self._static_services_list or
1002               (self._keep_services and not force_rebuild)):
1003             return
1004         with self.lock:
1005             try:
1006                 keep_services = self.api_client.keep_services().accessible()
1007             except Exception:  # API server predates Keep services.
1008                 keep_services = self.api_client.keep_disks().list()
1009
1010             # Gateway services are only used when specified by UUID,
1011             # so there's nothing to gain by filtering them by
1012             # service_type.
1013             self._gateway_services = {ks['uuid']: ks for ks in
1014                                       keep_services.execute()['items']}
1015             if not self._gateway_services:
1016                 raise arvados.errors.NoKeepServersError()
1017
1018             # Precompute the base URI for each service.
1019             for r in self._gateway_services.values():
1020                 host = r['service_host']
1021                 if not host.startswith('[') and host.find(':') >= 0:
1022                     # IPv6 URIs must be formatted like http://[::1]:80/...
1023                     host = '[' + host + ']'
1024                 r['_service_root'] = "{}://{}:{:d}/".format(
1025                     'https' if r['service_ssl_flag'] else 'http',
1026                     host,
1027                     r['service_port'])
1028
1029             _logger.debug(str(self._gateway_services))
1030             self._keep_services = [
1031                 ks for ks in self._gateway_services.values()
1032                 if not ks.get('service_type', '').startswith('gateway:')]
1033             self._writable_services = [ks for ks in self._keep_services
1034                                        if not ks.get('read_only')]
1035
1036             # For disk type services, max_replicas_per_service is 1
1037             # It is unknown (unlimited) for other service types.
1038             if self._any_nondisk_services(self._writable_services):
1039                 self.max_replicas_per_service = None
1040             else:
1041                 self.max_replicas_per_service = 1
1042
1043     def _service_weight(self, data_hash, service_uuid):
1044         """Compute the weight of a Keep service endpoint for a data
1045         block with a known hash.
1046
1047         The weight is md5(h + u) where u is the last 15 characters of
1048         the service endpoint's UUID.
1049         """
1050         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1051
1052     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1053         """Return an array of Keep service endpoints, in the order in
1054         which they should be probed when reading or writing data with
1055         the given hash+hints.
1056         """
1057         self.build_services_list(force_rebuild)
1058
1059         sorted_roots = []
1060         # Use the services indicated by the given +K@... remote
1061         # service hints, if any are present and can be resolved to a
1062         # URI.
1063         for hint in locator.hints:
1064             if hint.startswith('K@'):
1065                 if len(hint) == 7:
1066                     sorted_roots.append(
1067                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1068                 elif len(hint) == 29:
1069                     svc = self._gateway_services.get(hint[2:])
1070                     if svc:
1071                         sorted_roots.append(svc['_service_root'])
1072
1073         # Sort the available local services by weight (heaviest first)
1074         # for this locator, and return their service_roots (base URIs)
1075         # in that order.
1076         use_services = self._keep_services
1077         if need_writable:
1078             use_services = self._writable_services
1079         self.using_proxy = self._any_nondisk_services(use_services)
1080         sorted_roots.extend([
1081             svc['_service_root'] for svc in sorted(
1082                 use_services,
1083                 reverse=True,
1084                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1085         _logger.debug("{}: {}".format(locator, sorted_roots))
1086         return sorted_roots
1087
1088     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1089         # roots_map is a dictionary, mapping Keep service root strings
1090         # to KeepService objects.  Poll for Keep services, and add any
1091         # new ones to roots_map.  Return the current list of local
1092         # root strings.
1093         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1094         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1095         for root in local_roots:
1096             if root not in roots_map:
1097                 roots_map[root] = self.KeepService(
1098                     root, self._user_agent_pool,
1099                     upload_counter=self.upload_counter,
1100                     download_counter=self.download_counter,
1101                     headers=headers,
1102                     insecure=self.insecure)
1103         return local_roots
1104
1105     @staticmethod
1106     def _check_loop_result(result):
1107         # KeepClient RetryLoops should save results as a 2-tuple: the
1108         # actual result of the request, and the number of servers available
1109         # to receive the request this round.
1110         # This method returns True if there's a real result, False if
1111         # there are no more servers available, otherwise None.
1112         if isinstance(result, Exception):
1113             return None
1114         result, tried_server_count = result
1115         if (result is not None) and (result is not False):
1116             return True
1117         elif tried_server_count < 1:
1118             _logger.info("No more Keep services to try; giving up")
1119             return False
1120         else:
1121             return None
1122
1123     def get_from_cache(self, loc_s):
1124         """Fetch a block only if is in the cache, otherwise return None."""
1125         locator = KeepLocator(loc_s)
1126         slot = self.block_cache.get(locator.md5sum)
1127         if slot is not None and slot.ready.is_set():
1128             return slot.get()
1129         else:
1130             return None
1131
1132     def refresh_signature(self, loc):
1133         """Ask Keep to get the remote block and return its local signature"""
1134         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1135         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1136
1137     @retry.retry_method
1138     def head(self, loc_s, **kwargs):
1139         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1140
1141     @retry.retry_method
1142     def get(self, loc_s, **kwargs):
1143         return self._get_or_head(loc_s, method="GET", **kwargs)
1144
1145     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1146         """Get data from Keep.
1147
1148         This method fetches one or more blocks of data from Keep.  It
1149         sends a request each Keep service registered with the API
1150         server (or the proxy provided when this client was
1151         instantiated), then each service named in location hints, in
1152         sequence.  As soon as one service provides the data, it's
1153         returned.
1154
1155         Arguments:
1156         * loc_s: A string of one or more comma-separated locators to fetch.
1157           This method returns the concatenation of these blocks.
1158         * num_retries: The number of times to retry GET requests to
1159           *each* Keep server if it returns temporary failures, with
1160           exponential backoff.  Note that, in each loop, the method may try
1161           to fetch data from every available Keep service, along with any
1162           that are named in location hints in the locator.  The default value
1163           is set when the KeepClient is initialized.
1164         """
1165         if ',' in loc_s:
1166             return ''.join(self.get(x) for x in loc_s.split(','))
1167
1168         self.get_counter.add(1)
1169
1170         request_id = (request_id or
1171                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1172                       arvados.util.new_request_id())
1173         if headers is None:
1174             headers = {}
1175         headers['X-Request-Id'] = request_id
1176
1177         slot = None
1178         blob = None
1179         try:
1180             locator = KeepLocator(loc_s)
1181             if method == "GET":
1182                 while slot is None:
1183                     slot, first = self.block_cache.reserve_cache(locator.md5sum)
1184                     if first:
1185                         # Fresh and empty "first time it is used" slot
1186                         break
1187                     if prefetch:
1188                         # this is request for a prefetch to fill in
1189                         # the cache, don't need to wait for the
1190                         # result, so if it is already in flight return
1191                         # immediately.  Clear 'slot' to prevent
1192                         # finally block from calling slot.set()
1193                         slot = None
1194                         return None
1195
1196                     blob = slot.get()
1197                     if blob is not None:
1198                         self.hits_counter.add(1)
1199                         return blob
1200
1201                     # If blob is None, this means either
1202                     #
1203                     # (a) another thread was fetching this block and
1204                     # failed with an error or
1205                     #
1206                     # (b) cache thrashing caused the slot to be
1207                     # evicted (content set to None) by another thread
1208                     # between the call to reserve_cache() and get().
1209                     #
1210                     # We'll handle these cases by reserving a new slot
1211                     # and then doing a full GET request.
1212                     slot = None
1213
1214             self.misses_counter.add(1)
1215
1216             # If the locator has hints specifying a prefix (indicating a
1217             # remote keepproxy) or the UUID of a local gateway service,
1218             # read data from the indicated service(s) instead of the usual
1219             # list of local disk services.
1220             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1221                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1222             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1223                                for hint in locator.hints if (
1224                                        hint.startswith('K@') and
1225                                        len(hint) == 29 and
1226                                        self._gateway_services.get(hint[2:])
1227                                        )])
1228             # Map root URLs to their KeepService objects.
1229             roots_map = {
1230                 root: self.KeepService(root, self._user_agent_pool,
1231                                        upload_counter=self.upload_counter,
1232                                        download_counter=self.download_counter,
1233                                        headers=headers,
1234                                        insecure=self.insecure)
1235                 for root in hint_roots
1236             }
1237
1238             # See #3147 for a discussion of the loop implementation.  Highlights:
1239             # * Refresh the list of Keep services after each failure, in case
1240             #   it's being updated.
1241             # * Retry until we succeed, we're out of retries, or every available
1242             #   service has returned permanent failure.
1243             sorted_roots = []
1244             roots_map = {}
1245             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1246                                    backoff_start=2)
1247             for tries_left in loop:
1248                 try:
1249                     sorted_roots = self.map_new_services(
1250                         roots_map, locator,
1251                         force_rebuild=(tries_left < num_retries),
1252                         need_writable=False,
1253                         headers=headers)
1254                 except Exception as error:
1255                     loop.save_result(error)
1256                     continue
1257
1258                 # Query KeepService objects that haven't returned
1259                 # permanent failure, in our specified shuffle order.
1260                 services_to_try = [roots_map[root]
1261                                    for root in sorted_roots
1262                                    if roots_map[root].usable()]
1263                 for keep_service in services_to_try:
1264                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1265                     if blob is not None:
1266                         break
1267                 loop.save_result((blob, len(services_to_try)))
1268
1269             # Always cache the result, then return it if we succeeded.
1270             if loop.success():
1271                 return blob
1272         finally:
1273             if slot is not None:
1274                 self.block_cache.set(slot, blob)
1275
1276         # Q: Including 403 is necessary for the Keep tests to continue
1277         # passing, but maybe they should expect KeepReadError instead?
1278         not_founds = sum(1 for key in sorted_roots
1279                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1280         service_errors = ((key, roots_map[key].last_result()['error'])
1281                           for key in sorted_roots)
1282         if not roots_map:
1283             raise arvados.errors.KeepReadError(
1284                 "[{}] failed to read {}: no Keep services available ({})".format(
1285                     request_id, loc_s, loop.last_result()))
1286         elif not_founds == len(sorted_roots):
1287             raise arvados.errors.NotFoundError(
1288                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1289         else:
1290             raise arvados.errors.KeepReadError(
1291                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1292
1293     @retry.retry_method
1294     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1295         """Save data in Keep.
1296
1297         This method will get a list of Keep services from the API server, and
1298         send the data to each one simultaneously in a new thread.  Once the
1299         uploads are finished, if enough copies are saved, this method returns
1300         the most recent HTTP response body.  If requests fail to upload
1301         enough copies, this method raises KeepWriteError.
1302
1303         Arguments:
1304         * data: The string of data to upload.
1305         * copies: The number of copies that the user requires be saved.
1306           Default 2.
1307         * num_retries: The number of times to retry PUT requests to
1308           *each* Keep server if it returns temporary failures, with
1309           exponential backoff.  The default value is set when the
1310           KeepClient is initialized.
1311         * classes: An optional list of storage class names where copies should
1312           be written.
1313         """
1314
1315         classes = classes or self._default_classes
1316
1317         if not isinstance(data, bytes):
1318             data = data.encode()
1319
1320         self.put_counter.add(1)
1321
1322         data_hash = hashlib.md5(data).hexdigest()
1323         loc_s = data_hash + '+' + str(len(data))
1324         if copies < 1:
1325             return loc_s
1326         locator = KeepLocator(loc_s)
1327
1328         request_id = (request_id or
1329                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1330                       arvados.util.new_request_id())
1331         headers = {
1332             'X-Request-Id': request_id,
1333             'X-Keep-Desired-Replicas': str(copies),
1334         }
1335         roots_map = {}
1336         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1337                                backoff_start=2)
1338         done_copies = 0
1339         done_classes = []
1340         for tries_left in loop:
1341             try:
1342                 sorted_roots = self.map_new_services(
1343                     roots_map, locator,
1344                     force_rebuild=(tries_left < num_retries),
1345                     need_writable=True,
1346                     headers=headers)
1347             except Exception as error:
1348                 loop.save_result(error)
1349                 continue
1350
1351             pending_classes = []
1352             if done_classes is not None:
1353                 pending_classes = list(set(classes) - set(done_classes))
1354             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1355                                                         data_hash=data_hash,
1356                                                         copies=copies - done_copies,
1357                                                         max_service_replicas=self.max_replicas_per_service,
1358                                                         timeout=self.current_timeout(num_retries - tries_left),
1359                                                         classes=pending_classes)
1360             for service_root, ks in [(root, roots_map[root])
1361                                      for root in sorted_roots]:
1362                 if ks.finished():
1363                     continue
1364                 writer_pool.add_task(ks, service_root)
1365             writer_pool.join()
1366             pool_copies, pool_classes = writer_pool.done()
1367             done_copies += pool_copies
1368             if (done_classes is not None) and (pool_classes is not None):
1369                 done_classes += pool_classes
1370                 loop.save_result(
1371                     (done_copies >= copies and set(done_classes) == set(classes),
1372                     writer_pool.total_task_nr))
1373             else:
1374                 # Old keepstore contacted without storage classes support:
1375                 # success is determined only by successful copies.
1376                 #
1377                 # Disable storage classes tracking from this point forward.
1378                 if not self._storage_classes_unsupported_warning:
1379                     self._storage_classes_unsupported_warning = True
1380                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1381                 done_classes = None
1382                 loop.save_result(
1383                     (done_copies >= copies, writer_pool.total_task_nr))
1384
1385         if loop.success():
1386             return writer_pool.response()
1387         if not roots_map:
1388             raise arvados.errors.KeepWriteError(
1389                 "[{}] failed to write {}: no Keep services available ({})".format(
1390                     request_id, data_hash, loop.last_result()))
1391         else:
1392             service_errors = ((key, roots_map[key].last_result()['error'])
1393                               for key in sorted_roots
1394                               if roots_map[key].last_result()['error'])
1395             raise arvados.errors.KeepWriteError(
1396                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1397                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1398
1399     def _block_prefetch_worker(self):
1400         """The background downloader thread."""
1401         while True:
1402             try:
1403                 b = self._prefetch_queue.get()
1404                 if b is None:
1405                     return
1406                 self.get(b, prefetch=True)
1407             except Exception:
1408                 _logger.exception("Exception doing block prefetch")
1409
1410     def _start_prefetch_threads(self):
1411         if self._prefetch_threads is None:
1412             with self.lock:
1413                 if self._prefetch_threads is not None:
1414                     return
1415                 self._prefetch_queue = queue.Queue()
1416                 self._prefetch_threads = []
1417                 for i in range(0, self.num_prefetch_threads):
1418                     thread = threading.Thread(target=self._block_prefetch_worker)
1419                     self._prefetch_threads.append(thread)
1420                     thread.daemon = True
1421                     thread.start()
1422
1423     def block_prefetch(self, locator):
1424         """
1425         This relies on the fact that KeepClient implements a block cache,
1426         so repeated requests for the same block will not result in repeated
1427         downloads (unless the block is evicted from the cache.)  This method
1428         does not block.
1429         """
1430
1431         if self.block_cache.get(locator) is not None:
1432             return
1433
1434         self._start_prefetch_threads()
1435         self._prefetch_queue.put(locator)
1436
1437     def stop_prefetch_threads(self):
1438         with self.lock:
1439             if self._prefetch_threads is not None:
1440                 for t in self._prefetch_threads:
1441                     self._prefetch_queue.put(None)
1442                 for t in self._prefetch_threads:
1443                     t.join()
1444             self._prefetch_threads = None
1445             self._prefetch_queue = None
1446
1447     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1448         """A stub for put().
1449
1450         This method is used in place of the real put() method when
1451         using local storage (see constructor's local_store argument).
1452
1453         copies and num_retries arguments are ignored: they are here
1454         only for the sake of offering the same call signature as
1455         put().
1456
1457         Data stored this way can be retrieved via local_store_get().
1458         """
1459         md5 = hashlib.md5(data).hexdigest()
1460         locator = '%s+%d' % (md5, len(data))
1461         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1462             f.write(data)
1463         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1464                   os.path.join(self.local_store, md5))
1465         return locator
1466
1467     def local_store_get(self, loc_s, num_retries=None):
1468         """Companion to local_store_put()."""
1469         try:
1470             locator = KeepLocator(loc_s)
1471         except ValueError:
1472             raise arvados.errors.NotFoundError(
1473                 "Invalid data locator: '%s'" % loc_s)
1474         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1475             return b''
1476         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1477             return f.read()
1478
1479     def local_store_head(self, loc_s, num_retries=None):
1480         """Companion to local_store_put()."""
1481         try:
1482             locator = KeepLocator(loc_s)
1483         except ValueError:
1484             raise arvados.errors.NotFoundError(
1485                 "Invalid data locator: '%s'" % loc_s)
1486         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1487             return True
1488         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1489             return True